from typing import Any from fastapi import APIRouter, Body, Depends, HTTPException from fastapi.encoders import jsonable_encoder from sqlalchemy.orm import Session from app.core.security import get_password_hash from app.db.deps import get_current_active_user, get_db from app.models.user import User from app.schemas.user import User as UserSchema from app.schemas.user import UserCreate, UserUpdate router = APIRouter() @router.get("/me", response_model=UserSchema) def read_user_me( current_user: User = Depends(get_current_active_user), ) -> Any: """ Get current user. """ return current_user @router.put("/me", response_model=UserSchema) def update_user_me( *, db: Session = Depends(get_db), password: str = Body(None), full_name: str = Body(None), email: str = Body(None), current_user: User = Depends(get_current_active_user), ) -> Any: """ Update own user. """ current_user_data = jsonable_encoder(current_user) user_in = UserUpdate(**current_user_data) if password is not None: user_in.password = password if full_name is not None: user_in.full_name = full_name if email is not None: user_in.email = email # Check if email already exists if email and email != current_user.email: user = db.query(User).filter(User.email == email).first() if user: raise HTTPException( status_code=400, detail="The user with this email already exists in the system.", ) # Update user if user_in.password: hashed_password = get_password_hash(user_in.password) current_user.hashed_password = hashed_password if user_in.full_name: current_user.full_name = user_in.full_name if user_in.email: current_user.email = user_in.email if user_in.is_active is not None: current_user.is_active = user_in.is_active db.add(current_user) db.commit() db.refresh(current_user) return current_user @router.post("", response_model=UserSchema) def create_user( *, db: Session = Depends(get_db), user_in: UserCreate, ) -> Any: """ Create new user. """ user = db.query(User).filter(User.email == user_in.email).first() if user: raise HTTPException( status_code=400, detail="The user with this email already exists in the system.", ) # Create new user user = User( email=user_in.email, hashed_password=get_password_hash(user_in.password), full_name=user_in.full_name, is_active=user_in.is_active, ) db.add(user) db.commit() db.refresh(user) return user