Files
languard-servers-manager/MODULES.md
Tran G. (Revernomad) Khoa b17d199301 fix: address design review ACT NOW items (6 risk gaps)
- Add migrate_config() to ConfigGenerator protocol for schema version upgrades
- Add per-server operation lock to ProcessManager to prevent start/stop races
- Add busy_timeout retry/backoff strategy (exponential: 1s, 2s, 4s) for DB lock exhaustion
- Add ConfigForm testing strategy and error boundary for malformed schemas
- Add schema cache invalidation on adapter version change
- Add ConfigMigrationError to typed adapter exceptions
2026-04-16 17:29:19 +07:00

36 KiB

Languard Servers Manager — Python Module Breakdown

Project Structure

backend/
├── main.py
├── config.py
├── database.py
├── dependencies.py
│
├── core/                          # Game-agnostic core
│   ├── __init__.py
│   │
│   ├── auth/
│   │   ├── __init__.py
│   │   ├── router.py
│   │   ├── service.py
│   │   ├── schemas.py
│   │   └── utils.py
│   │
│   ├── servers/
│   │   ├── __init__.py
│   │   ├── router.py              # Generic routes, delegate to adapter
│   │   ├── service.py             # ServerService delegates game work to adapter
│   │   ├── process_manager.py     # ProcessManager singleton (game-agnostic)
│   │   └── schemas.py             # Generic server schemas
│   │
│   ├── players/
│   │   ├── __init__.py
│   │   ├── router.py
│   │   ├── service.py
│   │   └── schemas.py
│   │
│   ├── logs/
│   │   ├── __init__.py
│   │   ├── router.py
│   │   └── service.py
│   │
│   ├── metrics/
│   │   ├── __init__.py
│   │   ├── router.py
│   │   └── service.py
│   │
│   ├── bans/
│   │   ├── __init__.py
│   │   ├── router.py
│   │   └── service.py
│   │
│   ├── events/
│   │   ├── __init__.py
│   │   ├── router.py
│   │   └── service.py
│   │
│   ├── websocket/
│   │   ├── __init__.py
│   │   ├── router.py
│   │   ├── manager.py
│   │   └── broadcaster.py
│   │
│   ├── threads/
│   │   ├── __init__.py
│   │   ├── base_thread.py
│   │   ├── process_monitor.py     # Core (game-agnostic)
│   │   ├── log_tail.py            # Core, takes adapter LogParser
│   │   ├── metrics_collector.py   # Core (game-agnostic)
│   │   ├── remote_admin_poller.py # Core, takes adapter RemoteAdmin
│   │   └── thread_registry.py     # Builds threads from adapter capabilities
│   │
│   ├── games/
│   │   ├── __init__.py
│   │   └── router.py              # /api/games — type discovery, schemas
│   │
│   ├── system/
│   │   ├── __init__.py
│   │   └── router.py
│   │
│   ├── dal/
│   │   ├── __init__.py
│   │   ├── base_repository.py
│   │   ├── server_repository.py
│   │   ├── config_repository.py    # game_configs table
│   │   ├── player_repository.py
│   │   ├── log_repository.py
│   │   ├── metrics_repository.py
│   │   ├── mission_repository.py
│   │   ├── mod_repository.py
│   │   ├── ban_repository.py
│   │   └── event_repository.py
│   │
│   ├── migrations/
│   │   ├── runner.py
│   │   └── 001_initial_schema.sql
│   │
│   └── utils/
│       ├── __init__.py
│       ├── crypto.py
│       ├── file_utils.py
│       └── port_checker.py
│
└── adapters/                       # Game-specific adapters
    ├── __init__.py                 # Auto-registers built-in adapters + entry_points
    ├── registry.py                # GameAdapterRegistry singleton
    ├── protocols.py               # All capability Protocol definitions
    ├── exceptions.py              # Typed adapter exceptions (ConfigWriteError, etc.)
    │
    └── arma3/                     # Arma 3 adapter (built-in, first-class)
        ├── __init__.py             # Exports ARMA3_ADAPTER, registers on import
        ├── adapter.py             # Arma3Adapter class
        ├── config_generator.py    # Pydantic models + server.cfg, basic.cfg, Arma3Profile, beserver.cfg (merged schema + generation)
        ├── rcon_client.py         # BERConClient (BattlEye UDP protocol)
        ├── rcon_service.py        # Wraps BERConClient for RemoteAdmin protocol
        ├── log_parser.py          # RPTParser
        ├── mission_manager.py     # PBO upload, mission rotation config
        ├── mod_manager.py         # @mod_folder convention, -mod=/-serverMod=
        ├── process_config.py      # Exe allowlist, port conventions, profile dir
        ├── ban_manager.py         # battleye/ban.txt bidirectional sync
        ├── schemas.py             # Arma 3 specific request/response models
        └── migrations/
            └── 001_arma3_metadata.sql  # Arma 3 specific tables (if any)

