from typing import List, Optional, Dict, Any from sqlalchemy.orm import Session from sqlalchemy import or_, desc from app.models.movie import Movie from app.models.movie_actor import MovieActor from app.models.movie_genre import MovieGenre from app.api.schemas.movie import MovieCreate, MovieUpdate def get(db: Session, movie_id: int) -> Optional[Movie]: """ Get a movie by ID. """ return db.query(Movie).filter(Movie.id == movie_id).first() def get_multi( db: Session, skip: int = 0, limit: int = 100, filters: Optional[Dict[str, Any]] = None ) -> List[Movie]: """ Get multiple movies with pagination and optional filters. """ query = db.query(Movie) # Apply filters if provided if filters: if title := filters.get("title"): query = query.filter(Movie.title.ilike(f"%{title}%")) if director_id := filters.get("director_id"): query = query.filter(Movie.director_id == director_id) if genre_id := filters.get("genre_id"): query = query.join(MovieGenre).filter(MovieGenre.genre_id == genre_id) if actor_id := filters.get("actor_id"): query = query.join(MovieActor).filter(MovieActor.actor_id == actor_id) if min_rating := filters.get("min_rating"): query = query.filter(Movie.rating >= min_rating) if year := filters.get("year"): from sqlalchemy import extract query = query.filter(extract('year', Movie.release_date) == year) # Sort by rating (highest first) then by title query = query.order_by(desc(Movie.rating), Movie.title) # Get total count before pagination total = query.count() return query.offset(skip).limit(limit).all(), total def create(db: Session, movie_in: MovieCreate) -> Movie: """ Create a new movie. """ movie_data = movie_in.dict(exclude={"genre_ids", "actor_ids"}) db_movie = Movie(**movie_data) db.add(db_movie) db.commit() db.refresh(db_movie) # Add genres if any if movie_in.genre_ids: for genre_id in movie_in.genre_ids: db_movie_genre = MovieGenre(movie_id=db_movie.id, genre_id=genre_id) db.add(db_movie_genre) # Add actors if any if movie_in.actor_ids: for actor_id in movie_in.actor_ids: db_movie_actor = MovieActor(movie_id=db_movie.id, actor_id=actor_id) db.add(db_movie_actor) if movie_in.genre_ids or movie_in.actor_ids: db.commit() db.refresh(db_movie) return db_movie def update( db: Session, db_movie: Movie, movie_in: MovieUpdate ) -> Movie: """ Update a movie. """ # Update movie attributes update_data = movie_in.dict(exclude={"genre_ids", "actor_ids"}, exclude_unset=True) for field, value in update_data.items(): setattr(db_movie, field, value) # Update genres if provided if movie_in.genre_ids is not None: # Remove existing genres db.query(MovieGenre).filter(MovieGenre.movie_id == db_movie.id).delete() # Add new genres for genre_id in movie_in.genre_ids: db_movie_genre = MovieGenre(movie_id=db_movie.id, genre_id=genre_id) db.add(db_movie_genre) # Update actors if provided if movie_in.actor_ids is not None: # Remove existing actors db.query(MovieActor).filter(MovieActor.movie_id == db_movie.id).delete() # Add new actors for actor_id in movie_in.actor_ids: db_movie_actor = MovieActor(movie_id=db_movie.id, actor_id=actor_id) db.add(db_movie_actor) db.commit() db.refresh(db_movie) return db_movie def delete(db: Session, movie_id: int) -> bool: """ Delete a movie by ID. """ db_movie = db.query(Movie).filter(Movie.id == movie_id).first() if db_movie: # Delete associated movie-genre relationships db.query(MovieGenre).filter(MovieGenre.movie_id == movie_id).delete() # Delete associated movie-actor relationships db.query(MovieActor).filter(MovieActor.movie_id == movie_id).delete() # Delete the movie db.delete(db_movie) db.commit() return True return False def search(db: Session, query: str, limit: int = 10) -> List[Movie]: """ Search for movies by title or overview. """ search_query = f"%{query}%" return db.query(Movie).filter( or_( Movie.title.ilike(search_query), Movie.overview.ilike(search_query) ) ).limit(limit).all()