from datetime import timedelta from typing import Any from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session from app.core import security from app.core.config import settings from app.core.deps import get_current_user from app.db.session import get_db from app.models.user import User from app.schemas.token import Token from app.schemas.user import User as UserSchema from app.schemas.user import UserCreate from app.services import user as user_service router = APIRouter() @router.post("/login", response_model=Token) def login_access_token( db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends(), ) -> Any: """ OAuth2 compatible token login, get an access token for future requests """ user = user_service.authenticate(db, username=form_data.username, password=form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", ) elif not user_service.is_active(user): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user", ) access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) return { "access_token": security.create_access_token( user.id, expires_delta=access_token_expires ), "token_type": "bearer", } @router.post("/register", response_model=Token) def register_user( *, db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends(), ) -> Any: """ Register a new user and return an access token """ existing_user = user_service.get_by_username(db, username=form_data.username) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="A user with this username already exists", ) # Create user - using username as the primary identifier, but also require an email # For OAuth2PasswordRequestForm, we'll use the username field from the form # and set a default email based on the username user_in = UserCreate( username=form_data.username, email=f"{form_data.username}@example.com", # Default email since OAuth2 form doesn't have email field password=form_data.password ) user = user_service.create(db, obj_in=user_in) # Generate token access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) return { "access_token": security.create_access_token( user.id, expires_delta=access_token_expires ), "token_type": "bearer", } @router.get("/me", response_model=UserSchema) def read_users_me( current_user: User = Depends(get_current_user), ) -> Any: """ Get current user """ return current_user