from datetime import timedelta from typing import Any from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session from app.core import security from app.core.config import settings from app.core.deps import get_current_user from app.db.session import get_db from app.models.user import User from app.schemas.token import Token from app.schemas.user import User as UserSchema from app.schemas.user import UserCreate from app.services import user as user_service router = APIRouter() @router.post("/login", response_model=Token) def login_access_token( db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends(), ) -> Any: """ OAuth2 compatible token login, get an access token for future requests """ user = user_service.authenticate(db, email=form_data.username, password=form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", ) elif not user_service.is_active(user): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user", ) access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) return { "access_token": security.create_access_token( user.id, expires_delta=access_token_expires ), "token_type": "bearer", } @router.post("/register", response_model=Token) def register_user( *, db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends(), ) -> Any: """ Register a new user and return an access token """ user = user_service.get_by_email(db, email=form_data.username) if user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="A user with this email already exists", ) # Create user user_in = UserCreate(email=form_data.username, password=form_data.password) user = user_service.create(db, obj_in=user_in) # Generate token access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) return { "access_token": security.create_access_token( user.id, expires_delta=access_token_expires ), "token_type": "bearer", } @router.get("/me", response_model=UserSchema) def read_users_me( current_user: User = Depends(get_current_user), ) -> Any: """ Get current user """ return current_user