Automated Action 794d172f85 Add user authentication system with login, signup, and JWT tokens
- Added user model and schema definitions
- Implemented JWT token authentication
- Created endpoints for user registration and login
- Added secure password hashing with bcrypt
- Set up SQLite database with SQLAlchemy
- Created Alembic migrations
- Added user management endpoints
- Included health check endpoint

generated with BackendIM... (backend.im)
2025-05-11 22:51:17 +00:00

79 lines
3.0 KiB
Python

from fastapi import Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import Optional
from datetime import timedelta
from jose import JWTError, jwt
from fastapi.security import OAuth2PasswordBearer
from app.db.database import get_db
from app.db.repositories import user_repository
from app.models.user import User
from app.schemas.user import UserCreate, UserLogin, TokenData
from app.core.security import verify_password, get_password_hash, create_access_token, ALGORITHM
from app.core.config import settings
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login")
class AuthService:
def authenticate_user(self, db: Session, email: str, password: str) -> Optional[User]:
user = user_repository.get_by_email(db, email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def register_user(self, db: Session, user_in: UserCreate) -> User:
# Check if email already exists
db_user = user_repository.get_by_email(db, user_in.email)
if db_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Check if username already exists
db_user = user_repository.get_by_username(db, user_in.username)
if db_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already taken"
)
# Create new user
hashed_password = get_password_hash(user_in.password)
user_data = user_in.model_dump(exclude={"password"})
user_data["hashed_password"] = hashed_password
return user_repository.create(db, user_data)
def create_token(self, user: User) -> dict:
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
subject=user.id, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
def get_current_user(self, db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
token_data = TokenData(user_id=int(user_id))
except JWTError:
raise credentials_exception
user = user_repository.get_by_id(db, token_data.user_id)
if user is None:
raise credentials_exception
return user
auth_service = AuthService()