Module Details

main.py

Entry point. Creates and configures the FastAPI application.

# Responsibilities:
# - Create FastAPI app instance
# - Register all core routers with prefix /api
# - Register adapter-provided additional routers
# - Configure CORS middleware
# - Add JWT auth middleware
# - Register startup/shutdown event handlers:
#     startup: run DB migrations, init ProcessManager, auto-register adapters,
#              restore running servers
#     shutdown: gracefully stop all BroadcastThread, close DB
# - Auto-register built-in adapters (arma3) on import

Key functions:
  create_app() -> FastAPI
  on_startup()           # DB migrations, register adapters, recover state
  on_shutdown()          # Clean up threads, close connections

config.py

Loads and validates environment variables using Pydantic BaseSettings.

class Settings(BaseSettings):
    secret_key: str
    encryption_key: str          # Fernet base64 key (NOT hex)
    db_path: str = "./languard.db"
    servers_dir: str = "./servers"
    host: str = "0.0.0.0"
    port: int = 8000
    cors_origins: list[str] = ["http://localhost:5173"]
    log_retention_days: int = 7
    metrics_retention_days: int = 30
    player_history_retention_days: int = 90
    jwt_expire_hours: int = 24
    login_rate_limit: str = "5/minute"  # per IP

    # Game-specific defaults (used by adapters, not core)
    arma3_default_exe: str = "C:/Arma3Server/arma3server_x64.exe"

settings = Settings()  # singleton

Key change: Removed arma_exe as a core setting. Game-specific paths are now namespaced per adapter.


database.py

Database engine setup and session management. No game-specific logic.

# Responsibilities:
# - Create SQLAlchemy engine with WAL + FK + busy_timeout pragmas
# - Provide get_db() dependency for FastAPI routes (sync session)
# - Provide get_thread_db() for background threads (thread-local sessions)
# - run_migrations(): apply pending .sql migration files at startup

Key functions:
  get_engine() -> Engine
  get_db() -> Generator[Connection, None, None]   # FastAPI dependency
  get_thread_db() -> Connection                   # for threads
  run_migrations(engine: Engine)

dependencies.py

Reusable FastAPI dependencies. No game-specific logic.

Key functions:
  get_current_user(credentials: HTTPAuthorizationCredentials) -> User
  require_admin(user: User = Depends(get_current_user)) -> User
  get_server_or_404(server_id: int, db: Connection) -> dict
  get_adapter_for_server(server_id: int, db: Connection) -> GameAdapter
    # Convenience: loads server, resolves adapter from game_type

Core: servers/

router.py — Game-agnostic server endpoints. Delegates to adapter for game-specific operations.

  • GET /servers (supports ?game_type= filter)
  • POST /servers (requires game_type in body)
  • GET /servers/{id}
  • PUT /servers/{id}
  • DELETE /servers/{id}
  • POST /servers/{id}/start
  • POST /servers/{id}/stop
  • POST /servers/{id}/restart
  • POST /servers/{id}/kill
  • GET /servers/{id}/config
  • GET /servers/{id}/config/{section}
  • PUT /servers/{id}/config/{section}
  • GET /servers/{id}/config/preview
  • GET /servers/{id}/config/download/{filename}

service.pyServerService

