""" ThreadRegistry — manages the lifecycle of all per-server background threads. One instance is created at app startup and stored in app.state.thread_registry. Also provides class-level methods for convenience (called from ServerService). Thread set per server: - LogTailThread (started if adapter has "log_parser" capability and log_path is known) - MetricsCollectorThread (always started) - ProcessMonitorThread (always started) - RemoteAdminPollerThread (started only if adapter has "remote_admin" capability) Key methods: start_server_threads(server_id, db) — start all threads for a server stop_server_threads(server_id) — stop all threads for a server reattach_server_threads(server_id, db) — re-attach threads without restarting process stop_all() — called at app shutdown """ from __future__ import annotations import logging import queue from adapters.registry import GameAdapterRegistry from core.dal.config_repository import ConfigRepository from core.dal.server_repository import ServerRepository from core.threads.log_tail import LogTailThread from core.threads.metrics_collector import MetricsCollectorThread from core.threads.process_monitor import ProcessMonitorThread from core.threads.remote_admin_poller import RemoteAdminPollerThread logger = logging.getLogger(__name__) # Module-level singleton for convenience (used by ServerService) _instance: ThreadRegistry | None = None class ThreadRegistry: """ Manages all background threads for all running servers. """ def __init__( self, process_manager, adapter_registry: GameAdapterRegistry | None = None, global_broadcast_queue: queue.Queue | None = None, ) -> None: self._process_manager = process_manager self._adapter_registry = adapter_registry or GameAdapterRegistry self._broadcast_queue = global_broadcast_queue or queue.Queue(maxsize=1000) self._bundles: dict[int, dict] = {} # server_id -> thread bundle # ── Class-level convenience API ── @classmethod def _get_instance(cls) -> "ThreadRegistry | None": return _instance @classmethod def set_instance(cls, registry: "ThreadRegistry") -> None: global _instance _instance = registry @classmethod def start_server_threads(cls, server_id: int, db) -> None: """Class-level convenience — starts threads for a server using the singleton.""" registry = cls._get_instance() if registry is not None: registry._start_server_threads(server_id, db) @classmethod def stop_server_threads(cls, server_id: int) -> None: """Class-level convenience — stops threads for a server using the singleton.""" registry = cls._get_instance() if registry is not None: registry._stop_server_threads(server_id) @classmethod def reattach_server_threads(cls, server_id: int, db) -> None: """Class-level convenience — re-attaches threads for a recovered server.""" registry = cls._get_instance() if registry is not None: registry._reattach_server_threads(server_id, db) @classmethod def stop_all(cls) -> None: """Class-level convenience — stops all threads.""" registry = cls._get_instance() if registry is not None: registry._stop_all() @classmethod def get_rcon_client(cls, server_id: int): """Return the live Arma3RemoteAdmin client for a running server, or None.""" registry = cls._get_instance() if registry is None: return None bundle = registry._bundles.get(server_id) if bundle is None: return None poller = bundle.get("rcon_poller") if poller is None or not poller.is_alive(): return None return getattr(poller, "_client", None) # ── Instance methods ── def _start_server_threads(self, server_id: int, db) -> None: if server_id in self._bundles: logger.warning( "ThreadRegistry: threads already exist for server %d — stopping first", server_id, ) self._stop_server_threads(server_id) bundle = self._build_bundle(server_id, db) self._bundles[server_id] = bundle self._start_bundle(server_id, bundle) def _stop_server_threads(self, server_id: int) -> None: bundle = self._bundles.pop(server_id, None) if bundle is None: return self._stop_bundle(server_id, bundle) def _reattach_server_threads(self, server_id: int, db) -> None: logger.info("ThreadRegistry: reattaching threads for server %d", server_id) self._start_server_threads(server_id, db) def _stop_all(self) -> None: server_ids = list(self._bundles.keys()) for server_id in server_ids: self._stop_server_threads(server_id) logger.info("ThreadRegistry: all threads stopped") def get_thread_count(self, server_id: int) -> int: """Returns the number of running threads for a server.""" bundle = self._bundles.get(server_id) if bundle is None: return 0 return sum( 1 for key in ("log_tail", "metrics", "monitor", "rcon_poller") if bundle.get(key) is not None and bundle[key].is_alive() ) # ── Bundle construction ── def _build_bundle(self, server_id: int, db) -> dict: """Reads server + config data from DB and constructs (but does not start) the thread bundle.""" server_repo = ServerRepository(db) config_repo = ConfigRepository(db) server = server_repo.get_by_id(server_id) if server is None: raise ValueError(f"Server {server_id} not found in database") game_type = server["game_type"] adapter = self._adapter_registry.get(game_type) # Log path: RPT files live next to the server exe, not in the languard data dir log_path = None if adapter.has_capability("log_parser"): log_parser = adapter.get_log_parser() from pathlib import Path exe_dir = Path(server["exe_path"]).parent resolver = log_parser.get_log_file_resolver(server_id) resolved = resolver(exe_dir) if resolved is not None: log_path = str(resolved) bundle: dict = { "log_tail": None, "metrics": None, "monitor": None, "rcon_poller": None, } # Always: ProcessMonitorThread bundle["monitor"] = ProcessMonitorThread( server_id=server_id, process_manager=self._process_manager, broadcast_queue=self._broadcast_queue, ) # Always: MetricsCollectorThread bundle["metrics"] = MetricsCollectorThread( server_id=server_id, process_manager=self._process_manager, broadcast_queue=self._broadcast_queue, ) # Conditional: LogTailThread if log_path and adapter.has_capability("log_parser"): log_parser = adapter.get_log_parser() bundle["log_tail"] = LogTailThread( server_id=server_id, log_path=log_path, log_parser=log_parser, broadcast_queue=self._broadcast_queue, ) # Conditional: RemoteAdminPollerThread if adapter.has_capability("remote_admin"): remote_admin = adapter.get_remote_admin() if remote_admin is not None: # Get RCon password from config rcon_password = self._get_remote_admin_password(server_id, config_repo) if rcon_password: try: rcon_port = server.get("rcon_port") or server.get("game_port", 0) + 1 client = remote_admin.create_client( host="127.0.0.1", port=rcon_port, password=rcon_password, ) bundle["rcon_poller"] = RemoteAdminPollerThread( server_id=server_id, remote_admin_client=client, broadcast_queue=self._broadcast_queue, ) except Exception as exc: logger.warning( "ThreadRegistry: could not create RCon client for server %d: %s", server_id, exc, ) return bundle def _start_bundle(self, server_id: int, bundle: dict) -> None: started = [] for key in ("monitor", "metrics", "log_tail", "rcon_poller"): thread = bundle.get(key) if thread is not None: thread.start() started.append(key) logger.info("ThreadRegistry: started threads for server %d: %s", server_id, started) def _stop_bundle(self, server_id: int, bundle: dict) -> None: for key in ("rcon_poller", "log_tail", "metrics", "monitor"): thread = bundle.get(key) if thread is not None and thread.is_alive(): thread.stop_and_join(timeout=5.0) logger.info("ThreadRegistry: stopped all threads for server %d", server_id) # ── Helpers ── def _get_remote_admin_password( self, server_id: int, config_repo: ConfigRepository ) -> str | None: """Read the RCon password from the rcon config section.""" # Need to decrypt sensitive fields from adapters.registry import GameAdapterRegistry try: server = ServerRepository(config_repo._db).get_by_id(server_id) if server is None: return None adapter = self._adapter_registry.get(server["game_type"]) config_gen = adapter.get_config_generator() sensitive = config_gen.get_sensitive_fields("rcon") if "rcon" in config_gen.get_sections() else [] except Exception as exc: logger.debug("Could not determine sensitive fields for RCon config: %s", exc) sensitive = [] rcon_section = config_repo.get_section(server_id, "rcon", sensitive) if rcon_section is None: return None return rcon_section.get("password") or None