# Languard Server Manager — Python Module Breakdown ## Project Structure ``` backend/ ├── main.py ├── config.py ├── database.py ├── dependencies.py │ ├── auth/ │ ├── __init__.py │ ├── router.py │ ├── service.py │ ├── schemas.py │ └── utils.py │ ├── servers/ │ ├── __init__.py │ ├── router.py │ ├── service.py │ ├── schemas.py │ ├── process_manager.py │ └── config_generator.py │ ├── rcon/ │ ├── __init__.py │ ├── client.py │ └── service.py │ ├── missions/ │ ├── __init__.py │ ├── router.py │ ├── service.py │ └── schemas.py │ ├── mods/ │ ├── __init__.py │ ├── router.py │ ├── service.py │ └── schemas.py │ ├── players/ │ ├── __init__.py │ ├── router.py │ ├── service.py │ └── schemas.py │ ├── logs/ │ ├── __init__.py │ ├── router.py │ ├── service.py │ └── parser.py │ ├── metrics/ │ ├── __init__.py │ ├── router.py │ └── service.py │ ├── websocket/ │ ├── __init__.py │ ├── router.py │ ├── manager.py │ └── broadcaster.py │ ├── threads/ │ ├── __init__.py │ ├── base_thread.py │ ├── process_monitor.py │ ├── log_tail.py │ ├── metrics_collector.py │ ├── rcon_poller.py │ └── thread_registry.py │ ├── system/ │ ├── __init__.py │ └── router.py │ ├── dal/ │ ├── __init__.py │ ├── base_repository.py │ ├── server_repository.py │ ├── config_repository.py │ ├── player_repository.py │ ├── log_repository.py │ ├── metrics_repository.py │ ├── mission_repository.py │ ├── mod_repository.py │ ├── ban_repository.py │ └── event_repository.py │ ├── migrations/ │ ├── runner.py │ └── 001_initial_schema.sql │ └── utils/ ├── __init__.py ├── crypto.py ├── file_utils.py └── port_checker.py ``` --- ## Module Details ### `main.py` Entry point. Creates and configures the FastAPI application. ```python # Responsibilities: # - Create FastAPI app instance # - Register all routers with prefix /api # - Configure CORS middleware # - Add JWT auth middleware # - Register startup/shutdown event handlers: # startup: run DB migrations, init ProcessManager, restore running servers # shutdown: gracefully stop all BroadcastThread, close DB # - Mount static files if serving frontend # - NOTE: Route handlers that perform blocking I/O (subprocess, file writes, # socket checks) MUST be declared as plain `def` (not `async def`). # FastAPI automatically runs plain-def handlers in a thread pool, # preventing event loop blocking. Only truly async operations # (WebSocket send, async library calls) should use `async def`. Key functions: create_app() -> FastAPI on_startup() # DB migrations, recover state on_shutdown() # Clean up threads, close connections ``` --- ### `config.py` Loads and validates environment variables using Pydantic `BaseSettings`. ```python class Settings(BaseSettings): secret_key: str encryption_key: str # Fernet base64 key (NOT hex) db_path: str = "./languard.db" servers_dir: str = "./servers" arma_exe: str = "C:/Arma3Server/arma3server_x64.exe" host: str = "0.0.0.0" port: int = 8000 cors_origins: list[str] = ["http://localhost:5173"] log_retention_days: int = 7 metrics_retention_days: int = 30 player_history_retention_days: int = 90 jwt_expire_hours: int = 24 login_rate_limit: str = "5/minute" # per IP settings = Settings() # singleton ``` --- ### `database.py` Database engine setup and session management. ```python # Responsibilities: # - Create SQLAlchemy engine with WAL + FK + busy_timeout pragmas # - Provide get_db() dependency for FastAPI routes (sync session) # - Provide get_thread_db() for background threads (thread-local sessions) # - run_migrations(): apply pending .sql migration files at startup # - Migration rollback: if a migration fails, the schema_migrations table # is NOT updated; re-running applies only unapplied migrations (idempotent) # Pragma setup: # PRAGMA journal_mode=WAL # PRAGMA foreign_keys=ON # PRAGMA busy_timeout=5000 # 5s wait before "database is locked" error Key functions: get_engine() -> Engine get_db() -> Generator[Connection, None, None] # FastAPI dependency get_thread_db() -> Connection # for threads run_migrations(engine: Engine) ``` --- ### `dependencies.py` Reusable FastAPI dependencies. ```python # Responsibilities: # - get_current_user(token) -> User (JWT validation) # - require_admin(user) -> User (role check) # - get_server_or_404(server_id, db) -> ServerRow Key functions: get_current_user(credentials: HTTPAuthorizationCredentials) -> User require_admin(user: User = Depends(get_current_user)) -> User get_server_or_404(server_id: int, db: Connection) -> dict ``` --- ### `auth/` **`router.py`** — FastAPI router for auth endpoints. - `POST /auth/login` - `POST /auth/logout` - `GET /auth/me` - `PUT /auth/password` - `GET /auth/users` (admin) - `POST /auth/users` (admin) - `DELETE /auth/users/{user_id}` (admin) **`service.py`** — `AuthService` ```python class AuthService: def login(username, password) -> TokenResponse def create_user(username, password, role) -> User def change_password(user_id, current_pw, new_pw) -> bool def list_users() -> list[User] def delete_user(user_id) -> bool ``` **`utils.py`** ```python def hash_password(password: str) -> str # bcrypt def verify_password(plain, hashed) -> bool def create_access_token(data: dict) -> str # JWT sign def decode_access_token(token: str) -> dict # JWT verify ``` --- ### `servers/` **`router.py`** — All server CRUD + lifecycle endpoints. - `GET /servers` - `POST /servers` - `GET /servers/{id}` - `PUT /servers/{id}` - `DELETE /servers/{id}` - `POST /servers/{id}/start` - `POST /servers/{id}/stop` - `POST /servers/{id}/restart` - `POST /servers/{id}/kill` - `GET /servers/{id}/config` - `PUT /servers/{id}/config/server` - `PUT /servers/{id}/config/basic` - `PUT /servers/{id}/config/profile` - `PUT /servers/{id}/config/launch` - `PUT /servers/{id}/config/rcon` - `GET /servers/{id}/config/preview` - `GET /servers/{id}/config/download/{filename}` **`service.py`** — `ServerService` ```python class ServerService: def list_servers() -> list[ServerSummary] def get_server(server_id) -> ServerDetail def create_server(data: CreateServerRequest) -> Server def update_server(server_id, data) -> Server def delete_server(server_id) -> bool def start(server_id) -> StatusResponse # 1. Load config from DB # 2. Validate exe_path exists and basename matches allowlist # (arma3server_x64.exe, arma3server.exe) — prevents executing arbitrary binaries # 3. Check ALL derived ports not in use (game_port through game_port+3 + rcon_port) # 4. ConfigGenerator.write_all(server_id) # — if write fails: DB status='error', return error (no process launch) # 5. Build launch args # 6. ProcessManager.start(server_id, exe, args, cwd=servers/{id}/) # 7. DB: status = 'starting' # 8. ThreadRegistry.start_server_threads(server_id) # 9. Broadcast status update def stop(server_id, force=False) -> StatusResponse # 1. If not force: RConService.send_command('#shutdown') # 2. Wait up to 30s for process exit # 3. If still running: ProcessManager.kill(server_id) # 4. DB: status = 'stopped', pid = NULL # 5. ThreadRegistry.stop_server_threads(server_id) # 6. PlayerRepository.clear(server_id) # 7. Broadcast status update def restart(server_id) -> StatusResponse def update_config_server(server_id, data) -> ServerConfig def update_config_basic(server_id, data) -> BasicConfig def update_config_profile(server_id, data) -> ServerProfile def update_config_launch(server_id, data) -> LaunchParams def update_config_rcon(server_id, data) -> RConConfig # Updates rcon_configs row (rcon_password, max_ping, enabled) via ConfigRepository. # If data includes rcon_port, also updates servers.rcon_port via ServerRepository — # rcon_port lives in the servers table, not rcon_configs. # Regenerates battleye/beserver.cfg immediately after saving. def get_config_preview(server_id) -> str ``` **`process_manager.py`** — `ProcessManager` singleton ```python class ProcessManager: _instance = None _processes: dict[int, subprocess.Popen] = {} _lock: threading.Lock @classmethod def get() -> ProcessManager def start(server_id, exe_path, args: list[str], cwd: str) -> int # returns PID # subprocess.Popen([exe_path, *args], cwd=cwd, stdout=PIPE, stderr=STDOUT) # cwd = servers/{server_id}/ so relative config paths resolve correctly def stop(server_id, timeout=30) -> bool # On Windows: subprocess.terminate() = TerminateProcess (hard kill, no SIGTERM) # Graceful shutdown is handled by ServerService via RCon #shutdown first. # This method is the forceful fallback: terminate() → wait(timeout) def kill(server_id) -> bool # terminate() immediately (hard kill on Windows) def is_running(server_id) -> bool def get_pid(server_id) -> int | None def get_process(server_id) -> subprocess.Popen | None def list_running() -> list[int] # list of server_ids def recover_on_startup(db) # At app startup: query DB for servers with status='running' # Check if pid still alive AND verify process name matches arma3server # (prevents PID reuse by unrelated processes) # If alive: re-attach monitoring threads (skip process start) # If dead or wrong process: mark crashed, clear players ``` **`config_generator.py`** — `ConfigGenerator` ```python class ConfigGenerator: def write_all(server_id: int, db: Connection) -> None # Writes server.cfg, basic.cfg, server.Arma3Profile, battleye/beserver.cfg # Creates directories if they don't exist # Sets restrictive file permissions on files containing passwords: # Unix: chmod 0600 # Windows: use icacls to grant only the service account read/write access # Raises IOError if write fails — caller must handle (set DB status='error') def write_server_cfg(server_id, config: dict, path: Path) -> None # Uses structured builder — NOT f-strings or string.Template # Escapes double quotes in all string values (replace " with \"") # Validates no newline injection in string fields # Renders mission rotation as class Missions { class Mission1 { ... }; }; def write_basic_cfg(server_id, config: dict, path: Path) -> None def write_arma3profile(server_id, profile: dict, path: Path) -> None # Writes to servers/{id}/server/server.Arma3Profile (profile subdirectory) def write_beserver_cfg(server_id, rcon_config: dict, path: Path) -> None # Generates servers/{id}/battleye/beserver.cfg # Content: "RConPassword \nRConPort \n" # Without this file BattlEye will not open an RCon port. def build_launch_args(server_id, config: dict, launch: dict, mod_string: str) -> list[str] # Returns list of command-line arguments for arma3server_x64.exe # e.g. ['-port=2302', '-config=server.cfg', '-cfg=basic.cfg', # '-profiles=./', '-name=server', '-world=empty', # '-mod=@CBA;@ACE', '-serverMod=@ACE_server', # '-bepath=./battleye', # '-limitFPS=50', '-autoInit', '-loadMissionToMemory', ...] # NOTE: -profiles is relative to cwd (which is set to servers/{id}/) # -bepath is required for BattlEye to find beserver.cfg def _escape_config_string(value: str) -> str # Escapes backslashes FIRST, then double quotes and newlines for safe Arma 3 config interpolation. # Order matters: backslash → \\, then " → \", then newline → \\n # If backslashes are not escaped first, input "test\\" produces "test\\" # which Arma 3 reads as an escaped backslash + unescaped closing quote = injection. value = value.replace('\\', '\\\\') # backslash FIRST value = value.replace('"', '\\"') # then double-quote value = value.replace('\n', '\\n') # then newline value = value.replace('\r', '') # strip carriage returns value = value.replace('\t', ' ') # tabs → spaces return value def _render_mission_class(rotation: list[dict]) -> str # Renders the class Missions {} block for server.cfg # class Missions { class Mission1 { template="..."; difficulty="..."; }; ... }; ``` --- ### `rcon/` **`client.py`** — `BERConClient` ```python class BERConClient: """ Implements BattlEye RCon protocol over UDP. Packet type bytes: Client → Server: 0xFF 0x00 [password] → login Client → Server: 0xFF 0x01 [seq] [command] → send command Client → Server: 0xFF 0x02 → keepalive (empty payload) Server → Client: 0xFF 0x00 [0x00|0x01] → login response (0x01=ok) Server → Client: 0xFF 0x01 [seq] [response] → command response Server → Client: 0xFF 0x02 [seq] [message] → unsolicited server message (chat/kill events) Note: 0x01 is the type byte for BOTH outgoing commands AND incoming command responses. """ def __init__(host: str, port: int, password: str) # Request multiplexer: prevents response misrouting when # RConPollerThread and API-request RConService share the same socket. _pending_requests: dict[int, threading.Event] = {} # seq → Event _responses: dict[int, str] = {} # seq → response _seq_counter: int = 0 _lock: threading.Lock def connect() -> bool def disconnect() def login() -> bool def send_command(command: str, timeout: float = 5.0) -> str | None # Sends command with sequence number, creates Event, waits for response # Routes response to correct caller by matching sequence byte def keepalive() # send empty packet every 30s def is_connected() -> bool # Background receiver thread: def _receiver_loop() # Reads all incoming UDP packets # For type 0x01 (command response): sets Event + stores response for matching seq # For type 0x02 (server message): enqueues for processing (player events, chat) def parse_players_response(response: str) -> list[PlayerInfo] # Parse output of 'players' command # Format: "Players on server:\n[#] [IP:Port] [Ping] [GUID] [Name]\n..." ``` **`service.py`** — `RConService` ```python class RConService: def __init__(server_id: int) def send_command(command: str) -> str | None # Gets or creates BERConClient, sends command, returns response def kick_player(player_num: int, reason: str = "") -> bool def ban_player(player_num: int, duration_minutes: int, reason: str) -> bool def unban(guid: str) -> bool def say_all(message: str) -> bool def get_players() -> list[PlayerInfo] def send_mission_command(mission_name: str) -> bool def shutdown() -> bool def restart() -> bool def lock() -> bool def unlock() -> bool ``` --- ### `missions/` **`service.py`** — `MissionService` ```python class MissionService: def list_missions(server_id) -> list[Mission] def upload_mission(server_id, filename: str, file_data: bytes) -> Mission # 1. Validate .pbo extension # 2. Parse mission_name and terrain from filename # 3. Write to servers/{server_id}/mpmissions/{filename} # 4. Insert into missions table # 5. Return Mission object def delete_mission(server_id, mission_id) -> bool # 1. Check not in active rotation # 2. Delete file from disk # 3. Delete from DB def get_rotation(server_id) -> list[RotationEntry] def update_rotation(server_id, rotation: list[RotationEntry]) -> bool # 1. Delete existing rotation rows # 2. Insert new ordered list # 3. Trigger config regeneration ``` --- ### `mods/` **`service.py`** — `ModService` ```python class ModService: def list_all() -> list[Mod] def register_mod(name, folder_path, workshop_id, description) -> Mod # Validates folder exists def delete_mod(mod_id) -> bool # Check not in use by any server def get_server_mods(server_id) -> list[ServerMod] def update_server_mods(server_id, mods: list) -> bool # Replaces server_mods rows, regenerates mod string def build_mod_string(server_id) -> tuple[str, str] # Returns (-mod=..., -serverMod=...) strings ``` --- ### `players/` **`service.py`** — `PlayerService` ```python class PlayerService: def get_current_players(server_id) -> list[Player] def kick(server_id, player_num, reason) -> bool # RConService.kick_player() + log event def ban(server_id, player_num, duration_minutes, reason) -> bool # RConService.ban_player() + insert into bans table def get_history(server_id, limit, offset, search) -> PaginatedResult def update_from_rcon(server_id, rcon_players: list) -> None # Upsert players table; detect disconnections; insert player_history rows ``` --- ### `logs/` **`parser.py`** — `RPTParser` ```python class RPTParser: # Parses Arma 3 RPT log format # Example line: "10:05:23 BattlEye Server: Initialized (v1.240)" # With timestamp format "short": "10:05:23" # With timestamp format "full": "2026/04/16, 10:05:23" def parse_line(line: str) -> LogEntry | None # Returns: {timestamp, level, message} # level detection: 'error' if 'Error' in msg, 'warning' if 'Warning', else 'info' def parse_timestamp(raw: str) -> datetime ``` **`service.py`** — `LogService` ```python class LogService: def query(server_id, limit, offset, level, since, search) -> PaginatedLogs def clear(server_id) -> int # returns deleted count def get_rpt_path(server_id) -> Path | None # Delegates to file_utils.get_rpt_path() — globs for latest timestamped .rpt def cleanup_old_logs() # called by APScheduler ``` --- ### `metrics/` **`service.py`** — `MetricsService` ```python class MetricsService: def query(server_id, from_dt, to_dt, resolution) -> list[MetricPoint] # Aggregates by resolution ('1m', '5m', '1h') def insert(server_id, cpu, ram, player_count) -> None def cleanup_old_metrics() # called by APScheduler def get_latest(server_id) -> MetricPoint | None ``` --- ### `websocket/` **`manager.py`** — `ConnectionManager` ```python class ConnectionManager: """ Manages active WebSocket connections grouped by server_id. 'all' is a special server_id that receives events for all servers. """ _connections: dict[str, set[WebSocket]] _lock: asyncio.Lock async def connect(ws: WebSocket, server_id: str, channels: list[str]) async def disconnect(ws: WebSocket, server_id: str) async def broadcast(server_id: str, message: dict) # Sends to all connections subscribed to server_id + 'all' async def send_personal(ws: WebSocket, message: dict) ``` **`broadcaster.py`** — `BroadcastThread` ```python class BroadcastThread(threading.Thread): """ Runs in background thread. Reads from a queue (put by background threads). Posts messages to asyncio event loop via run_coroutine_threadsafe(). """ _queue: queue.Queue _loop: asyncio.AbstractEventLoop _manager: ConnectionManager _running: bool def run() # main loop: get from queue, schedule broadcast coroutine @staticmethod def enqueue(server_id: int, msg_type: str, data: dict) # Thread-safe: called from any background thread ``` **`router.py`** — WebSocket endpoint ```python @router.websocket("/ws/{server_id}") async def websocket_endpoint(ws: WebSocket, server_id: str, token: str = Query(...)): # 1. Validate JWT token from query param # 2. Accept WebSocket connection # 3. Register with ConnectionManager # 4. Loop: receive messages (ping/subscribe/unsubscribe) # 5. On disconnect: deregister from ConnectionManager ``` --- ### `threads/` **`base_thread.py`** — `BaseServerThread` ```python class BaseServerThread(threading.Thread): def __init__(server_id: int, interval: float) def stop() # sets _stop_event def is_stopped() -> bool def run() # creates thread-local DB connection, calls setup(), # then loops: tick(self._db) + wait(interval) # finally calls teardown() in finally block def setup() # override for init work (receives self._db) def tick() # override for per-interval work (uses self._db) def teardown() # override for cleanup (close files, sockets) def on_error(e: Exception) # default: log, continue # if same error repeats 5x in a row: escalate + self.stop() ``` **`process_monitor.py`** — `ProcessMonitorThread` ```python class ProcessMonitorThread(BaseServerThread): interval = 1.0 # seconds def tick(): # 1. Check if process is still alive (os.kill(pid, 0)) # 2. If dead: # a. Get exit code # b. DB: status = 'crashed', stopped_at = now # c. Clear players from DB # d. Broadcast: {type: 'status', status: 'crashed'} # e. Insert server_events: {event_type: 'crashed', exit_code} # f. If auto_restart enabled and restart_count < max_restarts: # DB: increment restart_count # Schedule restart after 10s (threading.Timer) # g. self.stop() ``` **`log_tail.py`** — `LogTailThread` ```python class LogTailThread(BaseServerThread): interval = 0.1 # 100ms def setup(): # Find .rpt file path # Open file, seek to end (tail behavior) self._file = open(rpt_path, 'r', encoding='utf-8', errors='replace') self._file.seek(0, 2) # seek to end def tick(): # 1. Read all new lines from self._file # 2. For each line: # a. RPTParser.parse_line(line) -> LogEntry # b. LogRepository.insert(server_id, entry) # c. BroadcastThread.enqueue(server_id, 'log', entry) def on_rpt_rotate(): # Close and reopen if file was rotated (new server start) ``` **`metrics_collector.py`** — `MetricsCollectorThread` ```python class MetricsCollectorThread(BaseServerThread): interval = 5.0 # seconds def tick(): # 1. Get PID from ProcessManager # 2. psutil.Process(pid).cpu_percent(interval=0.5) # 3. psutil.Process(pid).memory_info().rss / (1024*1024) # MB # 4. PlayerRepository.count(server_id) -> player_count # 5. MetricsRepository.insert(server_id, cpu, ram, player_count) # 6. BroadcastThread.enqueue(server_id, 'metrics', {cpu, ram, player_count}) ``` **`rcon_poller.py`** — `RConPollerThread` ```python class RConPollerThread(BaseServerThread): interval = 10.0 # seconds startup_delay = 30.0 # wait 30s after server start before first poll _rcon_ready = False # flag: set True only after successful setup def setup(): # Use _stop_event.wait() instead of time.sleep() so the thread # can be interrupted immediately during shutdown if self._stop_event.wait(self.startup_delay): self._rcon_ready = False return # stop was requested during startup delay self._rcon = RConService(self.server_id) self._rcon_ready = True def tick(): if not self._rcon_ready: return # setup() failed or was interrupted — skip tick # 1. RConService.get_players() -> list[PlayerInfo] # 2. PlayerService.update_from_rcon(server_id, players) # 3. BroadcastThread.enqueue(server_id, 'players', {players, count}) # 4. RConClient.keepalive() if needed ``` **`thread_registry.py`** — `ThreadRegistry` ```python class ThreadRegistry: """ Singleton. Manages all background threads per server. """ _threads: dict[int, dict[str, BaseServerThread]] _lock: threading.Lock @classmethod def get() -> ThreadRegistry def start_server_threads(server_id: int) -> None # Instantiates and starts: # ProcessMonitorThread, LogTailThread, # MetricsCollectorThread, RConPollerThread def stop_server_threads(server_id: int) -> None # Calls stop() on each thread; joins with timeout def get_thread(server_id, thread_type: str) -> BaseServerThread | None def list_active(server_id) -> list[str] # thread names def stop_all() -> None # on app shutdown ``` --- ### `dal/` **`base_repository.py`** ```python class BaseRepository: def __init__(db: Connection) def execute(sql: str, params: tuple = ()) -> CursorResult def fetchone(sql: str, params: tuple = ()) -> dict | None def fetchall(sql: str, params: tuple = ()) -> list[dict] def insert(table: str, data: dict) -> int # returns last_insert_rowid def update(table: str, data: dict, where: str, params: tuple) -> int def delete(table: str, where: str, params: tuple) -> int def row_to_dict(row) -> dict ``` **`server_repository.py`** ```python class ServerRepository(BaseRepository): def get_all() -> list[dict] def get_by_id(server_id) -> dict | None def create(data: dict) -> int def update_status(server_id, status, pid=None, started_at=None) -> None def update(server_id, data: dict) -> None def delete(server_id) -> None def get_running() -> list[dict] # for startup recovery def increment_restart_count(server_id) -> None def reset_restart_count(server_id) -> None ``` **`event_repository.py`** ```python class ServerEventRepository(BaseRepository): def insert(server_id: int, event_type: str, actor: str, detail: dict) -> int def get_events(server_id: int, limit: int, offset: int, event_type: str | None) -> list[dict] def get_recent(server_id: int, limit: int = 20) -> list[dict] ``` **`config_repository.py`** ```python class ConfigRepository(BaseRepository): def get_server_config(server_id) -> dict | None def upsert_server_config(server_id, data: dict) -> None def get_basic_config(server_id) -> dict | None def upsert_basic_config(server_id, data: dict) -> None def get_profile(server_id) -> dict | None def upsert_profile(server_id, data: dict) -> None def get_launch_params(server_id) -> dict | None def upsert_launch_params(server_id, data: dict) -> None def get_rcon_config(server_id) -> dict | None def upsert_rcon_config(server_id, data: dict) -> None def get_full_config(server_id) -> dict # all sections combined ``` --- ### `system/` **`router.py`** — System-level endpoints (no auth required for health check). ```python # GET /system/status → running_servers, total_servers, uptime, version # GET /system/health → 200 OK if app is alive (for load balancer / Docker healthcheck) @router.get("/system/status") async def system_status() -> APIResponse: # Returns: {version, running_servers, total_servers, uptime_seconds} @router.get("/system/health") async def health_check() -> dict: # Returns: {"status": "ok"} ``` --- ### `utils/` **`crypto.py`** ```python # AES-256 field encryption for sensitive values (passwords, RCon pw) # Uses cryptography.fernet.Fernet def encrypt(plaintext: str) -> str def decrypt(ciphertext: str) -> str def get_fernet() -> Fernet # from settings.encryption_key ``` **`file_utils.py`** ```python def ensure_server_dirs(server_id: int) -> None # Creates servers/{id}/, servers/{id}/server/ (profile dir), # servers/{id}/mpmissions/, servers/{id}/battleye/ def get_server_dir(server_id: int) -> Path def get_profile_dir(server_id: int) -> Path # Returns servers/{id}/server/ — Arma 3 profile dir (matches -name=server) def get_missions_dir(server_id: int) -> Path def get_rpt_path(server_id: int) -> Path | None # Arma 3 creates timestamped RPT files in the profile dir: # servers/{id}/server/arma3server_YYYY-MM-DD_HH-MM-SS.rpt # Uses rglob('*.rpt') to search recursively within profile dir. # Returns the most-recently-modified one. # Returns None if no .rpt file exists yet (server still starting up). def safe_delete_file(path: Path) -> bool def sanitize_filename(filename: str) -> str # Returns Path(filename).name — prevents path traversal on both Unix and Windows # os.path.basename() on Windows does NOT strip forward slashes; # Path.name handles both separators correctly. ``` **`port_checker.py`** ```python def is_port_in_use(port: int, host: str = "0.0.0.0") -> bool # socket.connect check def check_server_ports_available(game_port: int, rcon_port: int | None = None, host: str = "0.0.0.0") -> list[int] # Checks ALL ports: game_port, game_port+1 (Steam query), # game_port+2 (VON), game_port+3 (Steam auth), # plus the actual rcon_port (user-configurable, defaults to game_port+4) # If rcon_port is None, defaults to game_port+4 # If rcon_port is None, defaults to game_port+4 # Returns list of ports that are in use (empty = all available) def find_available_port(start: int = 2302, step: int = 100) -> int # Find next available game port (checking all 5 derived ports per candidate) ``` --- ## Key Dependencies (requirements.txt) ``` fastapi==0.111.0 uvicorn[standard]==0.29.0 pydantic==2.7.0 pydantic-settings==2.2.1 sqlalchemy==2.0.30 python-jose[cryptography]==3.3.0 # JWT passlib[bcrypt]==1.7.4 # password hashing cryptography==42.0.5 # field-level encryption (Fernet) psutil==5.9.8 # process metrics apscheduler==3.10.4 # scheduled jobs (log/metrics/player_history cleanup) python-multipart==0.0.9 # file upload support slowapi==0.1.9 # rate limiting middleware uvloop==0.19.0; sys_platform != "win32" # faster event loop (Linux/macOS only — skip on Windows) ```