Automated Action 8575427c43 Implement user authentication flow
- Setup project structure with FastAPI
- Create user models with SQLAlchemy
- Implement JWT authentication
- Create auth endpoints (register, login, me)
- Add health endpoint
- Generate Alembic migrations
- Update documentation

Generated with BackendIM... (backend.im)
2025-05-12 17:00:50 +00:00

89 lines
2.6 KiB
Python

from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Body, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.api.deps import get_current_active_user
from app.core.config import settings
from app.core.security import create_access_token
from app.db.session import get_db
from app.models.user import User
from app.schemas.auth import TokenResponse
from app.schemas.user import User as UserSchema, UserCreate
from app.services import user as user_service
router = APIRouter()
@router.post("/auth/register", response_model=UserSchema)
def register(
*,
db: Session = Depends(get_db),
user_in: UserCreate,
) -> Any:
"""
Register a new user.
"""
# Check if user with this email already exists
user = user_service.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered",
)
# Check if user with this username already exists
user = user_service.get_by_username(db, username=user_in.username)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already taken",
)
# Create new user
user = user_service.create(db, obj_in=user_in)
return user
@router.post("/auth/login", response_model=TokenResponse)
def login(
db: Session = Depends(get_db),
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests.
"""
user = user_service.authenticate(
db, username=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user_service.is_active(user):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.get("/auth/me", response_model=UserSchema)
def read_current_user(
current_user: User = Depends(get_current_active_user),
) -> Any:
"""
Get current user information.
"""
return current_user