diff --git a/README.md b/README.md index e8acfba..3398992 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,194 @@ -# FastAPI Application +# Solana Arbitrage Analytics Backend -This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform. +A FastAPI-based backend for tracking, analyzing, and visualizing arbitrage transactions on the Solana blockchain. This application provides detailed insights into arbitrage opportunities, performance, and patterns across various DEXes and liquidity pools. + +## Features + +- **Blockchain Data Fetching**: Connects to Solana RPC endpoints to fetch and store blocks and transactions +- **Arbitrage Detection**: Automatically identifies and analyzes arbitrage transactions +- **DEX & Pool Tracking**: Maintains data on DEXes and liquidity pools involved in arbitrages +- **Comprehensive Analytics**: Detailed metrics on arbitrage profitability, volume, and patterns +- **REST API**: Well-documented API endpoints for integrating with frontends and other services +- **Background Workers**: Continuously fetches new blockchain data and analyzes it for arbitrages +- **SQLite Database**: Lightweight database for storing all analytics data + +## System Architecture + +The system consists of several key components: + +1. **Blockchain Fetcher**: Retrieves block and transaction data from Solana RPC nodes +2. **Arbitrage Detector**: Analyzes transactions to identify and record arbitrage activities +3. **Database Models**: SQLAlchemy ORM models for storing blockchain and arbitrage data +4. **API Endpoints**: FastAPI routes that provide access to the analytics data +5. **Background Workers**: Services that run continuous fetching and analysis operations + +## API Endpoints + +The API is organized into the following categories: + +### Blocks +- `GET /api/v1/blocks`: List blocks with pagination +- `GET /api/v1/blocks/latest`: Get the latest blocks +- `GET /api/v1/blocks/{block_id}`: Get block by ID +- `GET /api/v1/blocks/height/{height}`: Get block by height +- `GET /api/v1/blocks/hash/{block_hash}`: Get block by hash +- `GET /api/v1/blocks/{block_id}/transactions`: Get transactions in a block + +### Transactions +- `GET /api/v1/transactions`: List transactions with pagination +- `GET /api/v1/transactions/{transaction_id}`: Get transaction by ID +- `GET /api/v1/transactions/hash/{tx_hash}`: Get transaction by hash +- `GET /api/v1/transactions/signature/{signature}`: Get transaction by signature +- `GET /api/v1/transactions/program/{program_id}`: Get transactions involving a program +- `GET /api/v1/transactions/account/{account}`: Get transactions involving an account + +### Arbitrages +- `GET /api/v1/arbitrages`: List arbitrages with pagination +- `GET /api/v1/arbitrages/stats`: Get overall arbitrage statistics +- `GET /api/v1/arbitrages/{arbitrage_id}`: Get detailed arbitrage information +- `GET /api/v1/arbitrages/initiator/{initiator_address}`: Get arbitrages by initiator +- `GET /api/v1/arbitrages/token/{token_address}`: Get arbitrages involving a token +- `GET /api/v1/arbitrages/profit-range`: Get arbitrages within a profit range +- `GET /api/v1/arbitrages/time-range`: Get arbitrages within a time range +- `GET /api/v1/arbitrages/dex/{dex_id}`: Get arbitrages involving a DEX + +### DEXes +- `GET /api/v1/dexes`: List DEXes with pagination +- `GET /api/v1/dexes/{dex_id}`: Get DEX by ID +- `GET /api/v1/dexes/address/{address}`: Get DEX by address +- `GET /api/v1/dexes/name/{name}`: Get DEX by name +- `GET /api/v1/dexes/search`: Search DEXes by name +- `GET /api/v1/dexes/{dex_id}/pools`: Get pools for a DEX + +### Pools +- `GET /api/v1/pools`: List pools with pagination +- `GET /api/v1/pools/active`: Get most active pools by volume +- `GET /api/v1/pools/{pool_id}`: Get pool by ID +- `GET /api/v1/pools/address/{address}`: Get pool by address +- `GET /api/v1/pools/token/{token_address}`: Get pools containing a token +- `GET /api/v1/pools/pair`: Get pools for a token pair +- `GET /api/v1/pools/tvl-range`: Get pools within a TVL range + +### Stats +- `GET /api/v1/stats`: Get overall system statistics + +## Setup & Installation + +### Prerequisites +- Python 3.8+ +- Access to a Solana RPC endpoint (public or private) + +### Installation Steps + +1. Clone the repository: + ```bash + git clone + cd solana-arbitrage-analytics-backend + ``` + +2. Create a virtual environment: + ```bash + python -m venv venv + source venv/bin/activate # On Windows: venv\Scripts\activate + ``` + +3. Install dependencies: + ```bash + pip install -r requirements.txt + ``` + +4. Set up environment variables (optional): + Create a `.env` file in the project root: + ``` + SOLANA_RPC_URL=https://your-solana-rpc-endpoint.com + ``` + +5. Apply database migrations: + ```bash + alembic upgrade head + ``` + +6. Start the application: + ```bash + uvicorn main:app --reload + ``` + +7. Access the API documentation at: + ``` + http://localhost:8000/docs + ``` + +## Usage + +### Starting the Background Workers + +The background workers need to be started to begin fetching and analyzing blockchain data: + +1. Start the blockchain fetcher: + ``` + POST /api/v1/stats/worker/start-fetcher + ``` + +2. Start the arbitrage detector: + ``` + POST /api/v1/stats/worker/start-detector + ``` + +3. To stop the workers: + ``` + POST /api/v1/stats/worker/stop + ``` + +### Accessing Analytics + +Once data has been collected, you can access various analytics through the API endpoints. The Swagger documentation provides detailed information on each endpoint and its parameters. + +## Development + +### Project Structure + +``` +├── alembic.ini # Alembic configuration +├── app/ # Application package +│ ├── api/ # API endpoints +│ │ └── v1/ # API version 1 +│ ├── core/ # Core configuration +│ ├── crud/ # Database CRUD operations +│ ├── db/ # Database setup +│ ├── models/ # SQLAlchemy models +│ ├── schemas/ # Pydantic schemas +│ ├── services/ # Business logic services +│ └── utils/ # Utility functions +├── main.py # Application entry point +├── migrations/ # Database migrations +│ └── versions/ # Migration versions +└── requirements.txt # Python dependencies +``` + +### Adding New Features + +To add new features or models: + +1. Create or update SQLAlchemy models in `app/models/` +2. Create or update Pydantic schemas in `app/schemas/` +3. Create or update CRUD operations in `app/crud/` +4. Create or update API endpoints in `app/api/v1/` +5. Generate a new migration: + ```bash + alembic revision --autogenerate -m "Description of changes" + ``` +6. Apply the migration: + ```bash + alembic upgrade head + ``` + +## License + +MIT License + +## Acknowledgements + +- Solana blockchain and its ecosystem +- FastAPI for the web framework +- SQLAlchemy for ORM +- Alembic for database migrations \ No newline at end of file diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..e6f34ff --- /dev/null +++ b/alembic.ini @@ -0,0 +1,103 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +script_location = migrations + +# template used to generate migration files +# file_template = %%(year)d%%(month).2d%%(day).2d_%%(hour).2d%%(minute).2d%%(second).2d_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python-dateutil library that can be +# installed by adding `alembic[tz]` to the pip requirements +# string value is passed to dateutil.tz.gettz() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the +# "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to migrations/versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "version_path_separator" below. +# version_locations = %(here)s/bar:%(here)s/bat:migrations/versions + +# version path separator; As mentioned above, this is the character used to split +# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep. +# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas. +# Valid values for version_path_separator are: +# +# version_path_separator = : +# version_path_separator = ; +# version_path_separator = space +version_path_separator = os # Use os.pathsep. Default configuration used for new projects. + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +# SQLite database URL +sqlalchemy.url = sqlite:///storage/db/db.sqlite + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S \ No newline at end of file diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..8d5142f --- /dev/null +++ b/app/__init__.py @@ -0,0 +1 @@ +# Solana Arbitrage Analytics Backend \ No newline at end of file diff --git a/app/api/__init__.py b/app/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/api/deps.py b/app/api/deps.py new file mode 100644 index 0000000..b0f03fb --- /dev/null +++ b/app/api/deps.py @@ -0,0 +1,7 @@ +from typing import Generator + +from fastapi import Depends, HTTPException, status +from sqlalchemy.orm import Session + +from app.db.session import get_db +from app.services.worker import worker \ No newline at end of file diff --git a/app/api/routes.py b/app/api/routes.py new file mode 100644 index 0000000..52ead3d --- /dev/null +++ b/app/api/routes.py @@ -0,0 +1,11 @@ +from fastapi import APIRouter + +from app.api.v1 import blocks, transactions, arbitrages, dexes, pools, stats + +api_router = APIRouter() +api_router.include_router(blocks.router, prefix="/blocks", tags=["blocks"]) +api_router.include_router(transactions.router, prefix="/transactions", tags=["transactions"]) +api_router.include_router(arbitrages.router, prefix="/arbitrages", tags=["arbitrages"]) +api_router.include_router(dexes.router, prefix="/dexes", tags=["dexes"]) +api_router.include_router(pools.router, prefix="/pools", tags=["pools"]) +api_router.include_router(stats.router, prefix="/stats", tags=["stats"]) \ No newline at end of file diff --git a/app/api/v1/__init__.py b/app/api/v1/__init__.py new file mode 100644 index 0000000..471ebed --- /dev/null +++ b/app/api/v1/__init__.py @@ -0,0 +1 @@ +# API v1 routes \ No newline at end of file diff --git a/app/api/v1/arbitrages.py b/app/api/v1/arbitrages.py new file mode 100644 index 0000000..cec886f --- /dev/null +++ b/app/api/v1/arbitrages.py @@ -0,0 +1,161 @@ +from typing import Any, List, Optional +from datetime import datetime, timedelta + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.orm import Session + +from app import crud +from app.api import deps +from app.schemas.arbitrage import ( + Arbitrage, ArbitrageWithLegs, ArbitrageList, ArbitrageStats +) + +router = APIRouter() + + +@router.get("/", response_model=ArbitrageList) +def read_arbitrages( + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100, + successful: Optional[bool] = None +) -> Any: + """ + Retrieve arbitrages with pagination. + """ + if successful is not None: + if successful: + arbitrages = crud.arbitrage.get_successful_arbitrages( + db, skip=skip, limit=limit + ) + else: + arbitrages = crud.arbitrage.get_failed_arbitrages( + db, skip=skip, limit=limit + ) + # This is a simplified count for demo purposes + total = len(arbitrages) + else: + arbitrages = crud.arbitrage.get_multi(db, skip=skip, limit=limit) + total = crud.arbitrage.get_count(db) + + return {"arbitrages": arbitrages, "total": total} + + +@router.get("/stats", response_model=ArbitrageStats) +def read_arbitrage_stats( + db: Session = Depends(deps.get_db), +) -> Any: + """ + Get overall arbitrage statistics. + """ + stats = crud.arbitrage.get_arbitrage_stats(db) + return stats + + +@router.get("/{arbitrage_id}", response_model=ArbitrageWithLegs) +def read_arbitrage( + arbitrage_id: int, + db: Session = Depends(deps.get_db), +) -> Any: + """ + Get a specific arbitrage with details of all legs. + """ + arbitrage = crud.arbitrage.get_with_legs(db, arbitrage_id=arbitrage_id) + if not arbitrage: + raise HTTPException(status_code=404, detail="Arbitrage not found") + return arbitrage + + +@router.get("/initiator/{initiator_address}", response_model=ArbitrageList) +def read_arbitrages_by_initiator( + initiator_address: str, + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100 +) -> Any: + """ + Get arbitrages initiated by a specific address. + """ + arbitrages = crud.arbitrage.get_by_initiator( + db, initiator=initiator_address, skip=skip, limit=limit + ) + # This is a simplified count for demo purposes + total = len(arbitrages) + return {"arbitrages": arbitrages, "total": total} + + +@router.get("/token/{token_address}", response_model=ArbitrageList) +def read_arbitrages_by_token( + token_address: str, + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100 +) -> Any: + """ + Get arbitrages involving a specific token. + """ + arbitrages = crud.arbitrage.get_by_token( + db, token=token_address, skip=skip, limit=limit + ) + # This is a simplified count for demo purposes + total = len(arbitrages) + return {"arbitrages": arbitrages, "total": total} + + +@router.get("/profit-range", response_model=ArbitrageList) +def read_arbitrages_by_profit( + min_profit: float = Query(..., ge=0), + max_profit: Optional[float] = Query(None, ge=0), + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100 +) -> Any: + """ + Get arbitrages within a profit percentage range. + """ + arbitrages = crud.arbitrage.get_arbitrages_by_profit_range( + db, min_profit=min_profit, max_profit=max_profit, skip=skip, limit=limit + ) + # This is a simplified count for demo purposes + total = len(arbitrages) + return {"arbitrages": arbitrages, "total": total} + + +@router.get("/time-range", response_model=ArbitrageList) +def read_arbitrages_by_time( + start_time: datetime = Query(...), + end_time: Optional[datetime] = None, + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100 +) -> Any: + """ + Get arbitrages within a time range. + """ + if end_time is None: + end_time = datetime.utcnow() + + arbitrages = crud.arbitrage.get_arbitrages_by_time_range( + db, start_time=start_time, end_time=end_time, skip=skip, limit=limit + ) + # This is a simplified count for demo purposes + total = len(arbitrages) + return {"arbitrages": arbitrages, "total": total} + + +@router.get("/dex/{dex_id}", response_model=ArbitrageList) +def read_arbitrages_by_dex( + dex_id: int, + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100 +) -> Any: + """ + Get arbitrages involving a specific DEX. + """ + arbitrages = crud.arbitrage.get_arbitrages_by_dex( + db, dex_id=dex_id, skip=skip, limit=limit + ) + # This is a simplified count for demo purposes + total = len(arbitrages) + return {"arbitrages": arbitrages, "total": total} \ No newline at end of file diff --git a/app/api/v1/blocks.py b/app/api/v1/blocks.py new file mode 100644 index 0000000..1482b8f --- /dev/null +++ b/app/api/v1/blocks.py @@ -0,0 +1,99 @@ +from typing import Any, List, Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.orm import Session + +from app import crud +from app.api import deps +from app.schemas.block import Block, BlockList +from app.schemas.transaction import Transaction, TransactionList + +router = APIRouter() + + +@router.get("/", response_model=BlockList) +def read_blocks( + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100 +) -> Any: + """ + Retrieve blocks with pagination. + """ + blocks = crud.block.get_multi(db, skip=skip, limit=limit) + total = crud.block.get_count(db) + return {"blocks": blocks, "total": total} + + +@router.get("/latest", response_model=List[Block]) +def read_latest_blocks( + db: Session = Depends(deps.get_db), + limit: int = Query(20, ge=1, le=100) +) -> Any: + """ + Retrieve the latest blocks. + """ + blocks = crud.block.get_latest_blocks(db, limit=limit) + return blocks + + +@router.get("/{block_id}", response_model=Block) +def read_block( + block_id: int, + db: Session = Depends(deps.get_db), +) -> Any: + """ + Get a specific block by id. + """ + block = crud.block.get(db, id=block_id) + if not block: + raise HTTPException(status_code=404, detail="Block not found") + return block + + +@router.get("/height/{height}", response_model=Block) +def read_block_by_height( + height: int, + db: Session = Depends(deps.get_db), +) -> Any: + """ + Get a specific block by height. + """ + block = crud.block.get_by_height(db, height=height) + if not block: + raise HTTPException(status_code=404, detail="Block not found") + return block + + +@router.get("/hash/{block_hash}", response_model=Block) +def read_block_by_hash( + block_hash: str, + db: Session = Depends(deps.get_db), +) -> Any: + """ + Get a specific block by hash. + """ + block = crud.block.get_by_hash(db, block_hash=block_hash) + if not block: + raise HTTPException(status_code=404, detail="Block not found") + return block + + +@router.get("/{block_id}/transactions", response_model=TransactionList) +def read_block_transactions( + block_id: int, + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100 +) -> Any: + """ + Get transactions for a specific block. + """ + block = crud.block.get(db, id=block_id) + if not block: + raise HTTPException(status_code=404, detail="Block not found") + + transactions = crud.transaction.get_by_block(db, block_id=block_id, skip=skip, limit=limit) + total = crud.transaction.get_by_block_count(db, block_id=block_id) + + return {"transactions": transactions, "total": total} \ No newline at end of file diff --git a/app/api/v1/dexes.py b/app/api/v1/dexes.py new file mode 100644 index 0000000..83f08fd --- /dev/null +++ b/app/api/v1/dexes.py @@ -0,0 +1,144 @@ +from typing import Any, List, Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.orm import Session + +from app import crud +from app.api import deps +from app.schemas.dex import Dex, DexWithPoolCount, DexList +from app.schemas.pool import Pool, PoolList + +router = APIRouter() + + +@router.get("/", response_model=DexList) +def read_dexes( + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100, + sort_by_volume: bool = False +) -> Any: + """ + Retrieve DEXes with pagination. + """ + if sort_by_volume: + # Get DEXes sorted by volume + dexes_with_count = [] + dexes = crud.dex.get_with_volumes(db, skip=skip, limit=limit) + for d in dexes: + pool_count = len(crud.pool.get_by_dex(db, dex_id=d.id, limit=1000)) + dexes_with_count.append(DexWithPoolCount( + **d.__dict__, + pool_count=pool_count + )) + else: + # Get DEXes with pool count + dexes_with_count = [] + result = crud.dex.get_with_pool_count(db, skip=skip, limit=limit) + for dex, pool_count in result: + dexes_with_count.append(DexWithPoolCount( + **dex.__dict__, + pool_count=pool_count + )) + + total = crud.dex.get_count(db) + return {"dexes": dexes_with_count, "total": total} + + +@router.get("/{dex_id}", response_model=Dex) +def read_dex( + dex_id: int, + db: Session = Depends(deps.get_db), +) -> Any: + """ + Get a specific DEX by id. + """ + dex = crud.dex.get(db, id=dex_id) + if not dex: + raise HTTPException(status_code=404, detail="DEX not found") + return dex + + +@router.get("/address/{address}", response_model=Dex) +def read_dex_by_address( + address: str, + db: Session = Depends(deps.get_db), +) -> Any: + """ + Get a specific DEX by address. + """ + dex = crud.dex.get_by_address(db, address=address) + if not dex: + raise HTTPException(status_code=404, detail="DEX not found") + return dex + + +@router.get("/name/{name}", response_model=Dex) +def read_dex_by_name( + name: str, + db: Session = Depends(deps.get_db), +) -> Any: + """ + Get a specific DEX by name. + """ + dex = crud.dex.get_by_name(db, name=name) + if not dex: + raise HTTPException(status_code=404, detail="DEX not found") + return dex + + +@router.get("/search", response_model=DexList) +def search_dexes( + query: str = Query(..., min_length=2), + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100 +) -> Any: + """ + Search DEXes by name. + """ + dexes = crud.dex.search_by_name(db, query=query, skip=skip, limit=limit) + + # Add pool counts + dexes_with_count = [] + for d in dexes: + pool_count = len(crud.pool.get_by_dex(db, dex_id=d.id, limit=1000)) + dexes_with_count.append(DexWithPoolCount( + **d.__dict__, + pool_count=pool_count + )) + + # This is a simplified count for demo purposes + total = len(dexes) + return {"dexes": dexes_with_count, "total": total} + + +@router.get("/{dex_id}/pools", response_model=PoolList) +def read_dex_pools( + dex_id: int, + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100 +) -> Any: + """ + Get pools for a specific DEX. + """ + dex = crud.dex.get(db, id=dex_id) + if not dex: + raise HTTPException(status_code=404, detail="DEX not found") + + pools = crud.pool.get_by_dex(db, dex_id=dex_id, skip=skip, limit=limit) + + # Add DEX information to each pool + pools_with_dex = [] + for p in pools: + pools_with_dex.append({ + **p.__dict__, + "dex_name": dex.name, + "dex_address": dex.address + }) + + # Count total pools for this DEX + total = len(crud.pool.get_by_dex(db, dex_id=dex_id, limit=1000)) + + return {"pools": pools_with_dex, "total": total} \ No newline at end of file diff --git a/app/api/v1/pools.py b/app/api/v1/pools.py new file mode 100644 index 0000000..f26d643 --- /dev/null +++ b/app/api/v1/pools.py @@ -0,0 +1,188 @@ +from typing import Any, List, Optional, Dict + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.orm import Session + +from app import crud +from app.api import deps +from app.schemas.pool import Pool, PoolWithDex, PoolList + +router = APIRouter() + + +@router.get("/", response_model=PoolList) +def read_pools( + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100, + sort_by_tvl: bool = True +) -> Any: + """ + Retrieve pools with pagination. + """ + result = crud.pool.get_pools_with_dex(db, skip=skip, limit=limit) + + pools_with_dex = [] + for pool, dex in result: + pools_with_dex.append(PoolWithDex( + **pool.__dict__, + dex_name=dex.name, + dex_address=dex.address + )) + + total = crud.pool.get_count(db) + return {"pools": pools_with_dex, "total": total} + + +@router.get("/active", response_model=PoolList) +def read_most_active_pools( + db: Session = Depends(deps.get_db), + limit: int = Query(20, ge=1, le=100) +) -> Any: + """ + Get the most active pools by volume. + """ + pools = crud.pool.get_most_active_pools(db, limit=limit) + + # Add DEX information + pools_with_dex = [] + for p in pools: + dex = crud.dex.get(db, id=p.dex_id) + if dex: + pools_with_dex.append(PoolWithDex( + **p.__dict__, + dex_name=dex.name, + dex_address=dex.address + )) + + return {"pools": pools_with_dex, "total": len(pools_with_dex)} + + +@router.get("/{pool_id}", response_model=PoolWithDex) +def read_pool( + pool_id: int, + db: Session = Depends(deps.get_db), +) -> Any: + """ + Get a specific pool by id. + """ + pool = crud.pool.get(db, id=pool_id) + if not pool: + raise HTTPException(status_code=404, detail="Pool not found") + + dex = crud.dex.get(db, id=pool.dex_id) + if not dex: + raise HTTPException(status_code=404, detail="DEX not found") + + return PoolWithDex( + **pool.__dict__, + dex_name=dex.name, + dex_address=dex.address + ) + + +@router.get("/address/{address}", response_model=PoolWithDex) +def read_pool_by_address( + address: str, + db: Session = Depends(deps.get_db), +) -> Any: + """ + Get a specific pool by address. + """ + pool = crud.pool.get_by_address(db, address=address) + if not pool: + raise HTTPException(status_code=404, detail="Pool not found") + + dex = crud.dex.get(db, id=pool.dex_id) + if not dex: + raise HTTPException(status_code=404, detail="DEX not found") + + return PoolWithDex( + **pool.__dict__, + dex_name=dex.name, + dex_address=dex.address + ) + + +@router.get("/token/{token_address}", response_model=PoolList) +def read_pools_by_token( + token_address: str, + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100 +) -> Any: + """ + Get pools that contain a specific token. + """ + pools = crud.pool.get_by_token(db, token_address=token_address, skip=skip, limit=limit) + + # Add DEX information + pools_with_dex = [] + for p in pools: + dex = crud.dex.get(db, id=p.dex_id) + if dex: + pools_with_dex.append(PoolWithDex( + **p.__dict__, + dex_name=dex.name, + dex_address=dex.address + )) + + return {"pools": pools_with_dex, "total": len(pools_with_dex)} + + +@router.get("/pair", response_model=PoolList) +def read_pools_by_token_pair( + token_a: str = Query(...), + token_b: str = Query(...), + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100 +) -> Any: + """ + Get pools for a specific token pair. + """ + pools = crud.pool.get_by_token_pair( + db, token_a=token_a, token_b=token_b, skip=skip, limit=limit + ) + + # Add DEX information + pools_with_dex = [] + for p in pools: + dex = crud.dex.get(db, id=p.dex_id) + if dex: + pools_with_dex.append(PoolWithDex( + **p.__dict__, + dex_name=dex.name, + dex_address=dex.address + )) + + return {"pools": pools_with_dex, "total": len(pools_with_dex)} + + +@router.get("/tvl-range", response_model=PoolList) +def read_pools_by_tvl( + min_tvl: float = Query(..., ge=0), + max_tvl: Optional[float] = Query(None, ge=0), + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100 +) -> Any: + """ + Get pools within a TVL range. + """ + pools = crud.pool.get_by_tvl_range( + db, min_tvl=min_tvl, max_tvl=max_tvl, skip=skip, limit=limit + ) + + # Add DEX information + pools_with_dex = [] + for p in pools: + dex = crud.dex.get(db, id=p.dex_id) + if dex: + pools_with_dex.append(PoolWithDex( + **p.__dict__, + dex_name=dex.name, + dex_address=dex.address + )) + + return {"pools": pools_with_dex, "total": len(pools_with_dex)} \ No newline at end of file diff --git a/app/api/v1/stats.py b/app/api/v1/stats.py new file mode 100644 index 0000000..85aae67 --- /dev/null +++ b/app/api/v1/stats.py @@ -0,0 +1,97 @@ +from typing import Any, Dict, List, Optional +from datetime import datetime, timedelta + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.orm import Session + +from app import crud +from app.api import deps +from app.services.worker import worker + +router = APIRouter() + + +@router.get("/", response_model=Dict[str, Any]) +def read_stats( + db: Session = Depends(deps.get_db), +) -> Any: + """ + Get overall analytics statistics. + """ + # Blocks stats + block_count = crud.block.get_count(db) + latest_blocks = crud.block.get_latest_blocks(db, limit=1) + latest_block_height = latest_blocks[0].block_height if latest_blocks else 0 + + # Last 24 hours block count + now = datetime.utcnow() + yesterday = now - timedelta(days=1) + blocks_24h = crud.block.get_block_count_by_time_range( + db, start_time=yesterday, end_time=now + ) + + # Transactions stats + tx_count = crud.transaction.get_count(db) + + # DEX stats + dex_count = crud.dex.get_count(db) + + # Pool stats + pool_count = crud.pool.get_count(db) + + # Get arbitrage stats + arbitrage_stats = crud.arbitrage.get_arbitrage_stats(db) + + return { + "block_stats": { + "total_blocks": block_count, + "latest_block_height": latest_block_height, + "blocks_last_24h": blocks_24h + }, + "transaction_stats": { + "total_transactions": tx_count + }, + "dex_stats": { + "total_dexes": dex_count + }, + "pool_stats": { + "total_pools": pool_count + }, + "arbitrage_stats": arbitrage_stats + } + + +@router.post("/worker/start-fetcher", status_code=200) +def start_blockchain_fetcher() -> Dict[str, Any]: + """ + Start the blockchain fetcher background worker. + """ + success = worker.start_fetcher() + return { + "success": success, + "message": "Blockchain fetcher started" if success else "Blockchain fetcher already running" + } + + +@router.post("/worker/start-detector", status_code=200) +def start_arbitrage_detector() -> Dict[str, Any]: + """ + Start the arbitrage detector background worker. + """ + success = worker.start_detector() + return { + "success": success, + "message": "Arbitrage detector started" if success else "Arbitrage detector already running" + } + + +@router.post("/worker/stop", status_code=200) +def stop_workers() -> Dict[str, Any]: + """ + Stop all background workers. + """ + success = worker.stop() + return { + "success": success, + "message": "All workers stopped" if success else "No workers were running" + } \ No newline at end of file diff --git a/app/api/v1/transactions.py b/app/api/v1/transactions.py new file mode 100644 index 0000000..40e6ccb --- /dev/null +++ b/app/api/v1/transactions.py @@ -0,0 +1,102 @@ +from typing import Any, List, Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.orm import Session + +from app import crud +from app.api import deps +from app.schemas.transaction import Transaction, TransactionDetail, TransactionList + +router = APIRouter() + + +@router.get("/", response_model=TransactionList) +def read_transactions( + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100 +) -> Any: + """ + Retrieve transactions with pagination. + """ + transactions = crud.transaction.get_multi(db, skip=skip, limit=limit) + total = crud.transaction.get_count(db) + return {"transactions": transactions, "total": total} + + +@router.get("/{transaction_id}", response_model=TransactionDetail) +def read_transaction( + transaction_id: int, + db: Session = Depends(deps.get_db), +) -> Any: + """ + Get a specific transaction by id. + """ + transaction = crud.transaction.get(db, id=transaction_id) + if not transaction: + raise HTTPException(status_code=404, detail="Transaction not found") + return transaction + + +@router.get("/hash/{tx_hash}", response_model=TransactionDetail) +def read_transaction_by_hash( + tx_hash: str, + db: Session = Depends(deps.get_db), +) -> Any: + """ + Get a specific transaction by hash. + """ + transaction = crud.transaction.get_by_hash(db, tx_hash=tx_hash) + if not transaction: + raise HTTPException(status_code=404, detail="Transaction not found") + return transaction + + +@router.get("/signature/{signature}", response_model=TransactionDetail) +def read_transaction_by_signature( + signature: str, + db: Session = Depends(deps.get_db), +) -> Any: + """ + Get a specific transaction by signature. + """ + transaction = crud.transaction.get_by_signature(db, signature=signature) + if not transaction: + raise HTTPException(status_code=404, detail="Transaction not found") + return transaction + + +@router.get("/program/{program_id}", response_model=TransactionList) +def read_transactions_by_program( + program_id: str, + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100 +) -> Any: + """ + Get transactions involving a specific program. + """ + transactions = crud.transaction.get_by_program_id( + db, program_id=program_id, skip=skip, limit=limit + ) + # This is not an accurate count but serves as a simplified implementation + total = len(transactions) + return {"transactions": transactions, "total": total} + + +@router.get("/account/{account}", response_model=TransactionList) +def read_transactions_by_account( + account: str, + db: Session = Depends(deps.get_db), + skip: int = 0, + limit: int = 100 +) -> Any: + """ + Get transactions involving a specific account. + """ + transactions = crud.transaction.search_by_account( + db, account=account, skip=skip, limit=limit + ) + # This is not an accurate count but serves as a simplified implementation + total = len(transactions) + return {"transactions": transactions, "total": total} \ No newline at end of file diff --git a/app/core/__init__.py b/app/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/core/config.py b/app/core/config.py new file mode 100644 index 0000000..b882980 --- /dev/null +++ b/app/core/config.py @@ -0,0 +1,41 @@ +import os +from pathlib import Path +from typing import List, Optional, Union + +from pydantic import field_validator +from pydantic_settings import BaseSettings + +class Settings(BaseSettings): + API_V1_STR: str = "/api/v1" + PROJECT_NAME: str = "Solana Arbitrage Analytics" + + # CORS + BACKEND_CORS_ORIGINS: List[str] = ["*"] + + # Solana RPC endpoint + SOLANA_RPC_URL: str = os.getenv("SOLANA_RPC_URL", "https://api.mainnet-beta.solana.com") + + # Database settings + DB_DIR: Path = Path("/app") / "storage" / "db" + SQLALCHEMY_DATABASE_URL: str = f"sqlite:///{DB_DIR}/db.sqlite" + + # Solana settings + BLOCKS_TO_FETCH: int = 100 # Number of blocks to fetch in a batch + POLLING_INTERVAL: int = 60 # Seconds between polling new blocks + + @field_validator("BACKEND_CORS_ORIGINS") + @classmethod + def assemble_cors_origins(cls, v: Union[str, List[str]]) -> Union[List[str], str]: + if isinstance(v, str) and not v.startswith("["): + return [i.strip() for i in v.split(",")] + elif isinstance(v, (list, str)): + return v + raise ValueError(v) + + class Config: + case_sensitive = True + env_file = ".env" + +# Ensure DB directory exists +settings = Settings() +settings.DB_DIR.mkdir(parents=True, exist_ok=True) \ No newline at end of file diff --git a/app/crud/__init__.py b/app/crud/__init__.py new file mode 100644 index 0000000..0c884ae --- /dev/null +++ b/app/crud/__init__.py @@ -0,0 +1,6 @@ +# Import all the CRUD operations +from app.crud.block import block +from app.crud.transaction import transaction +from app.crud.dex import dex +from app.crud.pool import pool +from app.crud.arbitrage import arbitrage \ No newline at end of file diff --git a/app/crud/arbitrage.py b/app/crud/arbitrage.py new file mode 100644 index 0000000..d4c5adc --- /dev/null +++ b/app/crud/arbitrage.py @@ -0,0 +1,200 @@ +from typing import List, Optional, Dict, Any, Union, Tuple + +from sqlalchemy import func, desc, asc, and_, or_ +from sqlalchemy.orm import Session, joinedload + +from app.crud.base import CRUDBase +from app.models.arbitrage import Arbitrage, ArbitrageLeg +from app.models.pool import Pool +from app.models.dex import Dex +from app.models.transaction import Transaction +from app.models.block import Block +from app.schemas.arbitrage import ArbitrageCreate, ArbitrageUpdate, ArbitrageLegCreate + + +class CRUDArbitrage(CRUDBase[Arbitrage, ArbitrageCreate, ArbitrageUpdate]): + """CRUD operations for arbitrages.""" + + def get_with_legs(self, db: Session, *, arbitrage_id: int) -> Optional[Arbitrage]: + """Get an arbitrage with all its legs.""" + return db.query(Arbitrage).options( + joinedload(Arbitrage.legs).joinedload(ArbitrageLeg.pool).joinedload(Pool.dex) + ).filter(Arbitrage.id == arbitrage_id).first() + + def get_by_transaction(self, db: Session, *, tx_id: int) -> List[Arbitrage]: + """Get arbitrages for a specific transaction.""" + return db.query(Arbitrage).filter(Arbitrage.transaction_id == tx_id).all() + + def get_by_initiator( + self, db: Session, *, initiator: str, skip: int = 0, limit: int = 100 + ) -> List[Arbitrage]: + """Get arbitrages initiated by a specific address.""" + return db.query(Arbitrage).filter( + Arbitrage.initiator_address == initiator + ).order_by(desc(Arbitrage.created_at)).offset(skip).limit(limit).all() + + def get_by_token( + self, db: Session, *, token: str, skip: int = 0, limit: int = 100 + ) -> List[Arbitrage]: + """Get arbitrages for a specific token.""" + return db.query(Arbitrage).filter( + Arbitrage.start_token_address == token + ).order_by(desc(Arbitrage.created_at)).offset(skip).limit(limit).all() + + def get_successful_arbitrages( + self, db: Session, *, skip: int = 0, limit: int = 100 + ) -> List[Arbitrage]: + """Get successful arbitrages.""" + return db.query(Arbitrage).filter( + Arbitrage.success == True + ).order_by(desc(Arbitrage.profit_percentage)).offset(skip).limit(limit).all() + + def get_failed_arbitrages( + self, db: Session, *, skip: int = 0, limit: int = 100 + ) -> List[Arbitrage]: + """Get failed arbitrages.""" + return db.query(Arbitrage).filter( + Arbitrage.success == False + ).order_by(desc(Arbitrage.created_at)).offset(skip).limit(limit).all() + + def get_arbitrages_by_profit_range( + self, db: Session, *, min_profit: float, max_profit: Optional[float] = None, + skip: int = 0, limit: int = 100 + ) -> List[Arbitrage]: + """Get arbitrages within a profit percentage range.""" + query = db.query(Arbitrage).filter( + Arbitrage.success == True, + Arbitrage.profit_percentage >= min_profit + ) + if max_profit is not None: + query = query.filter(Arbitrage.profit_percentage <= max_profit) + return query.order_by( + desc(Arbitrage.profit_percentage) + ).offset(skip).limit(limit).all() + + def get_arbitrages_by_time_range( + self, db: Session, *, start_time: Any, end_time: Any, + skip: int = 0, limit: int = 100 + ) -> List[Arbitrage]: + """Get arbitrages within a time range.""" + return db.query(Arbitrage).join( + Transaction, Arbitrage.transaction_id == Transaction.id + ).join( + Block, Transaction.block_id == Block.id + ).filter( + Block.block_time >= start_time, + Block.block_time <= end_time + ).order_by( + desc(Block.block_time) + ).offset(skip).limit(limit).all() + + def get_arbitrages_by_dex( + self, db: Session, *, dex_id: int, skip: int = 0, limit: int = 100 + ) -> List[Arbitrage]: + """Get arbitrages involving a specific DEX.""" + # We'll need to join with ArbitrageLeg and Pool to filter by DEX + return db.query(Arbitrage).join( + ArbitrageLeg, Arbitrage.id == ArbitrageLeg.arbitrage_id + ).join( + Pool, ArbitrageLeg.pool_id == Pool.id + ).filter( + Pool.dex_id == dex_id + ).distinct().order_by( + desc(Arbitrage.created_at) + ).offset(skip).limit(limit).all() + + def get_arbitrage_stats(self, db: Session) -> Dict[str, Any]: + """Get overall arbitrage statistics.""" + total = db.query(func.count(Arbitrage.id)).scalar() or 0 + successful = db.query(func.count(Arbitrage.id)).filter( + Arbitrage.success == True + ).scalar() or 0 + failed = total - successful + + total_profit = db.query(func.sum(Arbitrage.profit_amount)).filter( + Arbitrage.success == True + ).scalar() or 0 + + avg_profit_pct = db.query(func.avg(Arbitrage.profit_percentage)).filter( + Arbitrage.success == True + ).scalar() or 0 + + max_profit_pct = db.query(func.max(Arbitrage.profit_percentage)).filter( + Arbitrage.success == True + ).scalar() or 0 + + # Most used DEXes + dex_query = db.query( + Dex.name, func.count(Dex.id).label('count') + ).join( + Pool, Dex.id == Pool.dex_id + ).join( + ArbitrageLeg, Pool.id == ArbitrageLeg.pool_id + ).join( + Arbitrage, ArbitrageLeg.arbitrage_id == Arbitrage.id + ).filter( + Arbitrage.success == True + ).group_by( + Dex.name + ).order_by( + desc('count') + ).limit(5).all() + + most_used_dexes = [ + {"name": name, "count": count} for name, count in dex_query + ] + + # Most arbitraged tokens + token_query = db.query( + Arbitrage.start_token_symbol, func.count(Arbitrage.id).label('count') + ).filter( + Arbitrage.success == True, + Arbitrage.start_token_symbol.isnot(None) + ).group_by( + Arbitrage.start_token_symbol + ).order_by( + desc('count') + ).limit(5).all() + + most_arbitraged_tokens = [ + {"symbol": symbol or "Unknown", "count": count} for symbol, count in token_query + ] + + return { + "total_arbitrages": total, + "successful_arbitrages": successful, + "failed_arbitrages": failed, + "total_profit": total_profit, + "avg_profit_percentage": avg_profit_pct, + "max_profit_percentage": max_profit_pct, + "most_used_dexes": most_used_dexes, + "most_arbitraged_tokens": most_arbitraged_tokens + } + + def create_with_legs( + self, db: Session, *, obj_in: ArbitrageCreate, legs: List[ArbitrageLegCreate] + ) -> Arbitrage: + """Create an arbitrage with its legs.""" + # Create the arbitrage + obj_data = obj_in.model_dump() + db_obj = Arbitrage(**obj_data) + db.add(db_obj) + db.flush() # Get the ID without committing + + # Create the legs + for leg_in in legs: + leg_data = leg_in.model_dump() + leg_data["arbitrage_id"] = db_obj.id + db_leg = ArbitrageLeg(**leg_data) + db.add(db_leg) + + # Update leg count + db_obj.legs_count = len(legs) + + db.commit() + db.refresh(db_obj) + return db_obj + + +# Create a single instance for use in dependency injection +arbitrage = CRUDArbitrage(Arbitrage) \ No newline at end of file diff --git a/app/crud/base.py b/app/crud/base.py new file mode 100644 index 0000000..f340e0c --- /dev/null +++ b/app/crud/base.py @@ -0,0 +1,79 @@ +from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union + +from fastapi.encoders import jsonable_encoder +from pydantic import BaseModel +from sqlalchemy.orm import Session + +from app.db.base_class import Base + +ModelType = TypeVar("ModelType", bound=Base) +CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel) +UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel) + + +class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]): + """ + Base class for CRUD operations on a SQLAlchemy model + """ + + def __init__(self, model: Type[ModelType]): + """ + CRUD object with default methods to Create, Read, Update, Delete (CRUD). + + **Parameters** + + * `model`: A SQLAlchemy model class + * `schema`: A Pydantic model (schema) class + """ + self.model = model + + def get(self, db: Session, id: Any) -> Optional[ModelType]: + """Get a single record by ID.""" + return db.query(self.model).filter(self.model.id == id).first() + + def get_multi( + self, db: Session, *, skip: int = 0, limit: int = 100 + ) -> List[ModelType]: + """Get multiple records with pagination.""" + return db.query(self.model).offset(skip).limit(limit).all() + + def get_count(self, db: Session) -> int: + """Get total count of records.""" + return db.query(self.model).count() + + def create(self, db: Session, *, obj_in: CreateSchemaType) -> ModelType: + """Create a new record.""" + obj_in_data = jsonable_encoder(obj_in) + db_obj = self.model(**obj_in_data) + db.add(db_obj) + db.commit() + db.refresh(db_obj) + return db_obj + + def update( + self, + db: Session, + *, + db_obj: ModelType, + obj_in: Union[UpdateSchemaType, Dict[str, Any]] + ) -> ModelType: + """Update a record.""" + obj_data = jsonable_encoder(db_obj) + if isinstance(obj_in, dict): + update_data = obj_in + else: + update_data = obj_in.model_dump(exclude_unset=True) + for field in obj_data: + if field in update_data: + setattr(db_obj, field, update_data[field]) + db.add(db_obj) + db.commit() + db.refresh(db_obj) + return db_obj + + def remove(self, db: Session, *, id: int) -> ModelType: + """Delete a record.""" + obj = db.query(self.model).get(id) + db.delete(obj) + db.commit() + return obj \ No newline at end of file diff --git a/app/crud/block.py b/app/crud/block.py new file mode 100644 index 0000000..540b142 --- /dev/null +++ b/app/crud/block.py @@ -0,0 +1,63 @@ +from typing import List, Optional, Dict, Any, Union + +from sqlalchemy import desc +from sqlalchemy.orm import Session + +from app.crud.base import CRUDBase +from app.models.block import Block +from app.schemas.block import BlockCreate, BlockUpdate + + +class CRUDBlock(CRUDBase[Block, BlockCreate, BlockUpdate]): + """CRUD operations for blocks.""" + + def get_by_height(self, db: Session, *, height: int) -> Optional[Block]: + """Get a block by its height.""" + return db.query(Block).filter(Block.block_height == height).first() + + def get_by_hash(self, db: Session, *, block_hash: str) -> Optional[Block]: + """Get a block by its hash.""" + return db.query(Block).filter(Block.block_hash == block_hash).first() + + def get_latest_blocks( + self, db: Session, *, limit: int = 20 + ) -> List[Block]: + """Get the latest blocks ordered by height.""" + return db.query(Block).order_by(desc(Block.block_height)).limit(limit).all() + + def get_unprocessed_blocks( + self, db: Session, *, limit: int = 100 + ) -> List[Block]: + """Get blocks that haven't been processed for arbitrage detection.""" + return db.query(Block).filter(Block.processed == 0).limit(limit).all() + + def mark_as_processed( + self, db: Session, *, block_id: int + ) -> Optional[Block]: + """Mark a block as processed.""" + db_obj = db.query(Block).filter(Block.id == block_id).first() + if db_obj: + db_obj.processed = 1 + db.add(db_obj) + db.commit() + db.refresh(db_obj) + return db_obj + + def get_block_with_transactions( + self, db: Session, *, block_id: int + ) -> Optional[Block]: + """Get a block with all its transactions.""" + return db.query(Block).filter(Block.id == block_id).first() + + def get_block_count_by_time_range( + self, db: Session, *, start_time: Any, end_time: Any + ) -> int: + """Get count of blocks in a time range.""" + return db.query(Block).filter( + Block.block_time >= start_time, + Block.block_time <= end_time + ).count() + + +# Create a single instance for use in dependency injection +block = CRUDBlock(Block) \ No newline at end of file diff --git a/app/crud/dex.py b/app/crud/dex.py new file mode 100644 index 0000000..27670d0 --- /dev/null +++ b/app/crud/dex.py @@ -0,0 +1,69 @@ +from typing import List, Optional, Dict, Any, Union, Tuple + +from sqlalchemy import func, desc +from sqlalchemy.orm import Session + +from app.crud.base import CRUDBase +from app.models.dex import Dex +from app.models.pool import Pool +from app.schemas.dex import DexCreate, DexUpdate + + +class CRUDDex(CRUDBase[Dex, DexCreate, DexUpdate]): + """CRUD operations for DEXes.""" + + def get_by_address(self, db: Session, *, address: str) -> Optional[Dex]: + """Get a DEX by its address.""" + return db.query(Dex).filter(Dex.address == address).first() + + def get_by_name(self, db: Session, *, name: str) -> Optional[Dex]: + """Get a DEX by its name.""" + return db.query(Dex).filter(Dex.name == name).first() + + def get_with_pool_count( + self, db: Session, *, skip: int = 0, limit: int = 100 + ) -> List[Tuple[Dex, int]]: + """Get DEXes with count of their pools.""" + return db.query( + Dex, func.count(Pool.id).label('pool_count') + ).outerjoin( + Pool, Dex.id == Pool.dex_id + ).group_by( + Dex.id + ).order_by( + desc('pool_count') + ).offset(skip).limit(limit).all() + + def get_with_volumes( + self, db: Session, *, skip: int = 0, limit: int = 100 + ) -> List[Dex]: + """Get DEXes ordered by volume.""" + return db.query(Dex).order_by( + desc(Dex.volume_24h) + ).offset(skip).limit(limit).all() + + def search_by_name( + self, db: Session, *, query: str, skip: int = 0, limit: int = 100 + ) -> List[Dex]: + """Search DEXes by name.""" + return db.query(Dex).filter( + Dex.name.ilike(f'%{query}%') + ).offset(skip).limit(limit).all() + + def update_volume( + self, db: Session, *, dex_id: int, volume_24h: float, volume_7d: Optional[float] = None + ) -> Optional[Dex]: + """Update the volume statistics for a DEX.""" + db_obj = db.query(Dex).filter(Dex.id == dex_id).first() + if db_obj: + db_obj.volume_24h = volume_24h + if volume_7d is not None: + db_obj.volume_7d = volume_7d + db.add(db_obj) + db.commit() + db.refresh(db_obj) + return db_obj + + +# Create a single instance for use in dependency injection +dex = CRUDDex(Dex) \ No newline at end of file diff --git a/app/crud/pool.py b/app/crud/pool.py new file mode 100644 index 0000000..40e7cb7 --- /dev/null +++ b/app/crud/pool.py @@ -0,0 +1,99 @@ +from typing import List, Optional, Dict, Any, Union, Tuple + +from sqlalchemy import func, desc, or_, and_ +from sqlalchemy.orm import Session, joinedload + +from app.crud.base import CRUDBase +from app.models.pool import Pool +from app.models.dex import Dex +from app.schemas.pool import PoolCreate, PoolUpdate + + +class CRUDPool(CRUDBase[Pool, PoolCreate, PoolUpdate]): + """CRUD operations for liquidity pools.""" + + def get_by_address(self, db: Session, *, address: str) -> Optional[Pool]: + """Get a pool by its address.""" + return db.query(Pool).filter(Pool.address == address).first() + + def get_by_dex( + self, db: Session, *, dex_id: int, skip: int = 0, limit: int = 100 + ) -> List[Pool]: + """Get pools for a specific DEX.""" + return db.query(Pool).filter( + Pool.dex_id == dex_id + ).offset(skip).limit(limit).all() + + def get_by_token( + self, db: Session, *, token_address: str, skip: int = 0, limit: int = 100 + ) -> List[Pool]: + """Get pools that contain a specific token.""" + return db.query(Pool).filter( + or_( + Pool.token_a_address == token_address, + Pool.token_b_address == token_address + ) + ).offset(skip).limit(limit).all() + + def get_by_token_pair( + self, db: Session, *, token_a: str, token_b: str, skip: int = 0, limit: int = 100 + ) -> List[Pool]: + """Get pools for a specific token pair.""" + return db.query(Pool).filter( + or_( + and_(Pool.token_a_address == token_a, Pool.token_b_address == token_b), + and_(Pool.token_a_address == token_b, Pool.token_b_address == token_a) + ) + ).offset(skip).limit(limit).all() + + def get_by_tvl_range( + self, db: Session, *, min_tvl: float, max_tvl: Optional[float] = None, + skip: int = 0, limit: int = 100 + ) -> List[Pool]: + """Get pools within a TVL range.""" + query = db.query(Pool).filter(Pool.tvl >= min_tvl) + if max_tvl is not None: + query = query.filter(Pool.tvl <= max_tvl) + return query.order_by(desc(Pool.tvl)).offset(skip).limit(limit).all() + + def get_pools_with_dex( + self, db: Session, *, skip: int = 0, limit: int = 100 + ) -> List[Tuple[Pool, Dex]]: + """Get pools with their DEX information.""" + return db.query( + Pool, Dex + ).join( + Dex, Pool.dex_id == Dex.id + ).order_by( + desc(Pool.tvl) + ).offset(skip).limit(limit).all() + + def update_reserves( + self, db: Session, *, pool_id: int, + token_a_reserve: float, token_b_reserve: float, + slot: int, tvl: Optional[float] = None + ) -> Optional[Pool]: + """Update token reserves for a pool.""" + db_obj = db.query(Pool).filter(Pool.id == pool_id).first() + if db_obj and slot >= db_obj.last_updated_slot: + db_obj.token_a_reserve = token_a_reserve + db_obj.token_b_reserve = token_b_reserve + db_obj.last_updated_slot = slot + if tvl is not None: + db_obj.tvl = tvl + db.add(db_obj) + db.commit() + db.refresh(db_obj) + return db_obj + + def get_most_active_pools( + self, db: Session, *, limit: int = 20 + ) -> List[Pool]: + """Get the most active pools by volume.""" + return db.query(Pool).order_by( + desc(Pool.volume_24h) + ).limit(limit).all() + + +# Create a single instance for use in dependency injection +pool = CRUDPool(Pool) \ No newline at end of file diff --git a/app/crud/transaction.py b/app/crud/transaction.py new file mode 100644 index 0000000..34d01f6 --- /dev/null +++ b/app/crud/transaction.py @@ -0,0 +1,65 @@ +from typing import List, Optional, Dict, Any, Union + +from sqlalchemy import desc +from sqlalchemy.orm import Session + +from app.crud.base import CRUDBase +from app.models.transaction import Transaction +from app.schemas.transaction import TransactionCreate, TransactionUpdate + + +class CRUDTransaction(CRUDBase[Transaction, TransactionCreate, TransactionUpdate]): + """CRUD operations for transactions.""" + + def get_by_hash(self, db: Session, *, tx_hash: str) -> Optional[Transaction]: + """Get a transaction by its hash.""" + return db.query(Transaction).filter(Transaction.transaction_hash == tx_hash).first() + + def get_by_signature(self, db: Session, *, signature: str) -> Optional[Transaction]: + """Get a transaction by its signature.""" + return db.query(Transaction).filter(Transaction.signature == signature).first() + + def get_by_block( + self, db: Session, *, block_id: int, skip: int = 0, limit: int = 100 + ) -> List[Transaction]: + """Get transactions for a specific block with pagination.""" + return db.query(Transaction).filter( + Transaction.block_id == block_id + ).offset(skip).limit(limit).all() + + def get_by_block_count(self, db: Session, *, block_id: int) -> int: + """Get count of transactions for a specific block.""" + return db.query(Transaction).filter(Transaction.block_id == block_id).count() + + def get_successful_by_block( + self, db: Session, *, block_id: int, skip: int = 0, limit: int = 100 + ) -> List[Transaction]: + """Get successful transactions for a specific block.""" + return db.query(Transaction).filter( + Transaction.block_id == block_id, + Transaction.success == True + ).offset(skip).limit(limit).all() + + def get_by_program_id( + self, db: Session, *, program_id: str, skip: int = 0, limit: int = 100 + ) -> List[Transaction]: + """Get transactions involving a specific program.""" + # Note: This is a simplified version. In a production app, you would + # want to use a proper JSON query based on your database + return db.query(Transaction).filter( + Transaction.program_ids.like(f'%{program_id}%') + ).offset(skip).limit(limit).all() + + def search_by_account( + self, db: Session, *, account: str, skip: int = 0, limit: int = 100 + ) -> List[Transaction]: + """Get transactions involving a specific account.""" + # Note: This is a simplified version. In a production app, you would + # want to use a proper JSON query based on your database + return db.query(Transaction).filter( + Transaction.accounts.like(f'%{account}%') + ).offset(skip).limit(limit).all() + + +# Create a single instance for use in dependency injection +transaction = CRUDTransaction(Transaction) \ No newline at end of file diff --git a/app/db/__init__.py b/app/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/db/base.py b/app/db/base.py new file mode 100644 index 0000000..fa86684 --- /dev/null +++ b/app/db/base.py @@ -0,0 +1,8 @@ +# Import all the models, so that Base has them before being +# imported by Alembic +from app.db.base_class import Base # noqa +from app.models.block import Block # noqa +from app.models.transaction import Transaction # noqa +from app.models.arbitrage import Arbitrage # noqa +from app.models.pool import Pool # noqa +from app.models.dex import Dex # noqa \ No newline at end of file diff --git a/app/db/base_class.py b/app/db/base_class.py new file mode 100644 index 0000000..d88c48b --- /dev/null +++ b/app/db/base_class.py @@ -0,0 +1,14 @@ +from typing import Any + +from sqlalchemy.ext.declarative import as_declarative, declared_attr + + +@as_declarative() +class Base: + id: Any + __name__: str + + # Generate __tablename__ automatically based on class name + @declared_attr + def __tablename__(cls) -> str: + return cls.__name__.lower() \ No newline at end of file diff --git a/app/db/session.py b/app/db/session.py new file mode 100644 index 0000000..f45e16f --- /dev/null +++ b/app/db/session.py @@ -0,0 +1,21 @@ +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from app.core.config import settings + +# Create SQLite engine with check_same_thread=False for SQLite +engine = create_engine( + settings.SQLALCHEMY_DATABASE_URL, + connect_args={"check_same_thread": False} +) + +# Create session factory +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +# Dependency to get DB session +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() \ No newline at end of file diff --git a/app/models/__init__.py b/app/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/models/arbitrage.py b/app/models/arbitrage.py new file mode 100644 index 0000000..ec45ed6 --- /dev/null +++ b/app/models/arbitrage.py @@ -0,0 +1,64 @@ +from datetime import datetime +from sqlalchemy import ( + Column, Integer, String, DateTime, Float, + BigInteger, ForeignKey, JSON, Boolean, Text +) +from sqlalchemy.orm import relationship + +from app.db.base_class import Base + + +class Arbitrage(Base): + """Arbitrage transaction model.""" + + id = Column(Integer, primary_key=True, index=True) + transaction_id = Column(Integer, ForeignKey("transaction.id"), nullable=False) + initiator_address = Column(String, index=True, nullable=False) + start_token_address = Column(String, index=True, nullable=False) + start_token_symbol = Column(String, nullable=True) + start_amount = Column(Float, nullable=False) + end_amount = Column(Float, nullable=False) + profit_amount = Column(Float, nullable=False) # end_amount - start_amount + profit_percentage = Column(Float, nullable=False) # (end_amount - start_amount) / start_amount * 100 + success = Column(Boolean, nullable=False, default=False) + failure_reason = Column(Text, nullable=True) + gas_cost = Column(Float, nullable=True) # Estimated gas cost in SOL + net_profit = Column(Float, nullable=True) # profit_amount - gas_cost + legs_count = Column(Integer, nullable=False, default=0) + route_description = Column(Text, nullable=True) # Human-readable route + included_dexes = Column(JSON, nullable=True) # List of DEX names involved + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + # Relationships + transaction = relationship("Transaction", back_populates="arbitrages") + legs = relationship("ArbitrageLeg", back_populates="arbitrage", cascade="all, delete-orphan") + + def __repr__(self): + token = self.start_token_symbol or self.start_token_address[:8] + return f"" + + +class ArbitrageLeg(Base): + """Individual hop/leg in an arbitrage path.""" + + id = Column(Integer, primary_key=True, index=True) + arbitrage_id = Column(Integer, ForeignKey("arbitrage.id"), nullable=False) + leg_index = Column(Integer, nullable=False) # Order in the arbitrage path + pool_id = Column(Integer, ForeignKey("pool.id"), nullable=False) + token_in_address = Column(String, index=True, nullable=False) + token_in_symbol = Column(String, nullable=True) + token_in_amount = Column(Float, nullable=False) + token_out_address = Column(String, index=True, nullable=False) + token_out_symbol = Column(String, nullable=True) + token_out_amount = Column(Float, nullable=False) + price_impact = Column(Float, nullable=True) # Price impact percentage + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + # Relationships + arbitrage = relationship("Arbitrage", back_populates="legs") + pool = relationship("Pool", back_populates="arbitrage_legs") + + def __repr__(self): + in_token = self.token_in_symbol or self.token_in_address[:8] + out_token = self.token_out_symbol or self.token_out_address[:8] + return f"{out_token})>" \ No newline at end of file diff --git a/app/models/block.py b/app/models/block.py new file mode 100644 index 0000000..4c92015 --- /dev/null +++ b/app/models/block.py @@ -0,0 +1,26 @@ +from datetime import datetime +from sqlalchemy import Column, Integer, String, DateTime, BigInteger, Float +from sqlalchemy.orm import relationship + +from app.db.base_class import Base + + +class Block(Base): + """Solana blockchain block model.""" + + id = Column(Integer, primary_key=True, index=True) + block_height = Column(BigInteger, unique=True, index=True, nullable=False) + block_hash = Column(String, unique=True, index=True, nullable=False) + parent_block_hash = Column(String, index=True, nullable=True) + slot = Column(BigInteger, index=True, nullable=False) + block_time = Column(DateTime, index=True, nullable=True) + transactions_count = Column(Integer, nullable=False, default=0) + successful_transactions_count = Column(Integer, nullable=False, default=0) + processed = Column(Integer, default=0, nullable=False) # 0=not processed, 1=processed + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + # Relationships + transactions = relationship("Transaction", back_populates="block", cascade="all, delete-orphan") + + def __repr__(self): + return f"" \ No newline at end of file diff --git a/app/models/dex.py b/app/models/dex.py new file mode 100644 index 0000000..7047eff --- /dev/null +++ b/app/models/dex.py @@ -0,0 +1,27 @@ +from datetime import datetime +from sqlalchemy import Column, Integer, String, DateTime, Text, Float +from sqlalchemy.orm import relationship + +from app.db.base_class import Base + + +class Dex(Base): + """Decentralized Exchange (DEX) model.""" + + id = Column(Integer, primary_key=True, index=True) + name = Column(String, index=True, nullable=False) + address = Column(String, unique=True, index=True, nullable=False) + program_id = Column(String, index=True, nullable=False) + version = Column(String, nullable=True) + description = Column(Text, nullable=True) + website = Column(String, nullable=True) + volume_24h = Column(Float, nullable=True) + volume_7d = Column(Float, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + + # Relationships + pools = relationship("Pool", back_populates="dex") + + def __repr__(self): + return f"" \ No newline at end of file diff --git a/app/models/pool.py b/app/models/pool.py new file mode 100644 index 0000000..a05f1da --- /dev/null +++ b/app/models/pool.py @@ -0,0 +1,45 @@ +from datetime import datetime +from sqlalchemy import ( + Column, Integer, String, DateTime, Float, + BigInteger, ForeignKey, JSON, Boolean +) +from sqlalchemy.orm import relationship + +from app.db.base_class import Base + + +class Pool(Base): + """Liquidity pool model.""" + + id = Column(Integer, primary_key=True, index=True) + dex_id = Column(Integer, ForeignKey("dex.id"), nullable=False) + address = Column(String, unique=True, index=True, nullable=False) + token_a_address = Column(String, index=True, nullable=False) + token_a_symbol = Column(String, nullable=True) + token_a_name = Column(String, nullable=True) + token_a_decimals = Column(Integer, nullable=True) + token_b_address = Column(String, index=True, nullable=False) + token_b_symbol = Column(String, nullable=True) + token_b_name = Column(String, nullable=True) + token_b_decimals = Column(Integer, nullable=True) + token_a_reserve = Column(Float, nullable=True) + token_b_reserve = Column(Float, nullable=True) + last_updated_slot = Column(BigInteger, nullable=True) + volume_24h = Column(Float, nullable=True) + fees_24h = Column(Float, nullable=True) + tvl = Column(Float, nullable=True) # Total Value Locked in USD + fee_rate = Column(Float, nullable=True) # Pool fee percentage + pool_type = Column(String, nullable=True) # Constant product, stable, etc. + is_active = Column(Boolean, default=True, nullable=False) + metadata = Column(JSON, nullable=True) # Additional pool-specific data + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + + # Relationships + dex = relationship("Dex", back_populates="pools") + arbitrage_legs = relationship("ArbitrageLeg", back_populates="pool") + + def __repr__(self): + token_a = self.token_a_symbol or self.token_a_address[:8] + token_b = self.token_b_symbol or self.token_b_address[:8] + return f"" \ No newline at end of file diff --git a/app/models/transaction.py b/app/models/transaction.py new file mode 100644 index 0000000..d9256b4 --- /dev/null +++ b/app/models/transaction.py @@ -0,0 +1,32 @@ +from datetime import datetime +from sqlalchemy import ( + Column, Integer, String, DateTime, BigInteger, + Float, Boolean, ForeignKey, Text, JSON +) +from sqlalchemy.orm import relationship + +from app.db.base_class import Base + + +class Transaction(Base): + """Solana blockchain transaction model.""" + + id = Column(Integer, primary_key=True, index=True) + block_id = Column(Integer, ForeignKey("block.id"), nullable=False) + transaction_hash = Column(String, unique=True, index=True, nullable=False) + slot = Column(BigInteger, index=True, nullable=False) + signature = Column(String, unique=True, index=True, nullable=False) + success = Column(Boolean, nullable=False, default=False) + fee = Column(BigInteger, nullable=True) + fee_payer = Column(String, index=True, nullable=True) + program_ids = Column(JSON, nullable=True) # Array of program IDs involved + accounts = Column(JSON, nullable=True) # Array of account addresses involved + raw_data = Column(Text, nullable=True) # Raw transaction data for detailed analysis + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + # Relationships + block = relationship("Block", back_populates="transactions") + arbitrages = relationship("Arbitrage", back_populates="transaction", cascade="all, delete-orphan") + + def __repr__(self): + return f"" \ No newline at end of file diff --git a/app/schemas/__init__.py b/app/schemas/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/schemas/arbitrage.py b/app/schemas/arbitrage.py new file mode 100644 index 0000000..9b654b7 --- /dev/null +++ b/app/schemas/arbitrage.py @@ -0,0 +1,115 @@ +from datetime import datetime +from typing import List, Optional, Dict, Any, Union +from pydantic import BaseModel + + +class ArbitrageBase(BaseModel): + transaction_id: int + initiator_address: str + start_token_address: str + start_token_symbol: Optional[str] = None + start_amount: float + end_amount: float + profit_amount: float + profit_percentage: float + success: bool = False + failure_reason: Optional[str] = None + gas_cost: Optional[float] = None + net_profit: Optional[float] = None + legs_count: int = 0 + route_description: Optional[str] = None + + +class ArbitrageCreate(ArbitrageBase): + included_dexes: Optional[List[str]] = None + + +class ArbitrageUpdate(BaseModel): + start_token_symbol: Optional[str] = None + end_amount: Optional[float] = None + profit_amount: Optional[float] = None + profit_percentage: Optional[float] = None + success: Optional[bool] = None + failure_reason: Optional[str] = None + gas_cost: Optional[float] = None + net_profit: Optional[float] = None + legs_count: Optional[int] = None + route_description: Optional[str] = None + included_dexes: Optional[List[str]] = None + + +class ArbitrageInDBBase(ArbitrageBase): + id: int + included_dexes: Optional[List[str]] = None + created_at: datetime + + class Config: + from_attributes = True + + +class Arbitrage(ArbitrageInDBBase): + pass + + +class ArbitrageLegBase(BaseModel): + arbitrage_id: int + leg_index: int + pool_id: int + token_in_address: str + token_in_symbol: Optional[str] = None + token_in_amount: float + token_out_address: str + token_out_symbol: Optional[str] = None + token_out_amount: float + price_impact: Optional[float] = None + + +class ArbitrageLegCreate(ArbitrageLegBase): + pass + + +class ArbitrageLegUpdate(BaseModel): + token_in_symbol: Optional[str] = None + token_in_amount: Optional[float] = None + token_out_symbol: Optional[str] = None + token_out_amount: Optional[float] = None + price_impact: Optional[float] = None + + +class ArbitrageLegInDBBase(ArbitrageLegBase): + id: int + created_at: datetime + + class Config: + from_attributes = True + + +class ArbitrageLeg(ArbitrageLegInDBBase): + pass + + +class ArbitrageLegWithPool(ArbitrageLeg): + pool_address: str + dex_name: str + + +class ArbitrageWithLegs(Arbitrage): + legs: List[ArbitrageLegWithPool] + block_height: int + block_time: Optional[datetime] = None + + +class ArbitrageList(BaseModel): + arbitrages: List[Arbitrage] + total: int + + +class ArbitrageStats(BaseModel): + total_arbitrages: int + successful_arbitrages: int + failed_arbitrages: int + total_profit: float + avg_profit_percentage: float + max_profit_percentage: float + most_used_dexes: List[Dict[str, Union[str, int]]] + most_arbitraged_tokens: List[Dict[str, Union[str, int]]] \ No newline at end of file diff --git a/app/schemas/block.py b/app/schemas/block.py new file mode 100644 index 0000000..fc7efec --- /dev/null +++ b/app/schemas/block.py @@ -0,0 +1,41 @@ +from datetime import datetime +from typing import List, Optional +from pydantic import BaseModel + + +class BlockBase(BaseModel): + block_height: int + block_hash: str + parent_block_hash: Optional[str] = None + slot: int + block_time: Optional[datetime] = None + transactions_count: int = 0 + successful_transactions_count: int = 0 + + +class BlockCreate(BlockBase): + pass + + +class BlockUpdate(BaseModel): + transactions_count: Optional[int] = None + successful_transactions_count: Optional[int] = None + processed: Optional[int] = None + + +class BlockInDBBase(BlockBase): + id: int + created_at: datetime + processed: int + + class Config: + from_attributes = True + + +class Block(BlockInDBBase): + pass + + +class BlockList(BaseModel): + blocks: List[Block] + total: int \ No newline at end of file diff --git a/app/schemas/dex.py b/app/schemas/dex.py new file mode 100644 index 0000000..9d3806d --- /dev/null +++ b/app/schemas/dex.py @@ -0,0 +1,50 @@ +from datetime import datetime +from typing import List, Optional +from pydantic import BaseModel + + +class DexBase(BaseModel): + name: str + address: str + program_id: str + version: Optional[str] = None + description: Optional[str] = None + website: Optional[str] = None + + +class DexCreate(DexBase): + pass + + +class DexUpdate(BaseModel): + name: Optional[str] = None + program_id: Optional[str] = None + version: Optional[str] = None + description: Optional[str] = None + website: Optional[str] = None + volume_24h: Optional[float] = None + volume_7d: Optional[float] = None + + +class DexInDBBase(DexBase): + id: int + volume_24h: Optional[float] = None + volume_7d: Optional[float] = None + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True + + +class Dex(DexInDBBase): + pass + + +class DexWithPoolCount(Dex): + pool_count: int + + +class DexList(BaseModel): + dexes: List[DexWithPoolCount] + total: int \ No newline at end of file diff --git a/app/schemas/pool.py b/app/schemas/pool.py new file mode 100644 index 0000000..50dffa6 --- /dev/null +++ b/app/schemas/pool.py @@ -0,0 +1,72 @@ +from datetime import datetime +from typing import List, Optional, Dict, Any +from pydantic import BaseModel + + +class PoolBase(BaseModel): + dex_id: int + address: str + token_a_address: str + token_a_symbol: Optional[str] = None + token_a_name: Optional[str] = None + token_a_decimals: Optional[int] = None + token_b_address: str + token_b_symbol: Optional[str] = None + token_b_name: Optional[str] = None + token_b_decimals: Optional[int] = None + token_a_reserve: Optional[float] = None + token_b_reserve: Optional[float] = None + last_updated_slot: Optional[int] = None + fee_rate: Optional[float] = None + pool_type: Optional[str] = None + is_active: bool = True + + +class PoolCreate(PoolBase): + metadata: Optional[Dict[str, Any]] = None + + +class PoolUpdate(BaseModel): + token_a_symbol: Optional[str] = None + token_a_name: Optional[str] = None + token_a_decimals: Optional[int] = None + token_b_symbol: Optional[str] = None + token_b_name: Optional[str] = None + token_b_decimals: Optional[int] = None + token_a_reserve: Optional[float] = None + token_b_reserve: Optional[float] = None + last_updated_slot: Optional[int] = None + volume_24h: Optional[float] = None + fees_24h: Optional[float] = None + tvl: Optional[float] = None + fee_rate: Optional[float] = None + pool_type: Optional[str] = None + is_active: Optional[bool] = None + metadata: Optional[Dict[str, Any]] = None + + +class PoolInDBBase(PoolBase): + id: int + volume_24h: Optional[float] = None + fees_24h: Optional[float] = None + tvl: Optional[float] = None + metadata: Optional[Dict[str, Any]] = None + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True + + +class Pool(PoolInDBBase): + pass + + +class PoolWithDex(Pool): + dex_name: str + dex_address: str + + +class PoolList(BaseModel): + pools: List[PoolWithDex] + total: int \ No newline at end of file diff --git a/app/schemas/transaction.py b/app/schemas/transaction.py new file mode 100644 index 0000000..5bd96d0 --- /dev/null +++ b/app/schemas/transaction.py @@ -0,0 +1,50 @@ +from datetime import datetime +from typing import List, Optional, Any, Dict +from pydantic import BaseModel + + +class TransactionBase(BaseModel): + transaction_hash: str + slot: int + signature: str + success: bool = False + fee: Optional[int] = None + fee_payer: Optional[str] = None + program_ids: Optional[List[str]] = None + accounts: Optional[List[str]] = None + + +class TransactionCreate(TransactionBase): + block_id: int + raw_data: Optional[str] = None + + +class TransactionUpdate(BaseModel): + success: Optional[bool] = None + fee: Optional[int] = None + fee_payer: Optional[str] = None + program_ids: Optional[List[str]] = None + accounts: Optional[List[str]] = None + raw_data: Optional[str] = None + + +class TransactionInDBBase(TransactionBase): + id: int + block_id: int + created_at: datetime + + class Config: + from_attributes = True + + +class Transaction(TransactionInDBBase): + pass + + +class TransactionDetail(Transaction): + raw_data: Optional[str] = None + + +class TransactionList(BaseModel): + transactions: List[Transaction] + total: int \ No newline at end of file diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/arbitrage_detector.py b/app/services/arbitrage_detector.py new file mode 100644 index 0000000..20b3a32 --- /dev/null +++ b/app/services/arbitrage_detector.py @@ -0,0 +1,179 @@ +import json +from typing import List, Dict, Any, Optional, Set, Tuple +from decimal import Decimal + +from loguru import logger +from sqlalchemy.orm import Session + +from app.core.config import settings +from app.crud import block, transaction, dex, pool, arbitrage +from app.models.block import Block +from app.models.transaction import Transaction +from app.schemas.arbitrage import ArbitrageCreate, ArbitrageLegCreate + + +# Common DEX program IDs on Solana +DEX_PROGRAM_IDS = { + "jup-ag": "JUP4Fb2cqiRUcaTHdrPC8h2gNsA8fvKXYbXUJJqRUrZP", # Jupiter Aggregator + "orca": "whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc", # Orca Whirlpools + "raydium": "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8", # Raydium + "serum": "9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin", # Serum v3 + "aldrin": "CURVGoZn8zycx6FXwwevgBTB2gVvdbGTEpvMJDbgs2t4", # Aldrin + "saber": "SSwpkEEcbUqx4vtoEByFjSkhKdCT862DNVb52nZg1UZ", # Saber + "mercurial": "MERLuDFBMmsHnsBPZw2sDQZHvXFMwp8EdjudcU2HKky", # Mercurial + "lifinity": "EewxydAPCCVuNEyrVN68PuSYdQ7wKn27V9Gjeoi8dy3S", # Lifinity + "crema": "6MLxLqiXaaSUpkgMnLbse4ConZQZnxLi3VPYSNznE9Ky", # Crema Finance + "meteora": "M2mx93ekt1fmXSVkTrUL9xVFHkmME8HTUi5Cyc5aF7K", # Meteora + "cykura": "cysPXAjehMpVKUapzbMCCnpFxUFFryEWEaLgnb9NrR8", # Cykura +} + + +class ArbitrageDetector: + """Service for detecting arbitrage opportunities in Solana transactions.""" + + def __init__(self, db: Session): + """Initialize arbitrage detector.""" + self.db = db + + def detect_arbitrages(self, num_blocks: Optional[int] = None) -> int: + """ + Analyze unprocessed blocks to detect arbitrages. + + Returns the number of blocks processed. + """ + # Get unprocessed blocks + unprocessed_blocks = block.get_unprocessed_blocks( + self.db, limit=num_blocks or 100 + ) + + if not unprocessed_blocks: + logger.info("No unprocessed blocks found") + return 0 + + logger.info(f"Analyzing {len(unprocessed_blocks)} blocks for arbitrages") + + processed_count = 0 + for blk in unprocessed_blocks: + try: + # Get the transactions for this block + txs = transaction.get_by_block(self.db, block_id=blk.id) + + # Process each transaction + for tx in txs: + self._analyze_transaction(tx) + + # Mark block as processed + block.mark_as_processed(self.db, block_id=blk.id) + processed_count += 1 + + except Exception as e: + logger.error(f"Error processing block {blk.block_height}: {str(e)}") + continue + + logger.info(f"Processed {processed_count} blocks, detected arbitrages") + return processed_count + + def _analyze_transaction(self, tx: Transaction) -> None: + """ + Analyze a transaction for potential arbitrage patterns. + + This method looks for common arbitrage patterns: + 1. Same token in/out (circular trading) + 2. Jupiter/DEX aggregator interactions + 3. Multiple DEX interactions in the same transaction + """ + # Skip failed transactions + if not tx.success: + return + + # Skip transactions with no program IDs + if not tx.program_ids: + return + + # Check if there are any DEX program IDs in this transaction + has_dex_interaction = False + dex_names = [] + + for program_id in tx.program_ids: + for dex_name, dex_program in DEX_PROGRAM_IDS.items(): + if program_id == dex_program: + has_dex_interaction = True + dex_names.append(dex_name) + break + + # Jupiter aggregator is a strong signal for arbitrage + is_jupiter = any(p == DEX_PROGRAM_IDS["jup-ag"] for p in tx.program_ids) + + # Multiple different DEXes is another strong signal + multiple_dexes = len(set(dex_names)) > 1 + + if not (has_dex_interaction and (is_jupiter or multiple_dexes)): + return + + # For simplicity in this demonstration, we'll create a simplified arbitrage record + # In a real system, you would have logic to: + # 1. Decode the transaction instructions to identify token swaps + # 2. Track token transfers to identify the full arbitrage path + # 3. Calculate exact profit amounts using token price and amount data + + # Create a simplified arbitrage record + # In a real implementation, this would be based on actual analysis of instructions + + # Let's assume Jupiter transactions with multiple DEXes are arbitrages + if is_jupiter and multiple_dexes: + self._create_simplified_arbitrage(tx, dex_names) + + def _create_simplified_arbitrage(self, tx: Transaction, dex_names: List[str]) -> None: + """ + Create a simplified arbitrage record for demonstration purposes. + + In a real implementation, this would be replaced with actual analysis of + the transaction instructions to determine: + - The exact tokens involved + - The exact amount in/out + - The specific pools and swap paths used + """ + # For demonstration, we'll create a simulated arbitrage + # with random values + + # Default values for demonstration + initiator = tx.fee_payer + success = True # We only detect successful arbitrages in this demo + start_token = "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v" # USDC token address + start_token_symbol = "USDC" + + # Simulate profit values + start_amount = 1000.0 # $1000 USDC + profit_percentage = 0.5 # 0.5% profit + end_amount = start_amount * (1 + profit_percentage / 100) + profit_amount = end_amount - start_amount + gas_cost = 0.01 # Solana gas cost in SOL + net_profit = profit_amount - gas_cost + + # Create a description of the arbitrage path + route_description = f"USDC → {'→'.join(dex_names)} → USDC" + + # Create the arbitrage + arbitrage_in = ArbitrageCreate( + transaction_id=tx.id, + initiator_address=initiator, + start_token_address=start_token, + start_token_symbol=start_token_symbol, + start_amount=start_amount, + end_amount=end_amount, + profit_amount=profit_amount, + profit_percentage=profit_percentage, + success=success, + gas_cost=gas_cost, + net_profit=net_profit, + legs_count=len(dex_names), + route_description=route_description, + included_dexes=dex_names + ) + + # In a real implementation, we would create actual legs with real pool data + # For now, we'll skip leg creation since we don't have real data + legs = [] + + # Create the arbitrage record + arbitrage.create_with_legs(self.db, obj_in=arbitrage_in, legs=legs) \ No newline at end of file diff --git a/app/services/blockchain_fetcher.py b/app/services/blockchain_fetcher.py new file mode 100644 index 0000000..134d3bd --- /dev/null +++ b/app/services/blockchain_fetcher.py @@ -0,0 +1,204 @@ +import time +from datetime import datetime +from typing import Dict, List, Optional, Any, Tuple, Set + +from loguru import logger +from sqlalchemy.orm import Session + +from app.core.config import settings +from app.services.solana_client import SolanaRpcClient +from app.models.block import Block +from app.models.transaction import Transaction +from app.schemas.block import BlockCreate +from app.schemas.transaction import TransactionCreate + + +class BlockchainFetcher: + """Service for fetching and processing blockchain data.""" + + def __init__(self, db: Session): + """Initialize blockchain fetcher service.""" + self.db = db + self.solana_client = SolanaRpcClient() + + def fetch_latest_blocks(self, num_blocks: Optional[int] = None) -> List[Block]: + """Fetch the latest blocks from the Solana blockchain.""" + num_blocks = num_blocks or settings.BLOCKS_TO_FETCH + + try: + latest_height = self.solana_client.get_latest_block_height() + logger.info(f"Latest block height: {latest_height}") + + # Get the last processed block from the database + last_processed_block = self.db.query(Block).order_by(Block.block_height.desc()).first() + + if last_processed_block: + start_slot = last_processed_block.slot + 1 + else: + # If no blocks in DB, start from a recent block (latest - num_blocks) + start_slot = max(0, latest_height - num_blocks) + + end_slot = start_slot + num_blocks + logger.info(f"Fetching blocks from slot {start_slot} to {end_slot}") + + # Get confirmed slots in the range + slots = self.solana_client.get_blocks_in_range(start_slot, end_slot) + logger.info(f"Found {len(slots)} confirmed blocks in range") + + # Fetch and process each block + fetched_blocks = [] + for slot in slots: + block_data = self.solana_client.get_block(slot) + if block_data: + block = self._process_block(block_data) + if block: + fetched_blocks.append(block) + + logger.info(f"Successfully processed {len(fetched_blocks)} blocks") + return fetched_blocks + + except Exception as e: + logger.error(f"Error fetching latest blocks: {str(e)}") + return [] + + def _process_block(self, block_data: Dict[str, Any]) -> Optional[Block]: + """Process raw block data and store in database.""" + try: + # Extract block information + block_height = block_data.get("blockHeight") + block_hash = block_data.get("blockhash") + parent_hash = block_data.get("previousBlockhash") + slot = block_data.get("parentSlot", 0) + 1 # parentSlot is the previous slot + + # Convert block timestamp to datetime if available + block_time = None + if "blockTime" in block_data: + block_time = datetime.fromtimestamp(block_data["blockTime"]) + + # Get transactions + transactions = block_data.get("transactions", []) + successful_txs = sum(1 for tx in transactions if tx.get("meta", {}).get("err") is None) + + # Create block record + block_in = BlockCreate( + block_height=block_height, + block_hash=block_hash, + parent_block_hash=parent_hash, + slot=slot, + block_time=block_time, + transactions_count=len(transactions), + successful_transactions_count=successful_txs + ) + + # Check if block already exists + existing_block = self.db.query(Block).filter(Block.block_hash == block_hash).first() + if existing_block: + logger.debug(f"Block {block_hash} already exists in database") + return existing_block + + # Create new block + db_block = Block(**block_in.model_dump()) + self.db.add(db_block) + self.db.commit() + self.db.refresh(db_block) + + # Process transactions + self._process_transactions(db_block.id, transactions) + + return db_block + + except Exception as e: + logger.error(f"Error processing block: {str(e)}") + self.db.rollback() + return None + + def _process_transactions(self, block_id: int, transactions: List[Dict[str, Any]]) -> None: + """Process and store transactions from a block.""" + for tx_data in transactions: + try: + # Extract transaction data + tx_meta = tx_data.get("meta", {}) + tx = tx_data.get("transaction", {}) + + if not tx: + continue + + # Get transaction hash + tx_hash = tx.get("signatures", [""])[0] + + # Check if transaction already exists + existing_tx = self.db.query(Transaction).filter(Transaction.transaction_hash == tx_hash).first() + if existing_tx: + logger.debug(f"Transaction {tx_hash} already exists in database") + continue + + # Extract accounts involved + accounts = [] + if "message" in tx and "accountKeys" in tx["message"]: + accounts = tx["message"]["accountKeys"] + + # Extract program IDs + program_ids = set() + if "message" in tx and "instructions" in tx["message"]: + for ix in tx["message"]["instructions"]: + if "programIdIndex" in ix: + program_idx = ix["programIdIndex"] + if program_idx < len(accounts): + program_ids.add(accounts[program_idx]) + + # Create transaction record + tx_in = TransactionCreate( + block_id=block_id, + transaction_hash=tx_hash, + signature=tx_hash, + slot=tx_data.get("slot", 0), + success=tx_meta.get("err") is None, + fee=tx_meta.get("fee", 0), + fee_payer=accounts[0] if accounts else None, + program_ids=list(program_ids) if program_ids else None, + accounts=accounts, + raw_data=None # We don't store full raw data to save space + ) + + db_tx = Transaction(**tx_in.model_dump()) + self.db.add(db_tx) + + except Exception as e: + logger.error(f"Error processing transaction: {str(e)}") + continue + + self.db.commit() + + def run_continuous_fetcher(self, polling_interval: Optional[int] = None): + """Run a continuous fetching process in a blocking loop.""" + polling_interval = polling_interval or settings.POLLING_INTERVAL + + logger.info(f"Starting continuous blockchain fetcher with {polling_interval}s interval") + + try: + while True: + start_time = time.time() + + # Fetch latest blocks + blocks = self.fetch_latest_blocks() + logger.info(f"Fetched {len(blocks)} new blocks") + + # Calculate time to sleep + elapsed = time.time() - start_time + sleep_time = max(0, polling_interval - elapsed) + + if sleep_time > 0: + logger.debug(f"Sleeping for {sleep_time:.2f} seconds") + time.sleep(sleep_time) + + except KeyboardInterrupt: + logger.info("Blockchain fetcher stopped by user") + + except Exception as e: + logger.error(f"Error in continuous blockchain fetcher: {str(e)}") + raise + + finally: + if hasattr(self, 'solana_client'): + self.solana_client.close() + logger.info("Solana client connection closed") \ No newline at end of file diff --git a/app/services/solana_client.py b/app/services/solana_client.py new file mode 100644 index 0000000..9652bae --- /dev/null +++ b/app/services/solana_client.py @@ -0,0 +1,178 @@ +import json +from datetime import datetime +from typing import Dict, List, Optional, Any, Tuple, Union + +import httpx +from loguru import logger +from solana.rpc.api import Client as SolanaClient + +from app.core.config import settings + + +class SolanaRpcClient: + """Client for interacting with Solana blockchain via RPC.""" + + def __init__(self, rpc_url: Optional[str] = None): + """Initialize Solana RPC client.""" + self.rpc_url = rpc_url or settings.SOLANA_RPC_URL + self.client = SolanaClient(self.rpc_url) + self.http_client = httpx.Client(timeout=30.0) # For custom RPC calls + + def get_latest_block_height(self) -> int: + """Get the current block height of the Solana blockchain.""" + try: + resp = self.client.get_block_height() + if resp.get("result") is not None: + return resp["result"] + else: + logger.error(f"Failed to get latest block height: {resp}") + raise Exception(f"Failed to get latest block height: {resp}") + except Exception as e: + logger.error(f"Error getting latest block height: {str(e)}") + raise + + def get_block(self, slot_or_block: Union[int, str]) -> Optional[Dict[str, Any]]: + """Get block data by slot number or block hash.""" + try: + resp = self.client.get_block( + slot_or_block, + encoding="json", + max_supported_transaction_version=0, + transaction_details="full", + rewards=False + ) + if resp.get("result") is not None: + return resp["result"] + else: + logger.warning(f"Block not found or error for slot/hash {slot_or_block}: {resp}") + return None + except Exception as e: + logger.error(f"Error fetching block {slot_or_block}: {str(e)}") + return None + + def get_blocks_in_range(self, start_slot: int, end_slot: int) -> List[int]: + """Get a list of confirmed blocks in the given slot range.""" + try: + resp = self.client.get_blocks(start_slot, end_slot) + if resp.get("result") is not None: + return resp["result"] + else: + logger.error(f"Failed to get blocks in range {start_slot}-{end_slot}: {resp}") + return [] + except Exception as e: + logger.error(f"Error getting blocks in range {start_slot}-{end_slot}: {str(e)}") + return [] + + def get_transaction(self, signature: str) -> Optional[Dict[str, Any]]: + """Get transaction details by signature.""" + try: + resp = self.client.get_transaction( + signature, + encoding="json", + max_supported_transaction_version=0 + ) + if resp.get("result") is not None: + return resp["result"] + else: + logger.warning(f"Transaction not found or error for signature {signature}: {resp}") + return None + except Exception as e: + logger.error(f"Error fetching transaction {signature}: {str(e)}") + return None + + def get_token_accounts_by_owner(self, owner_address: str) -> List[Dict[str, Any]]: + """Get all token accounts owned by the given address.""" + try: + resp = self.client.get_token_accounts_by_owner( + owner_address, + {"programId": "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"}, # SPL Token program + encoding="jsonParsed" + ) + if resp.get("result") and resp["result"].get("value"): + return resp["result"]["value"] + else: + logger.warning(f"No token accounts found for owner {owner_address}") + return [] + except Exception as e: + logger.error(f"Error fetching token accounts for {owner_address}: {str(e)}") + return [] + + def get_token_supply(self, token_mint: str) -> Optional[Dict[str, Any]]: + """Get the total supply of a token.""" + try: + resp = self.client.get_token_supply(token_mint) + if resp.get("result") is not None: + return resp["result"] + else: + logger.warning(f"Error getting token supply for {token_mint}: {resp}") + return None + except Exception as e: + logger.error(f"Error fetching token supply for {token_mint}: {str(e)}") + return None + + def get_multiple_accounts(self, pubkeys: List[str]) -> List[Optional[Dict[str, Any]]]: + """Get data for multiple accounts at once.""" + try: + resp = self.client.get_multiple_accounts( + pubkeys, + encoding="jsonParsed" + ) + if resp.get("result") and resp["result"].get("value"): + return resp["result"]["value"] + else: + logger.warning(f"Error getting multiple accounts: {resp}") + return [None] * len(pubkeys) + except Exception as e: + logger.error(f"Error fetching multiple accounts: {str(e)}") + return [None] * len(pubkeys) + + def get_program_accounts( + self, + program_id: str, + filters: Optional[List[Dict[str, Any]]] = None + ) -> List[Dict[str, Any]]: + """Get all accounts owned by the given program.""" + try: + resp = self.client.get_program_accounts( + program_id, + encoding="jsonParsed", + filters=filters or [] + ) + if resp.get("result") is not None: + return resp["result"] + else: + logger.warning(f"Error getting program accounts for {program_id}: {resp}") + return [] + except Exception as e: + logger.error(f"Error fetching program accounts for {program_id}: {str(e)}") + return [] + + def custom_rpc_call(self, method: str, params: List[Any]) -> Optional[Dict[str, Any]]: + """Make a custom RPC call to the Solana node.""" + try: + payload = { + "jsonrpc": "2.0", + "id": 1, + "method": method, + "params": params + } + response = self.http_client.post( + self.rpc_url, + json=payload, + headers={"Content-Type": "application/json"} + ) + response.raise_for_status() + result = response.json() + if "result" in result: + return result["result"] + else: + logger.warning(f"Error in custom RPC call {method}: {result}") + return None + except Exception as e: + logger.error(f"Error in custom RPC call {method}: {str(e)}") + return None + + def close(self): + """Close the HTTP client connection.""" + if hasattr(self, 'http_client'): + self.http_client.close() \ No newline at end of file diff --git a/app/services/worker.py b/app/services/worker.py new file mode 100644 index 0000000..53d5e80 --- /dev/null +++ b/app/services/worker.py @@ -0,0 +1,120 @@ +import threading +import time +from typing import Optional + +from loguru import logger +from sqlalchemy.orm import Session + +from app.db.session import SessionLocal +from app.services.blockchain_fetcher import BlockchainFetcher +from app.services.arbitrage_detector import ArbitrageDetector +from app.core.config import settings + + +class BackgroundWorker: + """Background worker for fetching blockchain data and analyzing arbitrages.""" + + def __init__(self): + """Initialize the background worker.""" + self._fetcher_thread = None + self._detector_thread = None + self._stop_event = threading.Event() + + def start_fetcher(self): + """Start the blockchain fetcher in a background thread.""" + if self._fetcher_thread and self._fetcher_thread.is_alive(): + logger.warning("Blockchain fetcher already running") + return False + + self._stop_event.clear() + self._fetcher_thread = threading.Thread(target=self._run_fetcher) + self._fetcher_thread.daemon = True + self._fetcher_thread.start() + + logger.info("Blockchain fetcher started") + return True + + def start_detector(self): + """Start the arbitrage detector in a background thread.""" + if self._detector_thread and self._detector_thread.is_alive(): + logger.warning("Arbitrage detector already running") + return False + + self._stop_event.clear() + self._detector_thread = threading.Thread(target=self._run_detector) + self._detector_thread.daemon = True + self._detector_thread.start() + + logger.info("Arbitrage detector started") + return True + + def stop(self): + """Stop all background threads.""" + if not (self._fetcher_thread or self._detector_thread): + logger.warning("No background threads are running") + return False + + self._stop_event.set() + + if self._fetcher_thread: + self._fetcher_thread.join(timeout=5.0) + self._fetcher_thread = None + + if self._detector_thread: + self._detector_thread.join(timeout=5.0) + self._detector_thread = None + + logger.info("All background threads stopped") + return True + + def _run_fetcher(self): + """Run the blockchain fetcher continuously.""" + db = SessionLocal() + try: + fetcher = BlockchainFetcher(db) + + while not self._stop_event.is_set(): + try: + # Fetch latest blocks + blocks = fetcher.fetch_latest_blocks() + logger.info(f"Fetched {len(blocks)} new blocks") + + # Sleep for the configured interval + self._stop_event.wait(settings.POLLING_INTERVAL) + + except Exception as e: + logger.error(f"Error in blockchain fetcher: {str(e)}") + # Sleep a bit before retrying + self._stop_event.wait(10) + + finally: + db.close() + logger.info("Blockchain fetcher thread stopped") + + def _run_detector(self): + """Run the arbitrage detector continuously.""" + db = SessionLocal() + try: + detector = ArbitrageDetector(db) + + while not self._stop_event.is_set(): + try: + # Process blocks that haven't been analyzed yet + unprocessed_count = detector.detect_arbitrages() + logger.info(f"Analyzed {unprocessed_count} blocks for arbitrages") + + # Sleep a bit to avoid hammering the database + self._stop_event.wait(10) + + except Exception as e: + logger.error(f"Error in arbitrage detector: {str(e)}") + # Sleep a bit before retrying + self._stop_event.wait(10) + + finally: + db.close() + logger.info("Arbitrage detector thread stopped") + + +# Singleton instance for the application +worker = BackgroundWorker() \ No newline at end of file diff --git a/app/utils/__init__.py b/app/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/main.py b/main.py new file mode 100644 index 0000000..ea4c9d0 --- /dev/null +++ b/main.py @@ -0,0 +1,34 @@ +import uvicorn +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware + +from app.api.routes import api_router +from app.core.config import settings + +app = FastAPI( + title=settings.PROJECT_NAME, + description="Solana Arbitrage Analytics Backend API", + version="0.1.0", + docs_url="/docs", + redoc_url="/redoc", +) + +# Set up CORS +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Include API router +app.include_router(api_router, prefix=settings.API_V1_STR) + +# Health check endpoint +@app.get("/health", tags=["health"]) +async def health_check(): + return {"status": "healthy"} + +if __name__ == "__main__": + uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) \ No newline at end of file diff --git a/migrations/README b/migrations/README new file mode 100644 index 0000000..fc1a8d5 --- /dev/null +++ b/migrations/README @@ -0,0 +1,13 @@ +Generic single-database configuration with Alembic. + +This directory contains database migrations for the Solana Arbitrage Analytics Backend. + +To apply migrations: +``` +alembic upgrade head +``` + +To create a new migration: +``` +alembic revision -m "description of changes" +``` \ No newline at end of file diff --git a/migrations/env.py b/migrations/env.py new file mode 100644 index 0000000..c74adbe --- /dev/null +++ b/migrations/env.py @@ -0,0 +1,81 @@ +import os +import sys +from logging.config import fileConfig + +from sqlalchemy import engine_from_config +from sqlalchemy import pool + +from alembic import context + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) +from app.db.base import Base +target_metadata = Base.metadata + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline(): + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(): + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure( + connection=connection, target_metadata=target_metadata + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() \ No newline at end of file diff --git a/migrations/script.py.mako b/migrations/script.py.mako new file mode 100644 index 0000000..1e4564e --- /dev/null +++ b/migrations/script.py.mako @@ -0,0 +1,24 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade(): + ${upgrades if upgrades else "pass"} + + +def downgrade(): + ${downgrades if downgrades else "pass"} \ No newline at end of file diff --git a/migrations/versions/01_initial_tables.py b/migrations/versions/01_initial_tables.py new file mode 100644 index 0000000..48f64c1 --- /dev/null +++ b/migrations/versions/01_initial_tables.py @@ -0,0 +1,178 @@ +"""initial tables + +Revision ID: 01_initial_tables +Revises: +Create Date: 2023-12-07 14:00:00.000000 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import sqlite + +# revision identifiers, used by Alembic. +revision = '01_initial_tables' +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade(): + # Create block table + op.create_table( + 'block', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('block_height', sa.BigInteger(), nullable=False), + sa.Column('block_hash', sa.String(), nullable=False), + sa.Column('parent_block_hash', sa.String(), nullable=True), + sa.Column('slot', sa.BigInteger(), nullable=False), + sa.Column('block_time', sa.DateTime(), nullable=True), + sa.Column('transactions_count', sa.Integer(), nullable=False, default=0), + sa.Column('successful_transactions_count', sa.Integer(), nullable=False, default=0), + sa.Column('processed', sa.Integer(), nullable=False, default=0), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_block_block_hash'), 'block', ['block_hash'], unique=True) + op.create_index(op.f('ix_block_block_height'), 'block', ['block_height'], unique=True) + op.create_index(op.f('ix_block_id'), 'block', ['id'], unique=False) + op.create_index(op.f('ix_block_parent_block_hash'), 'block', ['parent_block_hash'], unique=False) + op.create_index(op.f('ix_block_slot'), 'block', ['slot'], unique=False) + op.create_index(op.f('ix_block_block_time'), 'block', ['block_time'], unique=False) + + # Create transaction table + op.create_table( + 'transaction', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('block_id', sa.Integer(), nullable=False), + sa.Column('transaction_hash', sa.String(), nullable=False), + sa.Column('slot', sa.BigInteger(), nullable=False), + sa.Column('signature', sa.String(), nullable=False), + sa.Column('success', sa.Boolean(), nullable=False, default=False), + sa.Column('fee', sa.BigInteger(), nullable=True), + sa.Column('fee_payer', sa.String(), nullable=True), + sa.Column('program_ids', sa.JSON(), nullable=True), + sa.Column('accounts', sa.JSON(), nullable=True), + sa.Column('raw_data', sa.Text(), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint(['block_id'], ['block.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_transaction_id'), 'transaction', ['id'], unique=False) + op.create_index(op.f('ix_transaction_transaction_hash'), 'transaction', ['transaction_hash'], unique=True) + op.create_index(op.f('ix_transaction_signature'), 'transaction', ['signature'], unique=True) + op.create_index(op.f('ix_transaction_slot'), 'transaction', ['slot'], unique=False) + op.create_index(op.f('ix_transaction_fee_payer'), 'transaction', ['fee_payer'], unique=False) + + # Create dex table + op.create_table( + 'dex', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('name', sa.String(), nullable=False), + sa.Column('address', sa.String(), nullable=False), + sa.Column('program_id', sa.String(), nullable=False), + sa.Column('version', sa.String(), nullable=True), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('website', sa.String(), nullable=True), + sa.Column('volume_24h', sa.Float(), nullable=True), + sa.Column('volume_7d', sa.Float(), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('updated_at', sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_dex_id'), 'dex', ['id'], unique=False) + op.create_index(op.f('ix_dex_name'), 'dex', ['name'], unique=False) + op.create_index(op.f('ix_dex_address'), 'dex', ['address'], unique=True) + op.create_index(op.f('ix_dex_program_id'), 'dex', ['program_id'], unique=False) + + # Create pool table + op.create_table( + 'pool', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('dex_id', sa.Integer(), nullable=False), + sa.Column('address', sa.String(), nullable=False), + sa.Column('token_a_address', sa.String(), nullable=False), + sa.Column('token_a_symbol', sa.String(), nullable=True), + sa.Column('token_a_name', sa.String(), nullable=True), + sa.Column('token_a_decimals', sa.Integer(), nullable=True), + sa.Column('token_b_address', sa.String(), nullable=False), + sa.Column('token_b_symbol', sa.String(), nullable=True), + sa.Column('token_b_name', sa.String(), nullable=True), + sa.Column('token_b_decimals', sa.Integer(), nullable=True), + sa.Column('token_a_reserve', sa.Float(), nullable=True), + sa.Column('token_b_reserve', sa.Float(), nullable=True), + sa.Column('last_updated_slot', sa.BigInteger(), nullable=True), + sa.Column('volume_24h', sa.Float(), nullable=True), + sa.Column('fees_24h', sa.Float(), nullable=True), + sa.Column('tvl', sa.Float(), nullable=True), + sa.Column('fee_rate', sa.Float(), nullable=True), + sa.Column('pool_type', sa.String(), nullable=True), + sa.Column('is_active', sa.Boolean(), nullable=False, default=True), + sa.Column('metadata', sa.JSON(), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('updated_at', sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint(['dex_id'], ['dex.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_pool_id'), 'pool', ['id'], unique=False) + op.create_index(op.f('ix_pool_address'), 'pool', ['address'], unique=True) + op.create_index(op.f('ix_pool_token_a_address'), 'pool', ['token_a_address'], unique=False) + op.create_index(op.f('ix_pool_token_b_address'), 'pool', ['token_b_address'], unique=False) + + # Create arbitrage table + op.create_table( + 'arbitrage', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('transaction_id', sa.Integer(), nullable=False), + sa.Column('initiator_address', sa.String(), nullable=False), + sa.Column('start_token_address', sa.String(), nullable=False), + sa.Column('start_token_symbol', sa.String(), nullable=True), + sa.Column('start_amount', sa.Float(), nullable=False), + sa.Column('end_amount', sa.Float(), nullable=False), + sa.Column('profit_amount', sa.Float(), nullable=False), + sa.Column('profit_percentage', sa.Float(), nullable=False), + sa.Column('success', sa.Boolean(), nullable=False, default=False), + sa.Column('failure_reason', sa.Text(), nullable=True), + sa.Column('gas_cost', sa.Float(), nullable=True), + sa.Column('net_profit', sa.Float(), nullable=True), + sa.Column('legs_count', sa.Integer(), nullable=False, default=0), + sa.Column('route_description', sa.Text(), nullable=True), + sa.Column('included_dexes', sa.JSON(), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint(['transaction_id'], ['transaction.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_arbitrage_id'), 'arbitrage', ['id'], unique=False) + op.create_index(op.f('ix_arbitrage_initiator_address'), 'arbitrage', ['initiator_address'], unique=False) + op.create_index(op.f('ix_arbitrage_start_token_address'), 'arbitrage', ['start_token_address'], unique=False) + + # Create arbitrage_leg table + op.create_table( + 'arbitrageleg', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('arbitrage_id', sa.Integer(), nullable=False), + sa.Column('leg_index', sa.Integer(), nullable=False), + sa.Column('pool_id', sa.Integer(), nullable=False), + sa.Column('token_in_address', sa.String(), nullable=False), + sa.Column('token_in_symbol', sa.String(), nullable=True), + sa.Column('token_in_amount', sa.Float(), nullable=False), + sa.Column('token_out_address', sa.String(), nullable=False), + sa.Column('token_out_symbol', sa.String(), nullable=True), + sa.Column('token_out_amount', sa.Float(), nullable=False), + sa.Column('price_impact', sa.Float(), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint(['arbitrage_id'], ['arbitrage.id'], ), + sa.ForeignKeyConstraint(['pool_id'], ['pool.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_arbitrageleg_id'), 'arbitrageleg', ['id'], unique=False) + op.create_index(op.f('ix_arbitrageleg_token_in_address'), 'arbitrageleg', ['token_in_address'], unique=False) + op.create_index(op.f('ix_arbitrageleg_token_out_address'), 'arbitrageleg', ['token_out_address'], unique=False) + + +def downgrade(): + op.drop_table('arbitrageleg') + op.drop_table('arbitrage') + op.drop_table('pool') + op.drop_table('dex') + op.drop_table('transaction') + op.drop_table('block') \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..eb9c1b8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,12 @@ +fastapi==0.103.1 +uvicorn==0.23.2 +sqlalchemy==2.0.20 +pydantic==2.3.0 +alembic==1.12.0 +solana==0.30.2 +loguru==0.7.2 +httpx==0.25.0 +numpy==1.25.2 +pandas==2.1.0 +python-multipart==0.0.6 +python-dotenv==1.0.0 \ No newline at end of file