
- Set up FastAPI project structure - Create database models for locations and weather data - Implement OpenWeatherMap API integration - Create API endpoints for current weather and history - Add health endpoint - Set up database migrations with Alembic - Update README with documentation generated with BackendIM... (backend.im)
136 lines
4.8 KiB
Python
136 lines
4.8 KiB
Python
import httpx
|
|
from fastapi import HTTPException, Depends
|
|
from sqlalchemy.orm import Session
|
|
from typing import Optional, Dict, Any, Tuple
|
|
from datetime import datetime
|
|
|
|
from app.core.config import settings
|
|
from app.core.database import get_db
|
|
from app.models.weather import Location, WeatherData
|
|
from app.schemas.weather import LocationCreate, WeatherDataCreate
|
|
|
|
|
|
class WeatherService:
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
self.api_key = settings.OPENWEATHERMAP_API_KEY
|
|
self.api_url = settings.OPENWEATHERMAP_API_URL
|
|
|
|
if not self.api_key:
|
|
raise ValueError("OpenWeatherMap API key is not set")
|
|
|
|
async def get_weather_by_city(self, city: str, country: Optional[str] = None) -> Tuple[Location, WeatherData]:
|
|
"""
|
|
Get current weather data for a city and save it to the database
|
|
"""
|
|
query = f"q={city}"
|
|
if country:
|
|
query += f",{country}"
|
|
|
|
return await self._fetch_and_save_weather(query)
|
|
|
|
async def get_weather_by_coordinates(self, lat: float, lon: float) -> Tuple[Location, WeatherData]:
|
|
"""
|
|
Get current weather data for coordinates and save it to the database
|
|
"""
|
|
query = f"lat={lat}&lon={lon}"
|
|
return await self._fetch_and_save_weather(query)
|
|
|
|
async def _fetch_and_save_weather(self, query: str) -> Tuple[Location, WeatherData]:
|
|
"""
|
|
Fetch weather data from OpenWeatherMap API and save it to the database
|
|
"""
|
|
url = f"{self.api_url}/weather?{query}&appid={self.api_key}&units=metric"
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(url)
|
|
|
|
if response.status_code != 200:
|
|
error = response.json().get("message", "Unknown error")
|
|
raise HTTPException(status_code=response.status_code, detail=f"OpenWeatherMap API error: {error}")
|
|
|
|
data = response.json()
|
|
|
|
# Get or create location
|
|
location = self._get_or_create_location(data)
|
|
|
|
# Create weather data
|
|
weather_data = self._create_weather_data(data, location.id)
|
|
|
|
return location, weather_data
|
|
|
|
def _get_or_create_location(self, data: Dict[Any, Any]) -> Location:
|
|
"""
|
|
Get location from database or create it if it doesn't exist
|
|
"""
|
|
# Check if location already exists
|
|
existing_location = self.db.query(Location).filter(
|
|
Location.latitude == data["coord"]["lat"],
|
|
Location.longitude == data["coord"]["lon"]
|
|
).first()
|
|
|
|
if existing_location:
|
|
return existing_location
|
|
|
|
# Create new location
|
|
location_data = LocationCreate(
|
|
name=data["name"],
|
|
country=data["sys"]["country"],
|
|
latitude=data["coord"]["lat"],
|
|
longitude=data["coord"]["lon"]
|
|
)
|
|
|
|
location = Location(**location_data.dict())
|
|
self.db.add(location)
|
|
self.db.commit()
|
|
self.db.refresh(location)
|
|
|
|
return location
|
|
|
|
def _create_weather_data(self, data: Dict[Any, Any], location_id: int) -> WeatherData:
|
|
"""
|
|
Create weather data record in the database
|
|
"""
|
|
weather = data["weather"][0]
|
|
main = data["main"]
|
|
wind = data["wind"]
|
|
clouds = data["clouds"]
|
|
|
|
weather_data_create = WeatherDataCreate(
|
|
location_id=location_id,
|
|
temperature=main["temp"],
|
|
feels_like=main["feels_like"],
|
|
humidity=main["humidity"],
|
|
pressure=main["pressure"],
|
|
wind_speed=wind["speed"],
|
|
wind_direction=wind.get("deg", 0),
|
|
weather_condition=weather["main"],
|
|
weather_description=weather["description"],
|
|
clouds=clouds["all"],
|
|
timestamp=datetime.fromtimestamp(data["dt"])
|
|
)
|
|
|
|
weather_data = WeatherData(**weather_data_create.dict())
|
|
self.db.add(weather_data)
|
|
self.db.commit()
|
|
self.db.refresh(weather_data)
|
|
|
|
return weather_data
|
|
|
|
def get_weather_history(self, location_id: int, limit: int = 10):
|
|
"""
|
|
Get weather history for a location
|
|
"""
|
|
location = self.db.query(Location).filter(Location.id == location_id).first()
|
|
if not location:
|
|
raise HTTPException(status_code=404, detail="Location not found")
|
|
|
|
history = self.db.query(WeatherData).filter(
|
|
WeatherData.location_id == location_id
|
|
).order_by(WeatherData.timestamp.desc()).limit(limit).all()
|
|
|
|
return location, history
|
|
|
|
|
|
def get_weather_service(db: Session = Depends(get_db)) -> WeatherService:
|
|
return WeatherService(db) |