Add CRUD endpoints for songs, albums, artists, and playlists

This commit is contained in:
Automated Action 2025-06-05 05:56:48 +00:00
parent a07f19f5cc
commit 3614aa6487
13 changed files with 1096 additions and 6 deletions

View File

@ -1,15 +1,14 @@
from fastapi import APIRouter
# Import individual routers here
from app.api.v1.endpoints import users, auth
# from app.api.v1.endpoints import songs, albums, artists, playlists
from app.api.v1.endpoints import users, auth, songs, albums, artists, playlists
api_router = APIRouter()
# Include all routers
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(auth.router, prefix="/auth", tags=["auth"])
# api_router.include_router(songs.router, prefix="/songs", tags=["songs"])
# api_router.include_router(albums.router, prefix="/albums", tags=["albums"])
# api_router.include_router(artists.router, prefix="/artists", tags=["artists"])
# api_router.include_router(playlists.router, prefix="/playlists", tags=["playlists"])
api_router.include_router(songs.router, prefix="/songs", tags=["songs"])
api_router.include_router(albums.router, prefix="/albums", tags=["albums"])
api_router.include_router(artists.router, prefix="/artists", tags=["artists"])
api_router.include_router(playlists.router, prefix="/playlists", tags=["playlists"])

View File

@ -0,0 +1,123 @@
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from app.api.deps import get_db, get_current_active_superuser
from app.models.user import User
from app.schemas.album import Album, AlbumCreate, AlbumUpdate
from app.services.album import (
create_album,
delete_album,
get_album,
get_albums,
update_album,
)
from app.services.artist import get_artist
router = APIRouter()
@router.get("/", response_model=List[Album])
def read_albums(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
artist_id: Optional[int] = Query(None, description="Filter by artist ID"),
search: Optional[str] = Query(None, description="Search by album title"),
) -> Any:
"""
Retrieve albums.
"""
albums = get_albums(db, skip=skip, limit=limit, artist_id=artist_id, search=search)
return albums
@router.post("/", response_model=Album)
def create_new_album(
*,
db: Session = Depends(get_db),
album_in: AlbumCreate,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Create new album. Only superusers can create albums.
"""
# Check if artist exists
artist = get_artist(db, artist_id=album_in.artist_id)
if not artist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Artist not found",
)
album = create_album(db, obj_in=album_in)
return album
@router.get("/{album_id}", response_model=Album)
def read_album(
*,
db: Session = Depends(get_db),
album_id: int,
) -> Any:
"""
Get album by ID.
"""
album = get_album(db, album_id=album_id)
if not album:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Album not found",
)
return album
@router.put("/{album_id}", response_model=Album)
def update_album_endpoint(
*,
db: Session = Depends(get_db),
album_id: int,
album_in: AlbumUpdate,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Update an album. Only superusers can update albums.
"""
album = get_album(db, album_id=album_id)
if not album:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Album not found",
)
# If updating artist_id, check if artist exists
if album_in.artist_id is not None and album_in.artist_id != album.artist_id:
artist = get_artist(db, artist_id=album_in.artist_id)
if not artist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Artist not found",
)
album = update_album(db, db_obj=album, obj_in=album_in)
return album
@router.delete("/{album_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_album_endpoint(
*,
db: Session = Depends(get_db),
album_id: int,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Delete an album. Only superusers can delete albums.
"""
album = get_album(db, album_id=album_id)
if not album:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Album not found",
)
delete_album(db, id=album_id)
return None

View File

@ -0,0 +1,111 @@
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from app.api.deps import get_db, get_current_active_superuser
from app.models.user import User
from app.schemas.artist import Artist, ArtistCreate, ArtistUpdate
from app.services.artist import (
create_artist,
delete_artist,
get_artist,
get_artist_by_name,
get_artists,
update_artist,
)
router = APIRouter()
@router.get("/", response_model=List[Artist])
def read_artists(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
search: Optional[str] = Query(None, description="Search by artist name"),
) -> Any:
"""
Retrieve artists.
"""
artists = get_artists(db, skip=skip, limit=limit, search=search)
return artists
@router.post("/", response_model=Artist)
def create_new_artist(
*,
db: Session = Depends(get_db),
artist_in: ArtistCreate,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Create new artist. Only superusers can create artists.
"""
artist = get_artist_by_name(db, name=artist_in.name)
if artist:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="An artist with this name already exists.",
)
artist = create_artist(db, obj_in=artist_in)
return artist
@router.get("/{artist_id}", response_model=Artist)
def read_artist(
*,
db: Session = Depends(get_db),
artist_id: int,
) -> Any:
"""
Get artist by ID.
"""
artist = get_artist(db, artist_id=artist_id)
if not artist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Artist not found",
)
return artist
@router.put("/{artist_id}", response_model=Artist)
def update_artist_endpoint(
*,
db: Session = Depends(get_db),
artist_id: int,
artist_in: ArtistUpdate,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Update an artist. Only superusers can update artists.
"""
artist = get_artist(db, artist_id=artist_id)
if not artist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Artist not found",
)
artist = update_artist(db, db_obj=artist, obj_in=artist_in)
return artist
@router.delete("/{artist_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_artist_endpoint(
*,
db: Session = Depends(get_db),
artist_id: int,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Delete an artist. Only superusers can delete artists.
"""
artist = get_artist(db, artist_id=artist_id)
if not artist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Artist not found",
)
delete_artist(db, id=artist_id)
return None

