from typing import Any, List from fastapi import APIRouter, Depends, HTTPException, status from fastapi.encoders import jsonable_encoder from sqlalchemy.orm import Session from app.api.deps import get_current_active_superuser, get_current_user from app.core.security import get_password_hash from app.db.session import get_db from app.models.user import User from app.schemas.user import User as UserSchema from app.schemas.user import UserCreate, UserUpdate router = APIRouter() @router.get("/", response_model=List[UserSchema]) def read_users( db: Session = Depends(get_db), skip: int = 0, limit: int = 100, current_user: User = Depends(get_current_active_superuser), ) -> Any: """ Retrieve users. Only superusers can access this endpoint. """ users = db.query(User).offset(skip).limit(limit).all() return users @router.post("/", response_model=UserSchema) def create_user( *, db: Session = Depends(get_db), user_in: UserCreate, current_user: User = Depends(get_current_active_superuser), ) -> Any: """ Create new user. Only superusers can create new users. """ # Check if user with this email already exists user = db.query(User).filter(User.email == user_in.email).first() if user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="The user with this email already exists in the system", ) # Create new user user = User( email=user_in.email, hashed_password=get_password_hash(user_in.password), full_name=user_in.full_name, is_active=user_in.is_active, is_superuser=False, ) db.add(user) db.commit() db.refresh(user) return user @router.get("/me", response_model=UserSchema) def read_user_me( current_user: User = Depends(get_current_user), ) -> Any: """ Get current user. """ return current_user @router.put("/me", response_model=UserSchema) def update_user_me( *, db: Session = Depends(get_db), user_in: UserUpdate, current_user: User = Depends(get_current_user), ) -> Any: """ Update own user. """ user_data = jsonable_encoder(current_user) # Update user data update_data = user_in.model_dump(exclude_unset=True) # Hash password if it's provided if "password" in update_data and update_data["password"]: update_data["hashed_password"] = get_password_hash(update_data.pop("password")) # Update user fields for field in user_data: if field in update_data: setattr(current_user, field, update_data[field]) db.add(current_user) db.commit() db.refresh(current_user) return current_user @router.get("/{user_id}", response_model=UserSchema) def read_user_by_id( user_id: int, current_user: User = Depends(get_current_active_superuser), db: Session = Depends(get_db), ) -> Any: """ Get a specific user by id. Only superusers can access other users. """ user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) return user @router.put("/{user_id}", response_model=UserSchema) def update_user( *, db: Session = Depends(get_db), user_id: int, user_in: UserUpdate, current_user: User = Depends(get_current_active_superuser), ) -> Any: """ Update a user. Only superusers can update users. """ user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) # Update user data update_data = user_in.model_dump(exclude_unset=True) # Hash password if it's provided if "password" in update_data and update_data["password"]: update_data["hashed_password"] = get_password_hash(update_data.pop("password")) # Update user fields for field in update_data: if hasattr(user, field): setattr(user, field, update_data[field]) db.add(user) db.commit() db.refresh(user) return user