Automated Action da59077885 Switch from email to username for authentication
- Add username field to User model
- Update authentication endpoints to use username instead of email
- Create migration for adding username column to user table
- Update user services to handle username validation and uniqueness
- Maintain email for compatibility, but make username the primary identifier
2025-05-26 13:17:17 +00:00

92 lines
2.9 KiB
Python

from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.core import security
from app.core.config import settings
from app.core.deps import get_current_user
from app.db.session import get_db
from app.models.user import User
from app.schemas.token import Token
from app.schemas.user import User as UserSchema
from app.schemas.user import UserCreate
from app.services import user as user_service
router = APIRouter()
@router.post("/login", response_model=Token)
def login_access_token(
db: Session = Depends(get_db),
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Any:
"""
OAuth2 compatible token login, get an access token for future requests
"""
user = user_service.authenticate(db, username=form_data.username, password=form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
elif not user_service.is_active(user):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user",
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/register", response_model=Token)
def register_user(
*,
db: Session = Depends(get_db),
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Any:
"""
Register a new user and return an access token
"""
existing_user = user_service.get_by_username(db, username=form_data.username)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this username already exists",
)
# Create user - using username as the primary identifier, but also require an email
# For OAuth2PasswordRequestForm, we'll use the username field from the form
# and set a default email based on the username
user_in = UserCreate(
username=form_data.username,
email=f"{form_data.username}@example.com", # Default email since OAuth2 form doesn't have email field
password=form_data.password
)
user = user_service.create(db, obj_in=user_in)
# Generate token
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.get("/me", response_model=UserSchema)
def read_users_me(
current_user: User = Depends(get_current_user),
) -> Any:
"""
Get current user
"""
return current_user