from fastapi import Request, HTTPException, status from fastapi.responses import JSONResponse from app.utils.auth import decode_token from app.database import SessionLocal from app.models import User from starlette.datastructures import Headers import re PUBLIC_PATHS = [ r'^/docs$', r'^/redoc$', r'^/openapi.json$', r'^/auth/token$', r'^/auth/login$', r'^/users/$', r'^/health$', ] async def get_current_user(token: str): """ Get the current user from a JWT token """ payload = decode_token(token) db = SessionLocal() try: user = db.query(User).filter(User.username == payload["username"]).first() if user is None: raise HTTPException(status_code=404, detail="User not found") return user finally: db.close() async def verify_token(request: Request, call_next): """ Middleware to verify JWT tokens """ # Check if the path is public path = request.url.path for pattern in PUBLIC_PATHS: if re.match(pattern, path): return await call_next(request) # Get the Authorization header auth_header = request.headers.get("Authorization") if not auth_header: return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"detail": "Not authenticated"}, headers={"WWW-Authenticate": "Bearer"}, ) # Extract the token try: scheme, token = auth_header.split() if scheme.lower() != "bearer": return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"detail": "Invalid authentication scheme"}, headers={"WWW-Authenticate": "Bearer"}, ) except ValueError: return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"detail": "Invalid token format"}, headers={"WWW-Authenticate": "Bearer"}, ) # Verify the token and get the current user try: user = await get_current_user(token) # Add the user to the request state request.state.user = user return await call_next(request) except HTTPException as e: return JSONResponse( status_code=e.status_code, content={"detail": e.detail}, headers=e.headers, )