Automated Action b7f18dcaec Add CRUD endpoints for opportunities, trades, and system events
- Add GET endpoint for individual arbitrage opportunities
- Add POST endpoint for creating arbitrage opportunities
- Add GET endpoint for individual trades
- Add POST endpoint for creating trades
- Add endpoints for system events (GET list, GET by ID, POST)
- Update API router to include the new events endpoints
- Fix linting issues
2025-06-05 19:43:15 +00:00

122 lines
3.9 KiB
Python

from datetime import datetime, timedelta
from typing import Any, Optional
from fastapi import APIRouter, Depends, Query, HTTPException, status
from sqlalchemy.orm import Session
from sqlalchemy import desc, func
from app.db.session import get_db
from app.models.arbitrage import Trade, ArbitrageOpportunity
from app.schemas.arbitrage import TradesList, TradeCreate, Trade as TradeSchema
router = APIRouter()
@router.get("", response_model=TradesList)
async def get_trades(
token_address: Optional[str] = Query(None, description="Filter by specific token address"),
status: Optional[str] = Query(None, description="Filter by trade status (success, failed, pending)"),
time_period: Optional[int] = Query(24, description="Time period in hours to look back"),
limit: int = Query(20, ge=1, le=100, description="Number of trades to return"),
offset: int = Query(0, ge=0, description="Pagination offset"),
db: Session = Depends(get_db)
) -> Any:
"""
Retrieve trade history with optional filtering.
"""
query = db.query(Trade)
# Apply filters
if token_address:
query = query.filter(Trade.token_address == token_address)
if status:
query = query.filter(Trade.tx_status == status)
if time_period is not None:
start_time = datetime.utcnow() - timedelta(hours=time_period)
query = query.filter(Trade.created_at >= start_time)
# Get total count
total_count = query.count()
# Get paginated results
trades = query.order_by(desc(Trade.created_at)).offset(offset).limit(limit).all()
# Calculate total profit for the filtered trades
total_profit = db.query(func.sum(Trade.profit_amount_usd)).filter(
Trade.tx_status == "success",
Trade.id.in_([trade.id for trade in trades])
).scalar() or 0.0
return {
"trades": trades,
"count": total_count,
"timestamp": datetime.utcnow(),
"total_profit_usd": total_profit
}
@router.get("/{trade_id}", response_model=TradeSchema)
async def get_trade(
trade_id: int,
db: Session = Depends(get_db)
) -> Any:
"""
Get a specific trade by ID.
"""
trade = db.query(Trade).filter(Trade.id == trade_id).first()
if not trade:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Trade with ID {trade_id} not found"
)
return trade
@router.post("", response_model=TradeSchema, status_code=status.HTTP_201_CREATED)
async def create_trade(
trade_in: TradeCreate,
db: Session = Depends(get_db)
) -> Any:
"""
Create a new trade record.
"""
# Check if the opportunity exists
opportunity = db.query(ArbitrageOpportunity).filter(
ArbitrageOpportunity.id == trade_in.opportunity_id
).first()
if not opportunity:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Arbitrage opportunity with ID {trade_in.opportunity_id} not found"
)
# Create the trade
db_trade = Trade(
opportunity_id=trade_in.opportunity_id,
token_address=trade_in.token_address,
token_symbol=trade_in.token_symbol,
source_dex=trade_in.source_dex,
target_dex=trade_in.target_dex,
input_amount=trade_in.input_amount,
input_amount_usd=trade_in.input_amount_usd,
output_amount=trade_in.output_amount,
output_amount_usd=trade_in.output_amount_usd,
profit_amount=trade_in.profit_amount,
profit_amount_usd=trade_in.profit_amount_usd,
profit_percent=trade_in.profit_percent,
tx_signature=trade_in.tx_signature,
tx_status=trade_in.tx_status,
tx_error=trade_in.tx_error
)
db.add(db_trade)
# Mark the opportunity as executed
opportunity.was_executed = True
db.commit()
db.refresh(db_trade)
return db_trade