from typing import Any from fastapi import APIRouter, Body, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session from app.api.deps import get_current_user from app.core.config import settings from app.core.security import ( create_access_token, create_refresh_token, TOKEN_TYPE_REFRESH ) from app.crud.user import authenticate_user, get_user from app.db.session import get_db from app.schemas.token import Token, RefreshToken from app.schemas.user import User from jose import jwt, JWTError from pydantic import ValidationError router = APIRouter() @router.post("/login", response_model=Token) async def login_access_token( db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends() ) -> Any: """ OAuth2 compatible token login, get an access token for future requests. Args: db: Database session form_data: Form containing username (can be email or username) and password Returns: Access token """ # Try to authenticate with username user = authenticate_user(db, username=form_data.username, password=form_data.password) # If username authentication fails, try with email if not user: user = authenticate_user(db, email=form_data.username, password=form_data.password) # If both attempts fail, raise an exception if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username/email or password", headers={"WWW-Authenticate": "Bearer"}, ) # Create access token with the default expiration time access_token = create_access_token(subject=user.id) # Create refresh token with the default expiration time refresh_token = create_refresh_token(subject=user.id) # Return tokens return { "access_token": access_token, "token_type": "bearer", "refresh_token": refresh_token } @router.post("/refresh-token", response_model=Token) async def refresh_token( db: Session = Depends(get_db), refresh_token_in: RefreshToken = Body(...) ) -> Any: """ Refresh access token using a refresh token. Args: db: Database session refresh_token_in: Refresh token Returns: New access token and refresh token """ try: # Decode the refresh token payload = jwt.decode( refresh_token_in.refresh_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) # Validate token payload if payload.get("type") != TOKEN_TYPE_REFRESH: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type", headers={"WWW-Authenticate": "Bearer"}, ) # Get user ID from token user_id = payload.get("sub") if user_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token", headers={"WWW-Authenticate": "Bearer"}, ) # Get user from database user = get_user(db, user_id=int(user_id)) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) # Check if user is active if not user.is_active: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user", ) # Create new access token access_token = create_access_token(subject=user.id) # Create new refresh token new_refresh_token = create_refresh_token(subject=user.id) # Return new tokens return { "access_token": access_token, "token_type": "bearer", "refresh_token": new_refresh_token } except (JWTError, ValidationError): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token", headers={"WWW-Authenticate": "Bearer"}, ) @router.get("/me", response_model=User) async def read_users_me( current_user: User = Depends(get_current_user) ) -> Any: """ Get current user information. Args: current_user: Current user from authentication Returns: Current user information """ return current_user