View File

@ -0,0 +1,227 @@
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from app.api.deps import get_db, get_current_active_user
from app.models.user import User
from app.schemas.playlist import (
Playlist,
PlaylistCreate,
PlaylistUpdate,
PlaylistWithSongs,
PlaylistSongAdd,
PlaylistSongRemove
)
from app.services.playlist import (
create_playlist,
delete_playlist,
get_playlist,
get_playlists_by_user,
get_public_playlists,
update_playlist,
add_song_to_playlist,
remove_song_from_playlist
)
from app.services.song import get_song
router = APIRouter()
@router.get("/", response_model=List[Playlist])
def read_playlists(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
search: Optional[str] = Query(None, description="Search by playlist name"),
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Retrieve user's playlists.
"""
playlists = get_playlists_by_user(
db,
user_id=current_user.id,
skip=skip,
limit=limit,
search=search
)
return playlists
@router.get("/public", response_model=List[Playlist])
def read_public_playlists(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
search: Optional[str] = Query(None, description="Search by playlist name"),
) -> Any:
"""
Retrieve public playlists.
"""
playlists = get_public_playlists(db, skip=skip, limit=limit, search=search)
return playlists
@router.post("/", response_model=Playlist)
def create_new_playlist(
*,
db: Session = Depends(get_db),
playlist_in: PlaylistCreate,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Create new playlist.
"""
playlist = create_playlist(db, obj_in=playlist_in, user_id=current_user.id)
return playlist
@router.get("/{playlist_id}", response_model=PlaylistWithSongs)
def read_playlist(
*,
db: Session = Depends(get_db),
playlist_id: int,
current_user: Optional[User] = Depends(get_current_active_user),
) -> Any:
"""
Get playlist by ID with songs.
"""
playlist = get_playlist(db, playlist_id=playlist_id)
if not playlist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Playlist not found",
)
# Check if user has access to the playlist
if not playlist.is_public and playlist.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to access this playlist",
)
return playlist
@router.put("/{playlist_id}", response_model=Playlist)
def update_playlist_endpoint(
*,
db: Session = Depends(get_db),
playlist_id: int,
playlist_in: PlaylistUpdate,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Update a playlist.
"""
playlist = get_playlist(db, playlist_id=playlist_id)
if not playlist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Playlist not found",
)
# Check if user owns the playlist
if playlist.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to modify this playlist",
)
playlist = update_playlist(db, db_obj=playlist, obj_in=playlist_in)
return playlist
@router.delete("/{playlist_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_playlist_endpoint(
*,
db: Session = Depends(get_db),
playlist_id: int,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Delete a playlist.
"""
playlist = get_playlist(db, playlist_id=playlist_id)
if not playlist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Playlist not found",
)
# Check if user owns the playlist
if playlist.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to delete this playlist",
)
delete_playlist(db, id=playlist_id)
return None
@router.post("/{playlist_id}/songs", response_model=PlaylistWithSongs)
def add_song_to_playlist_endpoint(
*,
db: Session = Depends(get_db),
playlist_id: int,
song_data: PlaylistSongAdd,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Add a song to a playlist.
"""
playlist = get_playlist(db, playlist_id=playlist_id)
if not playlist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Playlist not found",
)
# Check if user owns the playlist
if playlist.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to modify this playlist",
)
# Check if song exists
song = get_song(db, song_id=song_data.song_id)
if not song:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Song not found",
)
playlist = add_song_to_playlist(db, playlist_id=playlist_id, song_id=song_data.song_id)
return playlist
@router.delete("/{playlist_id}/songs", response_model=PlaylistWithSongs)
def remove_song_from_playlist_endpoint(
*,
db: Session = Depends(get_db),
playlist_id: int,
song_data: PlaylistSongRemove,
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Remove a song from a playlist.
"""
playlist = get_playlist(db, playlist_id=playlist_id)
if not playlist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Playlist not found",
)
# Check if user owns the playlist
if playlist.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to modify this playlist",
)
playlist = remove_song_from_playlist(db, playlist_id=playlist_id, song_id=song_data.song_id)
return playlist

