Files
languard-servers-manager/THREADING.md
Tran G. (Revernomad) Khoa 473f585391 feat: initial system design documents for Languard Server Manager
Complete backend design for an Arma 3 dedicated server management panel:
- ARCHITECTURE.md: System architecture, tech stack, component responsibilities, data flows
- DATABASE.md: SQLite schema with WAL mode, CHECK constraints, 16+ tables
- API.md: REST + WebSocket API contract with auth, CRUD, and real-time channels
- MODULES.md: Python module breakdown with class definitions and dependencies
- THREADING.md: Concurrency model with thread safety, auto-restart, and WS bridge
- IMPLEMENTATION_PLAN.md: 7-phase implementation plan with security from Phase 1

Key design decisions:
- Sync SQLAlchemy only (no aiosqlite), thread-local DB connections
- Structured config builder (not f-strings) preventing config injection
- RCon request multiplexer for concurrent UDP access
- BackgroundScheduler for sync DB cleanup jobs
- ban.txt bidirectional sync with documented field mapping
- Auto-restart sequenced after thread cleanup

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-16 13:54:30 +07:00

22 KiB

Languard Server Manager — Threading & Concurrency Design

Overview

The system uses a hybrid concurrency model:

  • FastAPI (asyncio) handles HTTP requests and WebSocket connections
  • Python threads (threading.Thread) handle long-running background work per server
  • Queue bridges the thread world → asyncio world for WebSocket broadcasting
  • SQLAlchemy sync sessions are used in threads (thread-local connections)

Thread Map

Main Process (FastAPI / asyncio event loop)
│
├── [uvicorn] HTTP/WS event loop                     (asyncio)
│     ├── REST request handlers                       (async def)
│     └── WebSocket handlers                          (async def)
│
├── BroadcastThread                                   (daemon thread, 1 global)
│     └── Reads from broadcast_queue (thread-safe)
│         Calls asyncio.run_coroutine_threadsafe()
│         → ConnectionManager.broadcast()
│
└── Per-running-server thread group (started when server starts, stopped when server stops):
      ├── ProcessMonitorThread    (1 per server, 1s interval)
      ├── LogTailThread           (1 per server, 100ms interval)
      ├── MetricsCollectorThread  (1 per server, 5s interval)
      └── RConPollerThread        (1 per server, 10s interval, 30s startup delay)

For N running servers, there are:

  • 4*N background threads + 1 BroadcastThread = 4N+1 background threads total

Thread Safety Rules

Resource Access Pattern Protection
ProcessManager._processes read/write from multiple threads threading.Lock
ThreadRegistry._threads read/write from main + shutdown threading.Lock
broadcast_queue multi-writer, single reader queue.Queue (thread-safe built-in)
ConnectionManager._connections async, single event loop asyncio.Lock
SQLite connections one connection per thread Thread-local via threading.local()
Config files on disk write on start, read-only during run No lock needed (regenerated before start)

SQLite Thread Safety

# Each background thread creates its own SQLAlchemy connection
# from the same engine (WAL mode allows concurrent reads)
# PRAGMA busy_timeout=5000 prevents "database is locked" errors

class BaseServerThread(threading.Thread):
    def run(self):
        # Create thread-local DB connection — single connection per thread
        engine = get_engine()
        self._db = engine.connect()
        try:
            self.setup()
            while not self._stop_event.is_set():
                try:
                    self.tick()
                except Exception as e:
                    self.on_error(e)
                self._stop_event.wait(self.interval)
        except Exception as e:
            logger.error(f"{self.name} setup error: {e}")
        finally:
            self.teardown()    # always release resources (even on setup failure)
            self._db.close()   # always close connection

BroadcastThread — Asyncio Bridge

This is the critical bridge between background threads and the asyncio WebSocket layer.

Background Thread                         Asyncio Event Loop
─────────────────                         ──────────────────
BroadcastThread.enqueue(                  uvicorn runs here
  server_id=1,
  msg_type='log',
  data={...}
)
  │
  ▼
