from typing import Optional from datetime import datetime, timedelta from fastapi import APIRouter, Depends, HTTPException, status, Response from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import JWTError, jwt from passlib.context import CryptContext from pydantic import BaseModel from core.database import fake_users_db # JWT and Password Hashing Setup SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # JWT Helper Functions def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def authenticate_user(username: str, password: str): user = fake_users_db.get(username) if not user: return False if not verify_password(password, user["password"]): return False return user def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt # Data Models class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): username: Optional[str] = None # Router Setup router = APIRouter() # Login Endpoint @router.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): """Login endpoint to get access token""" user = authenticate_user(form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user["username"]}, expires_delta=access_token_expires ) response = {"access_token": access_token, "token_type": "bearer"} # Store token in cookie response = Response(**response) response.set_cookie(key="access_token", value=access_token, httponly=True) return response # Helper function to get current user from token async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = fake_users_db.get(token_data.username) if user is None: raise credentials_exception return user # Authenticated user endpoint @router.get("/users/me/", response_model=dict) async def read_users_me(current_user: dict = Depends(get_current_user)): """Fetch current authenticated user""" return { "message": "User found", "data": current_user } ``` This code provides the following functionality: 1. **Login Endpoint**: `/token` endpoint to authenticate a user and return an access token. The token is stored in a browser cookie. 2. **User Endpoint**: `/users/me` endpoint to fetch the current authenticated user's data using the access token. 3. **JWT Implementation**: JSON Web Token (JWT) implementation for generating and verifying access tokens. 4. **Password Hashing**: Password hashing and verification using the `passlib` library. 5. **Helper Functions**: Functions for user authentication, password verification, token generation, and getting the current user from the token. 6. **Data Models**: Pydantic models for Token and TokenData. Note: This code uses a fake user database (`fake_users_db`) for demonstration purposes. In a real application, you would replace it with an actual database implementation.