class ServerService:
    def list_servers(game_type: str | None = None) -> list[ServerSummary]
    def get_server(server_id) -> ServerDetail
    def create_server(data: CreateServerRequest) -> Server
      # 1. Validate game_type is registered
      # 2. Get adapter defaults for config sections
      # 3. Create server row + game_configs rows
      # 4. Create server directory (layout from adapter.process_config)
    def update_server(server_id, data) -> Server
    def delete_server(server_id) -> bool

    def start(server_id) -> StatusResponse
      # 1. Load server from DB (includes game_type)
      # 2. adapter = GameAdapterRegistry.get(server.game_type)
      # 3. Validate exe against adapter.process_config.get_allowed_executables()
      # 4. Check ports via adapter.process_config.get_port_conventions()
      # 5. Read config sections from game_configs table
      # 6. adapter.config_generator.write_configs(server_id, dir, config)
      # 7. launch_args = adapter.config_generator.build_launch_args(config, mods)
      # 8. ProcessManager.start(server_id, exe, args, cwd=dir)
      # 9. DB: status = 'starting'
      # 10. ThreadRegistry.start_server_threads(server_id, db)
      # 11. Broadcast status update

    def stop(server_id, force=False) -> StatusResponse
      # 1. If not force and adapter has remote_admin:
      #      adapter.remote_admin.shutdown()
      # 2. Wait up to 30s for process exit
      # 3. If still running: ProcessManager.kill(server_id)
      # 4. DB: status = 'stopped', pid = NULL
      # 5. ThreadRegistry.stop_server_threads(server_id)
      # 6. PlayerRepository.clear(server_id)
      # 7. Broadcast status update

    def restart(server_id) -> StatusResponse

    def get_config(server_id) -> dict  # all sections
    def get_config_section(server_id, section) -> dict
    def update_config_section(server_id, section, data) -> dict
      # Validates data against adapter.config_generator.get_sections()[section]
      # Encrypts sensitive fields
      # Stores in game_configs table
    def get_config_preview(server_id) -> dict[str, str]
      # Returns {filename: rendered_content} from adapter.config_generator.preview_config()

process_manager.pyProcessManager singleton (game-agnostic)

class ProcessManager:
    _instance = None
    _processes: dict[int, subprocess.Popen] = {}
    _lock: threading.Lock
    _operation_locks: dict[int, threading.Lock]  # per-server operation lock

    @classmethod
    def get() -> ProcessManager

    def get_operation_lock(server_id) -> threading.Lock
      # Returns a per-server lock that serializes start/stop/restart
      # Prevents concurrent start+stop races for the same server
    def start(server_id, exe_path, args: list[str], cwd: str) -> int  # returns PID
    def stop(server_id, timeout=30) -> bool
    def kill(server_id) -> bool
    def is_running(server_id) -> bool
    def get_pid(server_id) -> int | None
    def get_process(server_id) -> subprocess.Popen | None
    def list_running() -> list[int]
    def recover_on_startup(db)
      # Checks PID still alive AND process name matches adapter allowlist
      # (prevents PID reuse by unrelated processes)

Core: players/

router.py — Generic player endpoints.

  • GET /servers/{id}/players
  • POST /servers/{id}/players/{slot_id}/kick (requires remote_admin)
  • POST /servers/{id}/players/{slot_id}/ban (requires remote_admin)
  • GET /servers/{id}/players/history

service.pyPlayerService

class PlayerService:
    def get_current_players(server_id) -> list[Player]
    def kick(server_id, slot_id, reason) -> bool
      # Resolves adapter; if adapter has remote_admin:
      #   adapter.remote_admin.kick_player(identifier, reason)
    def ban(server_id, slot_id, duration_minutes, reason) -> bool
    def get_history(server_id, limit, offset, search) -> PaginatedResult
    def update_from_remote_admin(server_id, players: list) -> None
      # Upserts players table; detects disconnections; inserts player_history rows

Core: logs/

router.pyGET /servers/{id}/logs, DELETE /servers/{id}/logs

service.pyLogService

class LogService:
    def query(server_id, limit, offset, level, since, search) -> PaginatedLogs
    def clear(server_id) -> int
    def get_log_path(server_id) -> Path | None
      # Resolves adapter's log_parser.get_log_file_resolver()
    def cleanup_old_logs()  # called by APScheduler

Core: metrics/, bans/, events/

Same pattern as above — game-agnostic services using core tables.


Core: threads/

base_thread.pyBaseServerThread (game-agnostic)

class BaseServerThread(threading.Thread):
    def __init__(server_id: int, interval: float)
    def stop()            # sets _stop_event
    def is_stopped() -> bool
    def run()             # creates thread-local DB connection, calls setup(),
                          # then loops: tick(self._db) + wait(interval)
                          # finally calls teardown() in finally block
    def setup()           # override for init work (receives self._db)
    def tick()            # override for per-interval work (uses self._db)
    def teardown()        # override for cleanup (close files, sockets)
    def on_error(e: Exception)  # default: log, continue

process_monitor.pyProcessMonitorThread (game-agnostic)