broadcast_queue.put({                     loop = asyncio.get_event_loop()
  'server_id': 1,                         (stored at app startup)
  'type': 'log',
  'data': {...}
})
  │
  ▼
BroadcastThread.run() ──────────────────► asyncio.run_coroutine_threadsafe(
  while True:                                 connection_manager.broadcast(
    msg = queue.get()                             server_id=1,
    fut = run_coroutine_threadsafe(               message={type, data}
              broadcast_coro,                 ),
              self._loop                      loop=self._loop
          )                               )
    fut.result(timeout=5)

Implementation Sketch

# broadcaster.py
import asyncio
import queue
import threading

_broadcast_queue: queue.Queue = queue.Queue(maxsize=10000)
_event_loop: asyncio.AbstractEventLoop | None = None

class BroadcastThread(threading.Thread):
    daemon = True

    def __init__(self, loop: asyncio.AbstractEventLoop, manager):
        super().__init__(name="BroadcastThread")
        self._loop = loop
        self._manager = manager
        self._running = True

    def run(self):
        while self._running:
            try:
                msg = _broadcast_queue.get(timeout=1.0)
                server_id = msg['server_id']
                # Build the outgoing WebSocket message envelope.
                # Include server_id so clients subscribed to 'all' can identify the source.
                # API contract: {type, server_id, data}
                outgoing = {
                    'type': msg['type'],
                    'server_id': server_id,
                    'data': msg['data'],
                }
                future = asyncio.run_coroutine_threadsafe(
                    self._manager.broadcast(str(server_id), outgoing, channel=msg['type']),
                    self._loop
                )
                try:
                    future.result(timeout=5.0)
                except TimeoutError:
                    # Don't block the queue — log and continue
                    logger.warning(f"Broadcast timeout for server {server_id} msg type {msg['type']}")
            except queue.Empty:
                continue
            except Exception as e:
                logger.error(f"BroadcastThread error: {e}")

    def stop(self):
        self._running = False

    @staticmethod
    def enqueue(server_id: int, msg_type: str, data: dict):
        """Thread-safe. Called from any background thread."""
        try:
            _broadcast_queue.put_nowait({
                'server_id': server_id,
                'type': msg_type,
                'data': data,
            })
        except queue.Full:
            logger.warning(f"Broadcast queue full, dropping {msg_type} for server {server_id}")

ProcessMonitorThread — Crash Detection & Auto-Restart