View File

@ -0,0 +1,169 @@
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from app.api.deps import get_db, get_current_active_superuser
from app.models.user import User
from app.schemas.song import Song, SongCreate, SongUpdate
from app.services.song import (
create_song,
delete_song,
get_song,
get_songs,
update_song,
)
from app.services.artist import get_artist
from app.services.album import get_album
router = APIRouter()
@router.get("/", response_model=List[Song])
def read_songs(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 100,
artist_id: Optional[int] = Query(None, description="Filter by artist ID"),
album_id: Optional[int] = Query(None, description="Filter by album ID"),
search: Optional[str] = Query(None, description="Search by song title"),
) -> Any:
"""
Retrieve songs.
"""
songs = get_songs(
db,
skip=skip,
limit=limit,
artist_id=artist_id,
album_id=album_id,
search=search
)
return songs
@router.post("/", response_model=Song)
def create_new_song(
*,
db: Session = Depends(get_db),
song_in: SongCreate,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Create new song. Only superusers can create songs.
"""
# Check if artist exists
artist = get_artist(db, artist_id=song_in.artist_id)
if not artist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Artist not found",
)
# Check if album exists if album_id is provided
if song_in.album_id:
album = get_album(db, album_id=song_in.album_id)
if not album:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Album not found",
)
# Check if album belongs to the artist
if album.artist_id != song_in.artist_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Album doesn't belong to the specified artist",
)
song = create_song(db, obj_in=song_in)
return song
@router.get("/{song_id}", response_model=Song)
def read_song(
*,
db: Session = Depends(get_db),
song_id: int,
) -> Any:
"""
Get song by ID.
"""
song = get_song(db, song_id=song_id)
if not song:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Song not found",
)
return song
@router.put("/{song_id}", response_model=Song)
def update_song_endpoint(
*,
db: Session = Depends(get_db),
song_id: int,
song_in: SongUpdate,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Update a song. Only superusers can update songs.
"""
song = get_song(db, song_id=song_id)
if not song:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Song not found",
)
# Check if updating artist_id and if artist exists
if song_in.artist_id is not None and song_in.artist_id != song.artist_id:
artist = get_artist(db, artist_id=song_in.artist_id)
if not artist:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Artist not found",
)
# Check if updating album_id and if album exists
if song_in.album_id is not None and song_in.album_id != song.album_id:
if song_in.album_id == 0: # Remove from album
song_in.album_id = None
else:
album = get_album(db, album_id=song_in.album_id)
if not album:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Album not found",
)
# Check if album belongs to the artist
artist_id = song_in.artist_id if song_in.artist_id is not None else song.artist_id
if album.artist_id != artist_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Album doesn't belong to the song's artist",
)
song = update_song(db, db_obj=song, obj_in=song_in)
return song
@router.delete("/{song_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def delete_song_endpoint(
*,
db: Session = Depends(get_db),
song_id: int,
current_user: User = Depends(get_current_active_superuser),
) -> Any:
"""
Delete a song. Only superusers can delete songs.
"""
song = get_song(db, song_id=song_id)
if not song:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Song not found",
)
delete_song(db, id=song_id)
return None

42
app/schemas/album.py Normal file
View File

@ -0,0 +1,42 @@
from typing import Optional
from datetime import date
from pydantic import BaseModel
# Shared properties
class AlbumBase(BaseModel):
title: Optional[str] = None
release_date: Optional[date] = None
cover_image_path: Optional[str] = None
description: Optional[str] = None
artist_id: Optional[int] = None
# Properties to receive on album creation
class AlbumCreate(AlbumBase):
title: str
artist_id: int
# Properties to receive on album update
class AlbumUpdate(AlbumBase):
pass
class AlbumInDBBase(AlbumBase):
id: int
title: str
artist_id: int
class Config:
from_attributes = True
# Properties to return to client
class Album(AlbumInDBBase):
pass
# Properties stored in DB
class AlbumInDB(AlbumInDBBase):
pass

37
app/schemas/artist.py Normal file
View File

