Create a FastAPI router with a WebSocket endpoint for agent events.
Source code in src/openjarvis/server/ws_bridge.py
| def create_ws_router(event_bus: EventBus) -> Any:
"""Create a FastAPI router with a WebSocket endpoint for agent events."""
router = APIRouter()
# Each connected client gets a queue + loop ref for thread-safe event delivery
clients: dict[WebSocket, tuple[asyncio.Queue, asyncio.AbstractEventLoop]] = {}
def _on_event(event: Event) -> None:
"""Forward event to all connected WebSocket client queues (thread-safe)."""
payload = {
"type": event.event_type.value,
"timestamp": event.timestamp,
"data": event.data or {},
}
for ws, (queue, loop) in list(clients.items()):
agent_filter = getattr(ws, "_agent_filter", None)
event_agent = (event.data or {}).get("agent_id")
if agent_filter and event_agent != agent_filter:
continue
try:
loop.call_soon_threadsafe(queue.put_nowait, payload)
except (RuntimeError, asyncio.QueueFull):
pass # Loop closed or client is slow
# Subscribe to all agent events
for event_type in _AGENT_EVENTS:
event_bus.subscribe(event_type, _on_event)
@router.websocket("/v1/agents/events")
async def agent_events(websocket: WebSocket) -> None:
await websocket.accept()
# Parse agent_id filter from query string
agent_id = websocket.query_params.get("agent_id")
websocket._agent_filter = agent_id # type: ignore[attr-defined]
queue: asyncio.Queue = asyncio.Queue(maxsize=100)
loop = asyncio.get_running_loop()
clients[websocket] = (queue, loop)
try:
while True:
payload = await queue.get()
await websocket.send_json(payload)
except WebSocketDisconnect:
pass
finally:
clients.pop(websocket, None)
return router
|