Automated Action 1d54b4ec09 Implement user authentication service with FastAPI
- Set up project structure and dependencies
- Create SQLAlchemy database models
- Set up Alembic for database migrations
- Implement user registration and login endpoints
- Add JWT token authentication
- Create middleware for protected routes
- Add health check endpoint
- Update README with documentation

generated with BackendIM... (backend.im)
2025-05-13 16:59:17 +00:00

85 lines
2.5 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from datetime import timedelta
from app import models, schemas
from app.database import get_db
from app.utils.auth import (
verify_password,
create_access_token,
ACCESS_TOKEN_EXPIRE_MINUTES,
)
router = APIRouter(
prefix="/auth",
tags=["authentication"],
responses={401: {"description": "Unauthorized"}},
)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")
def authenticate_user(username: str, password: str, db: Session):
"""
Authenticate a user with username and password
"""
user = db.query(models.User).filter(models.User.username == username).first()
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
@router.post("/token", response_model=schemas.Token)
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
"""
OAuth2 compatible token login, returns an access token
"""
user = authenticate_user(form_data.username, form_data.password, db)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "user_id": user.id},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/login", response_model=schemas.Token)
async def login(
login_data: schemas.LoginRequest,
db: Session = Depends(get_db)
):
"""
Regular login endpoint, returns an access token
"""
user = authenticate_user(login_data.username, login_data.password, db)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "user_id": user.id},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}