class ProcessMonitorThread(BaseServerThread):
    interval = 1.0  # seconds

    def tick():
        # 1. Check if process is still alive (os.kill(pid, 0))
        # 2. If dead:
        #    a. Get exit code
        #    b. DB: status = 'crashed', stopped_at = now
        #    c. Clear players from DB
        #    d. Broadcast: {type: 'status', status: 'crashed'}
        #    e. Insert server_events: {event_type: 'crashed', exit_code}
        #    f. If auto_restart: schedule restart with exponential backoff
        #    g. self.stop()

log_tail.pyLogTailThread (core thread + adapter parser)

class LogTailThread(BaseServerThread):
    interval = 0.1  # 100ms

    def __init__(self, server_id: int, log_parser: "LogParser",
                 log_file_resolver: Callable):
        super().__init__(server_id, self.interval)
        self._parser = log_parser
        self._log_file_resolver = log_file_resolver

    def setup():
        # Find log file using adapter's resolver
        # Open file, seek to end (tail behavior)

    def tick():
        # 1. Read all new lines from log file
        # 2. For each line:
        #    a. self._parser.parse_line(line) -> LogEntry
        #    b. LogRepository.insert(server_id, entry)
        #    c. BroadcastThread.enqueue(server_id, 'log', entry)

    def teardown():
        # Close the open log file handle

metrics_collector.pyMetricsCollectorThread (game-agnostic)

class MetricsCollectorThread(BaseServerThread):
    interval = 5.0  # seconds

    def tick():
        # 1. Get PID from ProcessManager
        # 2. psutil.Process(pid).cpu_percent(interval=0.5)
        # 3. psutil.Process(pid).memory_info().rss / (1024*1024)  # MB
        # 4. PlayerRepository.count(server_id) -> player_count
        # 5. MetricsRepository.insert(server_id, cpu, ram, player_count)
        # 6. BroadcastThread.enqueue(server_id, 'metrics', {cpu, ram, player_count})

remote_admin_poller.pyRemoteAdminPollerThread (core thread + adapter client)

class RemoteAdminPollerThread(BaseServerThread):
    interval = 10.0
    STARTUP_DELAY = 30.0

    def __init__(self, server_id: int,
                 remote_admin_factory: Callable[[], "RemoteAdminClient"]):
        super().__init__(server_id, self.interval)
        self._client_factory = remote_admin_factory
        self._client: RemoteAdminClient | None = None

    def setup():
        # Wait for server startup (using _stop_event.wait instead of sleep)
        # Create client via factory

    def tick():
        if not self._client:
            return
        try:
            players = self._client.get_players()
            PlayerService(self._db).update_from_remote_admin(self.server_id, players)
            BroadcastThread.enqueue(self.server_id, "players", {
                "players": [p.dict() for p in players],
                "count": len(players),
            })
        except ConnectionError:
            self._client = None  # Will reconnect on next tick

thread_registry.pyThreadRegistry (adapter-aware)

class ThreadRegistry:
    _threads: dict[int, dict[str, BaseServerThread]] = {}
    _lock = threading.Lock()

    @classmethod
    def start_server_threads(cls, server_id: int, db: Connection) -> None:
        server = ServerRepository(db).get_by_id(server_id)
        adapter = GameAdapterRegistry.get(server["game_type"])

        threads: dict[str, BaseServerThread] = {}

        # Core threads (always present)
        threads["process_monitor"] = ProcessMonitorThread(server_id)
        threads["metrics_collector"] = MetricsCollectorThread(server_id)

        # Adapter-provided log parser → generic LogTailThread
        log_parser = adapter.get_log_parser()
        threads["log_tail"] = LogTailThread(
            server_id,
            parser=log_parser,
            log_file_resolver=log_parser.get_log_file_resolver(server_id),
        )

        # Adapter-provided remote admin → generic RemoteAdminPollerThread
        remote_admin = adapter.get_remote_admin()
        if remote_admin is not None:
            threads["remote_admin_poller"] = RemoteAdminPollerThread(
                server_id,
                remote_admin_factory=lambda: remote_admin.create_client(
                    host="127.0.0.1",
                    port=server["rcon_port"],
                    password=_get_remote_admin_password(server_id, db),
                ),
            )

        # Adapter-declared custom threads
        for thread_factory in adapter.get_custom_thread_factories():
            thread = thread_factory(server_id, db)
            threads[thread.name_key] = thread

        with cls._lock:
            cls._threads[server_id] = threads

        for thread in threads.values():
            thread.start()

