# Languard Servers Manager — Python Module Breakdown ## Project Structure ``` backend/ ├── main.py ├── config.py ├── database.py ├── dependencies.py │ ├── core/ # Game-agnostic core │ ├── __init__.py │ │ │ ├── auth/ │ │ ├── __init__.py │ │ ├── router.py │ │ ├── service.py │ │ ├── schemas.py │ │ └── utils.py │ │ │ ├── servers/ │ │ ├── __init__.py │ │ ├── router.py # Generic routes, delegate to adapter │ │ ├── service.py # ServerService delegates game work to adapter │ │ ├── process_manager.py # ProcessManager singleton (game-agnostic) │ │ └── schemas.py # Generic server schemas │ │ │ ├── players/ │ │ ├── __init__.py │ │ ├── router.py │ │ ├── service.py │ │ └── schemas.py │ │ │ ├── logs/ │ │ ├── __init__.py │ │ ├── router.py │ │ └── service.py │ │ │ ├── metrics/ │ │ ├── __init__.py │ │ ├── router.py │ │ └── service.py │ │ │ ├── bans/ │ │ ├── __init__.py │ │ ├── router.py │ │ └── service.py │ │ │ ├── events/ │ │ ├── __init__.py │ │ ├── router.py │ │ └── service.py │ │ │ ├── websocket/ │ │ ├── __init__.py │ │ ├── router.py │ │ ├── manager.py │ │ └── broadcaster.py │ │ │ ├── threads/ │ │ ├── __init__.py │ │ ├── base_thread.py │ │ ├── process_monitor.py # Core (game-agnostic) │ │ ├── log_tail.py # Core, takes adapter LogParser │ │ ├── metrics_collector.py # Core (game-agnostic) │ │ ├── remote_admin_poller.py # Core, takes adapter RemoteAdmin │ │ └── thread_registry.py # Builds threads from adapter capabilities │ │ │ ├── games/ │ │ ├── __init__.py │ │ └── router.py # /api/games — type discovery, schemas │ │ │ ├── system/ │ │ ├── __init__.py │ │ └── router.py │ │ │ ├── dal/ │ │ ├── __init__.py │ │ ├── base_repository.py │ │ ├── server_repository.py │ │ ├── config_repository.py # game_configs table │ │ ├── player_repository.py │ │ ├── log_repository.py │ │ ├── metrics_repository.py │ │ ├── mission_repository.py │ │ ├── mod_repository.py │ │ ├── ban_repository.py │ │ └── event_repository.py │ │ │ ├── migrations/ │ │ ├── runner.py │ │ └── 001_initial_schema.sql │ │ │ └── utils/ │ ├── __init__.py │ ├── crypto.py │ ├── file_utils.py │ └── port_checker.py │ └── adapters/ # Game-specific adapters ├── __init__.py # Auto-registers built-in adapters + entry_points ├── registry.py # GameAdapterRegistry singleton ├── protocols.py # All capability Protocol definitions ├── exceptions.py # Typed adapter exceptions (ConfigWriteError, etc.) │ └── arma3/ # Arma 3 adapter (built-in, first-class) ├── __init__.py # Exports ARMA3_ADAPTER, registers on import ├── adapter.py # Arma3Adapter class ├── config_generator.py # Pydantic models + server.cfg, basic.cfg, Arma3Profile, beserver.cfg (merged schema + generation) ├── rcon_client.py # BERConClient (BattlEye UDP protocol) ├── rcon_service.py # Wraps BERConClient for RemoteAdmin protocol ├── log_parser.py # RPTParser ├── mission_manager.py # PBO upload, mission rotation config ├── mod_manager.py # @mod_folder convention, -mod=/-serverMod= ├── process_config.py # Exe allowlist, port conventions, profile dir ├── ban_manager.py # battleye/ban.txt bidirectional sync ├── schemas.py # Arma 3 specific request/response models └── migrations/ └── 001_arma3_metadata.sql # Arma 3 specific tables (if any) ``` --- ## Module Details ### `main.py` Entry point. Creates and configures the FastAPI application. ```python # Responsibilities: # - Create FastAPI app instance # - Register all core routers with prefix /api # - Register adapter-provided additional routers # - Configure CORS middleware # - Add JWT auth middleware # - Register startup/shutdown event handlers: # startup: run DB migrations, init ProcessManager, auto-register adapters, # restore running servers # shutdown: gracefully stop all BroadcastThread, close DB # - Auto-register built-in adapters (arma3) on import Key functions: create_app() -> FastAPI on_startup() # DB migrations, register adapters, recover state on_shutdown() # Clean up threads, close connections ``` --- ### `config.py` Loads and validates environment variables using Pydantic `BaseSettings`. ```python class Settings(BaseSettings): secret_key: str encryption_key: str # Fernet base64 key (NOT hex) db_path: str = "./languard.db" servers_dir: str = "./servers" host: str = "0.0.0.0" port: int = 8000 cors_origins: list[str] = ["http://localhost:5173"] log_retention_days: int = 7 metrics_retention_days: int = 30 player_history_retention_days: int = 90 jwt_expire_hours: int = 24 login_rate_limit: str = "5/minute" # per IP # Game-specific defaults (used by adapters, not core) arma3_default_exe: str = "C:/Arma3Server/arma3server_x64.exe" settings = Settings() # singleton ``` **Key change:** Removed `arma_exe` as a core setting. Game-specific paths are now namespaced per adapter. --- ### `database.py` Database engine setup and session management. **No game-specific logic.** ```python # Responsibilities: # - Create SQLAlchemy engine with WAL + FK + busy_timeout pragmas # - Provide get_db() dependency for FastAPI routes (sync session) # - Provide get_thread_db() for background threads (thread-local sessions) # - run_migrations(): apply pending .sql migration files at startup Key functions: get_engine() -> Engine get_db() -> Generator[Connection, None, None] # FastAPI dependency get_thread_db() -> Connection # for threads run_migrations(engine: Engine) ``` --- ### `dependencies.py` Reusable FastAPI dependencies. **No game-specific logic.** ```python Key functions: get_current_user(credentials: HTTPAuthorizationCredentials) -> User require_admin(user: User = Depends(get_current_user)) -> User get_server_or_404(server_id: int, db: Connection) -> dict get_adapter_for_server(server_id: int, db: Connection) -> GameAdapter # Convenience: loads server, resolves adapter from game_type ``` --- ### Core: `servers/` **`router.py`** — Game-agnostic server endpoints. Delegates to adapter for game-specific operations. - `GET /servers` (supports `?game_type=` filter) - `POST /servers` (requires `game_type` in body) - `GET /servers/{id}` - `PUT /servers/{id}` - `DELETE /servers/{id}` - `POST /servers/{id}/start` - `POST /servers/{id}/stop` - `POST /servers/{id}/restart` - `POST /servers/{id}/kill` - `GET /servers/{id}/config` - `GET /servers/{id}/config/{section}` - `PUT /servers/{id}/config/{section}` - `GET /servers/{id}/config/preview` - `GET /servers/{id}/config/download/{filename}` **`service.py`** — `ServerService` ```python class ServerService: def list_servers(game_type: str | None = None) -> list[ServerSummary] def get_server(server_id) -> ServerDetail def create_server(data: CreateServerRequest) -> Server # 1. Validate game_type is registered # 2. Get adapter defaults for config sections # 3. Create server row + game_configs rows # 4. Create server directory (layout from adapter.process_config) def update_server(server_id, data) -> Server def delete_server(server_id) -> bool def start(server_id) -> StatusResponse # 1. Load server from DB (includes game_type) # 2. adapter = GameAdapterRegistry.get(server.game_type) # 3. Validate exe against adapter.process_config.get_allowed_executables() # 4. Check ports via adapter.process_config.get_port_conventions() # 5. Read config sections from game_configs table # 6. adapter.config_generator.write_configs(server_id, dir, config) # 7. launch_args = adapter.config_generator.build_launch_args(config, mods) # 8. ProcessManager.start(server_id, exe, args, cwd=dir) # 9. DB: status = 'starting' # 10. ThreadRegistry.start_server_threads(server_id, db) # 11. Broadcast status update def stop(server_id, force=False) -> StatusResponse # 1. If not force and adapter has remote_admin: # adapter.remote_admin.shutdown() # 2. Wait up to 30s for process exit # 3. If still running: ProcessManager.kill(server_id) # 4. DB: status = 'stopped', pid = NULL # 5. ThreadRegistry.stop_server_threads(server_id) # 6. PlayerRepository.clear(server_id) # 7. Broadcast status update def restart(server_id) -> StatusResponse def get_config(server_id) -> dict # all sections def get_config_section(server_id, section) -> dict def update_config_section(server_id, section, data) -> dict # Validates data against adapter.config_generator.get_sections()[section] # Encrypts sensitive fields # Stores in game_configs table def get_config_preview(server_id) -> dict[str, str] # Returns {filename: rendered_content} from adapter.config_generator.preview_config() ``` **`process_manager.py`** — `ProcessManager` singleton **(game-agnostic)** ```python class ProcessManager: _instance = None _processes: dict[int, subprocess.Popen] = {} _lock: threading.Lock @classmethod def get() -> ProcessManager def start(server_id, exe_path, args: list[str], cwd: str) -> int # returns PID def stop(server_id, timeout=30) -> bool def kill(server_id) -> bool def is_running(server_id) -> bool def get_pid(server_id) -> int | None def get_process(server_id) -> subprocess.Popen | None def list_running() -> list[int] def recover_on_startup(db) # Checks PID still alive AND process name matches adapter allowlist # (prevents PID reuse by unrelated processes) ``` --- ### Core: `players/` **`router.py`** — Generic player endpoints. - `GET /servers/{id}/players` - `POST /servers/{id}/players/{slot_id}/kick` (requires `remote_admin`) - `POST /servers/{id}/players/{slot_id}/ban` (requires `remote_admin`) - `GET /servers/{id}/players/history` **`service.py`** — `PlayerService` ```python class PlayerService: def get_current_players(server_id) -> list[Player] def kick(server_id, slot_id, reason) -> bool # Resolves adapter; if adapter has remote_admin: # adapter.remote_admin.kick_player(identifier, reason) def ban(server_id, slot_id, duration_minutes, reason) -> bool def get_history(server_id, limit, offset, search) -> PaginatedResult def update_from_remote_admin(server_id, players: list) -> None # Upserts players table; detects disconnections; inserts player_history rows ``` --- ### Core: `logs/` **`router.py`** — `GET /servers/{id}/logs`, `DELETE /servers/{id}/logs` **`service.py`** — `LogService` ```python class LogService: def query(server_id, limit, offset, level, since, search) -> PaginatedLogs def clear(server_id) -> int def get_log_path(server_id) -> Path | None # Resolves adapter's log_parser.get_log_file_resolver() def cleanup_old_logs() # called by APScheduler ``` --- ### Core: `metrics/`, `bans/`, `events/` Same pattern as above — game-agnostic services using core tables. --- ### Core: `threads/` **`base_thread.py`** — `BaseServerThread` **(game-agnostic)** ```python class BaseServerThread(threading.Thread): def __init__(server_id: int, interval: float) def stop() # sets _stop_event def is_stopped() -> bool def run() # creates thread-local DB connection, calls setup(), # then loops: tick(self._db) + wait(interval) # finally calls teardown() in finally block def setup() # override for init work (receives self._db) def tick() # override for per-interval work (uses self._db) def teardown() # override for cleanup (close files, sockets) def on_error(e: Exception) # default: log, continue ``` **`process_monitor.py`** — `ProcessMonitorThread` **(game-agnostic)** ```python class ProcessMonitorThread(BaseServerThread): interval = 1.0 # seconds def tick(): # 1. Check if process is still alive (os.kill(pid, 0)) # 2. If dead: # a. Get exit code # b. DB: status = 'crashed', stopped_at = now # c. Clear players from DB # d. Broadcast: {type: 'status', status: 'crashed'} # e. Insert server_events: {event_type: 'crashed', exit_code} # f. If auto_restart: schedule restart with exponential backoff # g. self.stop() ``` **`log_tail.py`** — `LogTailThread` **(core thread + adapter parser)** ```python class LogTailThread(BaseServerThread): interval = 0.1 # 100ms def __init__(self, server_id: int, log_parser: "LogParser", log_file_resolver: Callable): super().__init__(server_id, self.interval) self._parser = log_parser self._log_file_resolver = log_file_resolver def setup(): # Find log file using adapter's resolver # Open file, seek to end (tail behavior) def tick(): # 1. Read all new lines from log file # 2. For each line: # a. self._parser.parse_line(line) -> LogEntry # b. LogRepository.insert(server_id, entry) # c. BroadcastThread.enqueue(server_id, 'log', entry) def teardown(): # Close the open log file handle ``` **`metrics_collector.py`** — `MetricsCollectorThread` **(game-agnostic)** ```python class MetricsCollectorThread(BaseServerThread): interval = 5.0 # seconds def tick(): # 1. Get PID from ProcessManager # 2. psutil.Process(pid).cpu_percent(interval=0.5) # 3. psutil.Process(pid).memory_info().rss / (1024*1024) # MB # 4. PlayerRepository.count(server_id) -> player_count # 5. MetricsRepository.insert(server_id, cpu, ram, player_count) # 6. BroadcastThread.enqueue(server_id, 'metrics', {cpu, ram, player_count}) ``` **`remote_admin_poller.py`** — `RemoteAdminPollerThread` **(core thread + adapter client)** ```python class RemoteAdminPollerThread(BaseServerThread): interval = 10.0 STARTUP_DELAY = 30.0 def __init__(self, server_id: int, remote_admin_factory: Callable[[], "RemoteAdminClient"]): super().__init__(server_id, self.interval) self._client_factory = remote_admin_factory self._client: RemoteAdminClient | None = None def setup(): # Wait for server startup (using _stop_event.wait instead of sleep) # Create client via factory def tick(): if not self._client: return try: players = self._client.get_players() PlayerService(self._db).update_from_remote_admin(self.server_id, players) BroadcastThread.enqueue(self.server_id, "players", { "players": [p.dict() for p in players], "count": len(players), }) except ConnectionError: self._client = None # Will reconnect on next tick ``` **`thread_registry.py`** — `ThreadRegistry` **(adapter-aware)** ```python class ThreadRegistry: _threads: dict[int, dict[str, BaseServerThread]] = {} _lock = threading.Lock() @classmethod def start_server_threads(cls, server_id: int, db: Connection) -> None: server = ServerRepository(db).get_by_id(server_id) adapter = GameAdapterRegistry.get(server["game_type"]) threads: dict[str, BaseServerThread] = {} # Core threads (always present) threads["process_monitor"] = ProcessMonitorThread(server_id) threads["metrics_collector"] = MetricsCollectorThread(server_id) # Adapter-provided log parser → generic LogTailThread log_parser = adapter.get_log_parser() threads["log_tail"] = LogTailThread( server_id, parser=log_parser, log_file_resolver=log_parser.get_log_file_resolver(server_id), ) # Adapter-provided remote admin → generic RemoteAdminPollerThread remote_admin = adapter.get_remote_admin() if remote_admin is not None: threads["remote_admin_poller"] = RemoteAdminPollerThread( server_id, remote_admin_factory=lambda: remote_admin.create_client( host="127.0.0.1", port=server["rcon_port"], password=_get_remote_admin_password(server_id, db), ), ) # Adapter-declared custom threads for thread_factory in adapter.get_custom_thread_factories(): thread = thread_factory(server_id, db) threads[thread.name_key] = thread with cls._lock: cls._threads[server_id] = threads for thread in threads.values(): thread.start() ``` --- ### Core: `games/` **`router.py`** — Game type discovery endpoints. ```python # GET /games → list all registered game types + capabilities # GET /games/{type} → details for a specific game type # GET /games/{type}/config-schema → JSON Schema for each config section # GET /games/{type}/defaults → default config values ``` --- ### Core: `dal/` **`config_repository.py`** — Manages the `game_configs` table. ```python class ConfigRepository(BaseRepository): def get_section(server_id: int, section: str) -> dict | None def get_all_sections(server_id: int) -> dict[str, dict] def upsert_section(server_id: int, game_type: str, section: str, config_json: str) -> None def delete_sections(server_id: int) -> None # cascade on server delete ``` All other repositories remain game-agnostic, using core tables. --- ### Adapters: `exceptions.py` Typed adapter exceptions that core catches specifically: ```python class AdapterError(Exception): """Base for all adapter errors.""" pass class ConfigWriteError(AdapterError): """File write failed (disk full, permissions).""" def __init__(self, path: str, detail: str): ... class ConfigValidationError(AdapterError): """Config values violate adapter constraints.""" def __init__(self, section: str, errors: list[dict]): ... class LaunchArgsError(AdapterError): """build_launch_args() failed (missing mod, bad path).""" def __init__(self, detail: str): ... class RemoteAdminError(AdapterError): """Remote admin connection/command failed.""" def __init__(self, detail: str, recoverable: bool = True): ... class ExeNotAllowedError(AdapterError): """Executable not in adapter's allowlist.""" def __init__(self, exe: str, allowed: list[str]): ... ``` --- ### Adapters: `protocols.py` Defines all capability protocols. See ARCHITECTURE.md for the full protocol definitions. ```python @runtime_checkable class ConfigGenerator(Protocol): """Merged protocol: config schema definition + file generation + launch args. ConfigSchema and ConfigGenerator were merged because they always co-occur — no game defines config schema without also generating config files.""" game_type: str def get_sections(self) -> dict[str, type[BaseModel]]: ... def get_defaults(self, section: str) -> dict[str, Any]: ... def get_sensitive_fields(self, section: str) -> list[str]: """Return JSON keys that need Fernet encryption for this section. Core's ConfigRepository handles encrypt/decrypt transparently.""" ... def get_config_version(self) -> str: """Current adapter schema version. Stored in game_configs.schema_version.""" ... def write_configs(self, server_id: int, server_dir: Path, config_sections: dict[str, dict]) -> list[Path]: ... def build_launch_args(self, config_sections: dict[str, dict], mod_args: list[str] | None = None) -> list[str]: ... def preview_config(self, server_id: int, server_dir: Path, config_sections: dict[str, dict]) -> dict[str, str]: ... @runtime_checkable class RemoteAdminClient(Protocol): def send_command(self, command: str, timeout: float = 5.0) -> str | None: ... def get_players(self) -> list[dict]: ... def kick_player(self, identifier: str, reason: str = "") -> bool: ... def ban_player(self, identifier: str, duration_minutes: int, reason: str) -> bool: ... def say_all(self, message: str) -> bool: ... def shutdown(self) -> bool: ... def keepalive(self) -> None: ... def disconnect(self) -> None: ... @runtime_checkable class RemoteAdmin(Protocol): def create_client(self, host: str, port: int, password: str) -> RemoteAdminClient: ... def get_startup_delay(self) -> float: ... def get_poll_interval(self) -> float: ... def get_player_data_schema(self) -> type[BaseModel] | None: """Pydantic model for players.game_data JSON. Return None for no validation.""" ... # NOTE on thread safety: RemoteAdminClient instances are shared between # RemoteAdminPollerThread and API request handlers. Core wraps all # RemoteAdminClient method calls with a per-server threading.Lock to # ensure thread safety. Adapters do NOT need to implement thread-safe clients. @runtime_checkable class LogParser(Protocol): def parse_line(self, line: str) -> dict | None: ... def get_log_file_resolver(self, server_id: int) -> "LogFileResolver": ... @runtime_checkable class MissionManager(Protocol): file_extension: str def parse_mission_filename(self, filename: str) -> dict: ... def get_rotation_config(self, rotation_entries: list[dict]) -> str: ... def get_missions_dir(self, server_dir: Path) -> Path: ... def get_mission_data_schema(self) -> type[BaseModel] | None: """Pydantic model for missions.game_data JSON. Return None for no validation.""" ... @runtime_checkable class ModManager(Protocol): def get_mod_folder_pattern(self) -> str: ... def build_mod_args(self, server_mods: list[dict]) -> list[str]: ... def validate_mod_folder(self, path: Path) -> bool: ... def get_mod_data_schema(self) -> type[BaseModel] | None: """Pydantic model for mods.game_data JSON. Return None for no validation.""" ... @runtime_checkable class ProcessConfig(Protocol): def get_allowed_executables(self) -> list[str]: ... def get_port_conventions(self, game_port: int) -> dict[str, int]: ... def get_default_game_port(self) -> int: ... def get_default_rcon_port(self, game_port: int) -> int | None: ... def get_server_dir_layout(self) -> list[str]: ... @runtime_checkable class BanManager(Protocol): def get_ban_file_path(self, server_dir: Path) -> Path: ... def sync_bans_to_file(self, bans: list[dict], ban_file: Path) -> None: ... def read_bans_from_file(self, ban_file: Path) -> list[dict]: ... def get_ban_data_schema(self) -> type[BaseModel] | None: """Pydantic model for bans.game_data JSON. Return None for no validation.""" ... @runtime_checkable class GameAdapter(Protocol): game_type: str display_name: str version: str def get_config_generator(self) -> ConfigGenerator: ... def get_process_config(self) -> ProcessConfig: ... def get_log_parser(self) -> LogParser: ... def get_remote_admin(self) -> RemoteAdmin | None: ... def get_mission_manager(self) -> MissionManager | None: ... def get_mod_manager(self) -> ModManager | None: ... def get_ban_manager(self) -> BanManager | None: ... def get_additional_routers(self) -> list[APIRouter]: ... def get_custom_thread_factories(self) -> list[Callable]: ... def has_capability(self, name: str) -> bool: ... ``` --- ### Adapters: `registry.py` ```python class GameAdapterRegistry: _adapters: dict[str, GameAdapter] = {} @classmethod def register(cls, adapter: GameAdapter) -> None: ... @classmethod def get(cls, game_type: str) -> GameAdapter: ... @classmethod def all(cls) -> list[GameAdapter]: ... @classmethod def list_game_types(cls) -> list[dict]: ... ``` --- ### Adapters: `arma3/` **`adapter.py`** — `Arma3Adapter` ```python class Arma3Adapter: game_type = "arma3" display_name = "Arma 3" version = "1.0.0" def get_config_generator(self) -> ConfigGenerator: return Arma3ConfigGenerator() # includes schema + generation def get_process_config(self) -> ProcessConfig: return Arma3ProcessConfig() def get_log_parser(self) -> LogParser: return RPTParser() def get_remote_admin(self) -> RemoteAdmin | None: return Arma3RConService() def get_mission_manager(self) -> MissionManager | None: return Arma3MissionManager() def get_mod_manager(self) -> ModManager | None: return Arma3ModManager() def get_ban_manager(self) -> BanManager | None: return Arma3BanManager() def has_capability(self, name: str) -> bool: """Explicit capability probe — core uses this instead of scattered None checks.""" return name in ( "config_generator", "process_config", "log_parser", "remote_admin", "mission_manager", "mod_manager", "ban_manager", ) def get_additional_routers(self): return [] # Arma 3 has no extra routes beyond generic set def get_custom_thread_factories(self): return [] ARMA3_ADAPTER = Arma3Adapter() ``` **`config_generator.py`** — `Arma3ConfigGenerator` (merged schema + generation) ```python # --- Pydantic models (formerly in config_schema.py) --- class ServerConfig(BaseModel): hostname: str = "My Arma 3 Server" password: str | None = None password_admin: str # must be set on creation server_command_password: str | None = None max_players: int = Field(default=40, gt=0) # ... all server.cfg parameters ... class BasicConfig(BaseModel): min_bandwidth: int = Field(default=800000, gt=0) max_bandwidth: int = Field(default=25000000, gt=0) # ... all basic.cfg parameters ... class ProfileConfig(BaseModel): reduced_damage: int = Field(default=0, ge=0, le=1) third_person_view: int = Field(default=0, ge=0, le=1) # ... all Arma3Profile parameters ... class LaunchConfig(BaseModel): world: str = "empty" limit_fps: int = Field(default=50, gt=0) # ... all launch parameters ... class RConConfig(BaseModel): rcon_password: str max_ping: int = Field(default=200, gt=0) enabled: int = Field(default=1, ge=0, le=1) # --- ConfigGenerator implementation (schema + generation in one class) --- class Arma3ConfigGenerator: game_type = "arma3" # Schema methods (formerly Arma3ConfigSchema) def get_sections(self) -> dict[str, type[BaseModel]]: return { "server": ServerConfig, "basic": BasicConfig, "profile": ProfileConfig, "launch": LaunchConfig, "rcon": RConConfig, } def get_defaults(self, section: str) -> dict: ... def get_sensitive_fields(self, section: str) -> list[str]: return { "server": ["password", "password_admin", "server_command_password"], "rcon": ["rcon_password"], }.get(section, []) def get_config_version(self) -> str: return "1.0.0" # Generation methods def write_configs(self, server_id, server_dir, config_sections) -> list[Path]: # Writes server.cfg, basic.cfg, server.Arma3Profile, beserver.cfg # Creates directories if they don't exist # Sets restrictive file permissions on files containing passwords # Uses structured builder — NOT f-strings — prevents config injection # ATOMIC: writes to .tmp files first, then os.replace() def write_server_cfg(server_id, config, path): ... def write_basic_cfg(server_id, config, path): ... def write_arma3profile(server_id, profile, path): ... # Writes to servers/{id}/server/server.Arma3Profile def write_beserver_cfg(server_id, rcon_config, path): ... # Generates servers/{id}/battleye/beserver.cfg def build_launch_args(self, config_sections, mod_args=None) -> list[str]: # Returns CLI args for arma3server_x64.exe # e.g. ['-port=2302', '-config=server.cfg', '-cfg=basic.cfg', # '-profiles=./', '-name=server', '-world=empty', # '-mod=@CBA;@ACE', '-serverMod=@ACE_server', # '-bepath=./battleye', '-limitFPS=50', ...] def preview_config(self, server_id, server_dir, config_sections) -> dict[str, str]: # Returns {filename: rendered_content} without writing to disk def _escape_config_string(value: str) -> str: # Escapes backslashes FIRST, then double quotes and newlines # Order matters: \\ → \\\\, then " → \\", then \n → \\n ``` **`rcon_client.py`** — `BERConClient` ```python class BERConClient: """Implements BattlEye RCon protocol over UDP.""" def __init__(host, port, password): ... _pending_requests: dict[int, threading.Event] = {} _responses: dict[int, str] = {} def connect() -> bool def disconnect() def login() -> bool def send_command(command, timeout=5.0) -> str | None def keepalive() def is_connected() -> bool def _receiver_loop() # background thread def parse_players_response(response) -> list[dict] ``` **`rcon_service.py`** — `Arma3RConService` (implements `RemoteAdmin` protocol) ```python class Arma3RConService: def create_client(self, host, port, password) -> RemoteAdminClient: client = BERConClient(host, port, password) client.connect() return client def get_startup_delay(self) -> float: return 30.0 def get_poll_interval(self) -> float: return 10.0 ``` **`log_parser.py`** — `RPTParser` ```python class RPTParser: def parse_line(self, line: str) -> dict | None: # Returns: {timestamp, level, message} # level detection: 'error' if 'Error', 'warning' if 'Warning', else 'info' def parse_timestamp(raw: str) -> datetime def get_log_file_resolver(self, server_id: int) -> LogFileResolver: # Returns a callable that finds the latest .rpt file # Arma 3 writes: servers/{id}/server/arma3server_YYYY-MM-DD_HH-MM-SS.rpt ``` **`process_config.py`** — `Arma3ProcessConfig` ```python class Arma3ProcessConfig: def get_allowed_executables(self) -> list[str]: return ["arma3server_x64.exe", "arma3server.exe"] def get_port_conventions(self, game_port: int) -> dict[str, int]: return { "game": game_port, "steam_query": game_port + 1, "von": game_port + 2, "steam_auth": game_port + 3, } # rcon_port is separate (user-configurable, defaults to game_port+4) def get_default_game_port(self) -> int: return 2302 def get_default_rcon_port(self, game_port: int) -> int | None: return game_port + 4 def get_server_dir_layout(self) -> list[str]: return ["server", "battleye", "mpmissions"] ``` **`mission_manager.py`** — `Arma3MissionManager` ```python class Arma3MissionManager: file_extension = ".pbo" def parse_mission_filename(self, filename: str) -> dict: # Extract mission_name and terrain from PBO filename # e.g. "MyMission.Altis.pbo" → {mission_name: "MyMission.Altis", terrain: "Altis"} def get_rotation_config(self, rotation_entries) -> str: # Renders class Missions {} block for server.cfg # class Missions { class Mission1 { template="..."; difficulty="..."; }; }; def get_missions_dir(self, server_dir: Path) -> Path: return server_dir / "mpmissions" ``` **`mod_manager.py`** — `Arma3ModManager` ```python class Arma3ModManager: def get_mod_folder_pattern(self) -> str: return "@*" def build_mod_args(self, server_mods: list[dict]) -> list[str]: # Build -mod= and -serverMod= CLI args # e.g. ["-mod=@CBA;@ACE", "-serverMod=@ACE_server"] def validate_mod_folder(self, path: Path) -> bool: return path.name.startswith("@") ``` **`ban_manager.py`** — `Arma3BanManager` ```python class Arma3BanManager: def get_ban_file_path(self, server_dir: Path) -> Path: return server_dir / "battleye" / "ban.txt" def sync_bans_to_file(self, bans: list[dict], ban_file: Path) -> None: # Write active bans to battleye/ban.txt # Format: GUID|IP timestamp reason def read_bans_from_file(self, ban_file: Path) -> list[dict]: # Read battleye/ban.txt and parse entries ``` --- ### Core: `utils/` **`file_utils.py`** — Game-agnostic file operations. ```python def ensure_server_dirs(server_id: int, layout: list[str] | None = None) -> None # Creates servers/{id}/ plus subdirectories from adapter.get_process_config().get_server_dir_layout() def get_server_dir(server_id: int) -> Path def safe_delete_file(path: Path) -> bool def sanitize_filename(filename: str) -> str ``` **`port_checker.py`** — Game-agnostic port checking. ```python def is_port_in_use(port: int, host: str = "0.0.0.0") -> bool def check_server_ports_available(game_port: int, rcon_port: int | None = None, host: str = "0.0.0.0", port_conventions: dict[str, int] | None = None) -> list[int] # If port_conventions provided (from adapter), checks all derived ports # Returns list of ports that are in use (empty = all available) def find_available_port(start: int = 2302, step: int = 100) -> int ``` **`crypto.py`** — Game-agnostic encryption. ```python def encrypt(plaintext: str) -> str def decrypt(ciphertext: str) -> str def get_fernet() -> Fernet ``` --- ## Key Dependencies (requirements.txt) ``` fastapi==0.111.0 uvicorn[standard]==0.29.0 pydantic==2.7.0 pydantic-settings==2.2.1 sqlalchemy==2.0.30 python-jose[cryptography]==3.3.0 # JWT passlib[bcrypt]==1.7.4 # password hashing cryptography==42.0.5 # field-level encryption (Fernet) psutil==5.9.8 # process metrics apscheduler==3.10.4 # scheduled jobs python-multipart==0.0.9 # file upload support slowapi==0.1.9 # rate limiting middleware ```