
- Created FastAPI application with SQLite database integration - Implemented OpenWeatherMap client with caching - Added endpoints for current weather, forecasts, and history - Included comprehensive error handling and validation - Set up Alembic migrations - Created detailed README with usage examples generated with BackendIM... (backend.im)
193 lines
7.2 KiB
Python
193 lines
7.2 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy.orm import Session
|
|
from typing import List, Optional
|
|
from datetime import datetime, timedelta
|
|
|
|
from app.database.session import get_db
|
|
from app.schemas.weather import (
|
|
City, CityCreate, WeatherRecord, WeatherRecordCreate,
|
|
CurrentWeatherResponse, ForecastResponse, ForecastItem,
|
|
WeatherSearchParams
|
|
)
|
|
from app.crud.weather import city as city_crud
|
|
from app.crud.weather import weather_record as weather_crud
|
|
from app.services.openweather import openweather_client
|
|
|
|
router = APIRouter()
|
|
|
|
@router.get("/current", response_model=CurrentWeatherResponse)
|
|
async def get_current_weather(
|
|
city: Optional[str] = Query(None, description="City name"),
|
|
country: Optional[str] = Query(None, description="Country code (ISO 3166)"),
|
|
lat: Optional[float] = Query(None, description="Latitude"),
|
|
lon: Optional[float] = Query(None, description="Longitude"),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Get current weather data for a location by city name or coordinates.
|
|
"""
|
|
# Input validation
|
|
if (city is None and (lat is None or lon is None)):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Either city or lat,lon parameters must be provided"
|
|
)
|
|
|
|
weather_data = None
|
|
db_city = None
|
|
|
|
# Fetch weather data based on provided parameters
|
|
if city:
|
|
# Check if we have this city in our database
|
|
if country:
|
|
db_city = city_crud.get_by_name_and_country(db, name=city, country=country)
|
|
|
|
# Fetch from OpenWeatherMap API
|
|
api_response = await openweather_client.get_current_weather_by_city(city, country)
|
|
city_data, weather = openweather_client.parse_weather_data(api_response)
|
|
|
|
# Create or update city in database
|
|
if not db_city:
|
|
city_create = CityCreate(**city_data)
|
|
db_city = city_crud.create(db, obj_in=city_create)
|
|
|
|
# Create weather record in database
|
|
weather.city_id = db_city.id
|
|
db_weather = weather_crud.create(db, obj_in=weather)
|
|
|
|
else: # Using coordinates
|
|
# Check if we have this location in our database
|
|
db_city = city_crud.get_by_coordinates(db, latitude=lat, longitude=lon)
|
|
|
|
# Fetch from OpenWeatherMap API
|
|
api_response = await openweather_client.get_current_weather_by_coordinates(lat, lon)
|
|
city_data, weather = openweather_client.parse_weather_data(api_response)
|
|
|
|
# Create or update city in database
|
|
if not db_city:
|
|
city_create = CityCreate(**city_data)
|
|
db_city = city_crud.create(db, obj_in=city_create)
|
|
|
|
# Create weather record in database
|
|
weather.city_id = db_city.id
|
|
db_weather = weather_crud.create(db, obj_in=weather)
|
|
|
|
return CurrentWeatherResponse(city=db_city, weather=db_weather)
|
|
|
|
@router.get("/forecast", response_model=ForecastResponse)
|
|
async def get_weather_forecast(
|
|
city: Optional[str] = Query(None, description="City name"),
|
|
country: Optional[str] = Query(None, description="Country code (ISO 3166)"),
|
|
lat: Optional[float] = Query(None, description="Latitude"),
|
|
lon: Optional[float] = Query(None, description="Longitude"),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Get 5-day weather forecast for a location by city name or coordinates.
|
|
"""
|
|
# Input validation
|
|
if (city is None and (lat is None or lon is None)):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Either city or lat,lon parameters must be provided"
|
|
)
|
|
|
|
db_city = None
|
|
forecast_items = []
|
|
|
|
# Fetch forecast data based on provided parameters
|
|
if city:
|
|
# Check if we have this city in our database
|
|
if country:
|
|
db_city = city_crud.get_by_name_and_country(db, name=city, country=country)
|
|
|
|
# Fetch from OpenWeatherMap API
|
|
api_response = await openweather_client.get_forecast_by_city(city, country)
|
|
city_data, forecast_data = openweather_client.parse_forecast_data(api_response)
|
|
|
|
# Create or update city in database
|
|
if not db_city:
|
|
city_create = CityCreate(**city_data)
|
|
db_city = city_crud.create(db, obj_in=city_create)
|
|
|
|
# Process forecast items
|
|
for item in forecast_data:
|
|
item.city_id = db_city.id
|
|
db_item = weather_crud.create(db, obj_in=item)
|
|
forecast_items.append(ForecastItem(**item.dict()))
|
|
|
|
else: # Using coordinates
|
|
# Check if we have this location in our database
|
|
db_city = city_crud.get_by_coordinates(db, latitude=lat, longitude=lon)
|
|
|
|
# Fetch from OpenWeatherMap API
|
|
api_response = await openweather_client.get_forecast_by_coordinates(lat, lon)
|
|
city_data, forecast_data = openweather_client.parse_forecast_data(api_response)
|
|
|
|
# Create or update city in database
|
|
if not db_city:
|
|
city_create = CityCreate(**city_data)
|
|
db_city = city_crud.create(db, obj_in=city_create)
|
|
|
|
# Process forecast items
|
|
for item in forecast_data:
|
|
item.city_id = db_city.id
|
|
db_item = weather_crud.create(db, obj_in=item)
|
|
forecast_items.append(ForecastItem(**item.dict()))
|
|
|
|
return ForecastResponse(city=db_city, forecast=forecast_items)
|
|
|
|
@router.get("/cities", response_model=List[City])
|
|
def get_cities(
|
|
name: Optional[str] = Query(None, description="Filter by city name"),
|
|
country: Optional[str] = Query(None, description="Filter by country code"),
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Retrieve cities from the database with optional filtering.
|
|
"""
|
|
if name and country:
|
|
city = city_crud.get_by_name_and_country(db, name=name, country=country)
|
|
return [city] if city else []
|
|
|
|
# Implement simple filtering logic - in a real app, we'd use more sophisticated query building
|
|
cities = db.query(city_crud.model)
|
|
|
|
if name:
|
|
cities = cities.filter(city_crud.model.name.ilike(f"%{name}%"))
|
|
|
|
if country:
|
|
cities = cities.filter(city_crud.model.country == country)
|
|
|
|
return cities.offset(skip).limit(limit).all()
|
|
|
|
@router.get("/history/{city_id}", response_model=List[WeatherRecord])
|
|
def get_weather_history(
|
|
city_id: int,
|
|
start_date: Optional[datetime] = Query(None, description="Start date for historical data"),
|
|
end_date: Optional[datetime] = Query(None, description="End date for historical data"),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Get historical weather data for a city.
|
|
"""
|
|
# Default to past 24 hours if no date range provided
|
|
if not start_date:
|
|
start_date = datetime.utcnow() - timedelta(days=1)
|
|
|
|
if not end_date:
|
|
end_date = datetime.utcnow()
|
|
|
|
# Check if city exists
|
|
db_city = city_crud.get(db, id=city_id)
|
|
if not db_city:
|
|
raise HTTPException(status_code=404, detail="City not found")
|
|
|
|
# Get historical data
|
|
history = weather_crud.get_history_by_city_id(
|
|
db, city_id=city_id, start_date=start_date, end_date=end_date
|
|
)
|
|
|
|
return history |