2025-06-05 07:32:18 +00:00

126 lines
3.2 KiB
Python

from typing import Any, Dict, Optional, Union
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
from app.services.security import get_password_hash, verify_password
def get_user(db: Session, user_id: int) -> Optional[User]:
"""
Get a user by ID.
"""
return db.query(User).filter(User.id == user_id).first()
def get_user_by_email(db: Session, email: str) -> Optional[User]:
"""
Get a user by email.
"""
return db.query(User).filter(User.email == email).first()
def get_user_by_username(db: Session, username: str) -> Optional[User]:
"""
Get a user by username.
"""
return db.query(User).filter(User.username == username).first()
def get_users(
db: Session, skip: int = 0, limit: int = 100
) -> list[User]:
"""
Get multiple users.
"""
return db.query(User).offset(skip).limit(limit).all()
def create_user(db: Session, user_in: UserCreate) -> User:
"""
Create a new user.
"""
# Check if user with this email already exists
user = get_user_by_email(db, email=user_in.email)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this email already exists",
)
# Check if user with this username already exists
user = get_user_by_username(db, username=user_in.username)
if user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A user with this username already exists",
)
# Create new user
hashed_password = get_password_hash(user_in.password)
db_user = User(
email=user_in.email,
username=user_in.username,
hashed_password=hashed_password,
full_name=user_in.full_name,
is_active=user_in.is_active,
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def update_user(
db: Session, *, db_user: User, user_in: Union[UserUpdate, Dict[str, Any]]
) -> User:
"""
Update a user.
"""
if isinstance(user_in, dict):
update_data = user_in
else:
update_data = user_in.model_dump(exclude_unset=True)
# If password is being updated, hash it
if update_data.get("password"):
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
# Update user
for field, value in update_data.items():
setattr(db_user, field, value)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def authenticate_user(db: Session, *, email: str, password: str) -> Optional[User]:
"""
Authenticate a user.
"""
user = get_user_by_email(db, email=email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active_user(user: User) -> bool:
"""
Check if user is active.
"""
return user.is_active
def is_superuser(user: User) -> bool:
"""
Check if user is superuser.
"""
return user.is_superuser