
- Create User model and schema - Implement password hashing with bcrypt - Add JWT token-based authentication - Create user and auth endpoints - Update todo endpoints with user authentication - Add alembic migration for user model - Update README with new features
101 lines
2.4 KiB
Python
101 lines
2.4 KiB
Python
from typing import List, Optional
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.security import get_password_hash, verify_password
|
|
from app.models.user import User
|
|
from app.schemas.user import UserCreate, UserUpdate
|
|
|
|
|
|
def get_user(db: Session, user_id: int) -> Optional[User]:
|
|
"""
|
|
Get a user by ID
|
|
"""
|
|
return db.query(User).filter(User.id == user_id).first()
|
|
|
|
|
|
def get_user_by_email(db: Session, email: str) -> Optional[User]:
|
|
"""
|
|
Get a user by email
|
|
"""
|
|
return db.query(User).filter(User.email == email).first()
|
|
|
|
|
|
def get_user_by_username(db: Session, username: str) -> Optional[User]:
|
|
"""
|
|
Get a user by username
|
|
"""
|
|
return db.query(User).filter(User.username == username).first()
|
|
|
|
|
|
def get_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]:
|
|
"""
|
|
Get a list of users with pagination
|
|
"""
|
|
return db.query(User).offset(skip).limit(limit).all()
|
|
|
|
|
|
def create_user(db: Session, user: UserCreate) -> User:
|
|
"""
|
|
Create a new user
|
|
"""
|
|
hashed_password = get_password_hash(user.password)
|
|
db_user = User(
|
|
email=user.email,
|
|
username=user.username,
|
|
hashed_password=hashed_password,
|
|
is_active=user.is_active,
|
|
)
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
|
|
def update_user(
|
|
db: Session, user_id: int, user: UserUpdate
|
|
) -> Optional[User]:
|
|
"""
|
|
Update user information
|
|
"""
|
|
db_user = get_user(db, user_id)
|
|
if not db_user:
|
|
return None
|
|
|
|
update_data = user.model_dump(exclude_unset=True)
|
|
|
|
# Hash the password if it's being updated
|
|
if "password" in update_data:
|
|
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
|
|
|
|
for key, value in update_data.items():
|
|
setattr(db_user, key, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
|
|
def delete_user(db: Session, user_id: int) -> bool:
|
|
"""
|
|
Delete a user
|
|
"""
|
|
db_user = get_user(db, user_id)
|
|
if not db_user:
|
|
return False
|
|
|
|
db.delete(db_user)
|
|
db.commit()
|
|
return True
|
|
|
|
|
|
def authenticate_user(db: Session, username: str, password: str) -> Optional[User]:
|
|
"""
|
|
Authenticate a user by username and password
|
|
"""
|
|
user = get_user_by_username(db, username)
|
|
if not user:
|
|
return None
|
|
if not verify_password(password, user.hashed_password):
|
|
return None
|
|
return user |