from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.orm import Session from sqlalchemy import desc import json from app.db.session import get_db from app.models.transaction import Transaction from app.core.schemas import TransactionCreate, TransactionUpdate, Transaction as TransactionSchema, PaginatedResponse router = APIRouter() @router.post("/", response_model=TransactionSchema) async def create_transaction( transaction: TransactionCreate, db: Session = Depends(get_db) ): """ Ingest a new transaction into the system. This endpoint accepts transaction data and stores it for fraud screening. The transaction will be available for real-time screening immediately. """ # Check if transaction already exists existing = db.query(Transaction).filter(Transaction.transaction_id == transaction.transaction_id).first() if existing: raise HTTPException(status_code=400, detail="Transaction with this ID already exists") # Convert metadata to JSON string if provided metadata_json = json.dumps(transaction.metadata) if transaction.metadata else None db_transaction = Transaction( transaction_id=transaction.transaction_id, user_id=transaction.user_id, account_id=transaction.account_id, amount=transaction.amount, currency=transaction.currency, transaction_type=transaction.transaction_type, merchant_id=transaction.merchant_id, merchant_category=transaction.merchant_category, channel=transaction.channel, location=transaction.location, ip_address=transaction.ip_address, device_id=transaction.device_id, status=transaction.status, metadata=metadata_json ) db.add(db_transaction) db.commit() db.refresh(db_transaction) # Convert metadata back to dict for response result = TransactionSchema.from_orm(db_transaction) if db_transaction.metadata: result.metadata = json.loads(db_transaction.metadata) return result @router.post("/bulk", response_model=List[TransactionSchema]) async def create_transactions_bulk( transactions: List[TransactionCreate], db: Session = Depends(get_db) ): """ Ingest multiple transactions in bulk. This endpoint is optimized for high-volume transaction ingestion. Duplicate transaction IDs will be skipped. """ created_transactions = [] for transaction in transactions: # Check if transaction already exists existing = db.query(Transaction).filter(Transaction.transaction_id == transaction.transaction_id).first() if existing: continue metadata_json = json.dumps(transaction.metadata) if transaction.metadata else None db_transaction = Transaction( transaction_id=transaction.transaction_id, user_id=transaction.user_id, account_id=transaction.account_id, amount=transaction.amount, currency=transaction.currency, transaction_type=transaction.transaction_type, merchant_id=transaction.merchant_id, merchant_category=transaction.merchant_category, channel=transaction.channel, location=transaction.location, ip_address=transaction.ip_address, device_id=transaction.device_id, status=transaction.status, metadata=metadata_json ) db.add(db_transaction) created_transactions.append(db_transaction) db.commit() # Refresh and convert to response format results = [] for db_transaction in created_transactions: db.refresh(db_transaction) result = TransactionSchema.from_orm(db_transaction) if db_transaction.metadata: result.metadata = json.loads(db_transaction.metadata) results.append(result) return results @router.get("/", response_model=PaginatedResponse) async def get_transactions( page: int = Query(1, ge=1), page_size: int = Query(100, ge=1, le=1000), user_id: Optional[str] = None, account_id: Optional[str] = None, transaction_type: Optional[str] = None, channel: Optional[str] = None, status: Optional[str] = None, min_amount: Optional[float] = None, max_amount: Optional[float] = None, db: Session = Depends(get_db) ): """ Retrieve transactions with filtering and pagination. Supports filtering by various transaction attributes and returns paginated results for efficient data retrieval. """ query = db.query(Transaction) # Apply filters if user_id: query = query.filter(Transaction.user_id == user_id) if account_id: query = query.filter(Transaction.account_id == account_id) if transaction_type: query = query.filter(Transaction.transaction_type == transaction_type) if channel: query = query.filter(Transaction.channel == channel) if status: query = query.filter(Transaction.status == status) if min_amount is not None: query = query.filter(Transaction.amount >= min_amount) if max_amount is not None: query = query.filter(Transaction.amount <= max_amount) # Get total count total = query.count() # Apply pagination offset = (page - 1) * page_size transactions = query.order_by(desc(Transaction.created_at)).offset(offset).limit(page_size).all() # Convert to response format items = [] for transaction in transactions: result = TransactionSchema.from_orm(transaction) if transaction.metadata: result.metadata = json.loads(transaction.metadata) items.append(result) return PaginatedResponse( items=items, total=total, page=page, page_size=page_size, total_pages=(total + page_size - 1) // page_size ) @router.get("/{transaction_id}", response_model=TransactionSchema) async def get_transaction( transaction_id: str, db: Session = Depends(get_db) ): """ Retrieve a specific transaction by ID. """ transaction = db.query(Transaction).filter(Transaction.transaction_id == transaction_id).first() if not transaction: raise HTTPException(status_code=404, detail="Transaction not found") result = TransactionSchema.from_orm(transaction) if transaction.metadata: result.metadata = json.loads(transaction.metadata) return result @router.patch("/{transaction_id}", response_model=TransactionSchema) async def update_transaction( transaction_id: str, transaction_update: TransactionUpdate, db: Session = Depends(get_db) ): """ Update a transaction's status or metadata. This endpoint allows updating transaction status (e.g., from pending to completed) and adding additional metadata without modifying core transaction data. """ transaction = db.query(Transaction).filter(Transaction.transaction_id == transaction_id).first() if not transaction: raise HTTPException(status_code=404, detail="Transaction not found") if transaction_update.status is not None: transaction.status = transaction_update.status if transaction_update.metadata is not None: # Merge with existing metadata existing_metadata = json.loads(transaction.metadata) if transaction.metadata else {} existing_metadata.update(transaction_update.metadata) transaction.metadata = json.dumps(existing_metadata) db.commit() db.refresh(transaction) result = TransactionSchema.from_orm(transaction) if transaction.metadata: result.metadata = json.loads(transaction.metadata) return result