132 lines
3.6 KiB
Python

from datetime import datetime, timedelta
from typing import List, Optional
from sqlalchemy.orm import Session
from app.core.config import settings
from app.models.role import Role
from app.models.user import User
from app.models.user_role import UserRole
from app.schemas.user import UserCreate, UserUpdate
from app.utils.security import get_password_hash, verify_password
def get_user_by_id(db: Session, user_id: int) -> Optional[User]:
"""Get a user by ID."""
return db.query(User).filter(User.id == user_id).first()
def get_user_by_email(db: Session, email: str) -> Optional[User]:
"""Get a user by email."""
return db.query(User).filter(User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]:
"""Get all users."""
return db.query(User).offset(skip).limit(limit).all()
def create_user(db: Session, user_in: UserCreate) -> User:
"""Create a new user."""
# Check if user already exists
user = get_user_by_email(db, email=user_in.email)
if user:
return None
# Create user object
db_user = User(
email=user_in.email,
hashed_password=get_password_hash(user_in.password),
first_name=user_in.first_name,
last_name=user_in.last_name,
is_active=True,
is_verified=False, # Default to not verified
)
db.add(db_user)
db.commit()
db.refresh(db_user)
# Add default user role
default_role = db.query(Role).filter(Role.name == "user").first()
if default_role:
db_user_role = UserRole(user_id=db_user.id, role_id=default_role.id)
db.add(db_user_role)
db.commit()
else:
# Create default role if it doesn't exist
default_role = Role(name="user", description="Regular user with limited access")
db.add(default_role)
db.commit()
db.refresh(default_role)
# Now assign it
db_user_role = UserRole(user_id=db_user.id, role_id=default_role.id)
db.add(db_user_role)
db.commit()
return db_user
def update_user(db: Session, user_id: int, user_in: UserUpdate) -> Optional[User]:
"""Update a user."""
user = get_user_by_id(db, user_id=user_id)
if not user:
return None
# Update user fields
update_data = user_in.dict(exclude_unset=True)
# Handle password update separately
if "password" in update_data:
password = update_data.pop("password")
user.hashed_password = get_password_hash(password)
# Update other fields
for field, value in update_data.items():
setattr(user, field, value)
user.updated_at = datetime.utcnow()
db.add(user)
db.commit()
db.refresh(user)
return user
def delete_user(db: Session, user_id: int) -> bool:
"""Delete a user."""
user = get_user_by_id(db, user_id=user_id)
if not user:
return False
db.delete(user)
db.commit()
return True
def verify_user(db: Session, user_id: int) -> Optional[User]:
"""Mark a user as verified."""
user = get_user_by_id(db, user_id=user_id)
if not user:
return None
user.is_verified = True
user.updated_at = datetime.utcnow()
db.add(user)
db.commit()
db.refresh(user)
return user
def authenticate_user(db: Session, email: str, password: str) -> Optional[User]:
"""Authenticate a user."""
user = get_user_by_email(db, email=email)
if not user:
return None
if not user.is_active:
return None
if not verify_password(password, user.hashed_password):
return None
return user