from typing import Optional, List from sqlalchemy.orm import Session from app.core.security import get_password_hash, verify_password from app.models.user import User from app.models.news import UserPreference from app.schemas.user import UserCreate, UserUpdate def get_user(db: Session, user_id: int) -> Optional[User]: """Get a user by ID.""" return db.query(User).filter(User.id == user_id).first() def get_user_by_email(db: Session, email: str) -> Optional[User]: """Get a user by email.""" return db.query(User).filter(User.email == email).first() def get_user_by_username(db: Session, username: str) -> Optional[User]: """Get a user by username.""" return db.query(User).filter(User.username == username).first() def get_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]: """Get a list of users.""" return db.query(User).offset(skip).limit(limit).all() def create_user(db: Session, user_in: UserCreate) -> User: """Create a new user.""" # Check if user already exists db_user = get_user_by_email(db, email=user_in.email) if db_user: raise ValueError("Email already registered") db_user = get_user_by_username(db, username=user_in.username) if db_user: raise ValueError("Username already taken") # Create the user user_data = user_in.model_dump(exclude={"password"}) db_user = User(**user_data, hashed_password=get_password_hash(user_in.password)) db.add(db_user) db.commit() db.refresh(db_user) # Create default user preferences user_preference = UserPreference(user_id=db_user.id) db.add(user_preference) db.commit() return db_user def update_user(db: Session, db_user: User, user_in: UserUpdate) -> User: """Update a user's information.""" # Update user fields update_data = user_in.model_dump(exclude_unset=True) if "password" in update_data: hashed_password = get_password_hash(update_data["password"]) del update_data["password"] update_data["hashed_password"] = hashed_password for field, value in update_data.items(): setattr(db_user, field, value) db.add(db_user) db.commit() db.refresh(db_user) return db_user def authenticate_user(db: Session, username: str, password: str) -> Optional[User]: """ Authenticate a user. Args: db: The database session. username: The username or email. password: The plaintext password. Returns: The user if authentication is successful, None otherwise. """ # Try to find user by username first user = get_user_by_username(db, username=username) # If not found, try by email if not user: user = get_user_by_email(db, email=username) if not user: return None if not verify_password(password, user.hashed_password): return None return user def is_active(user: User) -> bool: """Check if a user is active.""" return user.is_active def is_superuser(user: User) -> bool: """Check if a user is a superuser.""" return user.is_superuser def get_user_preference(db: Session, user_id: int) -> Optional[UserPreference]: """Get a user's preferences.""" return db.query(UserPreference).filter(UserPreference.user_id == user_id).first() def update_user_preference( db: Session, user_id: int, keywords: Optional[str] = None, sources: Optional[str] = None, categories: Optional[str] = None, countries: Optional[str] = None, languages: Optional[str] = None ) -> UserPreference: """Update a user's preferences.""" user_preference = get_user_preference(db, user_id) if not user_preference: # Create preference if it doesn't exist user_preference = UserPreference( user_id=user_id, keywords=keywords, sources=sources, categories=categories, countries=countries, languages=languages, ) db.add(user_preference) else: # Update existing preference if keywords is not None: user_preference.keywords = keywords if sources is not None: user_preference.sources = sources if categories is not None: user_preference.categories = categories if countries is not None: user_preference.countries = countries if languages is not None: user_preference.languages = languages db.commit() db.refresh(user_preference) return user_preference