import httpx from fastapi import HTTPException, Depends from sqlalchemy.orm import Session from typing import Optional, Dict, Any, Tuple from datetime import datetime from app.core.config import settings from app.core.database import get_db from app.models.weather import Location, WeatherData from app.schemas.weather import LocationCreate, WeatherDataCreate class WeatherService: def __init__(self, db: Session): self.db = db self.api_key = settings.OPENWEATHERMAP_API_KEY self.api_url = settings.OPENWEATHERMAP_API_URL if not self.api_key: raise ValueError("OpenWeatherMap API key is not set") async def get_weather_by_city(self, city: str, country: Optional[str] = None) -> Tuple[Location, WeatherData]: """ Get current weather data for a city and save it to the database """ query = f"q={city}" if country: query += f",{country}" return await self._fetch_and_save_weather(query) async def get_weather_by_coordinates(self, lat: float, lon: float) -> Tuple[Location, WeatherData]: """ Get current weather data for coordinates and save it to the database """ query = f"lat={lat}&lon={lon}" return await self._fetch_and_save_weather(query) async def _fetch_and_save_weather(self, query: str) -> Tuple[Location, WeatherData]: """ Fetch weather data from OpenWeatherMap API and save it to the database """ url = f"{self.api_url}/weather?{query}&appid={self.api_key}&units=metric" async with httpx.AsyncClient() as client: response = await client.get(url) if response.status_code != 200: error = response.json().get("message", "Unknown error") raise HTTPException(status_code=response.status_code, detail=f"OpenWeatherMap API error: {error}") data = response.json() # Get or create location location = self._get_or_create_location(data) # Create weather data weather_data = self._create_weather_data(data, location.id) return location, weather_data def _get_or_create_location(self, data: Dict[Any, Any]) -> Location: """ Get location from database or create it if it doesn't exist """ # Check if location already exists existing_location = self.db.query(Location).filter( Location.latitude == data["coord"]["lat"], Location.longitude == data["coord"]["lon"] ).first() if existing_location: return existing_location # Create new location location_data = LocationCreate( name=data["name"], country=data["sys"]["country"], latitude=data["coord"]["lat"], longitude=data["coord"]["lon"] ) location = Location(**location_data.dict()) self.db.add(location) self.db.commit() self.db.refresh(location) return location def _create_weather_data(self, data: Dict[Any, Any], location_id: int) -> WeatherData: """ Create weather data record in the database """ weather = data["weather"][0] main = data["main"] wind = data["wind"] clouds = data["clouds"] weather_data_create = WeatherDataCreate( location_id=location_id, temperature=main["temp"], feels_like=main["feels_like"], humidity=main["humidity"], pressure=main["pressure"], wind_speed=wind["speed"], wind_direction=wind.get("deg", 0), weather_condition=weather["main"], weather_description=weather["description"], clouds=clouds["all"], timestamp=datetime.fromtimestamp(data["dt"]) ) weather_data = WeatherData(**weather_data_create.dict()) self.db.add(weather_data) self.db.commit() self.db.refresh(weather_data) return weather_data def get_weather_history(self, location_id: int, limit: int = 10): """ Get weather history for a location """ location = self.db.query(Location).filter(Location.id == location_id).first() if not location: raise HTTPException(status_code=404, detail="Location not found") history = self.db.query(WeatherData).filter( WeatherData.location_id == location_id ).order_by(WeatherData.timestamp.desc()).limit(limit).all() return location, history def get_weather_service(db: Session = Depends(get_db)) -> WeatherService: return WeatherService(db)