
- Fix code linting issues - Update README with detailed documentation - Configure database paths for the current environment - Create necessary directory structure The News Aggregation Service is now ready to use with FastAPI and SQLite.
152 lines
4.5 KiB
Python
152 lines
4.5 KiB
Python
from typing import Optional, List
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.security import get_password_hash, verify_password
|
|
from app.models.user import User
|
|
from app.models.news import UserPreference
|
|
from app.schemas.user import UserCreate, UserUpdate
|
|
|
|
|
|
def get_user(db: Session, user_id: int) -> Optional[User]:
|
|
"""Get a user by ID."""
|
|
return db.query(User).filter(User.id == user_id).first()
|
|
|
|
|
|
def get_user_by_email(db: Session, email: str) -> Optional[User]:
|
|
"""Get a user by email."""
|
|
return db.query(User).filter(User.email == email).first()
|
|
|
|
|
|
def get_user_by_username(db: Session, username: str) -> Optional[User]:
|
|
"""Get a user by username."""
|
|
return db.query(User).filter(User.username == username).first()
|
|
|
|
|
|
def get_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]:
|
|
"""Get a list of users."""
|
|
return db.query(User).offset(skip).limit(limit).all()
|
|
|
|
|
|
def create_user(db: Session, user_in: UserCreate) -> User:
|
|
"""Create a new user."""
|
|
# Check if user already exists
|
|
db_user = get_user_by_email(db, email=user_in.email)
|
|
if db_user:
|
|
raise ValueError("Email already registered")
|
|
|
|
db_user = get_user_by_username(db, username=user_in.username)
|
|
if db_user:
|
|
raise ValueError("Username already taken")
|
|
|
|
# Create the user
|
|
user_data = user_in.model_dump(exclude={"password"})
|
|
db_user = User(**user_data, hashed_password=get_password_hash(user_in.password))
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
|
|
# Create default user preferences
|
|
user_preference = UserPreference(user_id=db_user.id)
|
|
db.add(user_preference)
|
|
db.commit()
|
|
|
|
return db_user
|
|
|
|
|
|
def update_user(db: Session, db_user: User, user_in: UserUpdate) -> User:
|
|
"""Update a user's information."""
|
|
# Update user fields
|
|
update_data = user_in.model_dump(exclude_unset=True)
|
|
|
|
if "password" in update_data:
|
|
hashed_password = get_password_hash(update_data["password"])
|
|
del update_data["password"]
|
|
update_data["hashed_password"] = hashed_password
|
|
|
|
for field, value in update_data.items():
|
|
setattr(db_user, field, value)
|
|
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
|
|
def authenticate_user(db: Session, username: str, password: str) -> Optional[User]:
|
|
"""
|
|
Authenticate a user.
|
|
|
|
Args:
|
|
db: The database session.
|
|
username: The username or email.
|
|
password: The plaintext password.
|
|
|
|
Returns:
|
|
The user if authentication is successful, None otherwise.
|
|
"""
|
|
# Try to find user by username first
|
|
user = get_user_by_username(db, username=username)
|
|
|
|
# If not found, try by email
|
|
if not user:
|
|
user = get_user_by_email(db, email=username)
|
|
|
|
if not user:
|
|
return None
|
|
|
|
if not verify_password(password, user.hashed_password):
|
|
return None
|
|
|
|
return user
|
|
|
|
|
|
def is_active(user: User) -> bool:
|
|
"""Check if a user is active."""
|
|
return user.is_active
|
|
|
|
|
|
def is_superuser(user: User) -> bool:
|
|
"""Check if a user is a superuser."""
|
|
return user.is_superuser
|
|
|
|
|
|
def get_user_preference(db: Session, user_id: int) -> Optional[UserPreference]:
|
|
"""Get a user's preferences."""
|
|
return db.query(UserPreference).filter(UserPreference.user_id == user_id).first()
|
|
|
|
|
|
def update_user_preference(
|
|
db: Session, user_id: int, keywords: Optional[str] = None,
|
|
sources: Optional[str] = None, categories: Optional[str] = None,
|
|
countries: Optional[str] = None, languages: Optional[str] = None
|
|
) -> UserPreference:
|
|
"""Update a user's preferences."""
|
|
user_preference = get_user_preference(db, user_id)
|
|
|
|
if not user_preference:
|
|
# Create preference if it doesn't exist
|
|
user_preference = UserPreference(
|
|
user_id=user_id,
|
|
keywords=keywords,
|
|
sources=sources,
|
|
categories=categories,
|
|
countries=countries,
|
|
languages=languages,
|
|
)
|
|
db.add(user_preference)
|
|
else:
|
|
# Update existing preference
|
|
if keywords is not None:
|
|
user_preference.keywords = keywords
|
|
if sources is not None:
|
|
user_preference.sources = sources
|
|
if categories is not None:
|
|
user_preference.categories = categories
|
|
if countries is not None:
|
|
user_preference.countries = countries
|
|
if languages is not None:
|
|
user_preference.languages = languages
|
|
|
|
db.commit()
|
|
db.refresh(user_preference)
|
|
return user_preference |