From 0cce38bbdbc8c9f59543b2ec0217a989c2a11102 Mon Sep 17 00:00:00 2001 From: Backend IM Bot Date: Thu, 20 Mar 2025 14:33:19 +0000 Subject: [PATCH] Update code in endpoints/login.post.py --- endpoints/login.post.py | 128 +++++++++++++++++++++++++++++++++++----- 1 file changed, 114 insertions(+), 14 deletions(-) diff --git a/endpoints/login.post.py b/endpoints/login.post.py index df5aa08..2b12b99 100644 --- a/endpoints/login.post.py +++ b/endpoints/login.post.py @@ -1,25 +1,125 @@ -from fastapi import APIRouter, Depends, HTTPException -from core.auth import get_current_user_dummy +from datetime import datetime, timedelta +from typing import Optional +from fastapi import APIRouter, Depends, HTTPException, Response +from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm +from jose import JWTError, jwt +from passlib.context import CryptContext +from pydantic import BaseModel from core.database import fake_users_db +# Constants +SECRET_KEY = "your-secret-key-here" +ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 + +# Password hashing +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login") + router = APIRouter() -@router.post("/login") -async def login_demo( - username: str = "demo", - password: str = "password" +# Models +class Token(BaseModel): + access_token: str + token_type: str + +class TokenData(BaseModel): + username: Optional[str] = None + +class User(BaseModel): + username: str + email: Optional[str] = None + disabled: Optional[bool] = None + +class UserInDB(User): + hashed_password: str + +# Helper functions +def verify_password(plain_password, hashed_password): + return pwd_context.verify(plain_password, hashed_password) + +def get_password_hash(password): + return pwd_context.hash(password) + +def get_user(db, username: str): + if username in db: + user_dict = db[username] + return UserInDB(**user_dict) + return None + +def authenticate_user(fake_db, username: str, password: str): + user = get_user(fake_db, username) + if not user: + return False + if not verify_password(password, user.hashed_password): + return False + return user + +def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): + to_encode = data.copy() + if expires_delta: + expire = datetime.utcnow() + expires_delta + else: + expire = datetime.utcnow() + timedelta(minutes=15) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + return encoded_jwt + +@router.post("/login", response_model=Token) +async def login( + response: Response, + form_data: OAuth2PasswordRequestForm = Depends() ): - """Demo login endpoint""" - user = fake_users_db.get(username) - if not user or user["password"] != password: - raise HTTPException(status_code=400, detail="Invalid credentials") + """Login endpoint with JWT token generation and cookie storage""" + user = authenticate_user(fake_users_db, form_data.username, form_data.password) + if not user: + raise HTTPException( + status_code=401, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = create_access_token( + data={"sub": user.username}, expires_delta=access_token_expires + ) + + # Set JWT token in HTTP-only cookie + response.set_cookie( + key="access_token", + value=f"Bearer {access_token}", + httponly=True, + max_age=1800, + expires=1800, + secure=True, + samesite="lax" + ) return { - "message": "Login successful (demo)", - "user": username, - "token": "dummy_jwt_token_123", + "message": "Login successful", + "access_token": access_token, + "token_type": "bearer", + "user": user.username, "features": { "rate_limit": 100, - "expires_in": 3600 + "expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60 } } + +async def get_current_user(token: str = Depends(oauth2_scheme)): + credentials_exception = HTTPException( + status_code=401, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + username: str = payload.get("sub") + if username is None: + raise credentials_exception + token_data = TokenData(username=username) + except JWTError: + raise credentials_exception + user = get_user(fake_users_db, username=token_data.username) + if user is None: + raise credentials_exception + return user \ No newline at end of file