from datetime import datetime, timedelta from typing import List, Optional from sqlalchemy.orm import Session from app.core.config import settings from app.models.role import Role from app.models.user import User from app.models.user_role import UserRole from app.schemas.user import UserCreate, UserUpdate from app.utils.security import get_password_hash, verify_password def get_user_by_id(db: Session, user_id: int) -> Optional[User]: """Get a user by ID.""" return db.query(User).filter(User.id == user_id).first() def get_user_by_email(db: Session, email: str) -> Optional[User]: """Get a user by email.""" return db.query(User).filter(User.email == email).first() def get_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]: """Get all users.""" return db.query(User).offset(skip).limit(limit).all() def create_user(db: Session, user_in: UserCreate) -> User: """Create a new user.""" # Check if user already exists user = get_user_by_email(db, email=user_in.email) if user: return None # Create user object db_user = User( email=user_in.email, hashed_password=get_password_hash(user_in.password), first_name=user_in.first_name, last_name=user_in.last_name, is_active=True, is_verified=False, # Default to not verified ) db.add(db_user) db.commit() db.refresh(db_user) # Add default user role default_role = db.query(Role).filter(Role.name == "user").first() if default_role: db_user_role = UserRole(user_id=db_user.id, role_id=default_role.id) db.add(db_user_role) db.commit() else: # Create default role if it doesn't exist default_role = Role(name="user", description="Regular user with limited access") db.add(default_role) db.commit() db.refresh(default_role) # Now assign it db_user_role = UserRole(user_id=db_user.id, role_id=default_role.id) db.add(db_user_role) db.commit() return db_user def update_user(db: Session, user_id: int, user_in: UserUpdate) -> Optional[User]: """Update a user.""" user = get_user_by_id(db, user_id=user_id) if not user: return None # Update user fields update_data = user_in.dict(exclude_unset=True) # Handle password update separately if "password" in update_data: password = update_data.pop("password") user.hashed_password = get_password_hash(password) # Update other fields for field, value in update_data.items(): setattr(user, field, value) user.updated_at = datetime.utcnow() db.add(user) db.commit() db.refresh(user) return user def delete_user(db: Session, user_id: int) -> bool: """Delete a user.""" user = get_user_by_id(db, user_id=user_id) if not user: return False db.delete(user) db.commit() return True def verify_user(db: Session, user_id: int) -> Optional[User]: """Mark a user as verified.""" user = get_user_by_id(db, user_id=user_id) if not user: return None user.is_verified = True user.updated_at = datetime.utcnow() db.add(user) db.commit() db.refresh(user) return user def authenticate_user(db: Session, email: str, password: str) -> Optional[User]: """Authenticate a user.""" user = get_user_by_email(db, email=email) if not user: return None if not user.is_active: return None if not verify_password(password, user.hashed_password): return None return user