Automated Action 2adbcd0535 Complete multi-tenant SaaS platform with external integrations
- Implemented comprehensive multi-tenant data isolation using database-level security
- Built JWT authentication system with role-based access control (Super Admin, Org Admin, User, Viewer)
- Created RESTful API endpoints for user and organization operations
- Added complete audit logging for all data modifications with IP tracking
- Implemented API rate limiting and input validation with security middleware
- Built webhook processing engine with async event handling and retry logic
- Created external API call handlers with circuit breaker pattern and error handling
- Implemented data synchronization between external services and internal data
- Added integration health monitoring and status tracking
- Created three mock external services (User Management, Payment, Communication)
- Implemented idempotency for webhook processing to handle duplicates gracefully
- Added comprehensive security headers and XSS/CSRF protection
- Set up Alembic database migrations with proper SQLite configuration
- Included extensive documentation and API examples

Architecture features:
- Multi-tenant isolation at database level
- Circuit breaker pattern for external API resilience
- Async background task processing
- Complete audit trail with user context
- Role-based permission system
- Webhook signature verification
- Request validation and sanitization
- Health monitoring endpoints

Co-Authored-By: Claude <noreply@anthropic.com>
2025-06-27 21:14:30 +00:00

124 lines
4.5 KiB
Python

from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from fastapi import HTTPException, status
from app.models.user import User, UserRole
from app.models.tenant import Organization
from app.schemas.auth import RegisterRequest, LoginRequest
from app.core.security import verify_password, get_password_hash, create_access_token
from app.services.audit import AuditService
from typing import Optional
class AuthService:
def __init__(self, db: Session):
self.db = db
self.audit_service = AuditService(db)
def authenticate_user(self, email: str, password: str) -> Optional[User]:
user = self.db.query(User).filter(User.email == email).first()
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def login(self, login_data: LoginRequest, ip_address: str, user_agent: str):
user = self.authenticate_user(login_data.email, login_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
access_token = create_access_token(data={"sub": str(user.id)})
# Log login activity
self.audit_service.log_user_activity(
user=user,
action="login",
resource_type="authentication",
ip_address=ip_address,
user_agent=user_agent
)
# Update last login
from datetime import datetime
user.last_login = datetime.utcnow()
self.db.commit()
return {"access_token": access_token, "token_type": "bearer"}
def register(self, register_data: RegisterRequest, ip_address: str, user_agent: str):
try:
# Create organization first
organization = Organization(
name=register_data.organization_name,
domain=register_data.organization_domain,
subdomain=register_data.organization_subdomain
)
self.db.add(organization)
self.db.flush() # Get the ID without committing
# Create user as org admin
hashed_password = get_password_hash(register_data.password)
user = User(
email=register_data.email,
username=register_data.username,
hashed_password=hashed_password,
first_name=register_data.first_name,
last_name=register_data.last_name,
role=UserRole.ORG_ADMIN,
organization_id=organization.id,
is_verified=True # Auto-verify for demo
)
self.db.add(user)
self.db.commit()
# Log registration
self.audit_service.log_action(
organization_id=organization.id,
user_id=user.id,
action="create",
resource_type="user_registration",
resource_id=str(user.id),
details={"role": user.role.value},
ip_address=ip_address,
user_agent=user_agent
)
return user
except IntegrityError as e:
self.db.rollback()
if "email" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
elif "username" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already taken"
)
elif "domain" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Domain already registered"
)
elif "subdomain" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Subdomain already taken"
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Registration failed"
)