from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.orm import Session from sqlalchemy import desc import json from app.db.session import get_db from app.models.rule import Rule, RuleVersion from app.core.schemas import RuleCreate, RuleUpdate, Rule as RuleSchema, PaginatedResponse router = APIRouter() @router.post("/", response_model=RuleSchema) async def create_rule( rule: RuleCreate, db: Session = Depends(get_db) ): """ Create a new fraud detection rule. Rules define conditions and actions for transaction screening. Each rule can include aggregation conditions for velocity checks. Example rule for velocity check: { "name": "High Velocity Transaction Check", "description": "Flag if user has more than 10 transactions > ₦100,000 in 24 hours", "rule_type": "velocity", "conditions": [ { "field": "amount", "operator": "gt", "value": 100000, "aggregate_function": "count", "time_window": "24h", "group_by": ["user_id"] } ], "actions": [ { "action_type": "flag", "parameters": {"risk_score": 80, "reason": "High velocity detected"} } ], "priority": 1 } """ # Check if rule name already exists existing = db.query(Rule).filter(Rule.name == rule.name).first() if existing: raise HTTPException(status_code=400, detail="Rule with this name already exists") db_rule = Rule( name=rule.name, description=rule.description, rule_type=rule.rule_type, conditions=json.dumps([condition.dict() for condition in rule.conditions]), actions=json.dumps([action.dict() for action in rule.actions]), priority=rule.priority, is_active=rule.is_active, version=1, created_by=rule.created_by ) db.add(db_rule) db.commit() db.refresh(db_rule) # Also save to rule versions for audit trail rule_version = RuleVersion( rule_id=db_rule.id, version=1, name=db_rule.name, description=db_rule.description, rule_type=db_rule.rule_type, conditions=db_rule.conditions, actions=db_rule.actions, priority=db_rule.priority, created_by=db_rule.created_by ) db.add(rule_version) db.commit() # Convert to response format result = RuleSchema.from_orm(db_rule) result.conditions = json.loads(db_rule.conditions) result.actions = json.loads(db_rule.actions) return result @router.get("/", response_model=PaginatedResponse) async def get_rules( page: int = Query(1, ge=1), page_size: int = Query(100, ge=1, le=1000), rule_type: Optional[str] = None, is_active: Optional[bool] = None, db: Session = Depends(get_db) ): """ Retrieve all fraud detection rules with filtering and pagination. """ query = db.query(Rule) # Apply filters if rule_type: query = query.filter(Rule.rule_type == rule_type) if is_active is not None: query = query.filter(Rule.is_active == is_active) # Get total count total = query.count() # Apply pagination offset = (page - 1) * page_size rules = query.order_by(desc(Rule.priority), desc(Rule.created_at)).offset(offset).limit(page_size).all() # Convert to response format items = [] for rule in rules: result = RuleSchema.from_orm(rule) result.conditions = json.loads(rule.conditions) result.actions = json.loads(rule.actions) items.append(result) return PaginatedResponse( items=items, total=total, page=page, page_size=page_size, total_pages=(total + page_size - 1) // page_size ) @router.get("/{rule_id}", response_model=RuleSchema) async def get_rule( rule_id: int, db: Session = Depends(get_db) ): """ Retrieve a specific rule by ID. """ rule = db.query(Rule).filter(Rule.id == rule_id).first() if not rule: raise HTTPException(status_code=404, detail="Rule not found") result = RuleSchema.from_orm(rule) result.conditions = json.loads(rule.conditions) result.actions = json.loads(rule.actions) return result @router.put("/{rule_id}", response_model=RuleSchema) async def update_rule( rule_id: int, rule_update: RuleUpdate, db: Session = Depends(get_db) ): """ Update an existing rule and create a new version. This endpoint creates a new version of the rule for audit purposes while updating the main rule record. """ rule = db.query(Rule).filter(Rule.id == rule_id).first() if not rule: raise HTTPException(status_code=404, detail="Rule not found") # Save current version to rule_versions before updating current_version = RuleVersion( rule_id=rule.id, version=rule.version, name=rule.name, description=rule.description, rule_type=rule.rule_type, conditions=rule.conditions, actions=rule.actions, priority=rule.priority, created_by=rule.created_by ) db.add(current_version) # Update rule with new values if rule_update.name is not None: # Check if new name conflicts with existing rules existing = db.query(Rule).filter(Rule.name == rule_update.name, Rule.id != rule_id).first() if existing: raise HTTPException(status_code=400, detail="Rule with this name already exists") rule.name = rule_update.name if rule_update.description is not None: rule.description = rule_update.description if rule_update.conditions is not None: rule.conditions = json.dumps([condition.dict() for condition in rule_update.conditions]) if rule_update.actions is not None: rule.actions = json.dumps([action.dict() for action in rule_update.actions]) if rule_update.priority is not None: rule.priority = rule_update.priority if rule_update.is_active is not None: rule.is_active = rule_update.is_active # Increment version rule.version += 1 db.commit() db.refresh(rule) # Convert to response format result = RuleSchema.from_orm(rule) result.conditions = json.loads(rule.conditions) result.actions = json.loads(rule.actions) return result @router.delete("/{rule_id}") async def delete_rule( rule_id: int, db: Session = Depends(get_db) ): """ Soft delete a rule by marking it as inactive. Rules are not permanently deleted to maintain audit trail. """ rule = db.query(Rule).filter(Rule.id == rule_id).first() if not rule: raise HTTPException(status_code=404, detail="Rule not found") rule.is_active = False db.commit() return {"message": "Rule deactivated successfully"} @router.get("/{rule_id}/versions") async def get_rule_versions( rule_id: int, db: Session = Depends(get_db) ): """ Retrieve version history for a specific rule. """ rule = db.query(Rule).filter(Rule.id == rule_id).first() if not rule: raise HTTPException(status_code=404, detail="Rule not found") versions = db.query(RuleVersion).filter(RuleVersion.rule_id == rule_id).order_by(desc(RuleVersion.version)).all() # Convert to response format result_versions = [] for version in versions: version_dict = { "id": version.id, "rule_id": version.rule_id, "version": version.version, "name": version.name, "description": version.description, "rule_type": version.rule_type, "conditions": json.loads(version.conditions), "actions": json.loads(version.actions), "priority": version.priority, "created_by": version.created_by, "created_at": version.created_at } result_versions.append(version_dict) return result_versions @router.post("/{rule_id}/rollback/{version}") async def rollback_rule( rule_id: int, version: int, db: Session = Depends(get_db) ): """ Rollback a rule to a previous version. """ rule = db.query(Rule).filter(Rule.id == rule_id).first() if not rule: raise HTTPException(status_code=404, detail="Rule not found") target_version = db.query(RuleVersion).filter( RuleVersion.rule_id == rule_id, RuleVersion.version == version ).first() if not target_version: raise HTTPException(status_code=404, detail="Rule version not found") # Save current version before rollback current_version = RuleVersion( rule_id=rule.id, version=rule.version, name=rule.name, description=rule.description, rule_type=rule.rule_type, conditions=rule.conditions, actions=rule.actions, priority=rule.priority, created_by=rule.created_by ) db.add(current_version) # Rollback to target version rule.name = target_version.name rule.description = target_version.description rule.rule_type = target_version.rule_type rule.conditions = target_version.conditions rule.actions = target_version.actions rule.priority = target_version.priority rule.version += 1 # Increment version even for rollback db.commit() db.refresh(rule) # Convert to response format result = RuleSchema.from_orm(rule) result.conditions = json.loads(rule.conditions) result.actions = json.loads(rule.actions) return result