Automated Action 91405a6195 Implement authentication service with FastAPI and SQLite
- Setup project structure and dependencies
- Create SQLite database with SQLAlchemy models
- Initialize Alembic for database migrations
- Implement JWT-based authentication utilities
- Create API endpoints for signup, login, and logout
- Add health check endpoint
- Implement authentication middleware for protected routes
- Update README with setup and usage instructions
- Add linting with Ruff
2025-05-17 17:33:29 +00:00

113 lines
3.3 KiB
Python

from datetime import timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.core.auth import get_current_user
from app.core.config import settings
from app.core.security import create_access_token, get_password_hash, verify_password
from app.db.session import get_db
from app.models.user import User
from app.schemas.token import Token
from app.schemas.user import User as UserSchema
from app.schemas.user import UserCreate
router = APIRouter()
@router.post("/auth/signup", response_model=UserSchema)
def create_user(
user_in: UserCreate,
db: Session = Depends(get_db)
) -> Any:
"""Create a new user
"""
# Check if user with this email or username already exists
user = db.query(User).filter(
(User.email == user_in.email) | (User.username == user_in.username)
).first()
if user:
if user.email == user_in.email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already taken"
)
# Create new user
user = User(
email=user_in.email,
username=user_in.username,
hashed_password=get_password_hash(user_in.password),
is_active=True
)
db.add(user)
db.commit()
db.refresh(user)
return user
@router.post("/auth/login", response_model=Token)
def login_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
) -> Any:
"""Get the JWT for a user with data from OAuth2 request form body
"""
# Try to authenticate with username
user = db.query(User).filter(User.username == form_data.username).first()
# If user not found by username, try email
if not user:
user = db.query(User).filter(User.email == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# If user is not active
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": create_access_token(
user.id, expires_delta=access_token_expires
),
"token_type": "bearer",
}
@router.post("/auth/logout", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
def logout(
_: User = Depends(get_current_user),
) -> None:
"""Logout a user (JWT token should be invalidated client-side)
"""
# JWT is stateless, so nothing to do on server-side for logout
# Client should remove the token
return None
@router.get("/users/me", response_model=UserSchema)
def read_users_me(
current_user: User = Depends(get_current_user),
) -> Any:
"""Get current user
"""
return current_user