""" ProcessManager singleton — owns all subprocess handles. Game-agnostic: delegates exe validation and config to adapters. """ from __future__ import annotations import logging import subprocess import threading from pathlib import Path import psutil logger = logging.getLogger(__name__) class ProcessManager: _instance: "ProcessManager | None" = None _init_lock = threading.Lock() def __init__(self): self._processes: dict[int, subprocess.Popen] = {} self._lock = threading.Lock() self._operation_locks: dict[int, threading.Lock] = {} self._ops_lock = threading.Lock() @classmethod def get(cls) -> "ProcessManager": if cls._instance is None: with cls._init_lock: if cls._instance is None: cls._instance = ProcessManager() return cls._instance def get_operation_lock(self, server_id: int) -> threading.Lock: """Per-server lock that serializes start/stop/restart for the same server.""" with self._ops_lock: if server_id not in self._operation_locks: self._operation_locks[server_id] = threading.Lock() return self._operation_locks[server_id] def start( self, server_id: int, exe_path: str, args: list[str], cwd: str | Path, ) -> int: """ Start a game server process. Returns the PID. cwd is set to servers/{server_id}/ so relative config paths work. """ with self._lock: if server_id in self._processes: proc = self._processes[server_id] if proc.poll() is None: raise RuntimeError(f"Server {server_id} is already running (PID {proc.pid})") del self._processes[server_id] full_cmd = [exe_path] + args logger.info("Starting server %d: %s", server_id, ' '.join(full_cmd)) proc = subprocess.Popen( full_cmd, cwd=str(cwd), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, # On Windows, don't create a new console window creationflags=subprocess.CREATE_NO_WINDOW if hasattr(subprocess, "CREATE_NO_WINDOW") else 0, ) with self._lock: self._processes[server_id] = proc logger.info("Server %d started with PID %d", server_id, proc.pid) return proc.pid def stop(self, server_id: int, timeout: int = 30) -> bool: """ Send terminate signal and wait up to timeout seconds. On Windows, terminate() = hard kill (no SIGTERM). Returns True if process exited, False if still running. """ with self._lock: proc = self._processes.get(server_id) if proc is None: return True try: proc.terminate() except ProcessLookupError: return True try: proc.wait(timeout=timeout) with self._lock: self._processes.pop(server_id, None) return True except subprocess.TimeoutExpired: return False def kill(self, server_id: int) -> bool: """Force-kill the process immediately.""" with self._lock: proc = self._processes.get(server_id) if proc is None: return True try: proc.kill() proc.wait(timeout=5) except (ProcessLookupError, subprocess.TimeoutExpired): logger.debug("Process %d already exited or timed out during kill", server_id) with self._lock: self._processes.pop(server_id, None) return True def is_running(self, server_id: int) -> bool: with self._lock: proc = self._processes.get(server_id) if proc is None: return False return proc.poll() is None def get_pid(self, server_id: int) -> int | None: with self._lock: proc = self._processes.get(server_id) if proc is None or proc.poll() is not None: return None return proc.pid def get_process(self, server_id: int) -> subprocess.Popen | None: with self._lock: return self._processes.get(server_id) def list_running(self) -> list[int]: with self._lock: return [sid for sid, p in self._processes.items() if p.poll() is None] def recover_on_startup(self, db) -> None: """ On app restart: check DB for servers marked 'running'. If the PID is still alive AND the process name matches the adapter's allowed executables, re-attach monitoring threads. Otherwise mark server as 'crashed'. """ from adapters.registry import GameAdapterRegistry from core.dal.server_repository import ServerRepository from core.dal.event_repository import EventRepository from sqlalchemy import text running_servers = ServerRepository(db).get_running() for server in running_servers: pid = server.get("pid") if pid is None: self._mark_crashed(server, db, "No PID recorded") continue # Check if PID is alive if not psutil.pid_exists(pid): self._mark_crashed(server, db, f"PID {pid} no longer exists") continue # Check process name matches adapter allowlist try: proc = psutil.Process(pid) proc_name = proc.name() adapter = GameAdapterRegistry.get(server["game_type"]) allowed = adapter.get_process_config().get_allowed_executables() if not any(proc_name.lower() == exe.lower() for exe in allowed): self._mark_crashed( server, db, f"PID {pid} has name '{proc_name}', not in allowlist {allowed}" ) continue except (psutil.NoSuchProcess, psutil.AccessDenied, KeyError) as e: self._mark_crashed(server, db, str(e)) continue # PID is valid — re-attach the process and start monitoring threads logger.info( "Recovering server %d (PID %d, %s)", server['id'], pid, server['game_type'] ) proc_obj = self._get_popen_for_pid(pid) if proc_obj: with self._lock: self._processes[server["id"]] = proc_obj # Re-start monitoring threads without re-launching the process try: from core.threads.thread_registry import ThreadRegistry ThreadRegistry.reattach_server_threads(server["id"], db) except Exception as e: logger.warning("Could not re-attach threads for server %d: %s", server['id'], e) else: self._mark_crashed(server, db, f"Could not attach to PID {pid}") def _mark_crashed(self, server: dict, db, reason: str) -> None: from core.dal.server_repository import ServerRepository from core.dal.event_repository import EventRepository logger.warning("Server %d marked crashed on startup: %s", server['id'], reason) ServerRepository(db).update_status(server["id"], "crashed") EventRepository(db).insert( server["id"], "crashed", actor="system", detail={"reason": reason, "on_startup": True} ) @staticmethod def _get_popen_for_pid(pid: int) -> subprocess.Popen | None: """ Create a Popen-like wrapper that attaches to an existing PID. NOTE: This is a limited wrapper — we cannot use Popen() on existing PIDs. We use a sentinel object that wraps psutil.Process. """ try: return _PsutilProcessWrapper(pid) except (psutil.NoSuchProcess, psutil.AccessDenied): return None class _PsutilProcessWrapper: """ Minimal Popen-compatible wrapper around an existing process (by PID). Used for startup recovery only. """ def __init__(self, pid: int): self._psutil_proc = psutil.Process(pid) self.pid = pid def poll(self) -> int | None: """Return None if running, exit code if not (we use -1 for external termination).""" if self._psutil_proc.is_running(): return None return -1 def wait(self, timeout: int | None = None): self._psutil_proc.wait(timeout=timeout) def terminate(self): self._psutil_proc.terminate() def kill(self): self._psutil_proc.kill()