def sync(
self, *, since: Optional[datetime] = None, cursor: Optional[str] = None
) -> Iterator[Document]:
"""Yield Documents for current weather and forecast."""
config = self._load_config()
api_key = config["api_key"]
location = config.get("location", "San Francisco,CA")
# Current weather
current = _weather_api_get(
"https://api.openweathermap.org/data/2.5/weather",
params={"q": location, "appid": api_key, "units": "imperial"},
)
main = current.get("main", {})
weather_desc = ", ".join(
w.get("description", "") for w in current.get("weather", [])
)
content = (
f"Temperature: {main.get('temp')}°F, "
f"Conditions: {weather_desc}, "
f"Humidity: {main.get('humidity')}%, "
f"Wind: {current.get('wind', {}).get('speed')} mph"
)
yield Document(
doc_id=f"weather-current-{location}",
source="weather",
doc_type="current",
content=content,
title=f"Current Weather — {location}",
timestamp=datetime.now(),
metadata={
"location": location,
"temp": main.get("temp"),
"conditions": weather_desc,
"humidity": main.get("humidity"),
"wind_speed": current.get("wind", {}).get("speed"),
},
)
# Forecast (next ~12 hours, 4 x 3-hour intervals)
forecast = _weather_api_get(
"https://api.openweathermap.org/data/2.5/forecast",
params={
"q": location,
"appid": api_key,
"units": "imperial",
"cnt": "4",
},
)
summaries = []
for entry in forecast.get("list", []):
dt_txt = entry.get("dt_txt", "")
temp = entry.get("main", {}).get("temp")
desc = ", ".join(w.get("description", "") for w in entry.get("weather", []))
summaries.append(f"{dt_txt}: {temp}°F, {desc}")
forecast_content = "Forecast:\n" + "\n".join(summaries)
yield Document(
doc_id=f"weather-forecast-{location}",
source="weather",
doc_type="forecast",
content=forecast_content,
title=f"Weather Forecast — {location}",
timestamp=datetime.now(),
metadata={"location": location},
)
self._status.state = "idle"
self._status.last_sync = datetime.now()