from typing import Any, List from fastapi import APIRouter, Body, Depends, HTTPException, status from fastapi.encoders import jsonable_encoder from pydantic import EmailStr from sqlalchemy.orm import Session from app.api.deps import get_current_active_superuser, get_current_active_user, get_db from app.core.security import get_password_hash from app.models.user import User from app.schemas.user import User as UserSchema, UserCreate, UserUpdate router = APIRouter() @router.get("/", response_model=List[UserSchema]) def read_users( db: Session = Depends(get_db), skip: int = 0, limit: int = 100, current_user: User = Depends(get_current_active_superuser), ) -> Any: """ Retrieve users. """ users = db.query(User).offset(skip).limit(limit).all() return users @router.post("/", response_model=UserSchema) def create_user( *, db: Session = Depends(get_db), user_in: UserCreate, current_user: User = Depends(get_current_active_superuser), ) -> Any: """ Create new user. """ user = db.query(User).filter(User.email == user_in.email).first() if user: raise HTTPException( status_code=400, detail="The user with this email already exists in the system.", ) user = User( email=user_in.email, hashed_password=get_password_hash(user_in.password), full_name=user_in.full_name, is_superuser=user_in.is_superuser, is_active=user_in.is_active, ) db.add(user) db.commit() db.refresh(user) return user @router.put("/me", response_model=UserSchema) def update_user_me( *, db: Session = Depends(get_db), password: str = Body(None), full_name: str = Body(None), email: EmailStr = Body(None), current_user: User = Depends(get_current_active_user), ) -> Any: """ Update own user. """ current_user_data = jsonable_encoder(current_user) user_in = UserUpdate(**current_user_data) if password is not None: user_in.password = password if full_name is not None: user_in.full_name = full_name if email is not None: user_in.email = email user = current_user if password is not None: user.hashed_password = get_password_hash(password) if full_name is not None: user.full_name = full_name if email is not None: user.email = email db.add(user) db.commit() db.refresh(user) return user @router.get("/me", response_model=UserSchema) def read_user_me( current_user: User = Depends(get_current_active_user), ) -> Any: """ Get current user. """ return current_user @router.get("/{user_id}", response_model=UserSchema) def read_user_by_id( user_id: int, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db), ) -> Any: """ Get a specific user by id. """ user = db.query(User).filter(User.id == user_id).first() if user == current_user: return user if not current_user.is_superuser: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="The user doesn't have enough privileges", ) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="The user with this id does not exist in the system", ) return user @router.put("/{user_id}", response_model=UserSchema) def update_user( *, db: Session = Depends(get_db), user_id: int, user_in: UserUpdate, current_user: User = Depends(get_current_active_superuser), ) -> Any: """ Update a user. """ user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=404, detail="The user with this id does not exist in the system", ) user_data = jsonable_encoder(user) update_data = user_in.dict(exclude_unset=True) for field in user_data: if field in update_data: setattr(user, field, update_data[field]) if user_in.password: user.hashed_password = get_password_hash(user_in.password) db.add(user) db.commit() db.refresh(user) return user