from typing import Optional from datetime import datetime, timedelta from fastapi import APIRouter, Depends, HTTPException, Response, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import jwt, JWTError from passlib.context import CryptContext from pydantic import BaseModel, EmailStr, validator from core.database import fake_users_db from core.config import settings router = APIRouter() # Password hashing utility pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # JWT utilities oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login") SECRET_KEY = settings.SECRET_KEY ALGORITHM = settings.ALGORITHM ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTES REFRESH_TOKEN_EXPIRE_MINUTES = settings.REFRESH_TOKEN_EXPIRE_MINUTES # User model class User(BaseModel): username: str email: EmailStr disabled: bool = False @validator('username') def username_alphanumeric(cls, v): assert v.isalnum(), 'Username must be alphanumeric' return v # Token data model class TokenData(BaseModel): username: str = None # Utility functions def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def get_user(username: str): user_data = fake_users_db.get(username) if user_data: user = User(**user_data) return user def authenticate_user(username: str, password: str): user = get_user(username) if not user: return False if not verify_password(password, user.password): return False return user def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def create_refresh_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(days=7) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt @router.post("/login") async def login_person(response: Response, form_data: OAuth2PasswordRequestForm = Depends()): """Demo login endpoint""" user = authenticate_user(form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) refresh_token_expires = timedelta(minutes=REFRESH_TOKEN_EXPIRE_MINUTES) refresh_token = create_refresh_token( data={"sub": user.username}, expires_delta=refresh_token_expires ) response.set_cookie( key="access_token", value=f"Bearer {access_token}", httponly=True ) response.set_cookie( key="refresh_token", value=f"Bearer {refresh_token}", httponly=True ) return {"message": "Login successful", "access_token": access_token, "refresh_token": refresh_token}