class ProcessMonitorThread(BaseServerThread):
    interval = 1.0

    def tick(self):
        proc = ProcessManager.get().get_process(self.server_id)
        if proc is None:
            self.stop()
            return

        exit_code = proc.poll()
        if exit_code is not None:
            # Process has exited
            self._handle_process_exit(exit_code)
            self.stop()

    def _handle_process_exit(self, exit_code: int):
        is_crash = (exit_code != 0)
        status = 'crashed' if is_crash else 'stopped'

        server = ServerRepository(self._db).get_by_id(self.server_id)
        ServerRepository(self._db).update_status(
            self.server_id, status, pid=None,
            stopped_at=datetime.utcnow().isoformat()
        )
        PlayerRepository(self._db).clear(self.server_id)
        ServerEventRepository(self._db).insert(
            self.server_id, status,
            actor='system',
            detail={'exit_code': exit_code}
        )

        BroadcastThread.enqueue(self.server_id, 'status', {'status': status})
        BroadcastThread.enqueue(self.server_id, 'event', {
            'event_type': status,
            'detail': {'exit_code': exit_code}
        })

        # Stop other threads for this server. Must NOT be called synchronously
        # from within this thread's own run() if stop_server_threads() joins threads,
        # as a thread cannot join itself. Use a daemon thread to do the cleanup
        # after this thread's run() returns naturally.
        # IMPORTANT: The auto-restart Timer must be started AFTER thread cleanup
        # completes. The cleanup daemon thread starts the restart timer when done.
        import threading as _threading

        def _cleanup_and_maybe_restart():
            try:
                ThreadRegistry.get().stop_server_threads(self.server_id)
                # Only schedule restart after threads are fully cleaned up
                if is_crash and server.get('auto_restart'):
                    self._schedule_auto_restart(server)
            except Exception as e:
                logger.error(f"Cleanup/restart failed for server {self.server_id}: {e}")
                BroadcastThread.enqueue(self.server_id, 'event', {
                    'event_type': 'auto_restart_failed',
                    'detail': {'error': str(e)}
                })

        _threading.Thread(
            target=_cleanup_and_maybe_restart,
            daemon=True,
            name=f"StopCleanup-{self.server_id}"
        ).start()

    def _schedule_auto_restart(self, server: dict):
        # IMPORTANT: This method runs in the daemon cleanup thread, NOT the
        # ProcessMonitorThread. Must create its own DB connection — do NOT
        # use self._db (it belongs to the ProcessMonitorThread's thread context
        # and may be closed by teardown() already).
        from database import get_thread_db
        db = get_thread_db()

        restart_count = server['restart_count']
        max_restarts = server['max_restarts']
        window = server['restart_window_seconds']
        last_restart = server.get('last_restart_at')

        # Reset restart_count if last restart was outside the window
        if last_restart:
            last_dt = datetime.fromisoformat(last_restart)
            elapsed = (datetime.utcnow() - last_dt).total_seconds()
            if elapsed > window:
                ServerRepository(db).reset_restart_count(self.server_id)
                restart_count = 0

        if restart_count < max_restarts:
            delay = min(10 * (restart_count + 1), 60)  # exponential backoff
            logger.info(f"Auto-restarting server {self.server_id} in {delay}s (attempt {restart_count+1}/{max_restarts})")
            threading.Timer(delay, self._auto_restart).start()
        else:
            logger.warning(f"Server {self.server_id} exceeded max auto-restarts ({max_restarts})")
            BroadcastThread.enqueue(self.server_id, 'event', {
                'event_type': 'max_restarts_exceeded',
                'detail': {'restart_count': restart_count}
            })

    def _auto_restart(self):
        from servers.service import ServerService
        try:
            ServerService().start(self.server_id)
        except Exception as e:
            logger.error(f"Auto-restart failed for server {self.server_id}: {e}")

LogTailThread — RPT File Tailing

The Arma 3 RPT file grows while the server runs. This thread tails it like tail -f.

class LogTailThread(BaseServerThread):
    interval = 0.1  # 100ms

    def setup(self):
        self._file = None
        self._current_path: Path | None = None
        self._last_size: int = 0
        self._open_latest_rpt()

    def _open_latest_rpt(self):
        """
        Arma 3 writes timestamped RPT files in the profile subdirectory:
        servers/{id}/server/arma3server_YYYY-MM-DD_HH-MM-SS.rpt

        Use rglob('*.rpt') to search recursively within the server dir.
        The profile subdirectory is determined by -profiles + -name flags.

        NOTE: Do NOT use os.stat().st_ino for rotation detection — on Windows/NTFS
        st_ino is always 0, making inode comparison completely non-functional.
        Instead, track the filename and file size. If a newer .rpt appears or the
        current file shrinks (truncated/replaced), reopen.
        """
        rpt_files = list(Path(get_server_dir(self.server_id)).rglob("*.rpt"))
        if not rpt_files:
            return  # Server hasn't created RPT yet; retry in next tick

        latest = max(rpt_files, key=lambda p: p.stat().st_mtime)
        try:
            self._file = open(latest, 'r', encoding='utf-8', errors='replace')
            self._file.seek(0, 2)  # seek to end — tail, don't replay old output
            self._current_path = latest
            self._last_size = self._file.tell()
        except OSError:
            self._file = None

    def tick(self):
        if self._file is None:
            self._open_latest_rpt()
            return

        # Rotation detection: only re-glob every 5 seconds (not every 100ms tick)
        # to avoid excessive filesystem I/O with large mpmissions directories.
        now = time.monotonic()
        if now - getattr(self, '_last_glob_time', 0) > 5.0:
            self._last_glob_time = now
            rpt_files = list(Path(get_server_dir(self.server_id)).rglob("*.rpt"))
        if rpt_files:
            latest = max(rpt_files, key=lambda p: p.stat().st_mtime)
            if latest != self._current_path:
                # A new RPT file was created — switch to it
                self._file.close()
                self._open_latest_rpt()
                return

        try:
            current_size = self._current_path.stat().st_size
        except OSError:
            return

        if current_size < self._last_size:
            # File shrank — truncated or replaced; reopen
            self._file.close()
            self._open_latest_rpt()
            return

        # Read new lines
        while True:
            line = self._file.readline()
            if not line:
                break
            self._last_size = self._file.tell()
            line = line.rstrip('\n')
            if not line:
                continue

            entry = RPTParser.parse_line(line)
            if entry:
                LogRepository(self._db).insert(self.server_id, entry)
                BroadcastThread.enqueue(self.server_id, 'log', entry)

    def teardown(self):
        """Close the open RPT file handle when the thread stops."""
        if self._file is not None:
            try:
                self._file.close()
            except OSError:
                pass
            self._file = None

