from typing import Any, List from fastapi import APIRouter, Body, Depends, HTTPException from fastapi.encoders import jsonable_encoder from pydantic import EmailStr from sqlalchemy.orm import Session from app.core.deps import get_current_active_superuser, get_current_active_user, get_db from app.models.user import User from app.schemas.user import User as UserSchema from app.schemas.user import UserCreate, UserUpdate from app.services import user as user_service router = APIRouter() @router.get("/", response_model=List[UserSchema]) def read_users( db: Session = Depends(get_db), skip: int = 0, limit: int = 100, current_user: User = Depends(get_current_active_superuser), ) -> Any: """ Retrieve users. """ users = db.query(User).offset(skip).limit(limit).all() return users @router.post("/", response_model=UserSchema) def create_user( *, db: Session = Depends(get_db), user_in: UserCreate, current_user: User = Depends(get_current_active_superuser), ) -> Any: """ Create new user. """ # Check if username already exists user = user_service.get_by_username(db, username=user_in.username) if user: raise HTTPException( status_code=400, detail="A user with this username already exists", ) # Also check email for uniqueness user = user_service.get_by_email(db, email=user_in.email) if user: raise HTTPException( status_code=400, detail="A user with this email already exists", ) user = user_service.create(db, obj_in=user_in) return user @router.put("/me", response_model=UserSchema) def update_user_me( *, db: Session = Depends(get_db), password: str = Body(None), full_name: str = Body(None), username: str = Body(None), email: EmailStr = Body(None), current_user: User = Depends(get_current_active_user), ) -> Any: """ Update own user. """ current_user_data = jsonable_encoder(current_user) user_in = UserUpdate(**current_user_data) if password is not None: user_in.password = password if full_name is not None: user_in.full_name = full_name # Check for username uniqueness if username is being updated if username is not None and username != current_user.username: existing_user = user_service.get_by_username(db, username=username) if existing_user: raise HTTPException( status_code=400, detail="A user with this username already exists", ) user_in.username = username # Check for email uniqueness if email is being updated if email is not None and email != current_user.email: existing_user = user_service.get_by_email(db, email=email) if existing_user: raise HTTPException( status_code=400, detail="A user with this email already exists", ) user_in.email = email user = user_service.update(db, db_obj=current_user, obj_in=user_in) return user @router.get("/me", response_model=UserSchema) def read_user_me( current_user: User = Depends(get_current_active_user), ) -> Any: """ Get current user. """ return current_user @router.get("/{user_id}", response_model=UserSchema) def read_user_by_id( user_id: str, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db), ) -> Any: """ Get a specific user by id. """ user = user_service.get_by_id(db, user_id=user_id) if user == current_user: return user if not user_service.is_superuser(current_user): raise HTTPException( status_code=400, detail="The user doesn't have enough privileges" ) return user @router.put("/{user_id}", response_model=UserSchema) def update_user( *, db: Session = Depends(get_db), user_id: str, user_in: UserUpdate, current_user: User = Depends(get_current_active_superuser), ) -> Any: """ Update a user. """ user = user_service.get_by_id(db, user_id=user_id) if not user: raise HTTPException( status_code=404, detail="The user with this id does not exist in the system", ) user = user_service.update(db, db_obj=user, obj_in=user_in) return user