Automated Action 5b55eedd2b Implement User Authentication and Authorization Service
This commit includes:
- User registration and authentication API with JWT
- Password reset functionality
- Role-based access control system
- Database models and migrations with SQLAlchemy and Alembic
- API documentation in README

generated with BackendIM... (backend.im)
2025-05-15 19:46:38 +00:00

113 lines
2.9 KiB
Python

from typing import List, Optional
from sqlalchemy.orm import Session
from app.models.role import Role
from app.models.user import User
from app.models.user_role import UserRole
from app.schemas.role import RoleCreate, RoleUpdate
def get_role_by_id(db: Session, role_id: int) -> Optional[Role]:
"""Get a role by ID."""
return db.query(Role).filter(Role.id == role_id).first()
def get_role_by_name(db: Session, name: str) -> Optional[Role]:
"""Get a role by name."""
return db.query(Role).filter(Role.name == name).first()
def get_roles(db: Session, skip: int = 0, limit: int = 100) -> List[Role]:
"""Get all roles."""
return db.query(Role).offset(skip).limit(limit).all()
def create_role(db: Session, role_in: RoleCreate) -> Role:
"""Create a new role."""
# Check if role already exists
role = get_role_by_name(db, name=role_in.name)
if role:
return None
# Create role object
db_role = Role(
name=role_in.name,
description=role_in.description,
)
db.add(db_role)
db.commit()
db.refresh(db_role)
return db_role
def update_role(db: Session, role_id: int, role_in: RoleUpdate) -> Optional[Role]:
"""Update a role."""
role = get_role_by_id(db, role_id=role_id)
if not role:
return None
# Update role fields
update_data = role_in.dict(exclude_unset=True)
# Update fields
for field, value in update_data.items():
setattr(role, field, value)
db.add(role)
db.commit()
db.refresh(role)
return role
def delete_role(db: Session, role_id: int) -> bool:
"""Delete a role."""
role = get_role_by_id(db, role_id=role_id)
if not role:
return False
db.delete(role)
db.commit()
return True
def assign_role_to_user(db: Session, user_id: int, role_id: int) -> Optional[UserRole]:
"""Assign a role to a user."""
# Check if user and role exist
user = db.query(User).filter(User.id == user_id).first()
role = db.query(Role).filter(Role.id == role_id).first()
if not user or not role:
return None
# Check if user already has the role
user_role = db.query(UserRole).filter(
UserRole.user_id == user_id,
UserRole.role_id == role_id
).first()
if user_role:
return user_role
# Assign role to user
db_user_role = UserRole(user_id=user_id, role_id=role_id)
db.add(db_user_role)
db.commit()
db.refresh(db_user_role)
return db_user_role
def remove_role_from_user(db: Session, user_id: int, role_id: int) -> bool:
"""Remove a role from a user."""
# Check if user has the role
user_role = db.query(UserRole).filter(
UserRole.user_id == user_id,
UserRole.role_id == role_id
).first()
if not user_role:
return False
db.delete(user_role)
db.commit()
return True