from typing import Any from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from app.core.auth import get_current_active_user from app.core.security import get_password_hash from app.db.session import get_db from app.models.user import User from app.schemas.user import User as UserSchema from app.schemas.user import UserUpdate router = APIRouter() @router.get("/me", response_model=UserSchema) def read_user_me( current_user: User = Depends(get_current_active_user), ) -> Any: """ Get current user. """ return current_user @router.put("/me", response_model=UserSchema) def update_user_me( *, db: Session = Depends(get_db), user_in: UserUpdate, current_user: User = Depends(get_current_active_user), ) -> Any: """ Update own user. """ # Check if username is being updated and if it's already taken if user_in.username and user_in.username != current_user.username: user = db.query(User).filter(User.username == user_in.username).first() if user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered", ) # Check if email is being updated and if it's already taken if user_in.email and user_in.email != current_user.email: user = db.query(User).filter(User.email == user_in.email).first() if user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered", ) # Update user details user_data = user_in.dict(exclude_unset=True) if user_in.password: user_data["hashed_password"] = get_password_hash(user_in.password) del user_data["password"] for key, value in user_data.items(): setattr(current_user, key, value) db.add(current_user) db.commit() db.refresh(current_user) return current_user