from typing import Any from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session from app.auth import ( create_access_token, create_refresh_token, get_current_user, get_password_hash, verify_password, verify_token ) from app.db.session import get_db from app.models.user import User from app.schemas.token import Token, RefreshToken from app.schemas.user import User as UserSchema, UserCreate, UserUpdate router = APIRouter(prefix="/auth", tags=["Authentication"]) @router.post("/register", response_model=UserSchema) async def register_user( user_in: UserCreate, db: Session = Depends(get_db) ) -> Any: """ Register a new user """ # Check if user with given email already exists user = db.query(User).filter(User.email == user_in.email).first() if user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="A user with this email already exists", ) # Check if user with given username already exists user = db.query(User).filter(User.username == user_in.username).first() if user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="A user with this username already exists", ) # Create new user hashed_password = get_password_hash(user_in.password) db_user = User( email=user_in.email, username=user_in.username, hashed_password=hashed_password, first_name=user_in.first_name, last_name=user_in.last_name, is_active=user_in.is_active, ) db.add(db_user) db.commit() db.refresh(db_user) return db_user @router.post("/login", response_model=Token) async def login( form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db) ) -> Any: """ Get access token for user """ # Try to find user by username user = db.query(User).filter(User.username == form_data.username).first() # If user not found by username, try by email if not user: user = db.query(User).filter(User.email == form_data.username).first() # If user still not found or password is incorrect, raise error if not user or not verify_password(form_data.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # If user is not active, raise error if not user.is_active: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user", ) # Generate access and refresh tokens access_token = create_access_token(user.id) refresh_token = create_refresh_token(user.id) return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer", } @router.post("/refresh", response_model=Token) async def refresh_token( refresh_token_data: RefreshToken, db: Session = Depends(get_db) ) -> Any: """ Refresh access token """ token_data = verify_token(refresh_token_data.refresh_token) if not token_data: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token", headers={"WWW-Authenticate": "Bearer"}, ) user = db.query(User).filter(User.id == int(token_data.sub)).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) if not user.is_active: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user", ) # Generate new tokens access_token = create_access_token(user.id) refresh_token = create_refresh_token(user.id) return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer", } @router.get("/me", response_model=UserSchema) async def get_me( current_user: User = Depends(get_current_user) ) -> Any: """ Get current user information """ return current_user @router.put("/me", response_model=UserSchema) async def update_me( user_in: UserUpdate, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ) -> Any: """ Update current user information """ # Check if email already exists if user_in.email and user_in.email != current_user.email: user = db.query(User).filter(User.email == user_in.email).first() if user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="A user with this email already exists", ) # Check if username already exists if user_in.username and user_in.username != current_user.username: user = db.query(User).filter(User.username == user_in.username).first() if user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="A user with this username already exists", ) # Update user data for field, value in user_in.model_dump(exclude_unset=True).items(): setattr(current_user, field, value) db.add(current_user) db.commit() db.refresh(current_user) return current_user