from typing import List, Optional, Union, Dict, Any from sqlalchemy.orm import Session from app.models.playlist import Playlist from app.models.song import Song from app.schemas.playlist import PlaylistCreate, PlaylistUpdate def get_playlist(db: Session, playlist_id: int) -> Optional[Playlist]: return db.query(Playlist).filter(Playlist.id == playlist_id).first() def get_playlists_by_user( db: Session, user_id: int, skip: int = 0, limit: int = 100, search: Optional[str] = None ) -> List[Playlist]: query = db.query(Playlist).filter(Playlist.user_id == user_id) if search: query = query.filter(Playlist.name.ilike(f"%{search}%")) return query.offset(skip).limit(limit).all() def get_public_playlists( db: Session, skip: int = 0, limit: int = 100, search: Optional[str] = None ) -> List[Playlist]: query = db.query(Playlist).filter(Playlist.is_public.is_(True)) if search: query = query.filter(Playlist.name.ilike(f"%{search}%")) return query.offset(skip).limit(limit).all() def create_playlist(db: Session, obj_in: PlaylistCreate, user_id: int) -> Playlist: db_obj = Playlist( name=obj_in.name, description=obj_in.description, is_public=obj_in.is_public, cover_image_path=obj_in.cover_image_path, user_id=user_id, ) db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def update_playlist( db: Session, *, db_obj: Playlist, obj_in: Union[PlaylistUpdate, Dict[str, Any]] ) -> Playlist: if isinstance(obj_in, dict): update_data = obj_in else: update_data = obj_in.dict(exclude_unset=True) for field in update_data: if hasattr(db_obj, field): setattr(db_obj, field, update_data[field]) db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def delete_playlist(db: Session, *, id: int) -> None: playlist = db.query(Playlist).get(id) if playlist: db.delete(playlist) db.commit() def add_song_to_playlist(db: Session, *, playlist_id: int, song_id: int) -> Playlist: playlist = get_playlist(db, playlist_id=playlist_id) song = db.query(Song).filter(Song.id == song_id).first() if playlist and song: # Check if song is already in playlist if song not in playlist.songs: playlist.songs.append(song) db.commit() db.refresh(playlist) return playlist def remove_song_from_playlist(db: Session, *, playlist_id: int, song_id: int) -> Playlist: playlist = get_playlist(db, playlist_id=playlist_id) song = db.query(Song).filter(Song.id == song_id).first() if playlist and song: # Check if song is in playlist if song in playlist.songs: playlist.songs.remove(song) db.commit() db.refresh(playlist) return playlist