2025-06-05 10:07:49 +00:00

85 lines
2.6 KiB
Python

from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import crud, schemas
from app.api import deps
from app.core import security
from app.core.config import settings
from app.schemas.auth import LoginRequest
router = APIRouter()
@router.post("/login", response_model=schemas.Token)
async def login_access_token(
db: Session = Depends(deps.get_db), form_data: OAuth2PasswordRequestForm = Depends()
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests using username/password
"""
# For OAuth2 form, username field can contain either email or username
identifier = form_data.username
user = None
# Try to authenticate with email
if "@" in identifier:
user = crud.user.authenticate(db, email=identifier, password=form_data.password)
else:
# Try to authenticate with username
user = crud.user.authenticate(db, username=identifier, password=form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username/email or password",
)
elif not crud.user.is_active(user):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Inactive user"
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/login/flexible", response_model=schemas.Token)
async def login_flexible(
login_data: LoginRequest, db: Session = Depends(deps.get_db)
) -> Any:
"""
Login with either email or username
"""
user = crud.user.authenticate(
db,
email=login_data.email,
username=login_data.username,
password=login_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username/email or password",
)
elif not crud.user.is_active(user):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Inactive user"
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}