Source code for uniqc.gateway.ws

"""WebSocket event broadcaster for the gateway dashboard."""

from __future__ import annotations

import asyncio
import json
from typing import Any

from fastapi import WebSocket

# ---------------------------------------------------------------------------
# Event types
# ---------------------------------------------------------------------------

EventType = str

PAYLOAD = dict[str, Any]


[docs] class Event: """One broadcast event.""" __slots__ = ("type", "payload") def __init__(self, type: EventType, payload: PAYLOAD) -> None: self.type = type self.payload = payload
[docs] def to_json(self) -> str: return json.dumps({"type": self.type, "payload": self.payload})
# --------------------------------------------------------------------------- # Broadcaster # --------------------------------------------------------------------------- PAYLOAD_BROADCAST = tuple[WebSocket, Event]
[docs] class EventBroadcaster: """ASGI-safe singleton that fans out events to all connected WebSocket clients. Usage:: broadcaster = EventBroadcaster() # In a route handler: await broadcaster.broadcast(Event("task:updated", {"task_id": "abc"})) """ def __init__(self) -> None: self._connections: list[WebSocket] = [] self._queue: asyncio.Queue[PAYLOAD_BROADCAST] = asyncio.Queue() self._runner_task: asyncio.Task[None] | None = None # -- connection management ------------------------------------------------
[docs] async def connect(self, websocket: WebSocket) -> None: await websocket.accept() self._connections.append(websocket) # Start the background fan-out task on first connection if self._runner_task is None or self._runner_task.done(): self._runner_task = asyncio.create_task(self._run())
[docs] def disconnect(self, websocket: WebSocket) -> None: if websocket in self._connections: self._connections.remove(websocket)
# -- fan-out -------------------------------------------------------------
[docs] async def broadcast(self, event: Event) -> None: """Enqueue an event for fan-out to all connected clients.""" for conn in self._connections: await self._queue.put((conn, event))
async def _run(self) -> None: """Background task: drain the queue, sending each event to every client.""" while True: dead: list[WebSocket] = [] # Collect all pending events pending: list[tuple[WebSocket, Event]] = [] try: while True: item = await asyncio.wait_for(self._queue.get(), timeout=0.05) pending.append(item) except asyncio.TimeoutError: pass for ws, evt in pending: try: await ws.send_text(evt.to_json()) except Exception: dead.append(ws) for ws in dead: self.disconnect(ws) if not self._connections and self._queue.empty(): # No clients: stop the runner break # -- convenience helpers --------------------------------------------------
[docs] async def emit_task_updated(self, task_id: str, status: str) -> None: await self.broadcast(Event("task:updated", {"task_id": task_id, "status": status}))
[docs] async def emit_backend_status(self, backend_id: str, status: str) -> None: await self.broadcast(Event("backend:status", {"backend_id": backend_id, "status": status}))
# Global singleton broadcaster = EventBroadcaster()