- Fix logfiles_router and thread_registry to resolve .rpt log files from Path(server["exe_path"]).parent/server/ instead of the languard data dir, which never contained log files — log list and live tail both now work correctly - Rewrite get_ui_schema() in config_generator to cover all ~80 fields across all 5 sections (server/basic/profile/launch/rcon) with proper toggle/select/number/password/tag-list/hidden widgets and labels; missions field is hidden (managed by Missions tab) - Add formatSelectDisplay() to ConfigEditor so select fields show descriptive text (e.g. "0 - Never") instead of raw numbers in view mode - Add ToggleDisplay for boolean fields (Enabled/Disabled with indicator dot) - Add section tab labels and descriptions to ConfigEditor - Add MissionList UX hints and dynamic Add/In Rotation button labels - Add "hidden" to FieldSchema widget union type - Update API.md, ARCHITECTURE.md, CLAUDE.md, FRONTEND.md, MODULES.md, THREADING.md to document log path fix and schema coverage
173 lines
7.8 KiB
Markdown
173 lines
7.8 KiB
Markdown
# Threading & Concurrency Model
|
|
|
|
## Overview
|
|
|
|
Languard uses a hybrid concurrency model:
|
|
|
|
- **FastAPI (asyncio)** handles HTTP requests and WebSocket connections on the main event loop
|
|
- **Python `threading.Thread`** handles long-running background work per server
|
|
- **`queue.Queue`** bridges the thread world to the asyncio world for WebSocket broadcasting
|
|
- **SQLAlchemy sync sessions** with thread-local connections provide thread-safe database access
|
|
|
|
## Thread Architecture
|
|
|
|
For N running servers, the system runs up to 4N+1 background threads:
|
|
|
|
| Thread Type | Count | Purpose |
|
|
|---|---|---|
|
|
| `BroadcastThread` | 1 (global) | Bridges `queue.Queue` to asyncio WebSocket broadcasts |
|
|
| `LogTailThread` | 1 per server | Tails .rpt log files, parses lines, persists to DB, broadcasts events |
|
|
| `ProcessMonitorThread` | 1 per server | Monitors server process, detects crashes, triggers auto-restart |
|
|
| `MetricsCollectorThread` | 1 per server | Collects CPU/RAM metrics via psutil every 10 seconds |
|
|
| `RemoteAdminPollerThread` | 1 per server | Polls player list via RCon, syncs join/leave events |
|
|
|
|
All server-specific threads are managed by `ThreadRegistry`, which creates/destroys thread bundles as servers start/stop.
|
|
|
|
## BaseServerThread
|
|
|
|
All background threads extend `BaseServerThread`, which provides:
|
|
|
|
- **Stop event**: `threading.Event` for graceful shutdown
|
|
- **Thread-local DB**: Creates a fresh SQLAlchemy connection per thread via `get_thread_db()`
|
|
- **Exception backoff**: On unhandled exceptions, sleeps with exponential backoff (5s → 30s max), then retries. If stop event is set, exits cleanly.
|
|
- **Abstract `run_loop()` method**: Subclasses implement the main loop, called repeatedly until stop event is set
|
|
|
|
```python
|
|
class BaseServerThread(threading.Thread):
|
|
def __init__(self, server_id: int, ...):
|
|
super().__init__(daemon=True)
|
|
self.server_id = server_id
|
|
self._stop_event = threading.Event()
|
|
|
|
def stop(self):
|
|
self._stop_event.set()
|
|
|
|
def run(self):
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
self.run_loop()
|
|
except Exception:
|
|
backoff = min(backoff * 2, 30)
|
|
self._stop_event.wait(backoff)
|
|
```
|
|
|
|
## ThreadRegistry
|
|
|
|
`ThreadRegistry` manages thread lifecycle per server:
|
|
|
|
- **`start_server_threads(server_id, db)`** — Creates and starts all 4 thread types for a server
|
|
- **`stop_server_threads(server_id)`** — Sets stop events and joins all threads for a server
|
|
- **`reattach_server_threads(server_id, db)`** — Recovers threads for a server that survived a process restart
|
|
- **`stop_all()`** — Stops all threads for all servers (called on shutdown)
|
|
|
|
Thread bundles are stored in a dict: `{server_id → ThreadBundle}`, where `ThreadBundle` is a dataclass holding all thread references.
|
|
|
|
## BroadcastThread
|
|
|
|
The `BroadcastThread` is the single global thread that bridges synchronous background threads to asynchronous WebSocket clients:
|
|
|
|
1. Background threads push events into a `queue.Queue(maxsize=1000)`
|
|
2. `BroadcastThread` runs a loop reading from the queue
|
|
3. For each event, it calls `asyncio.run_coroutine_threadsafe()` to schedule a WebSocket broadcast on the main event loop
|
|
4. If the queue is full, events are dropped (non-blocking put)
|
|
|
|
Events are broadcast to WebSocket clients subscribed to the relevant `server_id` (or `None` for all servers).
|
|
|
|
## ProcessManager
|
|
|
|
`ProcessManager` is a singleton that manages server processes via `subprocess.Popen`:
|
|
|
|
- **`start_process(server_id, cmd, cwd, env)`** — Starts a new subprocess, stores the PID
|
|
- **`stop_process(server_id, timeout)`** — Sends terminate signal, waits for exit, force-kills after timeout
|
|
- **`kill_process(server_id)`** — Force-kills the process immediately
|
|
- **`recover_on_startup(db)`** — On startup, checks all stored PIDs against running processes via `psutil.pid_exists()`. If a process is still alive, marks the server as running. If not, marks it as stopped.
|
|
- Thread-safe with per-server `threading.Lock`
|
|
|
|
## LogTailThread
|
|
|
|
Tails the Arma 3 .rpt log file for each server:
|
|
|
|
- Resolves the latest log file path using `Path(server["exe_path"]).parent / "server"` — Arma 3 writes .rpt files next to its executable, not in the languard server data directory
|
|
- Reads new lines from the end of the file, detecting log rotation (Windows/NTFS safe)
|
|
- Parses each line using `RPTParser.parse_line()` to extract timestamp, level, and message
|
|
- Persists parsed entries to the `logs` table via `LogRepository`
|
|
- Broadcasts `log` events via the global queue
|
|
|
|
## ProcessMonitorThread
|
|
|
|
Monitors each server process for crashes:
|
|
|
|
- Checks every 5 seconds whether the process is still alive
|
|
- If the process has exited unexpectedly:
|
|
1. Updates server status to `crashed`
|
|
2. Logs the crash event
|
|
3. If `auto_restart` is enabled and restart count hasn't exceeded `max_restarts` within the `restart_window_seconds`:
|
|
- Triggers a restart via `ServerService.start_server()`
|
|
- Increments `restart_count`
|
|
|
|
## MetricsCollectorThread
|
|
|
|
Collects CPU and RAM metrics for each running server:
|
|
|
|
- Uses `psutil.Process(pid)` to get CPU and memory usage
|
|
- Collects every 10 seconds
|
|
- Stores metrics in the `metrics` table via `MetricsRepository`
|
|
- Broadcasts `metrics` events via the global queue
|
|
|
|
## RemoteAdminPollerThread
|
|
|
|
Polls the BattlEye RCon interface for player list updates:
|
|
|
|
- Connects via `Arma3RemoteAdmin` using `BERConClient`
|
|
- Polls player list every 10 seconds
|
|
- Compares current players with previous state to detect joins/leaves
|
|
- On player join: upserts to `players` table, inserts to `player_history`, broadcasts `players` event
|
|
- On player leave: removes from `players`, updates `left_at` in `player_history`, broadcasts `players` event
|
|
- On RCon connection failure: reconnects with exponential backoff
|
|
|
|
## WebSocketManager
|
|
|
|
Runs on the main asyncio event loop:
|
|
|
|
- Clients connect to `/ws?token=JWT&server_id=N`
|
|
- JWT is validated on connection; invalid tokens close with code 4001
|
|
- Clients subscribe to specific `server_id`s or `None` (all servers)
|
|
- `broadcast(server_id, message)` sends JSON-encoded messages to matching subscribers
|
|
- `disconnect(websocket)` removes the client from the registry
|
|
- Thread-safe via `asyncio.Lock`
|
|
|
|
## Thread Safety Rules
|
|
|
|
1. **Database access**: Each thread uses its own connection via `get_thread_db()`. No shared DB connections.
|
|
2. **WebSocket broadcasting**: Threads write to `queue.Queue`, which is thread-safe. Only `BroadcastThread` reads from the queue.
|
|
3. **Process management**: `ProcessManager` uses per-server locks for thread-safe start/stop operations.
|
|
4. **SQLite WAL mode**: Enables concurrent reads from multiple threads while a single writer operates.
|
|
5. **Asyncio locks**: `WebSocketManager` uses `asyncio.Lock` for connection registry modifications.
|
|
|
|
## Scheduled Jobs
|
|
|
|
APScheduler `BackgroundScheduler` runs 3 cleanup cron jobs:
|
|
|
|
| Job | Schedule | Cleanup |
|
|
|---|---|---|
|
|
| Clean up old log entries | Daily at 03:00 | `DELETE FROM logs WHERE created_at < datetime('now', '-7 days')` |
|
|
| Clean up old metrics | Every 6 hours | `DELETE FROM metrics WHERE timestamp < datetime('now', '-1 day')` |
|
|
| Clean up old events | Weekly (Sunday 04:00) | `DELETE FROM server_events WHERE created_at < datetime('now', '-30 days')` |
|
|
|
|
## Startup Sequence
|
|
|
|
1. Init DB engine and run pending migrations
|
|
2. Register built-in adapters (Arma 3) and scan for third-party plugins
|
|
3. Create `WebSocketManager` (asyncio-only)
|
|
4. Create global `BroadcastThread` (queue → asyncio bridge)
|
|
5. Create `ThreadRegistry` with `ProcessManager` and adapter registry
|
|
6. Recover processes that survived a restart (PID validation via psutil)
|
|
7. Re-attach monitoring threads for running servers
|
|
8. Seed default admin user if no users exist
|
|
9. Register and start APScheduler cleanup jobs
|
|
|
|
## Shutdown Sequence
|
|
|
|
1. Stop all server threads via `ThreadRegistry.stop_all()`
|
|
2. Stop `BroadcastThread` and join with 5s timeout
|
|
3. Stop APScheduler |