from datetime import timedelta from typing import Any from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session from app.core.config import settings from app.core.security import create_access_token from app.crud.user import user as user_crud from app.core.deps import get_db from app.schemas.token import Token from app.schemas.user import User router = APIRouter() @router.post("/login", response_model=Token) def login_access_token( db: Session = Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends() ) -> Any: """ OAuth2 compatible token login, get an access token for future requests. """ user = user_crud.authenticate( db, email=form_data.username, password=form_data.password ) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", headers={"WWW-Authenticate": "Bearer"}, ) elif not user_crud.is_active(user): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user" ) access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) return { "access_token": create_access_token( user.id, expires_delta=access_token_expires ), "token_type": "bearer", } @router.post("/register", response_model=User) def register_user( *, db: Session = Depends(get_db), email: str, password: str, full_name: str = None, ) -> Any: """ Register a new user. """ user = user_crud.get_by_email(db, email=email) if user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="A user with this email already exists in the system", ) from app.schemas.user import UserCreate user_in = UserCreate( email=email, password=password, full_name=full_name, is_superuser=False, is_active=True, ) user = user_crud.create(db, obj_in=user_in) return user