from typing import Any, List from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from app import crud, models, schemas from app.api import deps router = APIRouter() @router.get("/", response_model=List[schemas.User]) def read_users( db: Session = Depends(deps.get_db), skip: int = 0, limit: int = 100, current_user: models.User = Depends(deps.get_current_admin_user), ) -> Any: """ Retrieve users. Admin only. """ users = crud.user.get_all(db, skip=skip, limit=limit) return users @router.post("/", response_model=schemas.User) def create_user( *, db: Session = Depends(deps.get_db), user_in: schemas.UserCreate, current_user: models.User = Depends(deps.get_current_admin_user), ) -> Any: """ Create new user. Admin only. """ user = crud.user.get_by_email(db, email=user_in.email) if user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered", ) user = crud.user.get_by_username(db, username=user_in.username) if user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered", ) user = crud.user.create(db, obj_in=user_in) return user @router.get("/{user_id}", response_model=schemas.User) def read_user( user_id: str, current_user: models.User = Depends(deps.get_current_active_user), db: Session = Depends(deps.get_db), ) -> Any: """ Get a specific user by id. """ user = crud.user.get_by_id(db, user_id=user_id) if user == current_user: return user if not crud.user.is_admin(current_user): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions to access this resource", ) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) return user @router.put("/{user_id}", response_model=schemas.User) def update_user( *, db: Session = Depends(deps.get_db), user_id: str, user_in: schemas.UserUpdate, current_user: models.User = Depends(deps.get_current_active_user), ) -> Any: """ Update a user. """ user = crud.user.get_by_id(db, user_id=user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) # Check permissions if user.id != current_user.id and not crud.user.is_admin(current_user): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions to perform this action", ) # Prevent non-admin users from changing their role if user_in.role is not None and user_in.role != user.role and not crud.user.is_admin(current_user): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions to change the role", ) # If updating email, check it's not already taken if user_in.email is not None and user_in.email != user.email: existing_user = crud.user.get_by_email(db, email=user_in.email) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered", ) # If updating username, check it's not already taken if user_in.username is not None and user_in.username != user.username: existing_user = crud.user.get_by_username(db, username=user_in.username) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered", ) user = crud.user.update(db, db_obj=user, obj_in=user_in) return user @router.delete("/{user_id}", response_model=schemas.User) def delete_user( *, db: Session = Depends(deps.get_db), user_id: str, current_user: models.User = Depends(deps.get_current_admin_user), ) -> Any: """ Delete a user. Admin only. """ user = crud.user.get_by_id(db, user_id=user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) # Prevent admins from deleting themselves if user.id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Users cannot delete their own account", ) user = crud.user.delete(db, user_id=user_id) return user