From b8415b87f5219178a83af625e549164add199536 Mon Sep 17 00:00:00 2001 From: Backend IM Bot Date: Thu, 20 Mar 2025 20:39:58 +0100 Subject: [PATCH] Update code in endpoints/them.get.py --- endpoints/them.get.py | 115 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 endpoints/them.get.py diff --git a/endpoints/them.get.py b/endpoints/them.get.py new file mode 100644 index 0000000..a764d41 --- /dev/null +++ b/endpoints/them.get.py @@ -0,0 +1,115 @@ +from typing import Optional +from datetime import datetime, timedelta +from fastapi import APIRouter, Depends, HTTPException, status, Response +from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm +from jose import JWTError, jwt +from passlib.context import CryptContext +from pydantic import BaseModel +from core.database import fake_users_db + +# JWT and Password Hashing Setup +SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" +ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 + +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + +# JWT Helper Functions +def verify_password(plain_password, hashed_password): + return pwd_context.verify(plain_password, hashed_password) + +def get_password_hash(password): + return pwd_context.hash(password) + +def authenticate_user(username: str, password: str): + user = fake_users_db.get(username) + if not user: + return False + if not verify_password(password, user["password"]): + return False + return user + +def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): + to_encode = data.copy() + if expires_delta: + expire = datetime.utcnow() + expires_delta + else: + expire = datetime.utcnow() + timedelta(minutes=15) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + return encoded_jwt + +# Data Models +class Token(BaseModel): + access_token: str + token_type: str + +class TokenData(BaseModel): + username: Optional[str] = None + +# Router Setup +router = APIRouter() + +# Login Endpoint +@router.post("/token", response_model=Token) +async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): + """Login endpoint to get access token""" + user = authenticate_user(form_data.username, form_data.password) + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = create_access_token( + data={"sub": user["username"]}, expires_delta=access_token_expires + ) + response = {"access_token": access_token, "token_type": "bearer"} + + # Store token in cookie + response = Response(**response) + response.set_cookie(key="access_token", value=access_token, httponly=True) + return response + +# Helper function to get current user from token +async def get_current_user(token: str = Depends(oauth2_scheme)): + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + username: str = payload.get("sub") + if username is None: + raise credentials_exception + token_data = TokenData(username=username) + except JWTError: + raise credentials_exception + user = fake_users_db.get(token_data.username) + if user is None: + raise credentials_exception + return user + +# Authenticated user endpoint +@router.get("/users/me/", response_model=dict) +async def read_users_me(current_user: dict = Depends(get_current_user)): + """Fetch current authenticated user""" + return { + "message": "User found", + "data": current_user + } +``` + +This code provides the following functionality: + +1. **Login Endpoint**: `/token` endpoint to authenticate a user and return an access token. The token is stored in a browser cookie. +2. **User Endpoint**: `/users/me` endpoint to fetch the current authenticated user's data using the access token. +3. **JWT Implementation**: JSON Web Token (JWT) implementation for generating and verifying access tokens. +4. **Password Hashing**: Password hashing and verification using the `passlib` library. +5. **Helper Functions**: Functions for user authentication, password verification, token generation, and getting the current user from the token. +6. **Data Models**: Pydantic models for Token and TokenData. + +Note: This code uses a fake user database (`fake_users_db`) for demonstration purposes. In a real application, you would replace it with an actual database implementation. \ No newline at end of file