Implement cryptocurrency data service with CoinCap API integration

This commit is contained in:
Automated Action 2025-05-14 12:41:37 +00:00
parent 571c430fec
commit 69bbdd9533
33 changed files with 1434 additions and 2 deletions

108
README.md
View File

@ -1,3 +1,107 @@
# FastAPI Application
# Cryptocurrency Data Service
This is a FastAPI application bootstrapped by BackendIM, the AI-powered backend generation platform.
A FastAPI backend service that provides cryptocurrency market data by integrating with the CoinCap API v3.
## Features
- **Real-time Cryptocurrency Data**: Access comprehensive data for thousands of cryptocurrencies
- **Market Information**: Detailed exchange and trading pair data
- **Historical Prices**: Access historical price data with various time intervals
- **Conversion Rates**: Get current conversion rates between cryptocurrencies and fiat currencies
- **Data Persistence**: Local SQLite database for caching and quick retrieval
- **Health Monitoring**: Built-in health check endpoint
## Tech Stack
- **FastAPI**: Modern, high-performance web framework
- **SQLAlchemy**: SQL toolkit and ORM
- **Alembic**: Database migration tool
- **SQLite**: Lightweight relational database
- **HTTPX**: Asynchronous HTTP client
- **Pydantic**: Data validation and settings management
- **Uvicorn**: ASGI server
## API Endpoints
The service provides the following API endpoints:
### Health Check
- `GET /health`: Check the health status of the service and its dependencies
### Assets
- `GET /api/assets`: Get a list of cryptocurrency assets with optional filtering and pagination
- `GET /api/assets/{slug}`: Get detailed information about a specific asset
- `GET /api/assets/{slug}/markets`: Get market information for a specific asset
- `GET /api/assets/{slug}/history`: Get historical price data for a specific asset
### Exchanges
- `GET /api/exchanges`: Get a list of exchanges with optional pagination
- `GET /api/exchanges/{exchange_id}`: Get detailed information about a specific exchange
### Markets
- `GET /api/markets`: Get a list of markets with optional filtering and pagination
### Rates
- `GET /api/rates`: Get a list of conversion rates
- `GET /api/rates/{slug}`: Get a specific conversion rate
## Installation and Setup
### Prerequisites
- Python 3.8 or higher
### Installation
1. Clone the repository:
```bash
git clone <repository-url>
cd cryptocurrency-data-service
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Set up environment variables (optional):
```bash
# Create a .env file
COINCAP_API_KEY=your_api_key
COINCAP_API_BASE_URL=https://rest.coincap.io/v3
```
4. Run database migrations:
```bash
alembic upgrade head
```
### Running the Application
Start the server with:
```bash
uvicorn main:app --reload
```
The API will be available at http://localhost:8000
## API Documentation
Once the server is running, you can access the API documentation at:
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc
## Data Sources
This service uses the [CoinCap API v3](https://rest.coincap.io/v3) to provide cryptocurrency data. The API key is required for production use.
## License
[MIT](LICENSE)

106
alembic.ini Normal file
View File

@ -0,0 +1,106 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = migrations
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:migrations/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# SQLite URL - Using absolute path as required
sqlalchemy.url = sqlite:////app/storage/db/db.sqlite
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

0
app/__init__.py Normal file
View File

0
app/api/__init__.py Normal file
View File

View File

124
app/api/routers/assets.py Normal file
View File

@ -0,0 +1,124 @@
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from typing import List, Optional
import logging
from app.database import get_db
from app.schemas.assets import (
AssetResponse, SingleAssetResponse, AssetHistoryResponse,
AssetMarketsResponse, ErrorResponse
)
from app.services.coincap_client import CoinCapClient
router = APIRouter(prefix="/assets", tags=["Assets"])
logger = logging.getLogger(__name__)
# Initialize client
client = CoinCapClient()
@router.get(
"",
response_model=AssetResponse,
summary="Get list of assets",
description="Retrieve a list of assets with optional filters"
)
async def get_assets(
search: Optional[str] = None,
ids: Optional[str] = None,
limit: Optional[int] = Query(100, ge=1, le=2000),
offset: Optional[int] = Query(0, ge=0),
db: Session = Depends(get_db)
):
try:
response = await client.get_assets(search=search, ids=ids, limit=limit, offset=offset)
return response
except Exception as e:
logger.error(f"Error fetching assets: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get(
"/{slug}",
response_model=SingleAssetResponse,
responses={404: {"model": ErrorResponse}},
summary="Get a specific asset",
description="Retrieve details for a specific asset by its slug"
)
async def get_asset(slug: str, db: Session = Depends(get_db)):
try:
response = await client.get_asset(slug)
if not response.get("data"):
raise HTTPException(status_code=404, detail=f"{slug} not found")
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error fetching asset {slug}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get(
"/{slug}/markets",
response_model=AssetMarketsResponse,
responses={404: {"model": ErrorResponse}},
summary="Get markets for an asset",
description="Retrieve markets for a specific asset"
)
async def get_asset_markets(
slug: str,
limit: Optional[int] = Query(100, ge=1, le=2000),
offset: Optional[int] = Query(0, ge=0),
db: Session = Depends(get_db)
):
try:
response = await client.get_asset_markets(slug, limit=limit, offset=offset)
if not response.get("data") and isinstance(response.get("data"), list):
raise HTTPException(status_code=404, detail=f"{slug} not found")
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error fetching markets for asset {slug}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get(
"/{slug}/history",
response_model=AssetHistoryResponse,
responses={404: {"model": ErrorResponse}},
summary="Get historical data for an asset",
description="Retrieve historical price data for a specific asset"
)
async def get_asset_history(
slug: str,
interval: str = Query(..., description="Time interval (m1, m5, m15, m30, h1, h2, h6, h12, d1)"),
start: Optional[int] = None,
end: Optional[int] = None,
db: Session = Depends(get_db)
):
# Validate interval
valid_intervals = ["m1", "m5", "m15", "m30", "h1", "h2", "h6", "h12", "d1"]
if interval not in valid_intervals:
raise HTTPException(
status_code=400,
detail=f"Invalid interval. Must be one of: {', '.join(valid_intervals)}"
)
# Validate start and end
if (start is None and end is not None) or (start is not None and end is None):
raise HTTPException(
status_code=400,
detail="Both start and end must be provided together or neither should be provided"
)
try:
response = await client.get_asset_history(
slug, interval=interval, start=start, end=end
)
if not response.get("data") and isinstance(response.get("data"), list):
raise HTTPException(status_code=404, detail=f"{slug} not found")
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error fetching history for asset {slug}: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@ -0,0 +1,53 @@
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from typing import List, Optional
import logging
from app.database import get_db
from app.schemas.exchanges import (
ExchangesResponse, SingleExchangeResponse, ErrorResponse
)
from app.services.coincap_client import CoinCapClient
router = APIRouter(prefix="/exchanges", tags=["Exchanges"])
logger = logging.getLogger(__name__)
# Initialize client
client = CoinCapClient()
@router.get(
"",
response_model=ExchangesResponse,
summary="Get list of exchanges",
description="Retrieve a list of exchanges with optional pagination"
)
async def get_exchanges(
limit: Optional[int] = Query(10, ge=1, le=2000),
offset: Optional[int] = Query(0, ge=0),
db: Session = Depends(get_db)
):
try:
response = await client.get_exchanges(limit=limit, offset=offset)
return response
except Exception as e:
logger.error(f"Error fetching exchanges: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get(
"/{exchange_id}",
response_model=SingleExchangeResponse,
responses={404: {"model": ErrorResponse}},
summary="Get a specific exchange",
description="Retrieve details for a specific exchange"
)
async def get_exchange(exchange_id: str, db: Session = Depends(get_db)):
try:
response = await client.get_exchange(exchange_id)
if not response.get("data"):
raise HTTPException(status_code=404, detail=f"Exchange {exchange_id} not found")
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error fetching exchange {exchange_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))

40
app/api/routers/health.py Normal file
View File

@ -0,0 +1,40 @@
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.database import get_db
import httpx
from app.config import COINCAP_API_BASE_URL, COINCAP_API_KEY
router = APIRouter(tags=["Health"])
@router.get("/health", summary="Check service health")
async def health_check(db: Session = Depends(get_db)):
"""
Health check endpoint that verifies:
- Database connection
- External API connectivity
"""
# Check database connection
try:
db.execute("SELECT 1")
db_status = "healthy"
except Exception as e:
db_status = f"unhealthy: {str(e)}"
# Check CoinCap API connection
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{COINCAP_API_BASE_URL}/assets",
params={"apiKey": COINCAP_API_KEY, "limit": 1}
)
response.raise_for_status()
api_status = "healthy"
except Exception as e:
api_status = f"unhealthy: {str(e)}"
return {
"status": "ok" if db_status == "healthy" and api_status == "healthy" else "degraded",
"database": db_status,
"coincap_api": api_status,
"version": "0.1.0"
}

View File

@ -0,0 +1,48 @@
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from typing import List, Optional
import logging
from app.database import get_db
from app.schemas.markets import MarketsResponse, ErrorResponse
from app.services.coincap_client import CoinCapClient
router = APIRouter(prefix="/markets", tags=["Markets"])
logger = logging.getLogger(__name__)
# Initialize client
client = CoinCapClient()
@router.get(
"",
response_model=MarketsResponse,
summary="Get list of markets",
description="Retrieve a list of markets with optional filters"
)
async def get_markets(
exchange_id: Optional[str] = None,
base_symbol: Optional[str] = None,
base_id: Optional[str] = None,
quote_symbol: Optional[str] = None,
quote_id: Optional[str] = None,
asset_symbol: Optional[str] = None,
asset_id: Optional[str] = None,
limit: Optional[int] = Query(10, ge=1, le=2000),
offset: Optional[int] = Query(0, ge=0),
db: Session = Depends(get_db)
):
try:
response = await client.get_markets(
exchange_id=exchange_id,
base_symbol=base_symbol,
base_id=base_id,
quote_symbol=quote_symbol,
quote_id=quote_id,
asset_symbol=asset_symbol,
asset_id=asset_id,
limit=limit,
offset=offset
)
return response
except Exception as e:
logger.error(f"Error fetching markets: {e}")
raise HTTPException(status_code=500, detail=str(e))

52
app/api/routers/rates.py Normal file
View File

@ -0,0 +1,52 @@
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from typing import List, Optional
import logging
from app.database import get_db
from app.schemas.rates import (
RatesResponse, SingleRateResponse, ErrorResponse
)
from app.services.coincap_client import CoinCapClient
router = APIRouter(prefix="/rates", tags=["Rates"])
logger = logging.getLogger(__name__)
# Initialize client
client = CoinCapClient()
@router.get(
"",
response_model=RatesResponse,
summary="Get conversion rates",
description="Retrieve a list of conversion rates with optional filtering"
)
async def get_rates(
ids: Optional[str] = None,
db: Session = Depends(get_db)
):
try:
response = await client.get_rates(ids=ids)
return response
except Exception as e:
logger.error(f"Error fetching rates: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get(
"/{slug}",
response_model=SingleRateResponse,
responses={404: {"model": ErrorResponse}},
summary="Get a specific conversion rate",
description="Retrieve details for a specific conversion rate by its slug"
)
async def get_rate(slug: str, db: Session = Depends(get_db)):
try:
response = await client.get_rate(slug)
if not response.get("data"):
raise HTTPException(status_code=404, detail=f"Rate {slug} not found")
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error fetching rate {slug}: {e}")
raise HTTPException(status_code=500, detail=str(e))

13
app/config.py Normal file
View File

@ -0,0 +1,13 @@
import os
from pathlib import Path
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# API configuration
COINCAP_API_KEY = os.getenv("COINCAP_API_KEY", "b1afdeb4223dd63e8e26977c3539432eeaca0a7603c4655a7c9c57dadb40e7fe")
COINCAP_API_BASE_URL = os.getenv("COINCAP_API_BASE_URL", "https://rest.coincap.io/v3")
# Cache configuration
CACHE_TIMEOUT = int(os.getenv("CACHE_TIMEOUT", 300)) # 5 minutes cache by default

27
app/database.py Normal file
View File

@ -0,0 +1,27 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from pathlib import Path
# Database directory setup
DB_DIR = Path("/app") / "storage" / "db"
DB_DIR.mkdir(parents=True, exist_ok=True)
SQLALCHEMY_DATABASE_URL = f"sqlite:///{DB_DIR}/db.sqlite"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Dependency to get database session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

66
app/exceptions.py Normal file
View File

@ -0,0 +1,66 @@
from fastapi import Request, status
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException as StarletteHTTPException
import logging
import time
logger = logging.getLogger(__name__)
class CoinCapAPIException(Exception):
"""Exception raised for CoinCap API errors."""
def __init__(self, status_code: int, detail: str):
self.status_code = status_code
self.detail = detail
super().__init__(self.detail)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""Handle validation errors."""
logger.error(f"Validation error: {exc}")
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={
"error": "Validation Error",
"detail": exc.errors(),
"timestamp": int(time.time() * 1000)
}
)
async def http_exception_handler(request: Request, exc: StarletteHTTPException):
"""Handle HTTP exceptions."""
logger.error(f"HTTP error: {exc.status_code} - {exc.detail}")
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.detail,
"timestamp": int(time.time() * 1000)
}
)
async def coincap_api_exception_handler(request: Request, exc: CoinCapAPIException):
"""Handle CoinCap API exceptions."""
logger.error(f"CoinCap API error: {exc.status_code} - {exc.detail}")
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.detail,
"timestamp": int(time.time() * 1000)
}
)
async def general_exception_handler(request: Request, exc: Exception):
"""Handle all other exceptions."""
logger.error(f"Unhandled exception: {exc}", exc_info=True)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": "Internal Server Error",
"detail": str(exc),
"timestamp": int(time.time() * 1000)
}
)

6
app/models/__init__.py Normal file
View File

@ -0,0 +1,6 @@
from app.models.asset import Asset, AssetPriceHistory
from app.models.exchange import Exchange
from app.models.market import Market
from app.models.rate import Rate
# Import all models here for Alembic to discover them

45
app/models/asset.py Normal file
View File

@ -0,0 +1,45 @@
from sqlalchemy import Column, String, Float, Integer, DateTime, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.database import Base
class Asset(Base):
__tablename__ = "assets"
id = Column(String, primary_key=True) # Slug of the asset (e.g., "bitcoin")
rank = Column(Integer)
symbol = Column(String, index=True)
name = Column(String, index=True)
supply = Column(Float)
max_supply = Column(Float, nullable=True)
market_cap_usd = Column(Float)
volume_usd_24hr = Column(Float)
price_usd = Column(Float)
change_percent_24hr = Column(Float, nullable=True)
vwap_24hr = Column(Float, nullable=True)
explorer = Column(String, nullable=True)
# Last time the data was updated
last_updated = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
# Relationships
price_history = relationship("AssetPriceHistory", back_populates="asset", cascade="all, delete-orphan")
def __repr__(self):
return f"<Asset {self.symbol}: {self.name}>"
class AssetPriceHistory(Base):
__tablename__ = "asset_price_history"
id = Column(Integer, primary_key=True)
asset_id = Column(String, ForeignKey("assets.id", ondelete="CASCADE"), index=True)
price_usd = Column(Float)
timestamp = Column(DateTime(timezone=True), index=True)
interval = Column(String) # m1, m5, m15, m30, h1, h2, h6, h12, d1
# Relationship
asset = relationship("Asset", back_populates="price_history")
def __repr__(self):
return f"<AssetPriceHistory {self.asset_id} @ {self.timestamp}>"

23
app/models/exchange.py Normal file
View File

@ -0,0 +1,23 @@
from sqlalchemy import Column, String, Float, Integer, DateTime, Boolean
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.database import Base
class Exchange(Base):
__tablename__ = "exchanges"
id = Column(String, primary_key=True) # exchange_id from the API
name = Column(String, index=True)
rank = Column(Integer, index=True)
percent_total_volume = Column(Float, nullable=True)
volume_usd = Column(Float, nullable=True)
trading_pairs = Column(Integer)
socket = Column(Boolean, nullable=True)
exchange_url = Column(String, nullable=True)
updated_timestamp = Column(Integer, nullable=True)
# Last time the data was updated in our database
last_updated = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
def __repr__(self):
return f"<Exchange {self.name} (Rank: {self.rank})>"

27
app/models/market.py Normal file
View File

@ -0,0 +1,27 @@
from sqlalchemy import Column, String, Float, Integer, DateTime, ForeignKey, Boolean
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.database import Base
class Market(Base):
__tablename__ = "markets"
id = Column(Integer, primary_key=True, autoincrement=True)
exchange_id = Column(String, index=True)
rank = Column(Integer, nullable=True)
base_id = Column(String, index=True) # Asset slug for base
quote_id = Column(String, index=True) # Asset slug for quote
base_symbol = Column(String, index=True) # e.g., BTC
quote_symbol = Column(String, index=True) # e.g., USD
price_quote = Column(Float) # Price in quote currency
price_usd = Column(Float) # Price in USD
volume_usd_24hr = Column(Float)
percent_exchange_volume = Column(Float, nullable=True)
trades_count_24hr = Column(Integer, nullable=True)
updated_timestamp = Column(Integer, nullable=True)
# Last time the data was updated in our database
last_updated = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
def __repr__(self):
return f"<Market {self.base_symbol}/{self.quote_symbol} on {self.exchange_id}>"

18
app/models/rate.py Normal file
View File

@ -0,0 +1,18 @@
from sqlalchemy import Column, String, Float, DateTime
from sqlalchemy.sql import func
from app.database import Base
class Rate(Base):
__tablename__ = "rates"
id = Column(String, primary_key=True) # Rate ID (slug)
symbol = Column(String, index=True) # Currency symbol (e.g., BTC, USD)
currency_symbol = Column(String, nullable=True) # Display symbol (e.g., $, ₿)
type = Column(String) # crypto or fiat
rate_usd = Column(Float) # Conversion rate to USD
# Last time the data was updated
last_updated = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
def __repr__(self):
return f"<Rate {self.symbol} ({self.type}): {self.rate_usd} USD>"

0
app/schemas/__init__.py Normal file
View File

71
app/schemas/assets.py Normal file
View File

@ -0,0 +1,71 @@
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from datetime import datetime
class AssetBase(BaseModel):
id: str
rank: Optional[str] = None
symbol: str
name: str
supply: Optional[str] = None
max_supply: Optional[str] = None
market_cap_usd: Optional[str] = None
volume_usd_24hr: Optional[str] = None
price_usd: Optional[str] = None
change_percent_24hr: Optional[str] = None
vwap_24hr: Optional[str] = None
explorer: Optional[str] = None
class AssetCreate(AssetBase):
pass
class AssetInDB(AssetBase):
last_updated: datetime
class Config:
orm_mode = True
class AssetResponse(BaseModel):
timestamp: int
data: List[AssetBase]
class SingleAssetResponse(BaseModel):
timestamp: int
data: AssetBase
class AssetHistoryPoint(BaseModel):
price_usd: str
time: int # Unix timestamp in milliseconds
date: datetime
class AssetHistoryResponse(BaseModel):
timestamp: int
data: List[AssetHistoryPoint]
class AssetMarket(BaseModel):
exchange_id: str
base_id: str
quote_id: str
base_symbol: str
quote_symbol: str
volume_usd_24hr: str
price_usd: str
volume_percent: str
class AssetMarketsResponse(BaseModel):
timestamp: int
data: List[AssetMarket]
class ErrorResponse(BaseModel):
error: str
timestamp: Optional[int] = None

41
app/schemas/exchanges.py Normal file
View File

@ -0,0 +1,41 @@
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from datetime import datetime
class ExchangeBase(BaseModel):
exchange_id: str
name: str
rank: str
percent_total_volume: Optional[str] = None
volume_usd: Optional[str] = None
trading_pairs: str
socket: Optional[bool] = None
exchange_url: Optional[str] = None
updated: int
class ExchangeCreate(ExchangeBase):
pass
class ExchangeInDB(ExchangeBase):
last_updated: datetime
class Config:
orm_mode = True
class ExchangesResponse(BaseModel):
data: List[ExchangeBase]
timestamp: int
class SingleExchangeResponse(BaseModel):
data: ExchangeBase
timestamp: int
class ErrorResponse(BaseModel):
error: str
timestamp: Optional[int] = None

40
app/schemas/markets.py Normal file
View File

@ -0,0 +1,40 @@
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from datetime import datetime
class MarketBase(BaseModel):
exchange_id: str
rank: Optional[str] = None
base_symbol: str
base_id: str
quote_symbol: str
quote_id: str
price_quote: str
price_usd: str
volume_usd_24hr: str
percent_exchange_volume: str
trades_count_24hr: Optional[str] = None
updated: int
class MarketCreate(MarketBase):
pass
class MarketInDB(MarketBase):
id: int
last_updated: datetime
class Config:
orm_mode = True
class MarketsResponse(BaseModel):
data: List[MarketBase]
timestamp: int
class ErrorResponse(BaseModel):
error: str
timestamp: Optional[int] = None

37
app/schemas/rates.py Normal file
View File

@ -0,0 +1,37 @@
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from datetime import datetime
class RateBase(BaseModel):
id: str
symbol: str
currency_symbol: Optional[str] = None
type: str
rate_usd: str
class RateCreate(RateBase):
pass
class RateInDB(RateBase):
last_updated: datetime
class Config:
orm_mode = True
class RatesResponse(BaseModel):
data: List[RateBase]
timestamp: int
class SingleRateResponse(BaseModel):
data: RateBase
timestamp: int
class ErrorResponse(BaseModel):
error: str
timestamp: Optional[int] = None

0
app/services/__init__.py Normal file
View File

View File

@ -0,0 +1,124 @@
import httpx
import logging
from typing import Dict, List, Optional, Any, Union
from app.config import COINCAP_API_BASE_URL, COINCAP_API_KEY
logger = logging.getLogger(__name__)
class CoinCapClient:
"""Client for interacting with the CoinCap API v3"""
def __init__(self, api_key: str = COINCAP_API_KEY, base_url: str = COINCAP_API_BASE_URL):
self.api_key = api_key
self.base_url = base_url
async def _request(self, method: str, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict:
"""Make a request to the CoinCap API
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint (without base URL)
params: Query parameters
Returns:
API response as dictionary
"""
if params is None:
params = {}
# Add API key to all requests
params['apiKey'] = self.api_key
url = f"{self.base_url}{endpoint}"
try:
async with httpx.AsyncClient() as client:
response = await client.request(method, url, params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred: {e}")
if e.response.status_code == 404:
# Return empty data for 404 errors
return {"data": []}
raise
except Exception as e:
logger.error(f"Error in CoinCap API request: {e}")
raise
# Assets endpoints
async def get_assets(self, search: Optional[str] = None,
ids: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None) -> Dict:
"""Get list of assets with optional filters"""
params = {k: v for k, v in locals().items()
if k not in ['self'] and v is not None}
return await self._request("GET", "/assets", params)
async def get_asset(self, slug: str) -> Dict:
"""Get information about a specific asset by slug"""
return await self._request("GET", f"/assets/{slug}")
async def get_asset_markets(self, slug: str,
limit: Optional[int] = None,
offset: Optional[int] = None) -> Dict:
"""Get markets for a specific asset"""
params = {k: v for k, v in locals().items()
if k not in ['self', 'slug'] and v is not None}
return await self._request("GET", f"/assets/{slug}/markets", params)
async def get_asset_history(self, slug: str,
interval: str,
start: Optional[int] = None,
end: Optional[int] = None) -> Dict:
"""Get historical data for a specific asset
Args:
slug: Asset slug
interval: Time interval (m1, m5, m15, m30, h1, h2, h6, h12, d1)
start: Start time in milliseconds
end: End time in milliseconds
"""
params = {k: v for k, v in locals().items()
if k not in ['self', 'slug'] and v is not None}
return await self._request("GET", f"/assets/{slug}/history", params)
# Exchanges endpoints
async def get_exchanges(self, limit: Optional[int] = None,
offset: Optional[int] = None) -> Dict:
"""Get list of exchanges"""
params = {k: v for k, v in locals().items()
if k not in ['self'] and v is not None}
return await self._request("GET", "/exchanges", params)
async def get_exchange(self, exchange_id: str) -> Dict:
"""Get information about a specific exchange"""
return await self._request("GET", f"/exchanges/{exchange_id}")
# Markets endpoints
async def get_markets(self,
exchange_id: Optional[str] = None,
base_symbol: Optional[str] = None,
base_id: Optional[str] = None,
quote_symbol: Optional[str] = None,
quote_id: Optional[str] = None,
asset_symbol: Optional[str] = None,
asset_id: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None) -> Dict:
"""Get list of markets with optional filters"""
params = {k: v for k, v in locals().items()
if k not in ['self'] and v is not None}
return await self._request("GET", "/markets", params)
# Rates endpoints
async def get_rates(self, ids: Optional[str] = None) -> Dict:
"""Get conversion rates, optionally filtered by comma-separated slugs"""
params = {k: v for k, v in locals().items()
if k not in ['self'] and v is not None}
return await self._request("GET", "/rates", params)
async def get_rate(self, slug: str) -> Dict:
"""Get a specific conversion rate by slug"""
return await self._request("GET", f"/rates/{slug}")

0
app/utils/__init__.py Normal file
View File

43
app/utils/logging.py Normal file
View File

@ -0,0 +1,43 @@
import logging
import sys
from pathlib import Path
from typing import Optional
def setup_logging(log_level: str = "INFO", log_file: Optional[Path] = None):
"""Configure logging for the application.
Args:
log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
log_file: Optional path to log file
"""
# Set up logging format
log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# Configure logging level
level = getattr(logging, log_level.upper())
# Configure root logger
handlers = [logging.StreamHandler(sys.stdout)]
# Add file handler if specified
if log_file:
log_file.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(logging.Formatter(log_format))
handlers.append(file_handler)
# Configure logging
logging.basicConfig(
level=level,
format=log_format,
handlers=handlers
)
# Set httpx logging to WARNING to reduce noise
logging.getLogger("httpx").setLevel(logging.WARNING)
# Set uvicorn access logs to WARNING to reduce noise
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
# Log startup message
logging.info(f"Logging initialized at {log_level} level")

79
main.py Normal file
View File

@ -0,0 +1,79 @@
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.exceptions import RequestValidationError
import uvicorn
from pathlib import Path
from sqlalchemy.orm import Session
import logging
import time
from app.database import get_db, Base, engine
from app.api.routers import assets, exchanges, markets, rates, health
from app.exceptions import (
CoinCapAPIException,
validation_exception_handler,
http_exception_handler,
coincap_api_exception_handler,
general_exception_handler
)
from app.utils.logging import setup_logging
from starlette.exceptions import HTTPException as StarletteHTTPException
# Setup logging
setup_logging(log_level="INFO")
logger = logging.getLogger(__name__)
# Create the FastAPI app
app = FastAPI(
title="Cryptocurrency Data Service",
description="Backend service for cryptocurrency data from CoinCap API",
version="0.1.0",
)
# Register exception handlers
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.add_exception_handler(StarletteHTTPException, http_exception_handler)
app.add_exception_handler(CoinCapAPIException, coincap_api_exception_handler)
app.add_exception_handler(Exception, general_exception_handler)
# CORS middleware configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Request timing middleware
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
logger.debug(f"Request to {request.url.path} processed in {process_time:.4f} seconds")
return response
# Include routers
app.include_router(health.router)
app.include_router(assets.router, prefix="/api")
app.include_router(exchanges.router, prefix="/api")
app.include_router(markets.router, prefix="/api")
app.include_router(rates.router, prefix="/api")
# Startup event
@app.on_event("startup")
async def startup_event():
logger.info("Starting up Cryptocurrency Data Service")
# Create database tables at startup
# In production, you should use Alembic migrations instead
# Base.metadata.create_all(bind=engine)
# Shutdown event
@app.on_event("shutdown")
async def shutdown_event():
logger.info("Shutting down Cryptocurrency Data Service")
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

1
migrations/README Normal file
View File

@ -0,0 +1 @@
Generic single-database configuration with an async dbapi.

79
migrations/env.py Normal file
View File

@ -0,0 +1,79 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from app.database import Base
import app.models # Import all models to register them with the Base class
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
migrations/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,133 @@
"""Initial schema
Revision ID: 001
Revises:
Create Date: 2025-05-14
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '001'
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
# Create assets table
op.create_table(
'assets',
sa.Column('id', sa.String(), nullable=False),
sa.Column('rank', sa.Integer(), nullable=True),
sa.Column('symbol', sa.String(), nullable=True),
sa.Column('name', sa.String(), nullable=True),
sa.Column('supply', sa.Float(), nullable=True),
sa.Column('max_supply', sa.Float(), nullable=True),
sa.Column('market_cap_usd', sa.Float(), nullable=True),
sa.Column('volume_usd_24hr', sa.Float(), nullable=True),
sa.Column('price_usd', sa.Float(), nullable=True),
sa.Column('change_percent_24hr', sa.Float(), nullable=True),
sa.Column('vwap_24hr', sa.Float(), nullable=True),
sa.Column('explorer', sa.String(), nullable=True),
sa.Column('last_updated', sa.DateTime(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_assets_name'), 'assets', ['name'], unique=False)
op.create_index(op.f('ix_assets_symbol'), 'assets', ['symbol'], unique=False)
# Create asset_price_history table
op.create_table(
'asset_price_history',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('asset_id', sa.String(), nullable=True),
sa.Column('price_usd', sa.Float(), nullable=True),
sa.Column('timestamp', sa.DateTime(timezone=True), nullable=True),
sa.Column('interval', sa.String(), nullable=True),
sa.ForeignKeyConstraint(['asset_id'], ['assets.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_asset_price_history_asset_id'), 'asset_price_history', ['asset_id'], unique=False)
op.create_index(op.f('ix_asset_price_history_timestamp'), 'asset_price_history', ['timestamp'], unique=False)
# Create exchanges table
op.create_table(
'exchanges',
sa.Column('id', sa.String(), nullable=False),
sa.Column('name', sa.String(), nullable=True),
sa.Column('rank', sa.Integer(), nullable=True),
sa.Column('percent_total_volume', sa.Float(), nullable=True),
sa.Column('volume_usd', sa.Float(), nullable=True),
sa.Column('trading_pairs', sa.Integer(), nullable=True),
sa.Column('socket', sa.Boolean(), nullable=True),
sa.Column('exchange_url', sa.String(), nullable=True),
sa.Column('updated_timestamp', sa.Integer(), nullable=True),
sa.Column('last_updated', sa.DateTime(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_exchanges_name'), 'exchanges', ['name'], unique=False)
op.create_index(op.f('ix_exchanges_rank'), 'exchanges', ['rank'], unique=False)
# Create markets table
op.create_table(
'markets',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('exchange_id', sa.String(), nullable=True),
sa.Column('rank', sa.Integer(), nullable=True),
sa.Column('base_id', sa.String(), nullable=True),
sa.Column('quote_id', sa.String(), nullable=True),
sa.Column('base_symbol', sa.String(), nullable=True),
sa.Column('quote_symbol', sa.String(), nullable=True),
sa.Column('price_quote', sa.Float(), nullable=True),
sa.Column('price_usd', sa.Float(), nullable=True),
sa.Column('volume_usd_24hr', sa.Float(), nullable=True),
sa.Column('percent_exchange_volume', sa.Float(), nullable=True),
sa.Column('trades_count_24hr', sa.Integer(), nullable=True),
sa.Column('updated_timestamp', sa.Integer(), nullable=True),
sa.Column('last_updated', sa.DateTime(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_markets_base_id'), 'markets', ['base_id'], unique=False)
op.create_index(op.f('ix_markets_base_symbol'), 'markets', ['base_symbol'], unique=False)
op.create_index(op.f('ix_markets_exchange_id'), 'markets', ['exchange_id'], unique=False)
op.create_index(op.f('ix_markets_quote_id'), 'markets', ['quote_id'], unique=False)
op.create_index(op.f('ix_markets_quote_symbol'), 'markets', ['quote_symbol'], unique=False)
# Create rates table
op.create_table(
'rates',
sa.Column('id', sa.String(), nullable=False),
sa.Column('symbol', sa.String(), nullable=True),
sa.Column('currency_symbol', sa.String(), nullable=True),
sa.Column('type', sa.String(), nullable=True),
sa.Column('rate_usd', sa.Float(), nullable=True),
sa.Column('last_updated', sa.DateTime(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_rates_symbol'), 'rates', ['symbol'], unique=False)
def downgrade() -> None:
op.drop_index(op.f('ix_rates_symbol'), table_name='rates')
op.drop_table('rates')
op.drop_index(op.f('ix_markets_quote_symbol'), table_name='markets')
op.drop_index(op.f('ix_markets_quote_id'), table_name='markets')
op.drop_index(op.f('ix_markets_exchange_id'), table_name='markets')
op.drop_index(op.f('ix_markets_base_symbol'), table_name='markets')
op.drop_index(op.f('ix_markets_base_id'), table_name='markets')
op.drop_table('markets')
op.drop_index(op.f('ix_exchanges_rank'), table_name='exchanges')
op.drop_index(op.f('ix_exchanges_name'), table_name='exchanges')
op.drop_table('exchanges')
op.drop_index(op.f('ix_asset_price_history_timestamp'), table_name='asset_price_history')
op.drop_index(op.f('ix_asset_price_history_asset_id'), table_name='asset_price_history')
op.drop_table('asset_price_history')
op.drop_index(op.f('ix_assets_symbol'), table_name='assets')
op.drop_index(op.f('ix_assets_name'), table_name='assets')
op.drop_table('assets')

8
requirements.txt Normal file
View File

@ -0,0 +1,8 @@
fastapi==0.104.0
uvicorn==0.23.2
sqlalchemy==2.0.22
pydantic==2.4.2
httpx==0.25.0
python-dotenv==1.0.0
alembic==1.12.1
ruff==0.1.3