""" BroadcastThread — the single bridge between OS threads and asyncio WebSocket world. Reads events from a queue.Queue (written by background server threads) and forwards them to the WebSocketManager running in the asyncio event loop. Design: - Runs as a daemon thread — no cleanup needed on shutdown. - queue.Queue is thread-safe — multiple producer threads, single consumer. - asyncio.run_coroutine_threadsafe() schedules the WebSocketManager.broadcast() coroutine on the event loop from this non-asyncio thread. - If the event loop is closed or the broadcast fails, the event is dropped silently. Queue item format (dict): { "type": str, # "log", "metrics", "players", "server_status", etc. "server_id": int, # Which server this event belongs to "data": dict | list, # Payload — varies by type } """ from __future__ import annotations import asyncio import logging import queue import threading logger = logging.getLogger(__name__) _QUEUE_GET_TIMEOUT = 1.0 _DROP_LOG_THRESHOLD = 100 class BroadcastThread(threading.Thread): """ Bridge from thread-world to asyncio-world. Args: event_queue: The shared queue.Queue that all background threads write to. ws_manager: The WebSocketManager instance (asyncio-side). loop: The asyncio event loop running in the main thread. """ def __init__( self, event_queue: queue.Queue, ws_manager, # WebSocketManager — type annotation omitted to avoid circular import loop: asyncio.AbstractEventLoop, ) -> None: super().__init__(name="BroadcastThread", daemon=True) self._queue = event_queue self._ws_manager = ws_manager self._loop = loop self._stop_event = threading.Event() self._dropped = 0 def stop(self) -> None: self._stop_event.set() def run(self) -> None: logger.info("BroadcastThread: started") while not self._stop_event.is_set(): try: item = self._queue.get(timeout=_QUEUE_GET_TIMEOUT) except queue.Empty: continue self._forward(item) # Drain remaining items on shutdown while not self._queue.empty(): try: item = self._queue.get_nowait() self._forward(item) except queue.Empty: break logger.info("BroadcastThread: stopped") def _forward(self, item: dict) -> None: """Schedule a broadcast on the asyncio event loop.""" if self._loop.is_closed(): self._dropped += 1 if self._dropped % _DROP_LOG_THRESHOLD == 0: logger.warning( "BroadcastThread: event loop closed, dropped %d messages", self._dropped, ) return server_id = item.get("server_id") event_type = item.get("type", "unknown") data = item.get("data", {}) message = { "type": event_type, "server_id": server_id, "data": data, } try: future = asyncio.run_coroutine_threadsafe( self._ws_manager.broadcast(server_id, message), self._loop, ) # Fire and forget — suppress unhandled exception warnings future.add_done_callback(self._on_broadcast_done) except RuntimeError as exc: logger.debug("BroadcastThread: could not schedule broadcast: %s", exc) def _on_broadcast_done(self, future) -> None: """Called when the broadcast coroutine completes. Log exceptions only.""" try: future.result() except Exception as exc: logger.debug("BroadcastThread: broadcast error: %s", exc)