134 lines
3.5 KiB
Python
134 lines
3.5 KiB
Python
from typing import Any
|
|
from pathlib import Path
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Response
|
|
from fastapi.responses import FileResponse
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.api.deps import get_db, get_current_active_user
|
|
from app.models.user import User
|
|
from app.services.song import get_song
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/song/{song_id}", response_class=Response)
|
|
async def stream_song(
|
|
song_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_active_user),
|
|
) -> Any:
|
|
"""
|
|
Stream a song by ID. Requires authentication.
|
|
"""
|
|
song = get_song(db, song_id=song_id)
|
|
if not song:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Song not found",
|
|
)
|
|
|
|
# Construct file path
|
|
file_path = Path(song.file_path)
|
|
|
|
# Check if file exists
|
|
if not file_path.exists():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Audio file not found",
|
|
)
|
|
|
|
# Return file response for streaming
|
|
return FileResponse(
|
|
path=file_path,
|
|
filename=f"{song.title}.mp3",
|
|
media_type="audio/mpeg"
|
|
)
|
|
|
|
|
|
@router.get("/album/cover/{album_id}")
|
|
async def get_album_cover(
|
|
album_id: int,
|
|
db: Session = Depends(get_db),
|
|
) -> Any:
|
|
"""
|
|
Get album cover image by album ID.
|
|
"""
|
|
from app.services.album import get_album
|
|
|
|
album = get_album(db, album_id=album_id)
|
|
if not album or not album.cover_image_path:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Album cover not found",
|
|
)
|
|
|
|
# Construct file path
|
|
file_path = Path(album.cover_image_path)
|
|
|
|
# Check if file exists
|
|
if not file_path.exists():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Cover image file not found",
|
|
)
|
|
|
|
# Determine media type based on file extension
|
|
extension = file_path.suffix.lower()
|
|
media_type = "image/jpeg" # Default
|
|
if extension == ".png":
|
|
media_type = "image/png"
|
|
elif extension == ".gif":
|
|
media_type = "image/gif"
|
|
elif extension == ".webp":
|
|
media_type = "image/webp"
|
|
|
|
# Return file response
|
|
return FileResponse(
|
|
path=file_path,
|
|
media_type=media_type
|
|
)
|
|
|
|
|
|
@router.get("/artist/image/{artist_id}")
|
|
async def get_artist_image(
|
|
artist_id: int,
|
|
db: Session = Depends(get_db),
|
|
) -> Any:
|
|
"""
|
|
Get artist image by artist ID.
|
|
"""
|
|
from app.services.artist import get_artist
|
|
|
|
artist = get_artist(db, artist_id=artist_id)
|
|
if not artist or not artist.image_path:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Artist image not found",
|
|
)
|
|
|
|
# Construct file path
|
|
file_path = Path(artist.image_path)
|
|
|
|
# Check if file exists
|
|
if not file_path.exists():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Image file not found",
|
|
)
|
|
|
|
# Determine media type based on file extension
|
|
extension = file_path.suffix.lower()
|
|
media_type = "image/jpeg" # Default
|
|
if extension == ".png":
|
|
media_type = "image/png"
|
|
elif extension == ".gif":
|
|
media_type = "image/gif"
|
|
elif extension == ".webp":
|
|
media_type = "image/webp"
|
|
|
|
# Return file response
|
|
return FileResponse(
|
|
path=file_path,
|
|
media_type=media_type
|
|
) |