Core: games/

router.py — Game type discovery endpoints.

# GET /games           → list all registered game types + capabilities
# GET /games/{type}    → details for a specific game type
# GET /games/{type}/config-schema  → JSON Schema for each config section
# GET /games/{type}/defaults        → default config values

Core: dal/

config_repository.py — Manages the game_configs table.

class ConfigRepository(BaseRepository):
    def get_section(server_id: int, section: str) -> dict | None
    def get_all_sections(server_id: int) -> dict[str, dict]
    def upsert_section(server_id: int, game_type: str, section: str,
                       config_json: str) -> None
    def delete_sections(server_id: int) -> None  # cascade on server delete

All other repositories remain game-agnostic, using core tables.


Adapters: exceptions.py

Typed adapter exceptions that core catches specifically:

class AdapterError(Exception):
    """Base for all adapter errors."""
    pass

class ConfigWriteError(AdapterError):
    """File write failed (disk full, permissions)."""
    def __init__(self, path: str, detail: str): ...

class ConfigValidationError(AdapterError):
    """Config values violate adapter constraints."""
    def __init__(self, section: str, errors: list[dict]): ...

class LaunchArgsError(AdapterError):
    """build_launch_args() failed (missing mod, bad path)."""
    def __init__(self, detail: str): ...

class RemoteAdminError(AdapterError):
    """Remote admin connection/command failed."""
    def __init__(self, detail: str, recoverable: bool = True): ...

class ExeNotAllowedError(AdapterError):
    """Executable not in adapter's allowlist."""
    def __init__(self, exe: str, allowed: list[str]): ...

Adapters: protocols.py

Defines all capability protocols. See ARCHITECTURE.md for the full protocol definitions.

@runtime_checkable
class ConfigGenerator(Protocol):
    """Merged protocol: config schema definition + file generation + launch args.
    ConfigSchema and ConfigGenerator were merged because they always co-occur —
    no game defines config schema without also generating config files."""
    game_type: str
    def get_sections(self) -> dict[str, type[BaseModel]]: ...
    def get_defaults(self, section: str) -> dict[str, Any]: ...
    def get_sensitive_fields(self, section: str) -> list[str]:
        """Return JSON keys that need Fernet encryption for this section.
        Core's ConfigRepository handles encrypt/decrypt transparently."""
        ...
    def get_config_version(self) -> str:
        """Current adapter schema version. Stored in game_configs.schema_version."""
        ...
    def migrate_config(self, old_version: str, config_json: dict[str, dict]) -> dict[str, dict]:
        """Transform config JSON from an older schema version to the current one.
        Called by ConfigRepository when a section's stored schema_version
        differs from get_config_version(). Returns the migrated config dict.
        Raises ConfigMigrationError on failure (core rolls back; original stays)."""
        ...
    def write_configs(self, server_id: int, server_dir: Path,
                      config_sections: dict[str, dict]) -> list[Path]: ...
    def build_launch_args(self, config_sections: dict[str, dict],
                          mod_args: list[str] | None = None) -> list[str]: ...
    def preview_config(self, server_id: int, server_dir: Path,
                       config_sections: dict[str, dict]) -> dict[str, str]: ...

@runtime_checkable
class RemoteAdminClient(Protocol):
    def send_command(self, command: str, timeout: float = 5.0) -> str | None: ...
    def get_players(self) -> list[dict]: ...
    def kick_player(self, identifier: str, reason: str = "") -> bool: ...
    def ban_player(self, identifier: str, duration_minutes: int, reason: str) -> bool: ...
    def say_all(self, message: str) -> bool: ...
    def shutdown(self) -> bool: ...
    def keepalive(self) -> None: ...
    def disconnect(self) -> None: ...

@runtime_checkable
class RemoteAdmin(Protocol):
    def create_client(self, host: str, port: int, password: str) -> RemoteAdminClient: ...
    def get_startup_delay(self) -> float: ...
    def get_poll_interval(self) -> float: ...
    def get_player_data_schema(self) -> type[BaseModel] | None:
        """Pydantic model for players.game_data JSON. Return None for no validation."""
        ...

# NOTE on thread safety: RemoteAdminClient instances are shared between
# RemoteAdminPollerThread and API request handlers. Core wraps all
# RemoteAdminClient method calls with a per-server threading.Lock to
# ensure thread safety. Adapters do NOT need to implement thread-safe clients.

