Automated Action 90c1cdef34 Setup News Aggregation Service
- Fix code linting issues
- Update README with detailed documentation
- Configure database paths for the current environment
- Create necessary directory structure

The News Aggregation Service is now ready to use with FastAPI and SQLite.
2025-05-27 18:50:11 +00:00

152 lines
4.5 KiB
Python

from typing import Optional, List
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.models.user import User
from app.models.news import UserPreference
from app.schemas.user import UserCreate, UserUpdate
def get_user(db: Session, user_id: int) -> Optional[User]:
"""Get a user by ID."""
return db.query(User).filter(User.id == user_id).first()
def get_user_by_email(db: Session, email: str) -> Optional[User]:
"""Get a user by email."""
return db.query(User).filter(User.email == email).first()
def get_user_by_username(db: Session, username: str) -> Optional[User]:
"""Get a user by username."""
return db.query(User).filter(User.username == username).first()
def get_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]:
"""Get a list of users."""
return db.query(User).offset(skip).limit(limit).all()
def create_user(db: Session, user_in: UserCreate) -> User:
"""Create a new user."""
# Check if user already exists
db_user = get_user_by_email(db, email=user_in.email)
if db_user:
raise ValueError("Email already registered")
db_user = get_user_by_username(db, username=user_in.username)
if db_user:
raise ValueError("Username already taken")
# Create the user
user_data = user_in.model_dump(exclude={"password"})
db_user = User(**user_data, hashed_password=get_password_hash(user_in.password))
db.add(db_user)
db.commit()
db.refresh(db_user)
# Create default user preferences
user_preference = UserPreference(user_id=db_user.id)
db.add(user_preference)
db.commit()
return db_user
def update_user(db: Session, db_user: User, user_in: UserUpdate) -> User:
"""Update a user's information."""
# Update user fields
update_data = user_in.model_dump(exclude_unset=True)
if "password" in update_data:
hashed_password = get_password_hash(update_data["password"])
del update_data["password"]
update_data["hashed_password"] = hashed_password
for field, value in update_data.items():
setattr(db_user, field, value)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def authenticate_user(db: Session, username: str, password: str) -> Optional[User]:
"""
Authenticate a user.
Args:
db: The database session.
username: The username or email.
password: The plaintext password.
Returns:
The user if authentication is successful, None otherwise.
"""
# Try to find user by username first
user = get_user_by_username(db, username=username)
# If not found, try by email
if not user:
user = get_user_by_email(db, email=username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def is_active(user: User) -> bool:
"""Check if a user is active."""
return user.is_active
def is_superuser(user: User) -> bool:
"""Check if a user is a superuser."""
return user.is_superuser
def get_user_preference(db: Session, user_id: int) -> Optional[UserPreference]:
"""Get a user's preferences."""
return db.query(UserPreference).filter(UserPreference.user_id == user_id).first()
def update_user_preference(
db: Session, user_id: int, keywords: Optional[str] = None,
sources: Optional[str] = None, categories: Optional[str] = None,
countries: Optional[str] = None, languages: Optional[str] = None
) -> UserPreference:
"""Update a user's preferences."""
user_preference = get_user_preference(db, user_id)
if not user_preference:
# Create preference if it doesn't exist
user_preference = UserPreference(
user_id=user_id,
keywords=keywords,
sources=sources,
categories=categories,
countries=countries,
languages=languages,
)
db.add(user_preference)
else:
# Update existing preference
if keywords is not None:
user_preference.keywords = keywords
if sources is not None:
user_preference.sources = sources
if categories is not None:
user_preference.categories = categories
if countries is not None:
user_preference.countries = countries
if languages is not None:
user_preference.languages = languages
db.commit()
db.refresh(user_preference)
return user_preference