from typing import List, Dict, Any from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from sqlalchemy import func from app.db.session import get_db from app.models.usage_stat import UsageStat from app.schemas.usage_stat import UsageStat as UsageStatSchema, UsageStatCreate, UsageStatUpdate router = APIRouter() @router.get("/", response_model=List[UsageStatSchema]) def get_usage_stats(db: Session = Depends(get_db)): # Get the latest value for each metric subquery = db.query( UsageStat.metric_name, func.max(UsageStat.created_at).label('max_created_at') ).group_by(UsageStat.metric_name).subquery() stats = db.query(UsageStat).join( subquery, (UsageStat.metric_name == subquery.c.metric_name) & (UsageStat.created_at == subquery.c.max_created_at) ).all() return stats @router.get("/summary") def get_stats_summary(db: Session = Depends(get_db)) -> Dict[str, Any]: # Get the latest value for each metric in a summary format subquery = db.query( UsageStat.metric_name, func.max(UsageStat.created_at).label('max_created_at') ).group_by(UsageStat.metric_name).subquery() stats = db.query(UsageStat).join( subquery, (UsageStat.metric_name == subquery.c.metric_name) & (UsageStat.created_at == subquery.c.max_created_at) ).all() summary = {} for stat in stats: summary[stat.metric_name] = { "value": stat.metric_value, "description": stat.description, "last_updated": stat.created_at } return summary @router.get("/{metric_name}") def get_metric_history( metric_name: str, limit: int = 10, db: Session = Depends(get_db) ): stats = db.query(UsageStat).filter( UsageStat.metric_name == metric_name ).order_by(UsageStat.created_at.desc()).limit(limit).all() if not stats: raise HTTPException(status_code=404, detail="Metric not found") return stats @router.post("/", response_model=UsageStatSchema) def create_usage_stat( stat: UsageStatCreate, db: Session = Depends(get_db) ): db_stat = UsageStat(**stat.dict()) db.add(db_stat) db.commit() db.refresh(db_stat) return db_stat @router.put("/{metric_name}", response_model=UsageStatSchema) def update_usage_stat( metric_name: str, stat_update: UsageStatUpdate, db: Session = Depends(get_db) ): # Create a new entry instead of updating (for historical tracking) latest_stat = db.query(UsageStat).filter( UsageStat.metric_name == metric_name ).order_by(UsageStat.created_at.desc()).first() if not latest_stat: raise HTTPException(status_code=404, detail="Metric not found") new_stat = UsageStat( metric_name=metric_name, metric_value=stat_update.metric_value if stat_update.metric_value is not None else latest_stat.metric_value, description=stat_update.description if stat_update.description is not None else latest_stat.description ) db.add(new_stat) db.commit() db.refresh(new_stat) return new_stat @router.post("/seed") def seed_usage_stats(db: Session = Depends(get_db)): # Seed some default usage statistics default_stats = [ {"metric_name": "total_users", "metric_value": 150000, "description": "Total registered users"}, {"metric_name": "content_created", "metric_value": 2500000, "description": "Total pieces of content created"}, {"metric_name": "active_creators", "metric_value": 45000, "description": "Active content creators this month"}, {"metric_name": "businesses_served", "metric_value": 12000, "description": "Businesses using our platform"}, {"metric_name": "conversion_rate", "metric_value": 15, "description": "Average conversion rate increase (%)"}, {"metric_name": "time_saved", "metric_value": 80, "description": "Average time saved per project (%)"} ] created_stats = [] for stat_data in default_stats: # Only create if it doesn't exist existing = db.query(UsageStat).filter( UsageStat.metric_name == stat_data["metric_name"] ).first() if not existing: stat = UsageStat(**stat_data) db.add(stat) created_stats.append(stat_data["metric_name"]) db.commit() return {"message": f"Created stats for: {', '.join(created_stats)}"}