@runtime_checkable
class LogParser(Protocol):
    def parse_line(self, line: str) -> dict | None: ...
    def get_log_file_resolver(self, server_id: int) -> "LogFileResolver": ...

@runtime_checkable
class MissionManager(Protocol):
    file_extension: str
    def parse_mission_filename(self, filename: str) -> dict: ...
    def get_rotation_config(self, rotation_entries: list[dict]) -> str: ...
    def get_missions_dir(self, server_dir: Path) -> Path: ...
    def get_mission_data_schema(self) -> type[BaseModel] | None:
        """Pydantic model for missions.game_data JSON. Return None for no validation."""
        ...

@runtime_checkable
class ModManager(Protocol):
    def get_mod_folder_pattern(self) -> str: ...
    def build_mod_args(self, server_mods: list[dict]) -> list[str]: ...
    def validate_mod_folder(self, path: Path) -> bool: ...
    def get_mod_data_schema(self) -> type[BaseModel] | None:
        """Pydantic model for mods.game_data JSON. Return None for no validation."""
        ...

@runtime_checkable
class ProcessConfig(Protocol):
    def get_allowed_executables(self) -> list[str]: ...
    def get_port_conventions(self, game_port: int) -> dict[str, int]: ...
    def get_default_game_port(self) -> int: ...
    def get_default_rcon_port(self, game_port: int) -> int | None: ...
    def get_server_dir_layout(self) -> list[str]: ...

@runtime_checkable
class BanManager(Protocol):
    def get_ban_file_path(self, server_dir: Path) -> Path: ...
    def sync_bans_to_file(self, bans: list[dict], ban_file: Path) -> None: ...
    def read_bans_from_file(self, ban_file: Path) -> list[dict]: ...
    def get_ban_data_schema(self) -> type[BaseModel] | None:
        """Pydantic model for bans.game_data JSON. Return None for no validation."""
        ...

@runtime_checkable
class GameAdapter(Protocol):
    game_type: str
    display_name: str
    version: str
    def get_config_generator(self) -> ConfigGenerator: ...
    def get_process_config(self) -> ProcessConfig: ...
    def get_log_parser(self) -> LogParser: ...
    def get_remote_admin(self) -> RemoteAdmin | None: ...
    def get_mission_manager(self) -> MissionManager | None: ...
    def get_mod_manager(self) -> ModManager | None: ...
    def get_ban_manager(self) -> BanManager | None: ...
    def get_additional_routers(self) -> list[APIRouter]: ...
    def get_custom_thread_factories(self) -> list[Callable]: ...
    def has_capability(self, name: str) -> bool: ...

Adapters: registry.py

class GameAdapterRegistry:
    _adapters: dict[str, GameAdapter] = {}

    @classmethod
    def register(cls, adapter: GameAdapter) -> None: ...

    @classmethod
    def get(cls, game_type: str) -> GameAdapter: ...

    @classmethod
    def all(cls) -> list[GameAdapter]: ...

    @classmethod
    def list_game_types(cls) -> list[dict]: ...

Adapters: arma3/

adapter.pyArma3Adapter

class Arma3Adapter:
    game_type = "arma3"
    display_name = "Arma 3"
    version = "1.0.0"

    def get_config_generator(self) -> ConfigGenerator:
        return Arma3ConfigGenerator()  # includes schema + generation

    def get_process_config(self) -> ProcessConfig:
        return Arma3ProcessConfig()

    def get_log_parser(self) -> LogParser:
        return RPTParser()

    def get_remote_admin(self) -> RemoteAdmin | None:
        return Arma3RConService()

    def get_mission_manager(self) -> MissionManager | None:
        return Arma3MissionManager()

    def get_mod_manager(self) -> ModManager | None:
        return Arma3ModManager()

    def get_ban_manager(self) -> BanManager | None:
        return Arma3BanManager()

    def has_capability(self, name: str) -> bool:
        """Explicit capability probe — core uses this instead of scattered None checks."""
        return name in (
            "config_generator", "process_config",
            "log_parser", "remote_admin", "mission_manager",
            "mod_manager", "ban_manager",
        )

    def get_additional_routers(self):
        return []  # Arma 3 has no extra routes beyond generic set

    def get_custom_thread_factories(self):
        return []

ARMA3_ADAPTER = Arma3Adapter()

