from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from typing import List from app.db.database import get_db from app.db.repositories import user_repository from app.schemas.user import User, UserUpdate from app.services.auth_service import auth_service from app.models.user import User as UserModel router = APIRouter() @router.get("/", response_model=List[User]) def read_users( skip: int = 0, limit: int = 100, db: Session = Depends(get_db), current_user: UserModel = Depends(auth_service.get_current_user) ): """ Retrieve users. Only authenticated users can see the list. """ users = user_repository.list(db, skip=skip, limit=limit) return users @router.get("/{user_id}", response_model=User) def read_user( user_id: int, db: Session = Depends(get_db), current_user: UserModel = Depends(auth_service.get_current_user) ): """ Get a specific user by id. """ db_user = user_repository.get_by_id(db, user_id) if db_user is None: raise HTTPException(status_code=404, detail="User not found") return db_user @router.put("/{user_id}", response_model=User) def update_user( user_id: int, user_in: UserUpdate, db: Session = Depends(get_db), current_user: UserModel = Depends(auth_service.get_current_user) ): """ Update a user. Users can only update their own information. """ # Only allow users to update their own information if current_user.id != user_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions" ) db_user = user_repository.get_by_id(db, user_id) if db_user is None: raise HTTPException(status_code=404, detail="User not found") # Check for email uniqueness if changing email if user_in.email and user_in.email != db_user.email: existing_user = user_repository.get_by_email(db, user_in.email) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) # Check for username uniqueness if changing username if user_in.username and user_in.username != db_user.username: existing_user = user_repository.get_by_username(db, user_in.username) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already taken" ) user_data = user_in.model_dump(exclude_unset=True) updated_user = user_repository.update(db, db_user, user_data) return updated_user @router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_user( user_id: int, db: Session = Depends(get_db), current_user: UserModel = Depends(auth_service.get_current_user) ): """ Delete a user. Users can only delete their own account. """ # Only allow users to delete their own account if current_user.id != user_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions" ) db_user = user_repository.get_by_id(db, user_id) if db_user is None: raise HTTPException(status_code=404, detail="User not found") user_repository.delete(db, db_user) return None