from typing import List, Optional, Dict, Any from sqlalchemy.orm import Session from app.models.actor import Actor from app.models.movie_actor import MovieActor from app.api.schemas.actor import ActorCreate, ActorUpdate def get(db: Session, actor_id: int) -> Optional[Actor]: """ Get an actor by ID. """ return db.query(Actor).filter(Actor.id == actor_id).first() def get_multi( db: Session, skip: int = 0, limit: int = 100, filters: Optional[Dict[str, Any]] = None ) -> List[Actor]: """ Get multiple actors with pagination and optional filters. """ query = db.query(Actor) # Apply filters if provided if filters: if name := filters.get("name"): query = query.filter(Actor.name.ilike(f"%{name}%")) if movie_id := filters.get("movie_id"): query = query.join(MovieActor).filter(MovieActor.movie_id == movie_id) # Sort by name query = query.order_by(Actor.name) # Get total count before pagination total = query.count() return query.offset(skip).limit(limit).all(), total def create(db: Session, actor_in: ActorCreate) -> Actor: """ Create a new actor. """ db_actor = Actor(**actor_in.dict()) db.add(db_actor) db.commit() db.refresh(db_actor) return db_actor def update( db: Session, db_actor: Actor, actor_in: ActorUpdate ) -> Actor: """ Update an actor. """ update_data = actor_in.dict(exclude_unset=True) for field, value in update_data.items(): setattr(db_actor, field, value) db.commit() db.refresh(db_actor) return db_actor def delete(db: Session, actor_id: int) -> bool: """ Delete an actor by ID. """ db_actor = db.query(Actor).filter(Actor.id == actor_id).first() if db_actor: # Delete associated movie-actor relationships db.query(MovieActor).filter(MovieActor.actor_id == actor_id).delete() # Delete the actor db.delete(db_actor) db.commit() return True return False