Automated Action 1d54b4ec09 Implement user authentication service with FastAPI
- Set up project structure and dependencies
- Create SQLAlchemy database models
- Set up Alembic for database migrations
- Implement user registration and login endpoints
- Add JWT token authentication
- Create middleware for protected routes
- Add health check endpoint
- Update README with documentation

generated with BackendIM... (backend.im)
2025-05-13 16:59:17 +00:00

83 lines
2.3 KiB
Python

from fastapi import Request, HTTPException, status
from fastapi.responses import JSONResponse
from app.utils.auth import decode_token
from app.database import SessionLocal
from app.models import User
from starlette.datastructures import Headers
import re
PUBLIC_PATHS = [
r'^/docs$',
r'^/redoc$',
r'^/openapi.json$',
r'^/auth/token$',
r'^/auth/login$',
r'^/users/$',
r'^/health$',
]
async def get_current_user(token: str):
"""
Get the current user from a JWT token
"""
payload = decode_token(token)
db = SessionLocal()
try:
user = db.query(User).filter(User.username == payload["username"]).first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
finally:
db.close()
async def verify_token(request: Request, call_next):
"""
Middleware to verify JWT tokens
"""
# Check if the path is public
path = request.url.path
for pattern in PUBLIC_PATHS:
if re.match(pattern, path):
return await call_next(request)
# Get the Authorization header
auth_header = request.headers.get("Authorization")
if not auth_header:
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Not authenticated"},
headers={"WWW-Authenticate": "Bearer"},
)
# Extract the token
try:
scheme, token = auth_header.split()
if scheme.lower() != "bearer":
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Invalid authentication scheme"},
headers={"WWW-Authenticate": "Bearer"},
)
except ValueError:
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Invalid token format"},
headers={"WWW-Authenticate": "Bearer"},
)
# Verify the token and get the current user
try:
user = await get_current_user(token)
# Add the user to the request state
request.state.user = user
return await call_next(request)
except HTTPException as e:
return JSONResponse(
status_code=e.status_code,
content={"detail": e.detail},
headers=e.headers,
)