from typing import List, Optional from sqlalchemy.orm import Session from app.models.role import Role from app.models.user import User from app.models.user_role import UserRole from app.schemas.role import RoleCreate, RoleUpdate def get_role_by_id(db: Session, role_id: int) -> Optional[Role]: """Get a role by ID.""" return db.query(Role).filter(Role.id == role_id).first() def get_role_by_name(db: Session, name: str) -> Optional[Role]: """Get a role by name.""" return db.query(Role).filter(Role.name == name).first() def get_roles(db: Session, skip: int = 0, limit: int = 100) -> List[Role]: """Get all roles.""" return db.query(Role).offset(skip).limit(limit).all() def create_role(db: Session, role_in: RoleCreate) -> Role: """Create a new role.""" # Check if role already exists role = get_role_by_name(db, name=role_in.name) if role: return None # Create role object db_role = Role( name=role_in.name, description=role_in.description, ) db.add(db_role) db.commit() db.refresh(db_role) return db_role def update_role(db: Session, role_id: int, role_in: RoleUpdate) -> Optional[Role]: """Update a role.""" role = get_role_by_id(db, role_id=role_id) if not role: return None # Update role fields update_data = role_in.dict(exclude_unset=True) # Update fields for field, value in update_data.items(): setattr(role, field, value) db.add(role) db.commit() db.refresh(role) return role def delete_role(db: Session, role_id: int) -> bool: """Delete a role.""" role = get_role_by_id(db, role_id=role_id) if not role: return False db.delete(role) db.commit() return True def assign_role_to_user(db: Session, user_id: int, role_id: int) -> Optional[UserRole]: """Assign a role to a user.""" # Check if user and role exist user = db.query(User).filter(User.id == user_id).first() role = db.query(Role).filter(Role.id == role_id).first() if not user or not role: return None # Check if user already has the role user_role = db.query(UserRole).filter( UserRole.user_id == user_id, UserRole.role_id == role_id ).first() if user_role: return user_role # Assign role to user db_user_role = UserRole(user_id=user_id, role_id=role_id) db.add(db_user_role) db.commit() db.refresh(db_user_role) return db_user_role def remove_role_from_user(db: Session, user_id: int, role_id: int) -> bool: """Remove a role from a user.""" # Check if user has the role user_role = db.query(UserRole).filter( UserRole.user_id == user_id, UserRole.role_id == role_id ).first() if not user_role: return False db.delete(user_role) db.commit() return True