@ -0,0 +1,37 @@
from typing import Optional
from pydantic import BaseModel
# Shared properties
class ArtistBase(BaseModel):
name: Optional[str] = None
bio: Optional[str] = None
image_path: Optional[str] = None
# Properties to receive on artist creation
class ArtistCreate(ArtistBase):
name: str
# Properties to receive on artist update
class ArtistUpdate(ArtistBase):
pass
class ArtistInDBBase(ArtistBase):
id: int
name: str
class Config:
from_attributes = True
# Properties to return to client
class Artist(ArtistInDBBase):
pass
# Properties stored in DB
class ArtistInDB(ArtistInDBBase):
pass

55
app/schemas/playlist.py Normal file
View File

@ -0,0 +1,55 @@
from typing import Optional, List
from pydantic import BaseModel
from app.schemas.song import Song
# Shared properties
class PlaylistBase(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
is_public: Optional[bool] = True
cover_image_path: Optional[str] = None
# Properties to receive on playlist creation
class PlaylistCreate(PlaylistBase):
name: str
# Properties to receive on playlist update
class PlaylistUpdate(PlaylistBase):
pass
class PlaylistInDBBase(PlaylistBase):
id: int
name: str
user_id: int
class Config:
from_attributes = True
# Properties to return to client
class Playlist(PlaylistInDBBase):
pass
# Properties to return with songs included
class PlaylistWithSongs(Playlist):
songs: List[Song] = []
# Properties stored in DB
class PlaylistInDB(PlaylistInDBBase):
pass
# For adding/removing songs from playlist
class PlaylistSongAdd(BaseModel):
song_id: int
class PlaylistSongRemove(BaseModel):
song_id: int

44
app/schemas/song.py Normal file
View File

@ -0,0 +1,44 @@
from typing import Optional
from pydantic import BaseModel
# Shared properties
class SongBase(BaseModel):
title: Optional[str] = None
duration: Optional[float] = None
file_path: Optional[str] = None
track_number: Optional[int] = None
artist_id: Optional[int] = None
album_id: Optional[int] = None
# Properties to receive on song creation
class SongCreate(SongBase):
title: str
file_path: str
artist_id: int
# Properties to receive on song update
class SongUpdate(SongBase):
pass
class SongInDBBase(SongBase):
id: int
title: str
file_path: str
artist_id: int
class Config:
from_attributes = True
# Properties to return to client
class Song(SongInDBBase):
pass
# Properties stored in DB
class SongInDB(SongInDBBase):
pass

61
app/services/album.py Normal file
View File

@ -0,0 +1,61 @@
from typing import List, Optional, Union, Dict, Any
from sqlalchemy.orm import Session
from app.models.album import Album
from app.schemas.album import AlbumCreate, AlbumUpdate
def get_album(db: Session, album_id: int) -> Optional[Album]:
return db.query(Album).filter(Album.id == album_id).first()
def get_albums(
db: Session,
skip: int = 0,
limit: int = 100,
artist_id: Optional[int] = None,
search: Optional[str] = None
) -> List[Album]:
query = db.query(Album)
if artist_id:
query = query.filter(Album.artist_id == artist_id)
if search:
query = query.filter(Album.title.ilike(f"%{search}%"))
return query.offset(skip).limit(limit).all()
def create_album(db: Session, obj_in: AlbumCreate) -> Album:
db_obj = Album(
title=obj_in.title,
release_date=obj_in.release_date,
cover_image_path=obj_in.cover_image_path,
description=obj_in.description,
artist_id=obj_in.artist_id,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update_album(
db: Session, *, db_obj: Album, obj_in: Union[AlbumUpdate, Dict[str, Any]]
) -> Album:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
for field in update_data:
if hasattr(db_obj, field):
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def delete_album(db: Session, *, id: int) -> None:
album = db.query(Album).get(id)
if album:
db.delete(album)
db.commit()

57
app/services/artist.py Normal file
View File

@ -0,0 +1,57 @@
from typing import List, Optional, Union, Dict, Any
from sqlalchemy.orm import Session
from app.models.artist import Artist
from app.schemas.artist import ArtistCreate, ArtistUpdate
def get_artist(db: Session, artist_id: int) -> Optional[Artist]:
return db.query(Artist).filter(Artist.id == artist_id).first()
def get_artist_by_name(db: Session, name: str) -> Optional[Artist]:
return db.query(Artist).filter(Artist.name == name).first()
def get_artists(
db: Session, skip: int = 0, limit: int = 100, search: Optional[str] = None
) -> List[Artist]:
query = db.query(Artist)
if search:
query = query.filter(Artist.name.ilike(f"%{search}%"))
return query.offset(skip).limit(limit).all()
def create_artist(db: Session, obj_in: ArtistCreate) -> Artist:
db_obj = Artist(
name=obj_in.name,
bio=obj_in.bio,
image_path=obj_in.image_path,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update_artist(
db: Session, *, db_obj: Artist, obj_in: Union[ArtistUpdate, Dict[str, Any]]
) -> Artist:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
for field in update_data:
if hasattr(db_obj, field):
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def delete_artist(db: Session, *, id: int) -> None:
artist = db.query(Artist).get(id)
if artist:
db.delete(artist)
db.commit()

100
app/services/playlist.py Normal file
View File

@ -0,0 +1,100 @@
from typing import List, Optional, Union, Dict, Any
from sqlalchemy.orm import Session
from app.models.playlist import Playlist
from app.models.song import Song
from app.schemas.playlist import PlaylistCreate, PlaylistUpdate
def get_playlist(db: Session, playlist_id: int) -> Optional[Playlist]:
return db.query(Playlist).filter(Playlist.id == playlist_id).first()
def get_playlists_by_user(
db: Session,
user_id: int,
skip: int = 0,
limit: int = 100,
search: Optional[str] = None
) -> List[Playlist]:
query = db.query(Playlist).filter(Playlist.user_id == user_id)
if search:
query = query.filter(Playlist.name.ilike(f"%{search}%"))
return query.offset(skip).limit(limit).all()
def get_public_playlists(
db: Session,
skip: int = 0,
limit: int = 100,
search: Optional[str] = None
) -> List[Playlist]:
query = db.query(Playlist).filter(Playlist.is_public.is_(True))
if search:
query = query.filter(Playlist.name.ilike(f"%{search}%"))
return query.offset(skip).limit(limit).all()
def create_playlist(db: Session, obj_in: PlaylistCreate, user_id: int) -> Playlist:
db_obj = Playlist(
name=obj_in.name,
description=obj_in.description,
is_public=obj_in.is_public,
cover_image_path=obj_in.cover_image_path,
user_id=user_id,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update_playlist(
db: Session, *, db_obj: Playlist, obj_in: Union[PlaylistUpdate, Dict[str, Any]]
) -> Playlist:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
for field in update_data:
if hasattr(db_obj, field):
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def delete_playlist(db: Session, *, id: int) -> None:
playlist = db.query(Playlist).get(id)
if playlist:
db.delete(playlist)
db.commit()
def add_song_to_playlist(db: Session, *, playlist_id: int, song_id: int) -> Playlist:
playlist = get_playlist(db, playlist_id=playlist_id)
song = db.query(Song).filter(Song.id == song_id).first()
if playlist and song:
# Check if song is already in playlist
if song not in playlist.songs:
playlist.songs.append(song)
db.commit()
db.refresh(playlist)
return playlist
def remove_song_from_playlist(db: Session, *, playlist_id: int, song_id: int) -> Playlist:
playlist = get_playlist(db, playlist_id=playlist_id)
song = db.query(Song).filter(Song.id == song_id).first()
if playlist and song:
# Check if song is in playlist
if song in playlist.songs:
playlist.songs.remove(song)
db.commit()
db.refresh(playlist)
return playlist

65
app/services/song.py Normal file
View File

@ -0,0 +1,65 @@
from typing import List, Optional, Union, Dict, Any
from sqlalchemy.orm import Session
from app.models.song import Song
from app.schemas.song import SongCreate, SongUpdate
def get_song(db: Session, song_id: int) -> Optional[Song]:
return db.query(Song).filter(Song.id == song_id).first()
def get_songs(
db: Session,
skip: int = 0,
limit: int = 100,
artist_id: Optional[int] = None,
album_id: Optional[int] = None,
search: Optional[str] = None
) -> List[Song]:
query = db.query(Song)
if artist_id:
query = query.filter(Song.artist_id == artist_id)
if album_id:
query = query.filter(Song.album_id == album_id)
if search:
query = query.filter(Song.title.ilike(f"%{search}%"))
return query.offset(skip).limit(limit).all()
def create_song(db: Session, obj_in: SongCreate) -> Song:
db_obj = Song(
title=obj_in.title,
duration=obj_in.duration,
file_path=obj_in.file_path,
track_number=obj_in.track_number,
artist_id=obj_in.artist_id,
album_id=obj_in.album_id,
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update_song(
db: Session, *, db_obj: Song, obj_in: Union[SongUpdate, Dict[str, Any]]
) -> Song:
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
for field in update_data:
if hasattr(db_obj, field):
setattr(db_obj, field, update_data[field])
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def delete_song(db: Session, *, id: int) -> None:
song = db.query(Song).get(id)
if song:
db.delete(song)
db.commit()