RConPollerThread — Player List Synchronization

class RConPollerThread(BaseServerThread):
    interval = 10.0
    STARTUP_DELAY = 30.0  # wait for server to fully initialize
    _rcon_ready = False   # flag: True only after successful setup

    def setup(self):
        # Wait for server to start up before attempting RCon
        if self._stop_event.wait(self.STARTUP_DELAY):
            self._rcon_ready = False
            return  # stop was requested during wait
        self._rcon = RConService(self.server_id)
        self._connected = self._rcon.connect()
        self._rcon_ready = True

    def tick(self):
        if not self._rcon_ready:
            return  # setup() failed or was interrupted
        if not self._connected:
            self._reconnect_attempts = getattr(self, '_reconnect_attempts', 0) + 1
            delay = min(10 * 2 ** self._reconnect_attempts, 120)  # exponential backoff
            if self._reconnect_attempts > 1:
                logger.info(f"RCon reconnect attempt {self._reconnect_attempts} for server {self.server_id} (next in {delay}s)")
                if self._stop_event.wait(delay):
                    return
            self._connected = self._rcon.connect()
            if not self._connected:
                return
            self._reconnect_attempts = 0  # reset on successful connection

        try:
            players = self._rcon.get_players()
            PlayerService(self._db).update_from_rcon(self.server_id, players)
            BroadcastThread.enqueue(self.server_id, 'players', {
                'players': [p.dict() for p in players],
                'count': len(players)
            })
        except ConnectionError:
            self._connected = False
            logger.warning(f"RCon connection lost for server {self.server_id}")

Thread Lifecycle

Start Server Flow

POST /servers/{id}/start
  │
  ├── ServerService.start()
  │     ├── ConfigGenerator.write_all()
  │     ├── ProcessManager.start()     ← creates subprocess.Popen
  │     └── ThreadRegistry.start_server_threads(id)
  │           ├── ProcessMonitorThread(id).start()
  │           ├── LogTailThread(id).start()
  │           ├── MetricsCollectorThread(id).start()
  │           └── RConPollerThread(id).start()
  │
  └── BroadcastThread.enqueue(id, 'status', {status: 'starting'})

Stop Server Flow

POST /servers/{id}/stop
  │
  ├── RConService.shutdown()        ← sends #shutdown via RCon
  ├── Wait up to 30s for process exit (ProcessManager.stop(timeout=30))
  ├── If still running: ProcessManager.kill()
  ├── ThreadRegistry.stop_server_threads(id)
  │     ├── ProcessMonitorThread.stop()  (sets _stop_event)
  │     ├── LogTailThread.stop()
  │     ├── MetricsCollectorThread.stop()
  │     └── RConPollerThread.stop()
  │     └── Thread.join(timeout=5) for each
  │
  └── BroadcastThread.enqueue(id, 'status', {status: 'stopped'})

App Shutdown Flow

FastAPI shutdown event
  │
  ├── ThreadRegistry.stop_all()    ← stop all threads for all servers
  ├── BroadcastThread.stop()
  ├── ConnectionManager.close_all()
  └── database engine dispose

Stop Event Pattern

