uniqc.gateway.ws module

WebSocket event broadcaster for the gateway dashboard.

class uniqc.gateway.ws.Event(type, payload)[source]

Bases: object

One broadcast event.

Parameters:
  • type (EventType)

  • payload (PAYLOAD)

payload
to_json()[source]
Return type:

str

type
class uniqc.gateway.ws.EventBroadcaster[source]

Bases: object

ASGI-safe singleton that fans out events to all connected WebSocket clients.

Usage:

broadcaster = EventBroadcaster()
# In a route handler:
await broadcaster.broadcast(Event("task:updated", {"task_id": "abc"}))
async broadcast(event)[source]

Enqueue an event for fan-out to all connected clients.

Parameters:

event (Event)

Return type:

None

async connect(websocket)[source]
Parameters:

websocket (WebSocket)

Return type:

None

disconnect(websocket)[source]
Parameters:

websocket (WebSocket)

Return type:

None

async emit_backend_status(backend_id, status)[source]
Parameters:
  • backend_id (str)

  • status (str)

Return type:

None

async emit_task_updated(task_id, status)[source]
Parameters:
Return type:

None