Backend: - Complete FastAPI backend with 42+ REST endpoints (auth, servers, config, players, bans, missions, mods, games, system) - Game adapter architecture with Arma 3 as first-class adapter - WebSocket real-time events for status, metrics, logs, players - Background thread system (process monitor, metrics, log tail, RCon poller) - Fernet encryption for sensitive config fields at rest - JWT auth with admin/viewer roles, bcrypt password hashing - SQLite with WAL mode, parameterized queries, migration system - APScheduler cleanup jobs for logs, metrics, events Frontend: - Server Detail page with 7 tabs (overview, config, players, bans, missions, mods, logs) - Settings page with password change and admin user management - Create Server wizard (4-step; known bug: silent validation failure) - New hooks: useServerDetail, useAuth, useGames - New components: ServerHeader, ConfigEditor, PlayerTable, BanTable, MissionList, ModList, LogViewer, PasswordChange, UserManager - WebSocket onEvent callback for real-time log accumulation - 120 unit tests passing (Vitest + React Testing Library) Docs: - Added .gitignore, CLAUDE.md, README.md - Updated FRONTEND.md, ARCHITECTURE.md with current implementation state - Added .env.example for backend configuration Known issues: - Create Server form: "Next" buttons don't validate before advancing, causing silent submit failure when fields are invalid - Config sub-tabs need UX redesign for non-technical users
116 lines
3.8 KiB
Python
116 lines
3.8 KiB
Python
"""
|
|
BroadcastThread — the single bridge between OS threads and asyncio WebSocket world.
|
|
|
|
Reads events from a queue.Queue (written by background server threads) and
|
|
forwards them to the WebSocketManager running in the asyncio event loop.
|
|
|
|
Design:
|
|
- Runs as a daemon thread — no cleanup needed on shutdown.
|
|
- queue.Queue is thread-safe — multiple producer threads, single consumer.
|
|
- asyncio.run_coroutine_threadsafe() schedules the WebSocketManager.broadcast()
|
|
coroutine on the event loop from this non-asyncio thread.
|
|
- If the event loop is closed or the broadcast fails, the event is dropped silently.
|
|
|
|
Queue item format (dict):
|
|
{
|
|
"type": str, # "log", "metrics", "players", "server_status", etc.
|
|
"server_id": int, # Which server this event belongs to
|
|
"data": dict | list, # Payload — varies by type
|
|
}
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import queue
|
|
import threading
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_QUEUE_GET_TIMEOUT = 1.0
|
|
_DROP_LOG_THRESHOLD = 100
|
|
|
|
|
|
class BroadcastThread(threading.Thread):
|
|
"""
|
|
Bridge from thread-world to asyncio-world.
|
|
|
|
Args:
|
|
event_queue: The shared queue.Queue that all background threads write to.
|
|
ws_manager: The WebSocketManager instance (asyncio-side).
|
|
loop: The asyncio event loop running in the main thread.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
event_queue: queue.Queue,
|
|
ws_manager, # WebSocketManager — type annotation omitted to avoid circular import
|
|
loop: asyncio.AbstractEventLoop,
|
|
) -> None:
|
|
super().__init__(name="BroadcastThread", daemon=True)
|
|
self._queue = event_queue
|
|
self._ws_manager = ws_manager
|
|
self._loop = loop
|
|
self._stop_event = threading.Event()
|
|
self._dropped = 0
|
|
|
|
def stop(self) -> None:
|
|
self._stop_event.set()
|
|
|
|
def run(self) -> None:
|
|
logger.info("BroadcastThread: started")
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
item = self._queue.get(timeout=_QUEUE_GET_TIMEOUT)
|
|
except queue.Empty:
|
|
continue
|
|
|
|
self._forward(item)
|
|
|
|
# Drain remaining items on shutdown
|
|
while not self._queue.empty():
|
|
try:
|
|
item = self._queue.get_nowait()
|
|
self._forward(item)
|
|
except queue.Empty:
|
|
break
|
|
|
|
logger.info("BroadcastThread: stopped")
|
|
|
|
def _forward(self, item: dict) -> None:
|
|
"""Schedule a broadcast on the asyncio event loop."""
|
|
if self._loop.is_closed():
|
|
self._dropped += 1
|
|
if self._dropped % _DROP_LOG_THRESHOLD == 0:
|
|
logger.warning(
|
|
"BroadcastThread: event loop closed, dropped %d messages",
|
|
self._dropped,
|
|
)
|
|
return
|
|
|
|
server_id = item.get("server_id")
|
|
event_type = item.get("type", "unknown")
|
|
data = item.get("data", {})
|
|
|
|
message = {
|
|
"type": event_type,
|
|
"server_id": server_id,
|
|
"data": data,
|
|
}
|
|
|
|
try:
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
self._ws_manager.broadcast(server_id, message),
|
|
self._loop,
|
|
)
|
|
# Fire and forget — suppress unhandled exception warnings
|
|
future.add_done_callback(self._on_broadcast_done)
|
|
except RuntimeError as exc:
|
|
logger.debug("BroadcastThread: could not schedule broadcast: %s", exc)
|
|
|
|
def _on_broadcast_done(self, future) -> None:
|
|
"""Called when the broadcast coroutine completes. Log exceptions only."""
|
|
try:
|
|
future.result()
|
|
except Exception as exc:
|
|
logger.debug("BroadcastThread: broadcast error: %s", exc) |