Automated Action 6705b7a5e2 Create Weather Data API with OpenWeatherMap integration
- Implemented FastAPI application structure
- Added OpenWeatherMap API integration
- Created SQLite database with SQLAlchemy
- Setup Alembic for database migrations
- Added health check endpoint
- Created comprehensive README

generated with BackendIM... (backend.im)
2025-05-12 13:51:42 +00:00

152 lines
4.9 KiB
Python

import httpx
from datetime import datetime
from sqlalchemy.orm import Session
from fastapi import HTTPException
from app.core.config import OPENWEATHERMAP_API_KEY, OPENWEATHERMAP_BASE_URL
from app.models.weather import City, WeatherData
from app.schemas.weather import CityCreate, WeatherDataCreate
async def get_weather_by_city_name(city_name: str, country_code: str = None):
"""
Get current weather data from OpenWeatherMap API by city name
"""
params = {
"q": f"{city_name},{country_code}" if country_code else city_name,
"appid": OPENWEATHERMAP_API_KEY,
"units": "metric"
}
async with httpx.AsyncClient() as client:
response = await client.get(f"{OPENWEATHERMAP_BASE_URL}/weather", params=params)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code,
detail=f"Error from OpenWeatherMap: {response.json().get('message', 'Unknown error')}")
return response.json()
async def get_weather_by_coordinates(lat: float, lon: float):
"""
Get current weather data from OpenWeatherMap API by coordinates
"""
params = {
"lat": lat,
"lon": lon,
"appid": OPENWEATHERMAP_API_KEY,
"units": "metric"
}
async with httpx.AsyncClient() as client:
response = await client.get(f"{OPENWEATHERMAP_BASE_URL}/weather", params=params)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code,
detail=f"Error from OpenWeatherMap: {response.json().get('message', 'Unknown error')}")
return response.json()
def create_or_update_city(db: Session, owm_data: dict):
"""
Create or update city in the database
"""
# Check if city already exists
city = db.query(City).filter(
City.name == owm_data["name"],
City.latitude == owm_data["coord"]["lat"],
City.longitude == owm_data["coord"]["lon"]
).first()
if not city:
# Create new city
city_data = CityCreate(
name=owm_data["name"],
country=owm_data["sys"]["country"],
latitude=owm_data["coord"]["lat"],
longitude=owm_data["coord"]["lon"]
)
city = City(
name=city_data.name,
country=city_data.country,
latitude=city_data.latitude,
longitude=city_data.longitude
)
db.add(city)
db.commit()
db.refresh(city)
return city
def save_weather_data(db: Session, city_id: int, owm_data: dict):
"""
Save weather data to database
"""
weather_data = WeatherDataCreate(
city_id=city_id,
temperature=owm_data["main"]["temp"],
feels_like=owm_data["main"]["feels_like"],
humidity=owm_data["main"]["humidity"],
pressure=owm_data["main"]["pressure"],
wind_speed=owm_data["wind"]["speed"],
wind_direction=owm_data["wind"]["deg"],
description=owm_data["weather"][0]["description"],
weather_main=owm_data["weather"][0]["main"],
weather_icon=owm_data["weather"][0]["icon"]
)
db_weather_data = WeatherData(
city_id=weather_data.city_id,
temperature=weather_data.temperature,
feels_like=weather_data.feels_like,
humidity=weather_data.humidity,
pressure=weather_data.pressure,
wind_speed=weather_data.wind_speed,
wind_direction=weather_data.wind_direction,
description=weather_data.description,
weather_main=weather_data.weather_main,
weather_icon=weather_data.weather_icon,
timestamp=datetime.utcnow()
)
db.add(db_weather_data)
db.commit()
db.refresh(db_weather_data)
return db_weather_data
async def get_and_save_weather(db: Session, city_name: str, country_code: str = None):
"""
Get weather data from OpenWeatherMap API and save to database
"""
# Get weather data from OpenWeatherMap
owm_data = await get_weather_by_city_name(city_name, country_code)
# Create or update city
city = create_or_update_city(db, owm_data)
# Save weather data
weather_data = save_weather_data(db, city.id, owm_data)
return {"city": city, "current_weather": weather_data}
def get_city_by_id(db: Session, city_id: int):
"""
Get city by ID
"""
return db.query(City).filter(City.id == city_id).first()
def get_cities(db: Session, skip: int = 0, limit: int = 100):
"""
Get all cities
"""
return db.query(City).offset(skip).limit(limit).all()
def get_weather_history_by_city(db: Session, city_id: int, skip: int = 0, limit: int = 100):
"""
Get weather history for a specific city
"""
return db.query(WeatherData).filter(
WeatherData.city_id == city_id
).order_by(WeatherData.timestamp.desc()).offset(skip).limit(limit).all()