All background threads use a threading.Event for graceful shutdown:

class BaseServerThread(threading.Thread):
    def __init__(self, server_id: int, interval: float):
        super().__init__(name=f"{self.__class__.__name__}-{server_id}", daemon=True)
        self.server_id = server_id
        self.interval = interval
        self._stop_event = threading.Event()

    def stop(self):
        self._stop_event.set()

    def is_stopped(self) -> bool:
        return self._stop_event.is_set()

    def teardown(self):
        """Override to release resources (close files, sockets) after the loop ends."""
        pass

    def run(self):
        try:
            self.setup()
        except Exception as e:
            logger.error(f"{self.name} setup error: {e}")
            return  # setup failed completely — no partial resources to clean

        try:
            while not self._stop_event.is_set():
                try:
                    self.tick()
                except Exception as e:
                    self.on_error(e)
                # Use wait() instead of sleep() — responds immediately to stop()
                self._stop_event.wait(self.interval)
        finally:
            self.teardown()  # always runs; subclasses close files/sockets here

WebSocket Connection Manager (asyncio)

# websocket/manager.py
class ConnectionManager:
    def __init__(self):
        # server_id → set[WebSocket]
        # Use set (not list) so .add()/.discard() work correctly.
        self._connections: dict[str, set[WebSocket]] = defaultdict(set)
        # Per-connection channel subscriptions: ws → set[str]
        self._channel_subs: dict[WebSocket, set[str]] = defaultdict(set)
        self._lock = asyncio.Lock()

    async def connect(self, ws: WebSocket, server_id: str):
        await ws.accept()
        async with self._lock:
            self._connections[server_id].add(ws)
            self._channel_subs[ws].add('status')  # default channel
            # Only add to 'all' bucket if server_id is explicitly 'all'
            if server_id == 'all':
                self._connections['all'].add(ws)

    async def disconnect(self, ws: WebSocket, server_id: str):
        async with self._lock:
            self._connections[server_id].discard(ws)
            self._connections['all'].discard(ws)
            self._channel_subs.pop(ws, None)

    async def subscribe(self, ws: WebSocket, channels: list[str]):
        async with self._lock:
            self._channel_subs[ws].update(channels)

    async def unsubscribe(self, ws: WebSocket, channels: list[str]):
        async with self._lock:
            self._channel_subs[ws].difference_update(channels)

    async def broadcast(self, server_id: str, message: dict, channel: str = None):
        """Send to all clients subscribed to server_id AND the message's channel."""
        targets: set[WebSocket] = set()
        async with self._lock:
            # Collect clients for this server_id + 'all' subscribers
            server_clients = self._connections.get(server_id, set())
            all_clients = self._connections.get('all', set())
            candidates = server_clients | all_clients

            # Filter by channel subscription if specified
            if channel:
                targets = {ws for ws in candidates
                           if channel in self._channel_subs.get(ws, set())}
            else:
                targets = candidates

        dead = []
        for ws in targets:
            try:
                await ws.send_json(message)
            except Exception:
                dead.append(ws)

        # Clean up dead connections
        if dead:
            async with self._lock:
                for ws in dead:
                    for bucket in self._connections.values():
                        bucket.discard(ws)
                    self._channel_subs.pop(ws, None)

Memory & Performance Considerations

Thread Memory Impact CPU Impact
ProcessMonitorThread Minimal (one os.kill check) Negligible
LogTailThread Buffer for unread log lines Low (file I/O)
MetricsCollectorThread psutil subprocess scan Low-Medium
RConPollerThread UDP socket + response buffer Low
BroadcastThread Queue buffer (max 10000 entries) Low

Recommendations

  • Set all threads as daemon=True — they die automatically if main process exits
  • broadcast_queue.maxsize=10000 — backpressure; drop on Full (log warning)
  • LogTailThread buffers max ~100 lines per tick before writing to DB in batch
  • MetricsCollectorThread uses psutil.Process.cpu_percent(interval=0.5) — blocks 500ms, acceptable at 5s interval
  • For N=10 servers: 41 background threads — well within Python's thread limits