From e30ae5b30dd35485ff0ba7464f240070c246ce68 Mon Sep 17 00:00:00 2001 From: Backend IM Bot Date: Thu, 20 Mar 2025 14:03:40 +0000 Subject: [PATCH] Update code in endpoints/countries.get.py --- endpoints/countries.get.py | 146 +++++++++++++++++++++++++++++++------ 1 file changed, 123 insertions(+), 23 deletions(-) diff --git a/endpoints/countries.get.py b/endpoints/countries.get.py index 9275b01..301325e 100644 --- a/endpoints/countries.get.py +++ b/endpoints/countries.get.py @@ -1,29 +1,129 @@ -from fastapi import APIRouter, Depends, HTTPException -from core.database import fake_users_db +from datetime import datetime, timedelta +from typing import Optional +from fastapi import APIRouter, Depends, HTTPException, Response +from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm +from jose import JWTError, jwt +from passlib.context import CryptContext +from pydantic import BaseModel router = APIRouter() -@router.get("/countries") -async def african_countries_handler(): - """Get list of all African countries""" - african_countries = [ - "Algeria", "Angola", "Benin", "Botswana", "Burkina Faso", "Burundi", - "Cameroon", "Cape Verde", "Central African Republic", "Chad", "Comoros", - "Congo", "Democratic Republic of the Congo", "Djibouti", "Egypt", - "Equatorial Guinea", "Eritrea", "Ethiopia", "Gabon", "Gambia", "Ghana", - "Guinea", "Guinea-Bissau", "Ivory Coast", "Kenya", "Lesotho", "Liberia", - "Libya", "Madagascar", "Malawi", "Mali", "Mauritania", "Mauritius", - "Morocco", "Mozambique", "Namibia", "Niger", "Nigeria", "Rwanda", - "Sao Tome and Principe", "Senegal", "Seychelles", "Sierra Leone", - "Somalia", "South Africa", "South Sudan", "Sudan", "Swaziland", - "Tanzania", "Togo", "Tunisia", "Uganda", "Zambia", "Zimbabwe" - ] +# Constants +SECRET_KEY = "your-secret-key-here" +ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 + +# Password hashing +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login") + +# Models +class User(BaseModel): + username: str + email: str + full_name: Optional[str] = None + disabled: Optional[bool] = False + +class UserInDB(User): + hashed_password: str + +class Token(BaseModel): + access_token: str + token_type: str + +class TokenData(BaseModel): + username: Optional[str] = None + +# Helper functions +def verify_password(plain_password, hashed_password): + return pwd_context.verify(plain_password, hashed_password) + +def get_password_hash(password): + return pwd_context.hash(password) + +def get_user(username: str): + if username in fake_users_db: + user_dict = fake_users_db[username] + return UserInDB(**user_dict) + return None + +def authenticate_user(username: str, password: str): + user = get_user(username) + if not user: + return False + if not verify_password(password, user.hashed_password): + return False + return user + +def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): + to_encode = data.copy() + if expires_delta: + expire = datetime.utcnow() + expires_delta + else: + expire = datetime.utcnow() + timedelta(minutes=15) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + return encoded_jwt + +async def get_current_user(token: str = Depends(oauth2_scheme)): + credentials_exception = HTTPException( + status_code=401, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + username: str = payload.get("sub") + if username is None: + raise credentials_exception + token_data = TokenData(username=username) + except JWTError: + raise credentials_exception + user = get_user(username=token_data.username) + if user is None: + raise credentials_exception + return user + +# Login endpoint +@router.post("/login", response_model=Token) +async def login(response: Response, form_data: OAuth2PasswordRequestForm = Depends()): + """Authenticate user and return JWT token""" + user = authenticate_user(form_data.username, form_data.password) + if not user: + raise HTTPException( + status_code=401, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}, + ) + access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = create_access_token( + data={"sub": user.username}, expires_delta=access_token_expires + ) + + # Set cookie + response.set_cookie( + key="access_token", + value=f"Bearer {access_token}", + httponly=True, + max_age=1800, + expires=1800, + secure=True, + samesite="lax" + ) return { - "message": "African countries retrieved successfully", - "data": african_countries, - "metadata": { - "total_countries": len(african_countries), - "continent": "Africa" + "access_token": access_token, + "token_type": "bearer", + "message": "Login successful", + "user": { + "username": user.username, + "email": user.email, + "full_name": user.full_name } - } \ No newline at end of file + } + +# Protected route example +@router.get("/me", response_model=User) +async def read_users_me(current_user: User = Depends(get_current_user)): + """Get current user information""" + return current_user \ No newline at end of file