
This commit includes: - User registration and authentication API with JWT - Password reset functionality - Role-based access control system - Database models and migrations with SQLAlchemy and Alembic - API documentation in README generated with BackendIM... (backend.im)
113 lines
2.9 KiB
Python
113 lines
2.9 KiB
Python
from typing import List, Optional
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.role import Role
|
|
from app.models.user import User
|
|
from app.models.user_role import UserRole
|
|
from app.schemas.role import RoleCreate, RoleUpdate
|
|
|
|
|
|
def get_role_by_id(db: Session, role_id: int) -> Optional[Role]:
|
|
"""Get a role by ID."""
|
|
return db.query(Role).filter(Role.id == role_id).first()
|
|
|
|
|
|
def get_role_by_name(db: Session, name: str) -> Optional[Role]:
|
|
"""Get a role by name."""
|
|
return db.query(Role).filter(Role.name == name).first()
|
|
|
|
|
|
def get_roles(db: Session, skip: int = 0, limit: int = 100) -> List[Role]:
|
|
"""Get all roles."""
|
|
return db.query(Role).offset(skip).limit(limit).all()
|
|
|
|
|
|
def create_role(db: Session, role_in: RoleCreate) -> Role:
|
|
"""Create a new role."""
|
|
# Check if role already exists
|
|
role = get_role_by_name(db, name=role_in.name)
|
|
if role:
|
|
return None
|
|
|
|
# Create role object
|
|
db_role = Role(
|
|
name=role_in.name,
|
|
description=role_in.description,
|
|
)
|
|
db.add(db_role)
|
|
db.commit()
|
|
db.refresh(db_role)
|
|
return db_role
|
|
|
|
|
|
def update_role(db: Session, role_id: int, role_in: RoleUpdate) -> Optional[Role]:
|
|
"""Update a role."""
|
|
role = get_role_by_id(db, role_id=role_id)
|
|
if not role:
|
|
return None
|
|
|
|
# Update role fields
|
|
update_data = role_in.dict(exclude_unset=True)
|
|
|
|
# Update fields
|
|
for field, value in update_data.items():
|
|
setattr(role, field, value)
|
|
|
|
db.add(role)
|
|
db.commit()
|
|
db.refresh(role)
|
|
return role
|
|
|
|
|
|
def delete_role(db: Session, role_id: int) -> bool:
|
|
"""Delete a role."""
|
|
role = get_role_by_id(db, role_id=role_id)
|
|
if not role:
|
|
return False
|
|
|
|
db.delete(role)
|
|
db.commit()
|
|
return True
|
|
|
|
|
|
def assign_role_to_user(db: Session, user_id: int, role_id: int) -> Optional[UserRole]:
|
|
"""Assign a role to a user."""
|
|
# Check if user and role exist
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
role = db.query(Role).filter(Role.id == role_id).first()
|
|
|
|
if not user or not role:
|
|
return None
|
|
|
|
# Check if user already has the role
|
|
user_role = db.query(UserRole).filter(
|
|
UserRole.user_id == user_id,
|
|
UserRole.role_id == role_id
|
|
).first()
|
|
|
|
if user_role:
|
|
return user_role
|
|
|
|
# Assign role to user
|
|
db_user_role = UserRole(user_id=user_id, role_id=role_id)
|
|
db.add(db_user_role)
|
|
db.commit()
|
|
db.refresh(db_user_role)
|
|
return db_user_role
|
|
|
|
|
|
def remove_role_from_user(db: Session, user_id: int, role_id: int) -> bool:
|
|
"""Remove a role from a user."""
|
|
# Check if user has the role
|
|
user_role = db.query(UserRole).filter(
|
|
UserRole.user_id == user_id,
|
|
UserRole.role_id == role_id
|
|
).first()
|
|
|
|
if not user_role:
|
|
return False
|
|
|
|
db.delete(user_role)
|
|
db.commit()
|
|
return True |