139 lines
3.6 KiB
Python
139 lines
3.6 KiB
Python
from typing import Optional, Dict, Union, List
|
|
from datetime import datetime
|
|
import re
|
|
from pydantic import EmailStr
|
|
from sqlalchemy.orm import Session
|
|
from models.user import User
|
|
from schemas.user import UserCreate, UserUpdate
|
|
from passlib.context import CryptContext
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
def validate_user_data(username: str, email: str, password: str) -> Dict[str, str]:
|
|
"""
|
|
Validate user registration data.
|
|
|
|
Args:
|
|
username: Username to validate
|
|
email: Email to validate
|
|
password: Password to validate
|
|
|
|
Returns:
|
|
Dict with error messages if validation fails, empty dict if validation passes
|
|
"""
|
|
errors = {}
|
|
|
|
if len(username) < 3 or len(username) > 50:
|
|
errors["username"] = "Username must be between 3 and 50 characters"
|
|
|
|
email_pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
|
|
if not re.match(email_pattern, email):
|
|
errors["email"] = "Invalid email format"
|
|
|
|
if len(password) < 8:
|
|
errors["password"] = "Password must be at least 8 characters long"
|
|
|
|
return errors
|
|
|
|
def hash_password(password: str) -> str:
|
|
"""
|
|
Hash a password using bcrypt.
|
|
|
|
Args:
|
|
password: Plain text password
|
|
|
|
Returns:
|
|
Hashed password
|
|
"""
|
|
return pwd_context.hash(password)
|
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
|
"""
|
|
Verify a password against its hash.
|
|
|
|
Args:
|
|
plain_password: Password to verify
|
|
hashed_password: Stored hash to check against
|
|
|
|
Returns:
|
|
True if password matches, False otherwise
|
|
"""
|
|
return pwd_context.verify(plain_password, hashed_password)
|
|
|
|
def update_last_login(db: Session, user: User) -> User:
|
|
"""
|
|
Update user's last login timestamp.
|
|
|
|
Args:
|
|
db: Database session
|
|
user: User object to update
|
|
|
|
Returns:
|
|
Updated user object
|
|
"""
|
|
user.last_login = datetime.utcnow()
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
def format_user_response(user: User) -> Dict[str, Any]:
|
|
"""
|
|
Format user data for API response.
|
|
|
|
Args:
|
|
user: User object to format
|
|
|
|
Returns:
|
|
Dictionary with formatted user data
|
|
"""
|
|
return {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"first_name": user.first_name,
|
|
"last_name": user.last_name,
|
|
"is_active": user.is_active,
|
|
"is_verified": user.is_verified,
|
|
"last_login": user.last_login.isoformat() if user.last_login else None
|
|
}
|
|
|
|
def search_users(
|
|
db: Session,
|
|
search_term: str,
|
|
skip: int = 0,
|
|
limit: int = 10
|
|
) -> List[User]:
|
|
"""
|
|
Search users by username, first name, or last name.
|
|
|
|
Args:
|
|
db: Database session
|
|
search_term: Term to search for
|
|
skip: Number of records to skip
|
|
limit: Maximum number of records to return
|
|
|
|
Returns:
|
|
List of matching user objects
|
|
"""
|
|
return db.query(User).filter(
|
|
(User.username.ilike(f"%{search_term}%")) |
|
|
(User.first_name.ilike(f"%{search_term}%")) |
|
|
(User.last_name.ilike(f"%{search_term}%"))
|
|
).offset(skip).limit(limit).all()
|
|
|
|
def toggle_user_status(db: Session, user: User, activate: bool) -> User:
|
|
"""
|
|
Toggle user active status.
|
|
|
|
Args:
|
|
db: Database session
|
|
user: User object to update
|
|
activate: True to activate, False to deactivate
|
|
|
|
Returns:
|
|
Updated user object
|
|
"""
|
|
user.is_active = activate
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user |