# Languard Servers Manager — Threading & Concurrency Design ## Overview The system uses a hybrid concurrency model: - **FastAPI (asyncio)** handles HTTP requests and WebSocket connections - **Python threads** (`threading.Thread`) handle long-running background work per server - **Queue** bridges the thread world → asyncio world for WebSocket broadcasting - **SQLAlchemy sync sessions** are used in threads (thread-local connections) --- ## Thread Map ``` Main Process (FastAPI / asyncio event loop) │ ├── [uvicorn] HTTP/WS event loop (asyncio) │ ├── REST request handlers (async def) │ └── WebSocket handlers (async def) │ ├── BroadcastThread (daemon thread, 1 global) │ └── Reads from broadcast_queue (thread-safe) │ Calls asyncio.run_coroutine_threadsafe() │ → ConnectionManager.broadcast() │ └── Per-running-server thread group (started when server starts, stopped when server stops): ├── ProcessMonitorThread (1 per server, 1s interval) ├── LogTailThread (1 per server, 100ms interval) ├── MetricsCollectorThread (1 per server, 5s interval) └── RConPollerThread (1 per server, 10s interval, 30s startup delay) ``` For **N running servers**, there are: - `4*N` background threads + 1 BroadcastThread = `4N+1` background threads total --- ## Thread Safety Rules | Resource | Access Pattern | Protection | |----------|---------------|------------| | `ProcessManager._processes` | read/write from multiple threads | `threading.Lock` | | `ThreadRegistry._threads` | read/write from main + shutdown | `threading.Lock` | | `broadcast_queue` | multi-writer, single reader | `queue.Queue` (thread-safe built-in) | | `ConnectionManager._connections` | async, single event loop | `asyncio.Lock` | | SQLite connections | one connection per thread | Thread-local via `threading.local()` | | Config files on disk | write on start, read-only during run | No lock needed (regenerated before start) | ### SQLite Thread Safety ```python # Each background thread creates its own SQLAlchemy connection # from the same engine (WAL mode allows concurrent reads) # PRAGMA busy_timeout=5000 prevents "database is locked" errors class BaseServerThread(threading.Thread): def run(self): # Create thread-local DB connection — single connection per thread engine = get_engine() self._db = engine.connect() try: self.setup() while not self._stop_event.is_set(): try: self.tick() except Exception as e: self.on_error(e) self._stop_event.wait(self.interval) except Exception as e: logger.error(f"{self.name} setup error: {e}") finally: self.teardown() # always release resources (even on setup failure) self._db.close() # always close connection ``` --- ## BroadcastThread — Asyncio Bridge This is the critical bridge between background threads and the asyncio WebSocket layer. ``` Background Thread Asyncio Event Loop ───────────────── ────────────────── BroadcastThread.enqueue( uvicorn runs here server_id=1, msg_type='log', data={...} ) │ ▼ broadcast_queue.put({ loop = asyncio.get_event_loop() 'server_id': 1, (stored at app startup) 'type': 'log', 'data': {...} }) │ ▼ BroadcastThread.run() ──────────────────► asyncio.run_coroutine_threadsafe( while True: connection_manager.broadcast( msg = queue.get() server_id=1, fut = run_coroutine_threadsafe( message={type, data} broadcast_coro, ), self._loop loop=self._loop ) ) fut.result(timeout=5) ``` ### Implementation Sketch ```python # broadcaster.py import asyncio import queue import threading _broadcast_queue: queue.Queue = queue.Queue(maxsize=10000) _event_loop: asyncio.AbstractEventLoop | None = None class BroadcastThread(threading.Thread): daemon = True def __init__(self, loop: asyncio.AbstractEventLoop, manager): super().__init__(name="BroadcastThread") self._loop = loop self._manager = manager self._running = True def run(self): while self._running: try: msg = _broadcast_queue.get(timeout=1.0) server_id = msg['server_id'] # Build the outgoing WebSocket message envelope. # Include server_id so clients subscribed to 'all' can identify the source. # API contract: {type, server_id, data} outgoing = { 'type': msg['type'], 'server_id': server_id, 'data': msg['data'], } future = asyncio.run_coroutine_threadsafe( self._manager.broadcast(str(server_id), outgoing, channel=msg['type']), self._loop ) try: future.result(timeout=5.0) except TimeoutError: # Don't block the queue — log and continue logger.warning(f"Broadcast timeout for server {server_id} msg type {msg['type']}") except queue.Empty: continue except Exception as e: logger.error(f"BroadcastThread error: {e}") def stop(self): self._running = False @staticmethod def enqueue(server_id: int, msg_type: str, data: dict): """Thread-safe. Called from any background thread.""" try: _broadcast_queue.put_nowait({ 'server_id': server_id, 'type': msg_type, 'data': data, }) except queue.Full: logger.warning(f"Broadcast queue full, dropping {msg_type} for server {server_id}") ``` --- ## ProcessMonitorThread — Crash Detection & Auto-Restart ```python class ProcessMonitorThread(BaseServerThread): interval = 1.0 def tick(self): proc = ProcessManager.get().get_process(self.server_id) if proc is None: self.stop() return exit_code = proc.poll() if exit_code is not None: # Process has exited self._handle_process_exit(exit_code) self.stop() def _handle_process_exit(self, exit_code: int): is_crash = (exit_code != 0) status = 'crashed' if is_crash else 'stopped' server = ServerRepository(self._db).get_by_id(self.server_id) ServerRepository(self._db).update_status( self.server_id, status, pid=None, stopped_at=datetime.utcnow().isoformat() ) PlayerRepository(self._db).clear(self.server_id) ServerEventRepository(self._db).insert( self.server_id, status, actor='system', detail={'exit_code': exit_code} ) BroadcastThread.enqueue(self.server_id, 'status', {'status': status}) BroadcastThread.enqueue(self.server_id, 'event', { 'event_type': status, 'detail': {'exit_code': exit_code} }) # Stop other threads for this server. Must NOT be called synchronously # from within this thread's own run() if stop_server_threads() joins threads, # as a thread cannot join itself. Use a daemon thread to do the cleanup # after this thread's run() returns naturally. # IMPORTANT: The auto-restart Timer must be started AFTER thread cleanup # completes. The cleanup daemon thread starts the restart timer when done. import threading as _threading def _cleanup_and_maybe_restart(): try: ThreadRegistry.get().stop_server_threads(self.server_id) # Only schedule restart after threads are fully cleaned up if is_crash and server.get('auto_restart'): self._schedule_auto_restart(server) except Exception as e: logger.error(f"Cleanup/restart failed for server {self.server_id}: {e}") BroadcastThread.enqueue(self.server_id, 'event', { 'event_type': 'auto_restart_failed', 'detail': {'error': str(e)} }) _threading.Thread( target=_cleanup_and_maybe_restart, daemon=True, name=f"StopCleanup-{self.server_id}" ).start() def _schedule_auto_restart(self, server: dict): # IMPORTANT: This method runs in the daemon cleanup thread, NOT the # ProcessMonitorThread. Must create its own DB connection — do NOT # use self._db (it belongs to the ProcessMonitorThread's thread context # and may be closed by teardown() already). from database import get_thread_db db = get_thread_db() restart_count = server['restart_count'] max_restarts = server['max_restarts'] window = server['restart_window_seconds'] last_restart = server.get('last_restart_at') # Reset restart_count if last restart was outside the window if last_restart: last_dt = datetime.fromisoformat(last_restart) elapsed = (datetime.utcnow() - last_dt).total_seconds() if elapsed > window: ServerRepository(db).reset_restart_count(self.server_id) restart_count = 0 if restart_count < max_restarts: delay = min(10 * (restart_count + 1), 60) # exponential backoff logger.info(f"Auto-restarting server {self.server_id} in {delay}s (attempt {restart_count+1}/{max_restarts})") threading.Timer(delay, self._auto_restart).start() else: logger.warning(f"Server {self.server_id} exceeded max auto-restarts ({max_restarts})") BroadcastThread.enqueue(self.server_id, 'event', { 'event_type': 'max_restarts_exceeded', 'detail': {'restart_count': restart_count} }) def _auto_restart(self): from servers.service import ServerService try: ServerService().start(self.server_id) except Exception as e: logger.error(f"Auto-restart failed for server {self.server_id}: {e}") ``` --- ## LogTailThread — RPT File Tailing The Arma 3 RPT file grows while the server runs. This thread tails it like `tail -f`. ```python class LogTailThread(BaseServerThread): interval = 0.1 # 100ms def setup(self): self._file = None self._current_path: Path | None = None self._last_size: int = 0 self._open_latest_rpt() def _open_latest_rpt(self): """ Arma 3 writes timestamped RPT files in the profile subdirectory: servers/{id}/server/arma3server_YYYY-MM-DD_HH-MM-SS.rpt Use rglob('*.rpt') to search recursively within the server dir. The profile subdirectory is determined by -profiles + -name flags. NOTE: Do NOT use os.stat().st_ino for rotation detection — on Windows/NTFS st_ino is always 0, making inode comparison completely non-functional. Instead, track the filename and file size. If a newer .rpt appears or the current file shrinks (truncated/replaced), reopen. """ rpt_files = list(Path(get_server_dir(self.server_id)).rglob("*.rpt")) if not rpt_files: return # Server hasn't created RPT yet; retry in next tick latest = max(rpt_files, key=lambda p: p.stat().st_mtime) try: self._file = open(latest, 'r', encoding='utf-8', errors='replace') self._file.seek(0, 2) # seek to end — tail, don't replay old output self._current_path = latest self._last_size = self._file.tell() except OSError: self._file = None def tick(self): if self._file is None: self._open_latest_rpt() return # Rotation detection: only re-glob every 5 seconds (not every 100ms tick) # to avoid excessive filesystem I/O with large mpmissions directories. now = time.monotonic() if now - getattr(self, '_last_glob_time', 0) > 5.0: self._last_glob_time = now rpt_files = list(Path(get_server_dir(self.server_id)).rglob("*.rpt")) if rpt_files: latest = max(rpt_files, key=lambda p: p.stat().st_mtime) if latest != self._current_path: # A new RPT file was created — switch to it self._file.close() self._open_latest_rpt() return try: current_size = self._current_path.stat().st_size except OSError: return if current_size < self._last_size: # File shrank — truncated or replaced; reopen self._file.close() self._open_latest_rpt() return # Read new lines while True: line = self._file.readline() if not line: break self._last_size = self._file.tell() line = line.rstrip('\n') if not line: continue entry = RPTParser.parse_line(line) if entry: LogRepository(self._db).insert(self.server_id, entry) BroadcastThread.enqueue(self.server_id, 'log', entry) def teardown(self): """Close the open RPT file handle when the thread stops.""" if self._file is not None: try: self._file.close() except OSError: pass self._file = None ``` --- ## RConPollerThread — Player List Synchronization ```python class RConPollerThread(BaseServerThread): interval = 10.0 STARTUP_DELAY = 30.0 # wait for server to fully initialize _rcon_ready = False # flag: True only after successful setup def setup(self): # Wait for server to start up before attempting RCon if self._stop_event.wait(self.STARTUP_DELAY): self._rcon_ready = False return # stop was requested during wait self._rcon = RConService(self.server_id) self._connected = self._rcon.connect() self._rcon_ready = True def tick(self): if not self._rcon_ready: return # setup() failed or was interrupted if not self._connected: self._reconnect_attempts = getattr(self, '_reconnect_attempts', 0) + 1 delay = min(10 * 2 ** self._reconnect_attempts, 120) # exponential backoff if self._reconnect_attempts > 1: logger.info(f"RCon reconnect attempt {self._reconnect_attempts} for server {self.server_id} (next in {delay}s)") if self._stop_event.wait(delay): return self._connected = self._rcon.connect() if not self._connected: return self._reconnect_attempts = 0 # reset on successful connection try: players = self._rcon.get_players() PlayerService(self._db).update_from_rcon(self.server_id, players) BroadcastThread.enqueue(self.server_id, 'players', { 'players': [p.dict() for p in players], 'count': len(players) }) except ConnectionError: self._connected = False logger.warning(f"RCon connection lost for server {self.server_id}") ``` --- ## Thread Lifecycle ### Start Server Flow ``` POST /servers/{id}/start │ ├── ServerService.start() │ ├── ConfigGenerator.write_all() │ ├── ProcessManager.start() ← creates subprocess.Popen │ └── ThreadRegistry.start_server_threads(id) │ ├── ProcessMonitorThread(id).start() │ ├── LogTailThread(id).start() │ ├── MetricsCollectorThread(id).start() │ └── RConPollerThread(id).start() │ └── BroadcastThread.enqueue(id, 'status', {status: 'starting'}) ``` ### Stop Server Flow ``` POST /servers/{id}/stop │ ├── RConService.shutdown() ← sends #shutdown via RCon ├── Wait up to 30s for process exit (ProcessManager.stop(timeout=30)) ├── If still running: ProcessManager.kill() ├── ThreadRegistry.stop_server_threads(id) │ ├── ProcessMonitorThread.stop() (sets _stop_event) │ ├── LogTailThread.stop() │ ├── MetricsCollectorThread.stop() │ └── RConPollerThread.stop() │ └── Thread.join(timeout=5) for each │ └── BroadcastThread.enqueue(id, 'status', {status: 'stopped'}) ``` ### App Shutdown Flow ``` FastAPI shutdown event │ ├── ThreadRegistry.stop_all() ← stop all threads for all servers ├── BroadcastThread.stop() ├── ConnectionManager.close_all() └── database engine dispose ``` --- ## Stop Event Pattern All background threads use a `threading.Event` for graceful shutdown: ```python class BaseServerThread(threading.Thread): def __init__(self, server_id: int, interval: float): super().__init__(name=f"{self.__class__.__name__}-{server_id}", daemon=True) self.server_id = server_id self.interval = interval self._stop_event = threading.Event() def stop(self): self._stop_event.set() def is_stopped(self) -> bool: return self._stop_event.is_set() def teardown(self): """Override to release resources (close files, sockets) after the loop ends.""" pass def run(self): try: self.setup() except Exception as e: logger.error(f"{self.name} setup error: {e}") return # setup failed completely — no partial resources to clean try: while not self._stop_event.is_set(): try: self.tick() except Exception as e: self.on_error(e) # Use wait() instead of sleep() — responds immediately to stop() self._stop_event.wait(self.interval) finally: self.teardown() # always runs; subclasses close files/sockets here ``` --- ## WebSocket Connection Manager (asyncio) ```python # websocket/manager.py class ConnectionManager: def __init__(self): # server_id → set[WebSocket] # Use set (not list) so .add()/.discard() work correctly. self._connections: dict[str, set[WebSocket]] = defaultdict(set) # Per-connection channel subscriptions: ws → set[str] self._channel_subs: dict[WebSocket, set[str]] = defaultdict(set) self._lock = asyncio.Lock() async def connect(self, ws: WebSocket, server_id: str): await ws.accept() async with self._lock: self._connections[server_id].add(ws) self._channel_subs[ws].add('status') # default channel # Only add to 'all' bucket if server_id is explicitly 'all' if server_id == 'all': self._connections['all'].add(ws) async def disconnect(self, ws: WebSocket, server_id: str): async with self._lock: self._connections[server_id].discard(ws) self._connections['all'].discard(ws) self._channel_subs.pop(ws, None) async def subscribe(self, ws: WebSocket, channels: list[str]): async with self._lock: self._channel_subs[ws].update(channels) async def unsubscribe(self, ws: WebSocket, channels: list[str]): async with self._lock: self._channel_subs[ws].difference_update(channels) async def broadcast(self, server_id: str, message: dict, channel: str = None): """Send to all clients subscribed to server_id AND the message's channel.""" targets: set[WebSocket] = set() async with self._lock: # Collect clients for this server_id + 'all' subscribers server_clients = self._connections.get(server_id, set()) all_clients = self._connections.get('all', set()) candidates = server_clients | all_clients # Filter by channel subscription if specified if channel: targets = {ws for ws in candidates if channel in self._channel_subs.get(ws, set())} else: targets = candidates dead = [] for ws in targets: try: await ws.send_json(message) except Exception: dead.append(ws) # Clean up dead connections if dead: async with self._lock: for ws in dead: for bucket in self._connections.values(): bucket.discard(ws) self._channel_subs.pop(ws, None) ``` --- ## Memory & Performance Considerations | Thread | Memory Impact | CPU Impact | |--------|--------------|-----------| | ProcessMonitorThread | Minimal (one `os.kill` check) | Negligible | | LogTailThread | Buffer for unread log lines | Low (file I/O) | | MetricsCollectorThread | psutil subprocess scan | Low-Medium | | RConPollerThread | UDP socket + response buffer | Low | | BroadcastThread | Queue buffer (max 10000 entries) | Low | ### Recommendations - Set all threads as `daemon=True` — they die automatically if main process exits - `broadcast_queue.maxsize=10000` — backpressure; drop on Full (log warning) - `LogTailThread` buffers max ~100 lines per tick before writing to DB in batch - `MetricsCollectorThread` uses `psutil.Process.cpu_percent(interval=0.5)` — blocks 500ms, acceptable at 5s interval - For N=10 servers: 41 background threads — well within Python's thread limits