config_generator.pyArma3ConfigGenerator (merged schema + generation)

# --- Pydantic models (formerly in config_schema.py) ---
class ServerConfig(BaseModel):
    hostname: str = "My Arma 3 Server"
    password: str | None = None
    password_admin: str  # must be set on creation
    server_command_password: str | None = None
    max_players: int = Field(default=40, gt=0)
    # ... all server.cfg parameters ...

class BasicConfig(BaseModel):
    min_bandwidth: int = Field(default=800000, gt=0)
    max_bandwidth: int = Field(default=25000000, gt=0)
    # ... all basic.cfg parameters ...

class ProfileConfig(BaseModel):
    reduced_damage: int = Field(default=0, ge=0, le=1)
    third_person_view: int = Field(default=0, ge=0, le=1)
    # ... all Arma3Profile parameters ...

class LaunchConfig(BaseModel):
    world: str = "empty"
    limit_fps: int = Field(default=50, gt=0)
    # ... all launch parameters ...

class RConConfig(BaseModel):
    rcon_password: str
    max_ping: int = Field(default=200, gt=0)
    enabled: int = Field(default=1, ge=0, le=1)

# --- ConfigGenerator implementation (schema + generation in one class) ---
class Arma3ConfigGenerator:
    game_type = "arma3"

    # Schema methods (formerly Arma3ConfigSchema)
    def get_sections(self) -> dict[str, type[BaseModel]]:
        return {
            "server": ServerConfig,
            "basic": BasicConfig,
            "profile": ProfileConfig,
            "launch": LaunchConfig,
            "rcon": RConConfig,
        }
    def get_defaults(self, section: str) -> dict: ...
    def get_sensitive_fields(self, section: str) -> list[str]:
        return {
            "server": ["password", "password_admin", "server_command_password"],
            "rcon": ["rcon_password"],
        }.get(section, [])
    def get_config_version(self) -> str:
        return "1.0.0"

    # Generation methods
    def write_configs(self, server_id, server_dir, config_sections) -> list[Path]:
        # Writes server.cfg, basic.cfg, server.Arma3Profile, beserver.cfg
        # Creates directories if they don't exist
        # Sets restrictive file permissions on files containing passwords
        # Uses structured builder — NOT f-strings — prevents config injection
        # ATOMIC: writes to .tmp files first, then os.replace()

    def write_server_cfg(server_id, config, path): ...
    def write_basic_cfg(server_id, config, path): ...
    def write_arma3profile(server_id, profile, path): ...
        # Writes to servers/{id}/server/server.Arma3Profile
    def write_beserver_cfg(server_id, rcon_config, path): ...
        # Generates servers/{id}/battleye/beserver.cfg

    def build_launch_args(self, config_sections, mod_args=None) -> list[str]:
        # Returns CLI args for arma3server_x64.exe
        # e.g. ['-port=2302', '-config=server.cfg', '-cfg=basic.cfg',
        #        '-profiles=./', '-name=server', '-world=empty',
        #        '-mod=@CBA;@ACE', '-serverMod=@ACE_server',
        #        '-bepath=./battleye', '-limitFPS=50', ...]

    def preview_config(self, server_id, server_dir, config_sections) -> dict[str, str]:
        # Returns {filename: rendered_content} without writing to disk

    def _escape_config_string(value: str) -> str:
        # Escapes backslashes FIRST, then double quotes and newlines
        # Order matters: \\ → \\\\, then " → \\", then \n → \\n

rcon_client.pyBERConClient

class BERConClient:
    """Implements BattlEye RCon protocol over UDP."""
    def __init__(host, port, password): ...
    _pending_requests: dict[int, threading.Event] = {}
    _responses: dict[int, str] = {}

    def connect() -> bool
    def disconnect()
    def login() -> bool
    def send_command(command, timeout=5.0) -> str | None
    def keepalive()
    def is_connected() -> bool

    def _receiver_loop()  # background thread
    def parse_players_response(response) -> list[dict]

rcon_service.pyArma3RConService (implements RemoteAdmin protocol)

class Arma3RConService:
    def create_client(self, host, port, password) -> RemoteAdminClient:
        client = BERConClient(host, port, password)
        client.connect()
        return client

    def get_startup_delay(self) -> float: return 30.0
    def get_poll_interval(self) -> float: return 10.0

log_parser.pyRPTParser

