from typing import Any, List from fastapi import APIRouter, Body, Depends, HTTPException from fastapi.encoders import jsonable_encoder from sqlalchemy.orm import Session from app import models, schemas from app.api import deps from app.core.security import get_password_hash router = APIRouter() @router.get("/", response_model=List[schemas.User]) def read_users( db: Session = Depends(deps.get_db), skip: int = 0, limit: int = 100, current_user: models.User = Depends(deps.get_current_active_admin), ) -> Any: """ Retrieve users. """ users = db.query(models.User).offset(skip).limit(limit).all() return users @router.get("/me", response_model=schemas.User) def read_user_me( current_user: models.User = Depends(deps.get_current_active_user), ) -> Any: """ Get current user. """ return current_user @router.put("/me", response_model=schemas.User) def update_user_me( *, db: Session = Depends(deps.get_db), password: str = Body(None), full_name: str = Body(None), email: str = Body(None), current_user: models.User = Depends(deps.get_current_active_user), ) -> Any: """ Update own user. """ current_user_data = jsonable_encoder(current_user) user_in = schemas.UserUpdate(**current_user_data) if password is not None: user_in.password = password if full_name is not None: user_in.full_name = full_name if email is not None: user_in.email = email if user_in.password: hashed_password = get_password_hash(user_in.password) current_user.password = hashed_password if user_in.full_name: current_user.full_name = user_in.full_name if user_in.email: # Check if email already exists user = db.query(models.User).filter( models.User.email == user_in.email, models.User.id != current_user.id ).first() if user: raise HTTPException( status_code=400, detail="A user with this email already exists.", ) current_user.email = user_in.email db.add(current_user) db.commit() db.refresh(current_user) return current_user @router.get("/{user_id}", response_model=schemas.User) def read_user_by_id( user_id: int, current_user: models.User = Depends(deps.get_current_active_user), db: Session = Depends(deps.get_db), ) -> Any: """ Get a specific user by id. """ user = db.query(models.User).filter(models.User.id == user_id).first() if user == current_user: return user if not current_user.is_admin: raise HTTPException( status_code=400, detail="The user doesn't have enough privileges" ) if not user: raise HTTPException( status_code=404, detail="The user with this id does not exist in the system", ) return user @router.delete("/{user_id}", response_model=schemas.User) def delete_user( user_id: int, db: Session = Depends(deps.get_db), current_user: models.User = Depends(deps.get_current_active_admin), ) -> Any: """ Delete a user. """ user = db.query(models.User).filter(models.User.id == user_id).first() if not user: raise HTTPException( status_code=404, detail="The user with this id does not exist in the system", ) if user.id == current_user.id: raise HTTPException( status_code=400, detail="Users cannot delete themselves", ) db.delete(user) db.commit() return user