Automated Action 0186fc8e70 Create movie database backend with FastAPI and SQLite
This commit implements a simple movie database backend inspired by IMDb. It includes:
- API endpoints for movies, actors, directors and genres
- SQLAlchemy models with relationships
- Alembic migrations
- Pydantic schemas for request/response validation
- Search and filtering functionality
- Health check endpoint
- Complete documentation
2025-05-19 20:28:07 +00:00

156 lines
4.6 KiB
Python

from typing import List, Optional, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import or_, desc
from app.models.movie import Movie
from app.models.movie_actor import MovieActor
from app.models.movie_genre import MovieGenre
from app.api.schemas.movie import MovieCreate, MovieUpdate
def get(db: Session, movie_id: int) -> Optional[Movie]:
"""
Get a movie by ID.
"""
return db.query(Movie).filter(Movie.id == movie_id).first()
def get_multi(
db: Session,
skip: int = 0,
limit: int = 100,
filters: Optional[Dict[str, Any]] = None
) -> List[Movie]:
"""
Get multiple movies with pagination and optional filters.
"""
query = db.query(Movie)
# Apply filters if provided
if filters:
if title := filters.get("title"):
query = query.filter(Movie.title.ilike(f"%{title}%"))
if director_id := filters.get("director_id"):
query = query.filter(Movie.director_id == director_id)
if genre_id := filters.get("genre_id"):
query = query.join(MovieGenre).filter(MovieGenre.genre_id == genre_id)
if actor_id := filters.get("actor_id"):
query = query.join(MovieActor).filter(MovieActor.actor_id == actor_id)
if min_rating := filters.get("min_rating"):
query = query.filter(Movie.rating >= min_rating)
if year := filters.get("year"):
from sqlalchemy import extract
query = query.filter(extract('year', Movie.release_date) == year)
# Sort by rating (highest first) then by title
query = query.order_by(desc(Movie.rating), Movie.title)
# Get total count before pagination
total = query.count()
return query.offset(skip).limit(limit).all(), total
def create(db: Session, movie_in: MovieCreate) -> Movie:
"""
Create a new movie.
"""
movie_data = movie_in.dict(exclude={"genre_ids", "actor_ids"})
db_movie = Movie(**movie_data)
db.add(db_movie)
db.commit()
db.refresh(db_movie)
# Add genres if any
if movie_in.genre_ids:
for genre_id in movie_in.genre_ids:
db_movie_genre = MovieGenre(movie_id=db_movie.id, genre_id=genre_id)
db.add(db_movie_genre)
# Add actors if any
if movie_in.actor_ids:
for actor_id in movie_in.actor_ids:
db_movie_actor = MovieActor(movie_id=db_movie.id, actor_id=actor_id)
db.add(db_movie_actor)
if movie_in.genre_ids or movie_in.actor_ids:
db.commit()
db.refresh(db_movie)
return db_movie
def update(
db: Session,
db_movie: Movie,
movie_in: MovieUpdate
) -> Movie:
"""
Update a movie.
"""
# Update movie attributes
update_data = movie_in.dict(exclude={"genre_ids", "actor_ids"}, exclude_unset=True)
for field, value in update_data.items():
setattr(db_movie, field, value)
# Update genres if provided
if movie_in.genre_ids is not None:
# Remove existing genres
db.query(MovieGenre).filter(MovieGenre.movie_id == db_movie.id).delete()
# Add new genres
for genre_id in movie_in.genre_ids:
db_movie_genre = MovieGenre(movie_id=db_movie.id, genre_id=genre_id)
db.add(db_movie_genre)
# Update actors if provided
if movie_in.actor_ids is not None:
# Remove existing actors
db.query(MovieActor).filter(MovieActor.movie_id == db_movie.id).delete()
# Add new actors
for actor_id in movie_in.actor_ids:
db_movie_actor = MovieActor(movie_id=db_movie.id, actor_id=actor_id)
db.add(db_movie_actor)
db.commit()
db.refresh(db_movie)
return db_movie
def delete(db: Session, movie_id: int) -> bool:
"""
Delete a movie by ID.
"""
db_movie = db.query(Movie).filter(Movie.id == movie_id).first()
if db_movie:
# Delete associated movie-genre relationships
db.query(MovieGenre).filter(MovieGenre.movie_id == movie_id).delete()
# Delete associated movie-actor relationships
db.query(MovieActor).filter(MovieActor.movie_id == movie_id).delete()
# Delete the movie
db.delete(db_movie)
db.commit()
return True
return False
def search(db: Session, query: str, limit: int = 10) -> List[Movie]:
"""
Search for movies by title or overview.
"""
search_query = f"%{query}%"
return db.query(Movie).filter(
or_(
Movie.title.ilike(search_query),
Movie.overview.ilike(search_query)
)
).limit(limit).all()