# Languard Servers Manager — Threading & Concurrency Design ## Overview The system uses a hybrid concurrency model: - **FastAPI (asyncio)** handles HTTP requests and WebSocket connections - **Python threads** (`threading.Thread`) handle long-running background work per server - **Queue** bridges the thread world → asyncio world for WebSocket broadcasting - **SQLAlchemy sync sessions** are used in threads (thread-local connections) The key change for multi-game support: **core threads are game-agnostic** and receive game-specific behavior (log parsers, remote admin clients) via dependency injection from the adapter. --- ## Thread Map ``` Main Process (FastAPI / asyncio event loop) │ ├── [uvicorn] HTTP/WS event loop (asyncio) │ ├── REST request handlers (async def / plain def) │ └── WebSocket handlers (async def) │ ├── BroadcastThread (daemon thread, 1 global) │ └── Reads from broadcast_queue (thread-safe) │ Calls asyncio.run_coroutine_threadsafe() │ → ConnectionManager.broadcast() │ └── Per-running-server thread group (started when server starts, stopped when server stops): ├── ProcessMonitorThread (1 per server, 1s interval) — CORE ├── LogTailThread (1 per server, 100ms interval) — CORE + adapter LogParser ├── MetricsCollectorThread (1 per server, 5s interval) — CORE └── RemoteAdminPollerThread (1 per server, 10s interval) — CORE + adapter RemoteAdmin ``` For **N running servers**, there are: - `4*N` background threads + 1 BroadcastThread = `4N+1` background threads total - (If adapter has no `remote_admin`, RemoteAdminPollerThread is skipped → `3*N+1`) --- ## Adapter Injection into Threads The `ThreadRegistry` resolves the adapter at thread creation time and injects game-specific components into the generic core threads: ```python class ThreadRegistry: @classmethod def start_server_threads(cls, server_id: int, db: Connection) -> None: server = ServerRepository(db).get_by_id(server_id) adapter = GameAdapterRegistry.get(server["game_type"]) threads: dict[str, BaseServerThread] = {} # Core threads — always present threads["process_monitor"] = ProcessMonitorThread(server_id) threads["metrics_collector"] = MetricsCollectorThread(server_id) # Core thread with adapter's log parser injected log_parser = adapter.get_log_parser() threads["log_tail"] = LogTailThread( server_id, parser=log_parser, log_file_resolver=log_parser.get_log_file_resolver(server_id), ) # Core thread with adapter's remote admin injected (if supported) remote_admin = adapter.get_remote_admin() if remote_admin is not None: threads["remote_admin_poller"] = RemoteAdminPollerThread( server_id, remote_admin_factory=lambda: remote_admin.create_client( host="127.0.0.1", port=server["rcon_port"], password=_get_remote_admin_password(server_id, db), ), ) # Adapter-declared custom threads (for game-specific background work) for thread_factory in adapter.get_custom_thread_factories(): thread = thread_factory(server_id, db) threads[thread.name_key] = thread with cls._lock: cls._threads[server_id] = threads for thread in threads.values(): thread.start() ``` --- ## Thread Safety Rules | Resource | Access Pattern | Protection | |----------|---------------|------------| | `ProcessManager._processes` | read/write from multiple threads | `threading.Lock` | | `ThreadRegistry._threads` | read/write from main + shutdown | `threading.Lock` | | `broadcast_queue` | multi-writer, single reader | `queue.Queue` (thread-safe built-in) | | `ConnectionManager._connections` | async, single event loop | `asyncio.Lock` | | SQLite connections | one connection per thread | Thread-local via `threading.local()` | | Config files on disk | write on start, read-only during run | No lock needed (regenerated before start) | | Adapter objects | read-only after registration | No lock needed (registered once at startup) | | RemoteAdminClient calls | called from RemoteAdminPollerThread only | **Core wraps with per-server `threading.Lock`** (see below) | ### RemoteAdminClient Thread Safety Adapters do NOT need to make their `RemoteAdminClient` implementations thread-safe. The core wraps every RemoteAdminClient call with a **per-server `threading.Lock`** so only one call executes at a time against a given server's admin client. ```python # In RemoteAdminPollerThread class RemoteAdminPollerThread(BaseServerThread): def __init__(self, server_id: int, remote_admin_factory: Callable[[], "RemoteAdminClient"]): super().__init__(server_id, self.interval) self._client_factory = remote_admin_factory self._client: RemoteAdminClient | None = None self._connected = False self._call_lock = threading.Lock() # per-server lock def _call(self, method, *args, **kwargs): """All RemoteAdminClient calls go through this to serialize access.""" with self._call_lock: return method(*args, **kwargs) # In tick(), replace direct self._client.get_players() with: # players = self._call(self._client.get_players) ``` This means: - Adapter authors write simple, non-thread-safe clients - Core guarantees no concurrent calls to the same client - Different servers' clients can call concurrently (different locks) ### SQLite Thread Safety ```python # Each background thread creates its own SQLAlchemy connection # from the same engine (WAL mode allows concurrent reads) # PRAGMA busy_timeout=5000 prevents "database is locked" errors # # If busy_timeout is exhausted (5s), the write fails with # OperationalError. Background threads retry with exponential # backoff: 1s, 2s, 4s — then log and skip the tick. # API request handlers retry up to 2 times with 1s backoff, # then return 503 "database temporarily unavailable". class BaseServerThread(threading.Thread): _db_retry_delays = [1.0, 2.0, 4.0] # seconds, exponential backoff def run(self): engine = get_engine() self._db = engine.connect() try: self.setup() while not self._stop_event.is_set(): try: self.tick() except OperationalError as e: if "database is locked" in str(e): retried = self._retry_db_write(self.tick) if not retried: logger.warning(f"{self.name}: DB locked after all retries, skipping tick") else: self.on_error(e) except Exception as e: self.on_error(e) self._stop_event.wait(self.interval) except Exception as e: logger.error(f"{self.name} setup error: {e}") finally: self.teardown() self._db.close() def _retry_db_write(self, fn, max_retries=3): for i, delay in enumerate(self._db_retry_delays[:max_retries]): self._stop_event.wait(delay) if self._stop_event.is_set(): return False try: fn() return True except OperationalError: continue return False ``` --- ## BroadcastThread — Asyncio Bridge This is the critical bridge between background threads and the asyncio WebSocket layer. **Game-agnostic.** ``` Background Thread Asyncio Event Loop ───────────────── ────────────────── Any background thread uvicorn runs here │ ▼ BroadcastThread.enqueue( loop = asyncio.get_running_loop() server_id=1, (stored at app startup) msg_type='log', data={...} ) │ ▼ broadcast_queue.put({ asyncio.run_coroutine_threadsafe( 'server_id': 1, connection_manager.broadcast( 'type': 'log', server_id=1, 'data': {...} message={type, data} ) ), │ loop=self._loop ▼ ) BroadcastThread.run() ──────────────────► while True: msg = queue.get() fut = run_coroutine_threadsafe( broadcast_coro, self._loop ) fut.result(timeout=5) ``` ### Implementation Sketch ```python # core/websocket/broadcaster.py import asyncio import queue import threading _broadcast_queue: queue.Queue = queue.Queue(maxsize=10000) _event_loop: asyncio.AbstractEventLoop | None = None class BroadcastThread(threading.Thread): daemon = True def __init__(self, loop: asyncio.AbstractEventLoop, manager): super().__init__(name="BroadcastThread") self._loop = loop self._manager = manager self._running = True def run(self): while self._running: try: msg = _broadcast_queue.get(timeout=1.0) server_id = msg['server_id'] outgoing = { 'type': msg['type'], 'server_id': server_id, 'data': msg['data'], } future = asyncio.run_coroutine_threadsafe( self._manager.broadcast(str(server_id), outgoing, channel=msg['type']), self._loop ) try: future.result(timeout=5.0) except TimeoutError: logger.warning(f"Broadcast timeout for server {server_id} msg type {msg['type']}") except queue.Empty: continue except Exception as e: logger.error(f"BroadcastThread error: {e}") def stop(self): self._running = False @staticmethod def enqueue(server_id: int, msg_type: str, data: dict): """Thread-safe. Called from any background thread.""" try: _broadcast_queue.put_nowait({ 'server_id': server_id, 'type': msg_type, 'data': data, }) except queue.Full: logger.warning(f"Broadcast queue full, dropping {msg_type} for server {server_id}") ``` --- ## ProcessMonitorThread — Crash Detection & Auto-Restart **Game-agnostic.** This thread only checks OS-level process status and updates the core `servers` table. ```python class ProcessMonitorThread(BaseServerThread): interval = 1.0 def tick(self): proc = ProcessManager.get().get_process(self.server_id) if proc is None: self.stop() return exit_code = proc.poll() if exit_code is not None: self._handle_process_exit(exit_code) self.stop() def _handle_process_exit(self, exit_code: int): is_crash = (exit_code != 0) status = 'crashed' if is_crash else 'stopped' server = ServerRepository(self._db).get_by_id(self.server_id) ServerRepository(self._db).update_status( self.server_id, status, pid=None, stopped_at=datetime.utcnow().isoformat() ) PlayerRepository(self._db).clear(self.server_id) ServerEventRepository(self._db).insert( self.server_id, status, actor='system', detail={'exit_code': exit_code} ) BroadcastThread.enqueue(self.server_id, 'status', {'status': status}) BroadcastThread.enqueue(self.server_id, 'event', { 'event_type': status, 'detail': {'exit_code': exit_code} }) # Stop other threads for this server via daemon cleanup thread # (avoids thread joining itself) import threading as _threading def _cleanup_and_maybe_restart(): try: ThreadRegistry.get().stop_server_threads(self.server_id) if is_crash and server.get('auto_restart'): self._schedule_auto_restart(server) except Exception as e: logger.error(f"Cleanup/restart failed for server {self.server_id}: {e}") BroadcastThread.enqueue(self.server_id, 'event', { 'event_type': 'auto_restart_failed', 'detail': {'error': str(e)} }) _threading.Thread( target=_cleanup_and_maybe_restart, daemon=True, name=f"StopCleanup-{self.server_id}" ).start() def _schedule_auto_restart(self, server: dict): # IMPORTANT: Runs in daemon cleanup thread, NOT ProcessMonitorThread. # Must create its own DB connection. from database import get_thread_db db = get_thread_db() restart_count = server['restart_count'] max_restarts = server['max_restarts'] window = server['restart_window_seconds'] last_restart = server.get('last_restart_at') if last_restart: last_dt = datetime.fromisoformat(last_restart) elapsed = (datetime.utcnow() - last_dt).total_seconds() if elapsed > window: ServerRepository(db).reset_restart_count(self.server_id) restart_count = 0 if restart_count < max_restarts: delay = min(10 * (restart_count + 1), 60) # exponential backoff logger.info(f"Auto-restarting server {self.server_id} in {delay}s (attempt {restart_count+1}/{max_restarts})") threading.Timer(delay, self._auto_restart).start() else: logger.warning(f"Server {self.server_id} exceeded max auto-restarts ({max_restarts})") BroadcastThread.enqueue(self.server_id, 'event', { 'event_type': 'max_restarts_exceeded', 'detail': {'restart_count': restart_count} }) def _auto_restart(self): from core.servers.service import ServerService try: ServerService().start(self.server_id) except Exception as e: logger.error(f"Auto-restart failed for server {self.server_id}: {e}") ``` --- ## LogTailThread — Generic File Tailing with Adapter Parser **Core thread** that takes an adapter-provided `LogParser` for game-specific log line parsing and file discovery. ```python class LogTailThread(BaseServerThread): interval = 0.1 # 100ms def __init__(self, server_id: int, log_parser: "LogParser", log_file_resolver: Callable[[Path], Path | None]): super().__init__(server_id, self.interval) self._parser = log_parser self._log_file_resolver = log_file_resolver self._file: TextIO | None = None self._current_path: Path | None = None self._last_size: int = 0 def setup(self): self._open_latest_log() def _open_latest_log(self): """ Uses the adapter-provided log_file_resolver to find the current log file. Opens it and seeks to end (tail behavior). NOTE: Do NOT use os.stat().st_ino for rotation detection — on Windows/NTFS st_ino is always 0. Instead, track filename and file size. """ server_dir = get_server_dir(self.server_id) log_path = self._log_file_resolver(server_dir) if log_path is None: return # Server hasn't created log yet; retry in next tick try: self._file = open(log_path, 'r', encoding='utf-8', errors='replace') self._file.seek(0, 2) # seek to end self._current_path = log_path self._last_size = self._file.tell() except OSError: self._file = None def tick(self): if self._file is None: self._open_latest_log() return # Rotation detection: only re-check every 5 seconds now = time.monotonic() if now - getattr(self, '_last_glob_time', 0) > 5.0: self._last_glob_time = now server_dir = get_server_dir(self.server_id) log_path = self._log_file_resolver(server_dir) if log_path is not None and log_path != self._current_path: self._file.close() self._open_latest_log() return try: current_size = self._current_path.stat().st_size except OSError: return if current_size < self._last_size: self._file.close() self._open_latest_log() return # Read new lines and parse using adapter's parser while True: line = self._file.readline() if not line: break self._last_size = self._file.tell() line = line.rstrip('\n') if not line: continue # Adapter parses the line — game-specific format entry = self._parser.parse_line(line) if entry: LogRepository(self._db).insert(self.server_id, entry) BroadcastThread.enqueue(self.server_id, 'log', entry) def teardown(self): if self._file is not None: try: self._file.close() except OSError: pass self._file = None ``` --- ## MetricsCollectorThread — Game-Agnostic Resource Monitoring **Fully game-agnostic.** Uses psutil to monitor any process. ```python class MetricsCollectorThread(BaseServerThread): interval = 5.0 def tick(self): pid = ProcessManager.get().get_pid(self.server_id) if pid is None: return try: proc = psutil.Process(pid) cpu = proc.cpu_percent(interval=0.5) ram = proc.memory_info().rss / (1024 * 1024) # MB except (psutil.NoSuchProcess, psutil.AccessDenied): return player_count = PlayerRepository(self._db).count(self.server_id) MetricsRepository(self._db).insert(self.server_id, cpu, ram, player_count) BroadcastThread.enqueue(self.server_id, 'metrics', { 'cpu_percent': cpu, 'ram_mb': ram, 'player_count': player_count, }) ``` --- ## RemoteAdminPollerThread — Generic Polling with Adapter Client **Core thread** that takes an adapter-provided `RemoteAdmin` factory for game-specific admin protocol communication. Skipped entirely if adapter has no `remote_admin` capability. ```python class RemoteAdminPollerThread(BaseServerThread): interval = 10.0 STARTUP_DELAY = 30.0 def __init__(self, server_id: int, remote_admin_factory: Callable[[], "RemoteAdminClient"]): super().__init__(server_id, self.interval) self._client_factory = remote_admin_factory self._client: RemoteAdminClient | None = None self._connected = False def setup(self): # Wait for server to start up before attempting connection # Uses _stop_event.wait() instead of time.sleep() for immediate shutdown startup_delay = self._get_startup_delay() if self._stop_event.wait(startup_delay): return # stop was requested during wait self._connect() def _get_startup_delay(self) -> float: # Default delay; adapter may override via RemoteAdmin.get_startup_delay() return self.STARTUP_DELAY def _connect(self): try: self._client = self._client_factory() self._connected = True except Exception as e: logger.warning(f"Remote admin connection failed for server {self.server_id}: {e}") self._connected = False def tick(self): if not self._connected: self._reconnect_attempts = getattr(self, '_reconnect_attempts', 0) + 1 delay = min(10 * 2 ** self._reconnect_attempts, 120) # exponential backoff if self._reconnect_attempts > 1: logger.info(f"Remote admin reconnect attempt {self._reconnect_attempts} for server {self.server_id}") if self._stop_event.wait(delay): return self._connect() if not self._connected: return self._reconnect_attempts = 0 try: players = self._call(self._client.get_players) PlayerService(self._db).update_from_remote_admin(self.server_id, players) BroadcastThread.enqueue(self.server_id, 'players', { 'players': [p for p in players], 'count': len(players), }) except ConnectionError: self._connected = False logger.warning(f"Remote admin connection lost for server {self.server_id}") except RemoteAdminError as e: logger.error(f"Remote admin adapter error for server {self.server_id}: {e}") self._connected = False def teardown(self): if self._client is not None: try: self._client.disconnect() except Exception: pass self._client = None ``` --- ## Thread Lifecycle ### Start Server Flow ``` POST /servers/{id}/start │ ├── ServerService.start() │ ├── adapter = GameAdapterRegistry.get(server.game_type) │ ├── check_server_ports_available(server_id) │ │ └── For ALL running servers, resolve each adapter, │ │ get port conventions, check full derived port set │ │ (cross-game: Arma 3 game+steam query + other games' ports) │ ├── adapter.config_generator.write_configs() │ │ └── Atomic write: write to .tmp files first, then os.replace() │ │ On failure: .tmp files cleaned up, originals untouched │ ├── launch_args = adapter.config_generator.build_launch_args() │ ├── ProcessManager.start() ← creates subprocess.Popen │ └── ThreadRegistry.start_server_threads(id, db) │ ├── ProcessMonitorThread(id) ← core, always │ ├── LogTailThread(id, adapter.log_parser) ← core + adapter │ ├── MetricsCollectorThread(id) ← core, always │ └── RemoteAdminPollerThread(id, adapter.remote_admin) │ ← core + adapter (if available) │ └── BroadcastThread.enqueue(id, 'status', {status: 'starting'}) Error paths on start: ├── ConfigWriteError → rollback .tmp files, return 500 to client ├── ConfigValidationError → return 422 with validation details ├── LaunchArgsError → return 400 with invalid arg info ├── ExeNotAllowedError → return 403 with executable name └── PortInUseError → return 409 with conflicting port info ``` ### Stop Server Flow ``` POST /servers/{id}/stop │ ├── adapter.remote_admin.shutdown() ← if adapter has remote_admin ├── Wait up to 30s for process exit (ProcessManager.stop(timeout=30)) ├── If still running: ProcessManager.kill() ├── ThreadRegistry.stop_server_threads(id) │ ├── ProcessMonitorThread.stop() │ ├── LogTailThread.stop() │ ├── MetricsCollectorThread.stop() │ └── RemoteAdminPollerThread.stop() ← if present │ └── Thread.join(timeout=5) for each │ └── BroadcastThread.enqueue(id, 'status', {status: 'stopped'}) ``` ### App Shutdown Flow ``` FastAPI shutdown event │ ├── ThreadRegistry.stop_all() ← stop all threads for all servers ├── BroadcastThread.stop() ├── ConnectionManager.close_all() └── database engine dispose ``` --- ## Stop Event Pattern All background threads use a `threading.Event` for graceful shutdown: ```python class BaseServerThread(threading.Thread): def __init__(self, server_id: int, interval: float): super().__init__(name=f"{self.__class__.__name__}-{server_id}", daemon=True) self.server_id = server_id self.interval = interval self._stop_event = threading.Event() def stop(self): self._stop_event.set() def is_stopped(self) -> bool: return self._stop_event.is_set() def teardown(self): """Override to release resources (close files, sockets) after the loop ends.""" pass def run(self): try: self.setup() except Exception as e: logger.error(f"{self.name} setup error: {e}") return # setup failed completely try: while not self._stop_event.is_set(): try: self.tick() except Exception as e: self.on_error(e) self._stop_event.wait(self.interval) finally: self.teardown() def on_error(self, error: Exception): """Default error handler. Adapter exceptions are typed for specific handling.""" if isinstance(error, RemoteAdminError): logger.error(f"{self.name} remote admin error: {error}") # RemoteAdminPollerThread overrides to set _connected = False elif isinstance(error, ConfigWriteError): logger.critical(f"{self.name} config write error (atomic write failed): {error}") elif isinstance(error, ConfigValidationError): logger.error(f"{self.name} config validation error: {error}") else: logger.error(f"{self.name} unhandled error: {error}") ``` --- ## WebSocket Connection Manager (asyncio) **Game-agnostic.** No changes from single-game design. ```python # core/websocket/manager.py class ConnectionManager: def __init__(self): self._connections: dict[str, set[WebSocket]] = defaultdict(set) self._channel_subs: dict[WebSocket, set[str]] = defaultdict(set) self._lock = asyncio.Lock() async def connect(self, ws: WebSocket, server_id: str): await ws.accept() async with self._lock: self._connections[server_id].add(ws) self._channel_subs[ws].add('status') if server_id == 'all': self._connections['all'].add(ws) async def disconnect(self, ws: WebSocket, server_id: str): async with self._lock: self._connections[server_id].discard(ws) self._connections['all'].discard(ws) self._channel_subs.pop(ws, None) async def subscribe(self, ws: WebSocket, channels: list[str]): async with self._lock: self._channel_subs[ws].update(channels) async def unsubscribe(self, ws: WebSocket, channels: list[str]): async with self._lock: self._channel_subs[ws].difference_update(channels) async def broadcast(self, server_id: str, message: dict, channel: str = None): targets: set[WebSocket] = set() async with self._lock: server_clients = self._connections.get(server_id, set()) all_clients = self._connections.get('all', set()) candidates = server_clients | all_clients if channel: targets = {ws for ws in candidates if channel in self._channel_subs.get(ws, set())} else: targets = candidates dead = [] for ws in targets: try: await ws.send_json(message) except Exception: dead.append(ws) if dead: async with self._lock: for ws in dead: for bucket in self._connections.values(): bucket.discard(ws) self._channel_subs.pop(ws, None) ``` --- ## Memory & Performance Considerations | Thread | Memory Impact | CPU Impact | |--------|--------------|-----------| | ProcessMonitorThread | Minimal (one `os.kill` check) | Negligible | | LogTailThread | Buffer for unread log lines | Low (file I/O + adapter parsing) | | MetricsCollectorThread | psutil subprocess scan | Low-Medium | | RemoteAdminPollerThread | Adapter client socket + buffer | Low (varies by adapter protocol) | | BroadcastThread | Queue buffer (max 10000 entries) | Low | ### Recommendations - Set all threads as `daemon=True` — they die automatically if main process exits - `broadcast_queue.maxsize=10000` — backpressure; drop on Full (log warning) - `LogTailThread` buffers max ~100 lines per tick before writing to DB in batch - `MetricsCollectorThread` uses `psutil.Process.cpu_percent(interval=0.5)` — blocks 500ms, acceptable at 5s interval - For N=10 servers: 31-41 background threads — well within Python's thread limits - Games without remote admin skip the RemoteAdminPollerThread entirely