130 lines
3.3 KiB
Python
130 lines
3.3 KiB
Python
from typing import Optional, Dict, Union
|
|
import re
|
|
from passlib.context import CryptContext
|
|
from sqlalchemy.orm import Session
|
|
from models.user import User
|
|
from schemas.user import UserCreate
|
|
from email_validator import validate_email, EmailNotValidError
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
def validate_username(username: str) -> bool:
|
|
"""
|
|
Validate username format and length.
|
|
|
|
Args:
|
|
username: Username to validate
|
|
|
|
Returns:
|
|
bool: True if valid, False otherwise
|
|
"""
|
|
if not 3 <= len(username) <= 32:
|
|
return False
|
|
pattern = r'^[a-zA-Z0-9_-]+$'
|
|
return bool(re.match(pattern, username))
|
|
|
|
def validate_user_email(email: str) -> bool:
|
|
"""
|
|
Validate email format using email-validator.
|
|
|
|
Args:
|
|
email: Email to validate
|
|
|
|
Returns:
|
|
bool: True if valid, False otherwise
|
|
"""
|
|
try:
|
|
validate_email(email)
|
|
return True
|
|
except EmailNotValidError:
|
|
return False
|
|
|
|
def hash_password(password: str) -> str:
|
|
"""
|
|
Hash password using bcrypt.
|
|
|
|
Args:
|
|
password: Plain text password
|
|
|
|
Returns:
|
|
str: Hashed password
|
|
"""
|
|
return pwd_context.hash(password)
|
|
|
|
def check_user_exists(db: Session, username: str, email: str) -> Optional[User]:
|
|
"""
|
|
Check if user already exists by username or email.
|
|
|
|
Args:
|
|
db: Database session
|
|
username: Username to check
|
|
email: Email to check
|
|
|
|
Returns:
|
|
Optional[User]: Existing user if found
|
|
"""
|
|
return db.query(User).filter(
|
|
(User.username == username) | (User.email == email)
|
|
).first()
|
|
|
|
def create_new_user(db: Session, user_data: UserCreate) -> Union[User, Dict[str, str]]:
|
|
"""
|
|
Create new user with validation and error handling.
|
|
|
|
Args:
|
|
db: Database session
|
|
user_data: User creation data
|
|
|
|
Returns:
|
|
Union[User, Dict[str, str]]: Created user or error dict
|
|
"""
|
|
# Validate username
|
|
if not validate_username(user_data.username):
|
|
return {"error": "Invalid username format"}
|
|
|
|
# Validate email
|
|
if not validate_user_email(user_data.email):
|
|
return {"error": "Invalid email format"}
|
|
|
|
# Check existing user
|
|
existing_user = check_user_exists(db, user_data.username, user_data.email)
|
|
if existing_user:
|
|
return {"error": "Username or email already registered"}
|
|
|
|
# Create user
|
|
hashed_password = hash_password(user_data.password)
|
|
db_user = User(
|
|
username=user_data.username,
|
|
email=user_data.email,
|
|
password=hashed_password,
|
|
first_name=user_data.first_name,
|
|
last_name=user_data.last_name,
|
|
is_active=True,
|
|
is_verified=False
|
|
)
|
|
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
|
|
return db_user
|
|
|
|
def format_user_response(user: User) -> Dict:
|
|
"""
|
|
Format user object for API response.
|
|
|
|
Args:
|
|
user: User object
|
|
|
|
Returns:
|
|
Dict: Formatted user data
|
|
"""
|
|
return {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"first_name": user.first_name,
|
|
"last_name": user.last_name,
|
|
"is_active": user.is_active,
|
|
"is_verified": user.is_verified
|
|
} |