Files
languard-servers-manager/backend/core/threads/thread_registry.py
Tran G. (Revernomad) Khoa 6511353b55 feat: implement full backend + frontend server detail, settings, and create server pages
Backend:
- Complete FastAPI backend with 42+ REST endpoints (auth, servers, config,
  players, bans, missions, mods, games, system)
- Game adapter architecture with Arma 3 as first-class adapter
- WebSocket real-time events for status, metrics, logs, players
- Background thread system (process monitor, metrics, log tail, RCon poller)
- Fernet encryption for sensitive config fields at rest
- JWT auth with admin/viewer roles, bcrypt password hashing
- SQLite with WAL mode, parameterized queries, migration system
- APScheduler cleanup jobs for logs, metrics, events

Frontend:
- Server Detail page with 7 tabs (overview, config, players, bans,
  missions, mods, logs)
- Settings page with password change and admin user management
- Create Server wizard (4-step; known bug: silent validation failure)
- New hooks: useServerDetail, useAuth, useGames
- New components: ServerHeader, ConfigEditor, PlayerTable, BanTable,
  MissionList, ModList, LogViewer, PasswordChange, UserManager
- WebSocket onEvent callback for real-time log accumulation
- 120 unit tests passing (Vitest + React Testing Library)

Docs:
- Added .gitignore, CLAUDE.md, README.md
- Updated FRONTEND.md, ARCHITECTURE.md with current implementation state
- Added .env.example for backend configuration

Known issues:
- Create Server form: "Next" buttons don't validate before advancing,
  causing silent submit failure when fields are invalid
- Config sub-tabs need UX redesign for non-technical users
2026-04-17 11:58:34 +07:00

257 lines
10 KiB
Python

