Automated Action 0ceeef31a6 Add user authentication system with JWT tokens
- Add user model with relationship to tasks
- Implement JWT token authentication
- Create user registration and login endpoints
- Update task endpoints to filter by current user
- Add Alembic migration for user table
- Update documentation with authentication details
2025-05-16 12:40:03 +00:00

96 lines
2.7 KiB
Python

from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app import crud
from app.api.deps import get_db, get_current_active_user
from app.core.config import settings
from app.core.security import create_access_token
from app.models.user import User
from app.schemas.user import User as UserSchema
from app.schemas.user import UserCreate, Token
router = APIRouter()
@router.post("/register", response_model=UserSchema)
def register(
*,
db: Session = Depends(get_db),
user_in: UserCreate,
) -> Any:
"""
Register a new user.
"""
# Check if user with this email already exists
user = crud.user.get_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this email already exists.",
)
# Check if user with this username already exists
user = crud.user.get_by_username(db, username=user_in.username)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this username already exists.",
)
# Create new user
user = crud.user.create(db, obj_in=user_in)
return user
@router.post("/login", response_model=Token)
def login(
db: Session = Depends(get_db),
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests.
"""
user = crud.user.authenticate(
db, email_or_username=form_data.username, password=form_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email/username or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not crud.user.is_active(user):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user",
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": create_access_token(
subject=user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/test-token", response_model=UserSchema)
def test_token(current_user: User = Depends(get_current_active_user)) -> Any:
"""
Test access token.
"""
return current_user
@router.get("/me", response_model=UserSchema)
def read_users_me(current_user: User = Depends(get_current_active_user)) -> Any:
"""
Get current user.
"""
return current_user