Automated Action 69f6a404bd Add user authentication to Todo application
- Create User model and schema
- Implement password hashing with bcrypt
- Add JWT token-based authentication
- Create user and auth endpoints
- Update todo endpoints with user authentication
- Add alembic migration for user model
- Update README with new features
2025-05-16 02:07:51 +00:00

101 lines
2.4 KiB
Python

from typing import List, Optional
from sqlalchemy.orm import Session
from app.core.security import get_password_hash, verify_password
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
def get_user(db: Session, user_id: int) -> Optional[User]:
"""
Get a user by ID
"""
return db.query(User).filter(User.id == user_id).first()
def get_user_by_email(db: Session, email: str) -> Optional[User]:
"""
Get a user by email
"""
return db.query(User).filter(User.email == email).first()
def get_user_by_username(db: Session, username: str) -> Optional[User]:
"""
Get a user by username
"""
return db.query(User).filter(User.username == username).first()
def get_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]:
"""
Get a list of users with pagination
"""
return db.query(User).offset(skip).limit(limit).all()
def create_user(db: Session, user: UserCreate) -> User:
"""
Create a new user
"""
hashed_password = get_password_hash(user.password)
db_user = User(
email=user.email,
username=user.username,
hashed_password=hashed_password,
is_active=user.is_active,
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def update_user(
db: Session, user_id: int, user: UserUpdate
) -> Optional[User]:
"""
Update user information
"""
db_user = get_user(db, user_id)
if not db_user:
return None
update_data = user.model_dump(exclude_unset=True)
# Hash the password if it's being updated
if "password" in update_data:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
for key, value in update_data.items():
setattr(db_user, key, value)
db.commit()
db.refresh(db_user)
return db_user
def delete_user(db: Session, user_id: int) -> bool:
"""
Delete a user
"""
db_user = get_user(db, user_id)
if not db_user:
return False
db.delete(db_user)
db.commit()
return True
def authenticate_user(db: Session, username: str, password: str) -> Optional[User]:
"""
Authenticate a user by username and password
"""
user = get_user_by_username(db, username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user