Automated Action 9ed9654f1e Enhance Anime Information API with Advanced Features
- Add advanced search filters (year range, score, studio, source)
- Create detailed statistics endpoint for anime collection
- Implement enhanced pagination metadata for better navigation
- Add sorting options for search results
- Enhance anime model with additional fields (season info, ratings, etc.)
- Add ability to search anime by character attributes
- Create migration for new anime model fields
- Update README with detailed documentation of new features
2025-05-17 22:10:44 +00:00

462 lines
16 KiB
Python

from typing import List, Optional, Dict, Any, Union
from sqlalchemy import desc, extract, func
from sqlalchemy.orm import Session
from app.crud.base import CRUDBase
from app.models.anime import Anime
from app.models.anime_genre import AnimeGenre
from app.models.genre import Genre
from app.models.character import Character
from app.schemas.anime import AnimeCreate, AnimeUpdate
class CRUDAnime(CRUDBase[Anime, AnimeCreate, AnimeUpdate]):
def create_with_genres(
self, db: Session, *, obj_in: AnimeCreate
) -> Anime:
genre_ids = obj_in.genre_ids
anime_data = obj_in.dict(exclude={"genre_ids"})
db_obj = Anime(**anime_data)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
# Add genres
if genre_ids:
for genre_id in genre_ids:
anime_genre = AnimeGenre(anime_id=db_obj.id, genre_id=genre_id)
db.add(anime_genre)
db.commit()
return db_obj
def update_with_genres(
self,
db: Session,
*,
db_obj: Anime,
obj_in: Union[AnimeUpdate, Dict[str, Any]]
) -> Anime:
if isinstance(obj_in, dict):
update_data = obj_in
genre_ids = update_data.pop("genre_ids", None)
else:
update_data = obj_in.dict(exclude_unset=True)
genre_ids = update_data.pop("genre_ids", None) if "genre_ids" in update_data else None
# Update anime attributes
for field in update_data:
setattr(db_obj, field, update_data[field])
# Update genres if provided
if genre_ids is not None:
# Remove existing genre links
db.query(AnimeGenre).filter(AnimeGenre.anime_id == db_obj.id).delete()
# Add new genre links
for genre_id in genre_ids:
anime_genre = AnimeGenre(anime_id=db_obj.id, genre_id=genre_id)
db.add(anime_genre)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def get_with_genres(self, db: Session, id: int):
"""Get anime with its genres populated"""
anime = db.query(Anime).filter(Anime.id == id).first()
if anime:
# Load genres (to be implemented with relationships)
pass
return anime
def search(
self,
db: Session,
*,
title: Optional[str] = None,
genre_id: Optional[int] = None,
status: Optional[str] = None,
year_from: Optional[int] = None,
year_to: Optional[int] = None,
score_min: Optional[float] = None,
score_max: Optional[float] = None,
source: Optional[str] = None,
studio: Optional[str] = None,
sort_by: Optional[str] = None,
sort_order: Optional[str] = "asc",
skip: int = 0,
limit: int = 100
) -> List[Anime]:
query = db.query(Anime)
# Apply text filters
if title:
query = query.filter(Anime.title.ilike(f"%{title}%"))
if genre_id:
query = query.join(AnimeGenre).filter(AnimeGenre.genre_id == genre_id)
if status:
query = query.filter(Anime.status == status)
if source:
query = query.filter(Anime.source.ilike(f"%{source}%"))
if studio:
query = query.filter(Anime.studio.ilike(f"%{studio}%"))
# Apply numeric range filters
if year_from:
query = query.filter(extract('year', Anime.aired_from) >= year_from)
if year_to:
query = query.filter(extract('year', Anime.aired_from) <= year_to)
if score_min is not None:
query = query.filter(Anime.score >= score_min)
if score_max is not None:
query = query.filter(Anime.score <= score_max)
# Apply sorting
if sort_by:
# Map sort_by parameter to model attribute
sort_attr = getattr(Anime, sort_by, None)
if sort_attr is not None:
if sort_order and sort_order.lower() == "desc":
query = query.order_by(desc(sort_attr))
else:
query = query.order_by(sort_attr)
return query.offset(skip).limit(limit).all()
def search_count(
self,
db: Session,
*,
title: Optional[str] = None,
genre_id: Optional[int] = None,
status: Optional[str] = None,
year_from: Optional[int] = None,
year_to: Optional[int] = None,
score_min: Optional[float] = None,
score_max: Optional[float] = None,
source: Optional[str] = None,
studio: Optional[str] = None
) -> int:
query = db.query(Anime)
# Apply text filters
if title:
query = query.filter(Anime.title.ilike(f"%{title}%"))
if genre_id:
query = query.join(AnimeGenre).filter(AnimeGenre.genre_id == genre_id)
if status:
query = query.filter(Anime.status == status)
if source:
query = query.filter(Anime.source.ilike(f"%{source}%"))
if studio:
query = query.filter(Anime.studio.ilike(f"%{studio}%"))
# Apply numeric range filters
if year_from:
query = query.filter(extract('year', Anime.aired_from) >= year_from)
if year_to:
query = query.filter(extract('year', Anime.aired_from) <= year_to)
if score_min is not None:
query = query.filter(Anime.score >= score_min)
if score_max is not None:
query = query.filter(Anime.score <= score_max)
return query.count()
def get_statistics(self, db: Session) -> Dict[str, Any]:
"""Get statistics about the anime collection"""
# Count total anime
total_anime = db.query(func.count(Anime.id)).scalar() or 0
# Calculate averages
avg_score = db.query(func.avg(Anime.score)).scalar()
avg_episodes = db.query(func.avg(Anime.episodes)).scalar()
# Status distribution
status_distribution = []
status_counts = db.query(
Anime.status,
func.count(Anime.id).label('count')
).group_by(Anime.status).all()
for status, count in status_counts:
if status: # Skip None values
status_distribution.append({
"name": status,
"count": count
})
# Source distribution
source_distribution = []
source_counts = db.query(
Anime.source,
func.count(Anime.id).label('count')
).group_by(Anime.source).all()
for source, count in source_counts:
if source: # Skip None values
source_distribution.append({
"name": source,
"count": count
})
# Studio distribution
studio_distribution = []
studio_counts = db.query(
Anime.studio,
func.count(Anime.id).label('count')
).group_by(Anime.studio).all()
for studio, count in studio_counts:
if studio: # Skip None values
studio_distribution.append({
"name": studio,
"count": count
})
# Year distribution
year_distribution = []
year_counts = db.query(
extract('year', Anime.aired_from).label('year'),
func.count(Anime.id).label('count')
).group_by('year').order_by('year').all()
for year, count in year_counts:
if year: # Skip None values
year_distribution.append({
"year": int(year),
"count": count
})
# Score distribution
score_distribution = []
# Define score ranges (0-1, 1-2, ..., 9-10)
score_ranges = [(i, i+1) for i in range(0, 10)]
for low, high in score_ranges:
count = db.query(func.count(Anime.id)).filter(
Anime.score >= low,
Anime.score < high if high < 10 else Anime.score <= high
).scalar() or 0
score_distribution.append({
"score_range": f"{low}-{high}",
"count": count
})
# Top genres
top_genres = []
genre_counts = db.query(
Genre.id,
Genre.name,
func.count(AnimeGenre.anime_id).label('count')
).join(
AnimeGenre, Genre.id == AnimeGenre.genre_id
).group_by(
Genre.id, Genre.name
).order_by(
desc('count')
).limit(10).all()
for genre_id, name, count in genre_counts:
top_genres.append({
"id": genre_id,
"name": name,
"count": count
})
return {
"total_anime": total_anime,
"avg_score": float(avg_score) if avg_score else None,
"avg_episodes": float(avg_episodes) if avg_episodes else None,
"status_distribution": status_distribution,
"source_distribution": source_distribution,
"studio_distribution": studio_distribution,
"year_distribution": year_distribution,
"score_distribution": score_distribution,
"top_genres": top_genres
}
def search_by_character(
self,
db: Session,
*,
character_name: Optional[str] = None,
character_role: Optional[str] = None,
character_gender: Optional[str] = None,
include_anime_filters: bool = False,
# Standard anime filters
title: Optional[str] = None,
genre_id: Optional[int] = None,
status: Optional[str] = None,
year_from: Optional[int] = None,
year_to: Optional[int] = None,
score_min: Optional[float] = None,
score_max: Optional[float] = None,
source: Optional[str] = None,
studio: Optional[str] = None,
sort_by: Optional[str] = None,
sort_order: Optional[str] = "asc",
skip: int = 0,
limit: int = 100
) -> List[Anime]:
"""
Search for anime based on character attributes, with optional anime filtering
"""
# Start by finding matching characters
char_query = db.query(Character.anime_id)
if character_name:
char_query = char_query.filter(Character.name.ilike(f"%{character_name}%"))
if character_role:
char_query = char_query.filter(Character.role.ilike(f"%{character_role}%"))
if character_gender:
char_query = char_query.filter(Character.gender.ilike(f"%{character_gender}%"))
# Get the anime IDs with matching characters
anime_ids = [row[0] for row in char_query.distinct().all()]
if not anime_ids:
return [] # No matching characters found
# Query anime with matching IDs
query = db.query(Anime).filter(Anime.id.in_(anime_ids))
# Apply additional anime filters if requested
if include_anime_filters:
if title:
query = query.filter(Anime.title.ilike(f"%{title}%"))
if genre_id:
query = query.join(AnimeGenre).filter(AnimeGenre.genre_id == genre_id)
if status:
query = query.filter(Anime.status == status)
if source:
query = query.filter(Anime.source.ilike(f"%{source}%"))
if studio:
query = query.filter(Anime.studio.ilike(f"%{studio}%"))
if year_from:
query = query.filter(extract('year', Anime.aired_from) >= year_from)
if year_to:
query = query.filter(extract('year', Anime.aired_from) <= year_to)
if score_min is not None:
query = query.filter(Anime.score >= score_min)
if score_max is not None:
query = query.filter(Anime.score <= score_max)
# Apply sorting
if sort_by:
sort_attr = getattr(Anime, sort_by, None)
if sort_attr is not None:
if sort_order and sort_order.lower() == "desc":
query = query.order_by(desc(sort_attr))
else:
query = query.order_by(sort_attr)
return query.offset(skip).limit(limit).all()
def search_by_character_count(
self,
db: Session,
*,
character_name: Optional[str] = None,
character_role: Optional[str] = None,
character_gender: Optional[str] = None,
include_anime_filters: bool = False,
# Standard anime filters
title: Optional[str] = None,
genre_id: Optional[int] = None,
status: Optional[str] = None,
year_from: Optional[int] = None,
year_to: Optional[int] = None,
score_min: Optional[float] = None,
score_max: Optional[float] = None,
source: Optional[str] = None,
studio: Optional[str] = None
) -> int:
"""
Count anime based on character attributes, with optional anime filtering
"""
# Start by finding matching characters
char_query = db.query(Character.anime_id)
if character_name:
char_query = char_query.filter(Character.name.ilike(f"%{character_name}%"))
if character_role:
char_query = char_query.filter(Character.role.ilike(f"%{character_role}%"))
if character_gender:
char_query = char_query.filter(Character.gender.ilike(f"%{character_gender}%"))
# Get the anime IDs with matching characters
anime_ids = [row[0] for row in char_query.distinct().all()]
if not anime_ids:
return 0 # No matching characters found
# Query anime with matching IDs
query = db.query(Anime).filter(Anime.id.in_(anime_ids))
# Apply additional anime filters if requested
if include_anime_filters:
if title:
query = query.filter(Anime.title.ilike(f"%{title}%"))
if genre_id:
query = query.join(AnimeGenre).filter(AnimeGenre.genre_id == genre_id)
if status:
query = query.filter(Anime.status == status)
if source:
query = query.filter(Anime.source.ilike(f"%{source}%"))
if studio:
query = query.filter(Anime.studio.ilike(f"%{studio}%"))
if year_from:
query = query.filter(extract('year', Anime.aired_from) >= year_from)
if year_to:
query = query.filter(extract('year', Anime.aired_from) <= year_to)
if score_min is not None:
query = query.filter(Anime.score >= score_min)
if score_max is not None:
query = query.filter(Anime.score <= score_max)
return query.count()
anime = CRUDAnime(Anime)