from typing import Annotated, Any, List from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from app.api.dependencies import get_current_active_superuser, get_db from app.crud import user from app.models.user import User from app.schemas.user import User as UserSchema from app.schemas.user import UserCreate, UserUpdate router = APIRouter() @router.get("/", response_model=List[UserSchema]) def read_users( db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_superuser)], skip: int = 0, limit: int = 100, ) -> Any: """ Retrieve users. """ users = user.get_multi(db, skip=skip, limit=limit) return users @router.post("/", response_model=UserSchema, status_code=status.HTTP_201_CREATED) def create_user( *, db: Annotated[Session, Depends(get_db)], user_in: UserCreate, ) -> Any: """ Create new user. """ existing_user = user.get_by_email(db, email=user_in.email) if existing_user: raise HTTPException( status_code=400, detail="The user with this email already exists in the system.", ) existing_user = user.get_by_username(db, username=user_in.username) if existing_user: raise HTTPException( status_code=400, detail="The user with this username already exists in the system.", ) return user.create(db, obj_in=user_in) @router.get("/{user_id}", response_model=UserSchema) def read_user_by_id( user_id: int, db: Annotated[Session, Depends(get_db)], ) -> Any: """ Get a specific user by id. """ db_user = user.get(db, id=user_id) if db_user is None: raise HTTPException( status_code=404, detail="User not found", ) return db_user @router.put("/{user_id}", response_model=UserSchema) def update_user( *, db: Annotated[Session, Depends(get_db)], user_id: int, user_in: UserUpdate, current_user: Annotated[User, Depends(get_current_active_superuser)], ) -> Any: """ Update a user. """ db_user = user.get(db, id=user_id) if db_user is None: raise HTTPException( status_code=404, detail="User not found", ) return user.update(db, db_obj=db_user, obj_in=user_in) @router.delete("/{user_id}", response_model=UserSchema) def delete_user( *, db: Annotated[Session, Depends(get_db)], user_id: int, current_user: Annotated[User, Depends(get_current_active_superuser)], ) -> Any: """ Delete a user. """ db_user = user.get(db, id=user_id) if db_user is None: raise HTTPException( status_code=404, detail="User not found", ) return user.remove(db, id=user_id)