from typing import Any, List from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from app.core.security import get_current_user, get_password_hash from app.db.session import get_db from app.models.user import User from app.schemas.user import User as UserSchema from app.schemas.user import UserUpdate router = APIRouter() @router.get("/me", response_model=UserSchema) def read_user_me( db: Session = Depends(get_db), current_user_id: str = Depends(get_current_user), ) -> Any: """ Get current user. """ user = db.query(User).filter(User.id == current_user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) return user @router.put("/me", response_model=UserSchema) def update_user_me( *, db: Session = Depends(get_db), current_user_id: str = Depends(get_current_user), user_in: UserUpdate, ) -> Any: """ Update current user. """ user = db.query(User).filter(User.id == current_user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) update_data = user_in.dict(exclude_unset=True) if "password" in update_data and update_data["password"]: update_data["hashed_password"] = get_password_hash(update_data["password"]) del update_data["password"] for field, value in update_data.items(): setattr(user, field, value) db.add(user) db.commit() db.refresh(user) return user @router.get("/{user_id}", response_model=UserSchema) def read_user_by_id( user_id: str, db: Session = Depends(get_db), current_user_id: str = Depends(get_current_user), ) -> Any: """ Get a specific user by id. """ user = db.query(User).filter(User.id == current_user_id).first() if not user or not user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions", ) user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) return user @router.get("/", response_model=List[UserSchema]) def read_users( db: Session = Depends(get_db), current_user_id: str = Depends(get_current_user), skip: int = 0, limit: int = 100, ) -> Any: """ Retrieve users. Only admin users can access this endpoint. """ user = db.query(User).filter(User.id == current_user_id).first() if not user or not user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions", ) users = db.query(User).offset(skip).limit(limit).all() return users