
- Set up SQLite database configuration and directory structure - Configure Alembic for proper SQLite migrations - Add initial model schemas and API endpoints - Fix OAuth2 authentication - Implement proper code formatting with Ruff
76 lines
2.2 KiB
Python
76 lines
2.2 KiB
Python
from typing import Any
|
|
|
|
from fastapi import APIRouter, Depends
|
|
from fastapi.security import OAuth2PasswordRequestForm
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app import crud
|
|
from app.api import deps
|
|
from app.core.exceptions import BadRequestException, UnauthorizedException
|
|
from app.core.security import create_access_token, verify_password
|
|
from app.db.session import get_db
|
|
from app.schemas.token import Token
|
|
from app.schemas.user import User, UserCreate
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("/register", response_model=User)
|
|
def register_user(
|
|
user_in: UserCreate,
|
|
db: Session = Depends(get_db)
|
|
) -> Any:
|
|
"""
|
|
Register a new user.
|
|
"""
|
|
# Check if user with this email already exists
|
|
user = crud.get_by_email(db, email=user_in.email)
|
|
if user:
|
|
raise BadRequestException(detail="A user with this email already exists")
|
|
|
|
# Check if user with this username already exists
|
|
user = crud.get_by_username(db, username=user_in.username)
|
|
if user:
|
|
raise BadRequestException(detail="A user with this username already exists")
|
|
|
|
# Create new user
|
|
user = crud.create_user(db, obj_in=user_in)
|
|
|
|
return user
|
|
|
|
|
|
@router.post("/login", response_model=Token)
|
|
def login(
|
|
db: Session = Depends(get_db),
|
|
form_data: OAuth2PasswordRequestForm = Depends()
|
|
) -> Any:
|
|
"""
|
|
Get access token for user.
|
|
"""
|
|
# Try to authenticate with email
|
|
user = crud.get_by_email(db, email=form_data.username)
|
|
if not user:
|
|
# Try to authenticate with username
|
|
user = crud.get_by_username(db, username=form_data.username)
|
|
|
|
if not user:
|
|
raise UnauthorizedException(detail="Incorrect email/username or password")
|
|
|
|
if not verify_password(form_data.password, user.hashed_password):
|
|
raise UnauthorizedException(detail="Incorrect email/username or password")
|
|
|
|
# Create access token
|
|
access_token = create_access_token(subject=user.id)
|
|
|
|
return {"access_token": access_token, "token_type": "bearer"}
|
|
|
|
|
|
@router.get("/me", response_model=User)
|
|
def read_users_me(
|
|
current_user: User = Depends(deps.get_current_active_user)
|
|
) -> Any:
|
|
"""
|
|
Get current user.
|
|
"""
|
|
return current_user |