class RPTParser:
    def parse_line(self, line: str) -> dict | None:
        # Returns: {timestamp, level, message}
        # level detection: 'error' if 'Error', 'warning' if 'Warning', else 'info'

    def parse_timestamp(raw: str) -> datetime

    def get_log_file_resolver(self, server_id: int) -> LogFileResolver:
        # Returns a callable that finds the latest .rpt file
        # Arma 3 writes: servers/{id}/server/arma3server_YYYY-MM-DD_HH-MM-SS.rpt

process_config.pyArma3ProcessConfig

class Arma3ProcessConfig:
    def get_allowed_executables(self) -> list[str]:
        return ["arma3server_x64.exe", "arma3server.exe"]

    def get_port_conventions(self, game_port: int) -> dict[str, int]:
        return {
            "game": game_port,
            "steam_query": game_port + 1,
            "von": game_port + 2,
            "steam_auth": game_port + 3,
        }
        # rcon_port is separate (user-configurable, defaults to game_port+4)

    def get_default_game_port(self) -> int: return 2302

    def get_default_rcon_port(self, game_port: int) -> int | None:
        return game_port + 4

    def get_server_dir_layout(self) -> list[str]:
        return ["server", "battleye", "mpmissions"]

mission_manager.pyArma3MissionManager

class Arma3MissionManager:
    file_extension = ".pbo"

    def parse_mission_filename(self, filename: str) -> dict:
        # Extract mission_name and terrain from PBO filename
        # e.g. "MyMission.Altis.pbo" → {mission_name: "MyMission.Altis", terrain: "Altis"}

    def get_rotation_config(self, rotation_entries) -> str:
        # Renders class Missions {} block for server.cfg
        # class Missions { class Mission1 { template="..."; difficulty="..."; }; };

    def get_missions_dir(self, server_dir: Path) -> Path:
        return server_dir / "mpmissions"

mod_manager.pyArma3ModManager

class Arma3ModManager:
    def get_mod_folder_pattern(self) -> str: return "@*"

    def build_mod_args(self, server_mods: list[dict]) -> list[str]:
        # Build -mod= and -serverMod= CLI args
        # e.g. ["-mod=@CBA;@ACE", "-serverMod=@ACE_server"]

    def validate_mod_folder(self, path: Path) -> bool:
        return path.name.startswith("@")

ban_manager.pyArma3BanManager

class Arma3BanManager:
    def get_ban_file_path(self, server_dir: Path) -> Path:
        return server_dir / "battleye" / "ban.txt"

    def sync_bans_to_file(self, bans: list[dict], ban_file: Path) -> None:
        # Write active bans to battleye/ban.txt
        # Format: GUID|IP timestamp reason

    def read_bans_from_file(self, ban_file: Path) -> list[dict]:
        # Read battleye/ban.txt and parse entries

Core: utils/

file_utils.py — Game-agnostic file operations.

def ensure_server_dirs(server_id: int, layout: list[str] | None = None) -> None
  # Creates servers/{id}/ plus subdirectories from adapter.get_process_config().get_server_dir_layout()

def get_server_dir(server_id: int) -> Path
def safe_delete_file(path: Path) -> bool
def sanitize_filename(filename: str) -> str

port_checker.py — Game-agnostic port checking.

def is_port_in_use(port: int, host: str = "0.0.0.0") -> bool

def check_server_ports_available(game_port: int, rcon_port: int | None = None,
                                  host: str = "0.0.0.0",
                                  port_conventions: dict[str, int] | None = None) -> list[int]
  # If port_conventions provided (from adapter), checks all derived ports
  # Returns list of ports that are in use (empty = all available)

def find_available_port(start: int = 2302, step: int = 100) -> int

crypto.py — Game-agnostic encryption.

def encrypt(plaintext: str) -> str
def decrypt(ciphertext: str) -> str
def get_fernet() -> Fernet

Key Dependencies (requirements.txt)

fastapi==0.111.0
uvicorn[standard]==0.29.0
pydantic==2.7.0
pydantic-settings==2.2.1
sqlalchemy==2.0.30
python-jose[cryptography]==3.3.0   # JWT
passlib[bcrypt]==1.7.4  # password hashing
cryptography==42.0.5    # field-level encryption (Fernet)
psutil==5.9.8           # process metrics
apscheduler==3.10.4     # scheduled jobs
python-multipart==0.0.9 # file upload support
slowapi==0.1.9          # rate limiting middleware