from sqlalchemy.orm import Session from sqlalchemy.exc import IntegrityError from fastapi import HTTPException, status from typing import List, Optional from app.models.user import User, UserRole from app.schemas.user import UserCreate, UserUpdate from app.core.security import get_password_hash from app.services.audit import AuditService class UserService: def __init__(self, db: Session): self.db = db self.audit_service = AuditService(db) def get_users(self, organization_id: int, skip: int = 0, limit: int = 100) -> List[User]: return self.db.query(User).filter( User.organization_id == organization_id ).offset(skip).limit(limit).all() def get_user(self, user_id: int, organization_id: int) -> Optional[User]: return self.db.query(User).filter( User.id == user_id, User.organization_id == organization_id ).first() def create_user( self, user_data: UserCreate, current_user: User, ip_address: str, user_agent: str ) -> User: # Ensure user is created within the same organization if user_data.organization_id != current_user.organization_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Cannot create user in different organization" ) try: hashed_password = get_password_hash(user_data.password) db_user = User( email=user_data.email, username=user_data.username, hashed_password=hashed_password, first_name=user_data.first_name, last_name=user_data.last_name, role=user_data.role, is_active=user_data.is_active, organization_id=user_data.organization_id ) self.db.add(db_user) self.db.commit() self.db.refresh(db_user) # Log user creation self.audit_service.log_user_activity( user=current_user, action="create", resource_type="user", resource_id=str(db_user.id), details={ "created_user_email": db_user.email, "created_user_role": db_user.role.value }, ip_address=ip_address, user_agent=user_agent ) return db_user except IntegrityError: self.db.rollback() raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email or username already exists" ) def update_user( self, user_id: int, user_update: UserUpdate, current_user: User, ip_address: str, user_agent: str ) -> User: db_user = self.get_user(user_id, current_user.organization_id) if not db_user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Only allow role updates by org admins or super admins if (user_update.role and current_user.role not in [UserRole.ORG_ADMIN, UserRole.SUPER_ADMIN]): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions to change user role" ) update_data = user_update.dict(exclude_unset=True) old_values = {field: getattr(db_user, field) for field in update_data.keys()} for field, value in update_data.items(): setattr(db_user, field, value) try: self.db.commit() self.db.refresh(db_user) # Log user update self.audit_service.log_user_activity( user=current_user, action="update", resource_type="user", resource_id=str(db_user.id), details={ "updated_fields": list(update_data.keys()), "old_values": old_values, "new_values": update_data }, ip_address=ip_address, user_agent=user_agent ) return db_user except IntegrityError: self.db.rollback() raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email or username already exists" ) def delete_user( self, user_id: int, current_user: User, ip_address: str, user_agent: str ) -> bool: db_user = self.get_user(user_id, current_user.organization_id) if not db_user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Prevent self-deletion if db_user.id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete your own account" ) # Log user deletion before deleting self.audit_service.log_user_activity( user=current_user, action="delete", resource_type="user", resource_id=str(db_user.id), details={ "deleted_user_email": db_user.email, "deleted_user_role": db_user.role.value }, ip_address=ip_address, user_agent=user_agent ) self.db.delete(db_user) self.db.commit() return True