"""
ThreadRegistry — manages the lifecycle of all per-server background threads.
One instance is created at app startup and stored in app.state.thread_registry.
Also provides class-level methods for convenience (called from ServerService).
Thread set per server:
- LogTailThread (started if adapter has "log_parser" capability and log_path is known)
- MetricsCollectorThread (always started)
- ProcessMonitorThread (always started)
- RemoteAdminPollerThread (started only if adapter has "remote_admin" capability)
Key methods:
start_server_threads(server_id, db) — start all threads for a server
stop_server_threads(server_id) — stop all threads for a server
reattach_server_threads(server_id, db) — re-attach threads without restarting process
stop_all() — called at app shutdown
"""
from __future__ import annotations
import logging
import queue
from adapters.registry import GameAdapterRegistry
from core.dal.config_repository import ConfigRepository
from core.dal.server_repository import ServerRepository
from core.threads.log_tail import LogTailThread
from core.threads.metrics_collector import MetricsCollectorThread
from core.threads.process_monitor import ProcessMonitorThread
from core.threads.remote_admin_poller import RemoteAdminPollerThread
logger = logging.getLogger(__name__)
# Module-level singleton for convenience (used by ServerService)
_instance: ThreadRegistry | None = None
class ThreadRegistry:
"""
Manages all background threads for all running servers.
"""
def __init__(
self,
process_manager,
adapter_registry: GameAdapterRegistry | None = None,
global_broadcast_queue: queue.Queue | None = None,
) -> None:
self._process_manager = process_manager
self._adapter_registry = adapter_registry or GameAdapterRegistry
self._broadcast_queue = global_broadcast_queue or queue.Queue(maxsize=1000)
self._bundles: dict[int, dict] = {} # server_id -> thread bundle
# ── Class-level convenience API ──
@classmethod
def _get_instance(cls) -> "ThreadRegistry | None":
return _instance
@classmethod
def set_instance(cls, registry: "ThreadRegistry") -> None:
global _instance
_instance = registry
@classmethod
def start_server_threads(cls, server_id: int, db) -> None:
"""Class-level convenience — starts threads for a server using the singleton."""
registry = cls._get_instance()
if registry is not None:
registry._start_server_threads(server_id, db)
@classmethod
def stop_server_threads(cls, server_id: int) -> None:
"""Class-level convenience — stops threads for a server using the singleton."""
registry = cls._get_instance()
if registry is not None:
registry._stop_server_threads(server_id)
@classmethod
def reattach_server_threads(cls, server_id: int, db) -> None:
"""Class-level convenience — re-attaches threads for a recovered server."""
registry = cls._get_instance()
if registry is not None:
registry._reattach_server_threads(server_id, db)
@classmethod
def stop_all(cls) -> None:
"""Class-level convenience — stops all threads."""
registry = cls._get_instance()
if registry is not None:
registry._stop_all()
# ── Instance methods ──
def _start_server_threads(self, server_id: int, db) -> None:
if server_id in self._bundles:
logger.warning(
"ThreadRegistry: threads already exist for server %d — stopping first",
server_id,
)
self._stop_server_threads(server_id)
bundle = self._build_bundle(server_id, db)
self._bundles[server_id] = bundle
self._start_bundle(server_id, bundle)
def _stop_server_threads(self, server_id: int) -> None:
bundle = self._bundles.pop(server_id, None)
if bundle is None:
return
self._stop_bundle(server_id, bundle)
def _reattach_server_threads(self, server_id: int, db) -> None:
logger.info("ThreadRegistry: reattaching threads for server %d", server_id)
self._start_server_threads(server_id, db)
def _stop_all(self) -> None:
server_ids = list(self._bundles.keys())
for server_id in server_ids:
self._stop_server_threads(server_id)
logger.info("ThreadRegistry: all threads stopped")
def get_thread_count(self, server_id: int) -> int:
"""Returns the number of running threads for a server."""
bundle = self._bundles.get(server_id)
if bundle is None:
return 0
return sum(
1
for key in ("log_tail", "metrics", "monitor", "rcon_poller")
if bundle.get(key) is not None and bundle[key].is_alive()
)
# ── Bundle construction ──
def _build_bundle(self, server_id: int, db) -> dict:
"""Reads server + config data from DB and constructs (but does not start) the thread bundle."""
server_repo = ServerRepository(db)
config_repo = ConfigRepository(db)
server = server_repo.get_by_id(server_id)
if server is None:
raise ValueError(f"Server {server_id} not found in database")
game_type = server["game_type"]
adapter = self._adapter_registry.get(game_type)
# Log path: read from config if present, else use adapter default
log_path = None
if adapter.has_capability("log_parser"):
log_parser = adapter.get_log_parser()
# Try to resolve log path via the adapter's log file resolver
from core.utils.file_utils import get_server_dir
server_dir = get_server_dir(server_id)
if server_dir.exists():
resolver = log_parser.get_log_file_resolver(server_id)
resolved = resolver(server_dir)
if resolved is not None:
log_path = str(resolved)
bundle: dict = {
"log_tail": None,
"metrics": None,
"monitor": None,
"rcon_poller": None,
}
# Always: ProcessMonitorThread
bundle["monitor"] = ProcessMonitorThread(
server_id=server_id,
process_manager=self._process_manager,
broadcast_queue=self._broadcast_queue,
)
# Always: MetricsCollectorThread
bundle["metrics"] = MetricsCollectorThread(
server_id=server_id,
process_manager=self._process_manager,
broadcast_queue=self._broadcast_queue,
)
# Conditional: LogTailThread
if log_path and adapter.has_capability("log_parser"):
log_parser = adapter.get_log_parser()
bundle["log_tail"] = LogTailThread(
server_id=server_id,
log_path=log_path,
log_parser=log_parser,
broadcast_queue=self._broadcast_queue,
)
# Conditional: RemoteAdminPollerThread
if adapter.has_capability("remote_admin"):
remote_admin = adapter.get_remote_admin()
if remote_admin is not None:
# Get RCon password from config
rcon_password = self._get_remote_admin_password(server_id, config_repo)
if rcon_password:
try:
rcon_port = server.get("rcon_port") or server.get("game_port", 0) + 1
client = remote_admin.create_client(
host="127.0.0.1",
port=rcon_port,
password=rcon_password,
)
bundle["rcon_poller"] = RemoteAdminPollerThread(
server_id=server_id,
remote_admin_client=client,
broadcast_queue=self._broadcast_queue,
)
except Exception as exc:
logger.warning(
"ThreadRegistry: could not create RCon client for server %d: %s",
server_id, exc,
)
return bundle
def _start_bundle(self, server_id: int, bundle: dict) -> None:
started = []
for key in ("monitor", "metrics", "log_tail", "rcon_poller"):
thread = bundle.get(key)
if thread is not None:
thread.start()
started.append(key)
logger.info("ThreadRegistry: started threads for server %d: %s", server_id, started)
def _stop_bundle(self, server_id: int, bundle: dict) -> None:
for key in ("rcon_poller", "log_tail", "metrics", "monitor"):
thread = bundle.get(key)
if thread is not None and thread.is_alive():
thread.stop_and_join(timeout=5.0)
logger.info("ThreadRegistry: stopped all threads for server %d", server_id)
# ── Helpers ──
def _get_remote_admin_password(
self, server_id: int, config_repo: ConfigRepository
) -> str | None:
"""Read the RCon password from the rcon config section."""
# Need to decrypt sensitive fields
from adapters.registry import GameAdapterRegistry
try:
server = ServerRepository(config_repo._db).get_by_id(server_id)
if server is None:
return None
adapter = self._adapter_registry.get(server["game_type"])
config_gen = adapter.get_config_generator()
sensitive = config_gen.get_sensitive_fields("rcon") if "rcon" in config_gen.get_sections() else []
except Exception as exc:
logger.debug("Could not determine sensitive fields for RCon config: %s", exc)
sensitive = []
rcon_section = config_repo.get_section(server_id, "rcon", sensitive)
if rcon_section is None:
return None
return rcon_section.get("password") or None