feat: implement full backend + frontend server detail, settings, and create server pages

Backend:
- Complete FastAPI backend with 42+ REST endpoints (auth, servers, config,
  players, bans, missions, mods, games, system)
- Game adapter architecture with Arma 3 as first-class adapter
- WebSocket real-time events for status, metrics, logs, players
- Background thread system (process monitor, metrics, log tail, RCon poller)
- Fernet encryption for sensitive config fields at rest
- JWT auth with admin/viewer roles, bcrypt password hashing
- SQLite with WAL mode, parameterized queries, migration system
- APScheduler cleanup jobs for logs, metrics, events

Frontend:
- Server Detail page with 7 tabs (overview, config, players, bans,
  missions, mods, logs)
- Settings page with password change and admin user management
- Create Server wizard (4-step; known bug: silent validation failure)
- New hooks: useServerDetail, useAuth, useGames
- New components: ServerHeader, ConfigEditor, PlayerTable, BanTable,
  MissionList, ModList, LogViewer, PasswordChange, UserManager
- WebSocket onEvent callback for real-time log accumulation
- 120 unit tests passing (Vitest + React Testing Library)

Docs:
- Added .gitignore, CLAUDE.md, README.md
- Updated FRONTEND.md, ARCHITECTURE.md with current implementation state
- Added .env.example for backend configuration

Known issues:
- Create Server form: "Next" buttons don't validate before advancing,
  causing silent submit failure when fields are invalid
- Config sub-tabs need UX redesign for non-technical users
This commit is contained in:
Tran G. (Revernomad) Khoa
2026-04-17 11:58:34 +07:00
parent 620429c9b8
commit 6511353b55
119 changed files with 13752 additions and 5000 deletions

View File

@@ -0,0 +1,39 @@
"""
Auto-register all built-in adapters.
Also scans importlib entry_points for third-party adapters.
"""
import logging
logger = logging.getLogger(__name__)
def load_builtin_adapters():
"""Import built-in adapter packages — they self-register on import."""
from adapters.arma3 import ARMA3_ADAPTER # noqa: F401
def load_third_party_adapters():
"""
Scan 'languard.adapters' entry_point group for third-party adapters.
Third-party packages add this to their pyproject.toml:
[project.entry-points."languard.adapters"]
mygame = "mygame_adapter:MYGAME_ADAPTER"
"""
try:
from importlib.metadata import entry_points
eps = entry_points(group="languard.adapters")
for ep in eps:
try:
adapter = ep.load()
from adapters.registry import GameAdapterRegistry
GameAdapterRegistry.register(adapter)
logger.info("Loaded third-party adapter via entry_point: %s", ep.name)
except Exception as e:
logger.error("Failed to load third-party adapter '%s': %s", ep.name, e)
except Exception as e:
logger.warning("Entry point scanning failed: %s", e)
def initialize_adapters():
load_builtin_adapters()
load_third_party_adapters()

View File

@@ -0,0 +1,7 @@
"""Auto-register Arma 3 adapter on import."""
from adapters.arma3.adapter import ARMA3_ADAPTER
from adapters.registry import GameAdapterRegistry
GameAdapterRegistry.register(ARMA3_ADAPTER)
__all__ = ["ARMA3_ADAPTER"]

View File

@@ -0,0 +1,59 @@
"""Arma 3 adapter — composes all Arma 3 capability implementations."""
from adapters.arma3.config_generator import Arma3ConfigGenerator
from adapters.arma3.process_config import Arma3ProcessConfig
# Capabilities enabled so far (add more as phases complete)
_CAPABILITIES = {
"config_generator",
"process_config",
"log_parser",
"remote_admin",
"ban_manager",
"mission_manager",
"mod_manager",
}
class Arma3Adapter:
game_type = "arma3"
display_name = "Arma 3"
version = "1.0.0"
def get_config_generator(self):
return Arma3ConfigGenerator()
def get_process_config(self):
return Arma3ProcessConfig()
def get_log_parser(self):
from adapters.arma3.log_parser import RPTParser
return RPTParser()
def get_remote_admin(self):
"""Return the RemoteAdmin factory for Arma3 BattlEye RCon."""
from adapters.arma3.remote_admin import Arma3RemoteAdminFactory
return Arma3RemoteAdminFactory()
def get_mission_manager(self, server_id: int | None = None):
from adapters.arma3.mission_manager import Arma3MissionManager
return Arma3MissionManager(server_id=server_id)
def get_mod_manager(self, server_id: int | None = None):
from adapters.arma3.mod_manager import Arma3ModManager
return Arma3ModManager(server_id=server_id)
def get_ban_manager(self, server_id: int | None = None):
from adapters.arma3.ban_manager import Arma3BanManager
return Arma3BanManager(server_id=server_id)
def has_capability(self, name: str) -> bool:
return name in _CAPABILITIES
def get_additional_routers(self) -> list:
return []
def get_custom_thread_factories(self) -> list:
return []
ARMA3_ADAPTER = Arma3Adapter()

View File

@@ -0,0 +1,200 @@
"""Arma 3 ban manager — bidirectional sync between DB bans and BattlEye ban file."""
from __future__ import annotations
import logging
import os
from pathlib import Path
from pydantic import BaseModel
from core.utils.file_utils import get_server_dir
logger = logging.getLogger(__name__)
_BANS_FILE = "battleye/bans.txt"
class Arma3BanData(BaseModel):
"""Ban data schema for Arma 3."""
guid: str = ""
ip: str = ""
class Arma3BanManager:
"""
Implements BanManager protocol for Arma3 BattlEye.
Also provides richer file-based operations for the ban endpoints.
"""
def __init__(self, server_id: int | None = None) -> None:
self._server_id = server_id
def _bans_path(self) -> Path:
if self._server_id is None:
raise ValueError("server_id required for file-based ban operations")
server_dir = get_server_dir(self._server_id)
return server_dir / _BANS_FILE
# ── BanManager protocol methods ──
def get_ban_file_path(self, server_dir: Path) -> Path:
return server_dir / _BANS_FILE
def sync_bans_to_file(self, bans: list[dict], ban_file: Path) -> None:
"""Write bans from DB to BattlEye ban file format."""
lines = []
for ban in bans:
identifier = ban.get("player_uid") or ban.get("guid") or ban.get("ip", "")
ban_type = ban.get("ban_type", "GUID")
reason = ban.get("reason", "")
duration = ban.get("duration_minutes", 0)
reason_clean = reason.replace("\n", " ").replace("\r", "").strip()
if identifier:
lines.append(f"{ban_type} {identifier} {duration} {reason_clean}".strip())
ban_file.parent.mkdir(parents=True, exist_ok=True)
tmp_path = str(ban_file) + ".tmp"
try:
with open(tmp_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines) + "\n" if lines else "")
os.replace(tmp_path, str(ban_file))
except OSError as exc:
self._safe_delete(tmp_path)
raise
def read_bans_from_file(self, ban_file: Path) -> list[dict]:
"""Read bans from BattlEye ban file into standard format."""
if not ban_file.exists():
return []
bans = []
for line_num, line in enumerate(ban_file.read_text(encoding="utf-8", errors="replace").splitlines(), 1):
line = line.strip()
if not line or line.startswith("//") or line.startswith("#"):
continue
parsed = self._parse_ban_line(line, line_num)
if parsed:
bans.append(parsed)
return bans
def get_ban_data_schema(self) -> type[BaseModel] | None:
return Arma3BanData
# ── Richer file-based operations (used by ban endpoints) ──
def get_bans(self) -> list[dict]:
"""Read all bans from bans.txt. Returns list of dicts."""
bans_path = self._bans_path()
if not bans_path.exists():
return []
bans = []
try:
with open(bans_path, "r", encoding="utf-8", errors="replace") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line or line.startswith("#"):
continue
parsed = self._parse_ban_line(line, line_num)
if parsed:
bans.append(parsed)
except OSError as exc:
logger.error("Cannot read bans.txt: %s", exc)
return bans
def add_ban(self, identifier: str, ban_type: str, reason: str, duration_minutes: int) -> None:
"""Append a ban entry to bans.txt."""
reason_clean = reason.replace("\n", " ").replace("\r", "").strip()
line = f"{ban_type} {identifier} {duration_minutes} {reason_clean}\n"
bans_path = self._bans_path()
bans_path.parent.mkdir(parents=True, exist_ok=True)
try:
with open(bans_path, "a", encoding="utf-8") as f:
f.write(line)
except OSError as exc:
logger.error("Cannot write bans.txt: %s", exc)
def remove_ban(self, identifier: str) -> bool:
"""Remove all ban entries matching the given identifier. Returns True if removed."""
bans_path = self._bans_path()
if not bans_path.exists():
return False
try:
with open(bans_path, "r", encoding="utf-8", errors="replace") as f:
lines = f.readlines()
except OSError as exc:
logger.error("Cannot read bans.txt: %s", exc)
return False
new_lines = []
removed = 0
for line in lines:
stripped = line.strip()
if stripped and not stripped.startswith("#"):
parts = stripped.split(None, 3)
if len(parts) >= 2 and parts[1] == identifier:
removed += 1
continue
new_lines.append(line)
if removed == 0:
return False
tmp_path = str(bans_path) + ".tmp"
try:
with open(tmp_path, "w", encoding="utf-8") as f:
f.writelines(new_lines)
os.replace(tmp_path, str(bans_path))
except OSError as exc:
self._safe_delete(tmp_path)
logger.error("Cannot update bans.txt: %s", exc)
return False
return True
# ── Internal ──
def _parse_ban_line(self, line: str, line_num: int) -> dict | None:
"""Parse one ban line: TYPE IDENTIFIER DURATION REASON"""
parts = line.split(None, 3)
if len(parts) < 2:
return None
ban_type = parts[0].upper()
if ban_type not in ("GUID", "IP"):
return None
identifier = parts[1]
duration = 0
reason = ""
if len(parts) >= 3:
try:
duration = int(parts[2])
except ValueError:
duration = 0
if len(parts) >= 4:
reason = parts[3]
return {
"type": ban_type,
"identifier": identifier,
"duration_minutes": duration,
"reason": reason,
"is_permanent": duration == 0,
}
@staticmethod
def _safe_delete(path: str) -> None:
try:
os.unlink(path)
except OSError as exc:
logger.debug("Arma3BanManager: could not delete %s: %s", path, exc)

View File

@@ -0,0 +1,400 @@
"""
Arma 3 config generator.
Merged protocol: Pydantic models (schema) + file generation + launch args.
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import Any
from pydantic import BaseModel, Field
# ─── Pydantic Models (config schema) ─────────────────────────────────────────
class ServerConfig(BaseModel):
hostname: str = "My Arma 3 Server"
password: str | None = None
password_admin: str = ""
server_command_password: str | None = None
max_players: int = Field(default=40, gt=0, le=1000)
kick_duplicate: int = Field(default=1, ge=0, le=1)
persistent: int = Field(default=1, ge=0, le=1)
vote_threshold: float = Field(default=0.33, ge=0.0, le=1.0)
vote_mission_players: int = Field(default=1, ge=0)
vote_timeout: int = Field(default=60, ge=0)
role_timeout: int = Field(default=90, ge=0)
briefing_timeout: int = Field(default=60, ge=0)
debriefing_timeout: int = Field(default=45, ge=0)
lobby_idle_timeout: int = Field(default=300, ge=0)
disable_von: int = Field(default=0, ge=0, le=1)
von_codec: int = Field(default=1, ge=0, le=1)
von_codec_quality: int = Field(default=20, ge=0, le=30)
max_ping: int = Field(default=250, gt=0)
max_packet_loss: int = Field(default=50, ge=0, le=100)
max_desync: int = Field(default=200, ge=0)
disconnect_timeout: int = Field(default=15, ge=0)
kick_on_ping: int = Field(default=1, ge=0, le=1)
kick_on_packet_loss: int = Field(default=1, ge=0, le=1)
kick_on_desync: int = Field(default=1, ge=0, le=1)
kick_on_timeout: int = Field(default=1, ge=0, le=1)
battleye: int = Field(default=1, ge=0, le=1)
verify_signatures: int = Field(default=2, ge=0, le=2)
allowed_file_patching: int = Field(default=0, ge=0, le=2)
forced_difficulty: str = "Regular"
timestamp_format: str = "short"
auto_select_mission: int = Field(default=0, ge=0, le=1)
random_mission_order: int = Field(default=0, ge=0, le=1)
log_file: str = "server_console.log"
skip_lobby: int = Field(default=0, ge=0, le=1)
drawing_in_map: int = Field(default=1, ge=0, le=1)
upnp: int = Field(default=0, ge=0, le=1)
loopback: int = Field(default=0, ge=0, le=1)
statistics_enabled: int = Field(default=1, ge=0, le=1)
motd_lines: list[str] = Field(default_factory=lambda: ["Welcome!", "Have fun"])
motd_interval: float = Field(default=5.0, gt=0)
headless_clients: list[str] = Field(default_factory=list)
local_clients: list[str] = Field(default_factory=list)
admin_uids: list[str] = Field(default_factory=list)
class BasicConfig(BaseModel):
min_bandwidth: int = Field(default=800000, gt=0)
max_bandwidth: int = Field(default=25000000, gt=0)
max_msg_send: int = Field(default=384, gt=0)
max_size_guaranteed: int = Field(default=512, gt=0)
max_size_non_guaranteed: int = Field(default=256, gt=0)
min_error_to_send: float = Field(default=0.003, gt=0)
max_custom_file_size: int = Field(default=100000, ge=0)
class ProfileConfig(BaseModel):
reduced_damage: int = Field(default=0, ge=0, le=1)
group_indicators: int = Field(default=0, ge=0, le=3)
friendly_tags: int = Field(default=0, ge=0, le=3)
enemy_tags: int = Field(default=0, ge=0, le=3)
detected_mines: int = Field(default=0, ge=0, le=3)
commands: int = Field(default=1, ge=0, le=3)
waypoints: int = Field(default=1, ge=0, le=3)
tactical_ping: int = Field(default=0, ge=0, le=1)
weapon_info: int = Field(default=2, ge=0, le=3)
stance_indicator: int = Field(default=2, ge=0, le=3)
stamina_bar: int = Field(default=0, ge=0, le=1)
weapon_crosshair: int = Field(default=0, ge=0, le=1)
vision_aid: int = Field(default=0, ge=0, le=1)
third_person_view: int = Field(default=0, ge=0, le=1)
camera_shake: int = Field(default=1, ge=0, le=1)
score_table: int = Field(default=1, ge=0, le=1)
death_messages: int = Field(default=1, ge=0, le=1)
von_id: int = Field(default=1, ge=0, le=1)
map_content_friendly: int = Field(default=0, ge=0, le=3)
map_content_enemy: int = Field(default=0, ge=0, le=3)
map_content_mines: int = Field(default=0, ge=0, le=3)
auto_report: int = Field(default=0, ge=0, le=1)
multiple_saves: int = Field(default=0, ge=0, le=1)
ai_level_preset: int = Field(default=3, ge=0, le=4)
skill_ai: float = Field(default=0.5, ge=0.0, le=1.0)
precision_ai: float = Field(default=0.5, ge=0.0, le=1.0)
class LaunchConfig(BaseModel):
world: str = "empty"
extra_params: str = ""
limit_fps: int = Field(default=50, gt=0, le=1000)
auto_init: int = Field(default=0, ge=0, le=1)
load_mission_to_memory: int = Field(default=0, ge=0, le=1)
enable_ht: int = Field(default=0, ge=0, le=1)
huge_pages: int = Field(default=0, ge=0, le=1)
cpu_count: int | None = None
ex_threads: int = Field(default=7, ge=0)
max_mem: int | None = None
no_logs: int = Field(default=0, ge=0, le=1)
netlog: int = Field(default=0, ge=0, le=1)
class RConConfig(BaseModel):
rcon_password: str = ""
max_ping: int = Field(default=200, gt=0)
enabled: int = Field(default=1, ge=0, le=1)
# ─── Config Generator ─────────────────────────────────────────────────────────
class Arma3ConfigGenerator:
game_type = "arma3"
SECTIONS: dict[str, type[BaseModel]] = {
"server": ServerConfig,
"basic": BasicConfig,
"profile": ProfileConfig,
"launch": LaunchConfig,
"rcon": RConConfig,
}
SENSITIVE_FIELDS: dict[str, list[str]] = {
"server": ["password", "password_admin", "server_command_password"],
"rcon": ["rcon_password"],
}
def get_sections(self) -> dict[str, type[BaseModel]]:
return self.SECTIONS
def get_defaults(self, section: str) -> dict[str, Any]:
model_cls = self.SECTIONS.get(section)
if model_cls is None:
return {}
return model_cls().model_dump()
def get_sensitive_fields(self, section: str) -> list[str]:
return self.SENSITIVE_FIELDS.get(section, [])
def get_config_version(self) -> str:
return "1.0.0"
def migrate_config(self, old_version: str, config_json: dict) -> dict:
"""
For version 1.0.0 there is nothing to migrate.
Future versions: add migration logic here.
"""
from adapters.exceptions import ConfigMigrationError
raise ConfigMigrationError(
old_version, f"No migration path from {old_version} to {self.get_config_version()}"
)
# ── Config file writers ───────────────────────────────────────────────────
@staticmethod
def _escape(value: str) -> str:
"""
Escape a string for use inside Arma 3 double-quoted config values.
Order matters: escape backslashes FIRST.
"""
value = value.replace("\\", "\\\\")
value = value.replace('"', '\\"')
value = value.replace('\n', '\\n')
return value
@staticmethod
def _atomic_write(path: Path, content: str) -> None:
"""Write content to path atomically via tmp file + os.replace()."""
from adapters.exceptions import ConfigWriteError
tmp_path = path.with_suffix(path.suffix + ".tmp")
try:
path.parent.mkdir(parents=True, exist_ok=True)
tmp_path.write_text(content, encoding="utf-8")
os.replace(str(tmp_path), str(path))
except OSError as e:
# Clean up tmp file if it exists
try:
tmp_path.unlink(missing_ok=True)
except OSError as exc:
logger.debug("Could not clean up temp file %s: %s", tmp_path, exc)
raise ConfigWriteError(str(path), str(e)) from e
def _render_server_cfg(self, cfg: ServerConfig) -> str:
"""Render server.cfg content string."""
motd_items = ", ".join(f'"{self._escape(l)}"' for l in cfg.motd_lines)
headless = ", ".join(f'"{h}"' for h in cfg.headless_clients)
local = ", ".join(f'"{l}"' for l in cfg.local_clients)
admin_uids = ", ".join(f'"{u}"' for u in cfg.admin_uids)
lines = [
f'hostname = "{self._escape(cfg.hostname)}";',
]
if cfg.password:
lines.append(f'password = "{self._escape(cfg.password)}";')
if cfg.password_admin:
lines.append(f'passwordAdmin = "{self._escape(cfg.password_admin)}";')
if cfg.server_command_password:
lines.append(f'serverCommandPassword = "{self._escape(cfg.server_command_password)}";')
lines += [
f"maxPlayers = {cfg.max_players};",
f"kickDuplicate = {cfg.kick_duplicate};",
f"persistent = {cfg.persistent};",
f"voteThreshold = {cfg.vote_threshold};",
f"voteMissionPlayers = {cfg.vote_mission_players};",
f"voteTimeout = {cfg.vote_timeout};",
f"roleTimeout = {cfg.role_timeout};",
f"briefingTimeOut = {cfg.briefing_timeout};",
f"debriefingTimeOut = {cfg.debriefing_timeout};",
f"lobbyIdleTimeout = {cfg.lobby_idle_timeout};",
f"disableVoN = {cfg.disable_von};",
f"vonCodec = {cfg.von_codec};",
f"vonCodecQuality = {cfg.von_codec_quality};",
f"maxPing = {cfg.max_ping};",
f"maxPacketLoss = {cfg.max_packet_loss};",
f"maxDesync = {cfg.max_desync};",
f"disconnectTimeout = {cfg.disconnect_timeout};",
f"kickOnPing = {cfg.kick_on_ping};",
f"kickOnPacketLoss = {cfg.kick_on_packet_loss};",
f"kickOnDesync = {cfg.kick_on_desync};",
f"kickOnTimeout = {cfg.kick_on_timeout};",
f"BattlEye = {cfg.battleye};",
f"verifySignatures = {cfg.verify_signatures};",
f"allowedFilePatching = {cfg.allowed_file_patching};",
f'forcedDifficulty = "{cfg.forced_difficulty}";',
f'timeStampFormat = "{cfg.timestamp_format}";',
f"autoSelectMission = {cfg.auto_select_mission};",
f"randomMissionOrder = {cfg.random_mission_order};",
f'logFile = "{cfg.log_file}";',
f"skipLobby = {cfg.skip_lobby};",
f"drawingInMap = {cfg.drawing_in_map};",
f"upnp = {cfg.upnp};",
f"loopback = {cfg.loopback};",
f"statisticsEnabled = {cfg.statistics_enabled};",
f"motd[] = {{{motd_items}}};",
f"motdInterval = {cfg.motd_interval};",
]
if cfg.headless_clients:
lines.append(f"headlessClients[] = {{{headless}}};")
if cfg.local_clients:
lines.append(f"localClient[] = {{{local}}};")
if cfg.admin_uids:
lines.append(f"admins[] = {{{admin_uids}}};")
return "\n".join(lines) + "\n"
def _render_basic_cfg(self, cfg: BasicConfig) -> str:
return (
f"MinBandwidth = {cfg.min_bandwidth};\n"
f"MaxBandwidth = {cfg.max_bandwidth};\n"
f"MaxMsgSend = {cfg.max_msg_send};\n"
f"MaxSizeGuaranteed = {cfg.max_size_guaranteed};\n"
f"MaxSizeNonguaranteed = {cfg.max_size_non_guaranteed};\n"
f"MinErrorToSend = {cfg.min_error_to_send};\n"
f"MaxCustomFileSize = {cfg.max_custom_file_size};\n"
)
def _render_arma3profile(self, cfg: ProfileConfig) -> str:
return (
"class DifficultyPresets {\n"
" class CustomDifficulty {\n"
" class Options {\n"
f" reducedDamage = {cfg.reduced_damage};\n"
f" groupIndicators = {cfg.group_indicators};\n"
f" friendlyTags = {cfg.friendly_tags};\n"
f" enemyTags = {cfg.enemy_tags};\n"
f" detectedMines = {cfg.detected_mines};\n"
f" commands = {cfg.commands};\n"
f" waypoints = {cfg.waypoints};\n"
f" tacticalPing = {cfg.tactical_ping};\n"
f" weaponInfo = {cfg.weapon_info};\n"
f" stanceIndicator = {cfg.stance_indicator};\n"
f" staminaBar = {cfg.stamina_bar};\n"
f" weaponCrosshair = {cfg.weapon_crosshair};\n"
f" visionAid = {cfg.vision_aid};\n"
f" thirdPersonView = {cfg.third_person_view};\n"
f" cameraShake = {cfg.camera_shake};\n"
f" scoreTable = {cfg.score_table};\n"
f" deathMessages = {cfg.death_messages};\n"
f" vonID = {cfg.von_id};\n"
f" mapContentFriendly = {cfg.map_content_friendly};\n"
f" mapContentEnemy = {cfg.map_content_enemy};\n"
f" mapContentMines = {cfg.map_content_mines};\n"
f" autoReport = {cfg.auto_report};\n"
f" multipleSaves = {cfg.multiple_saves};\n"
" };\n"
f" aiLevelPreset = {cfg.ai_level_preset};\n"
" };\n"
" class CustomAILevel {\n"
f" skillAI = {cfg.skill_ai};\n"
f" precisionAI = {cfg.precision_ai};\n"
" };\n"
"};\n"
)
def _render_beserver_cfg(self, cfg: RConConfig) -> str:
return (
f"RConPassword {cfg.rcon_password}\n"
f"MaxPing {cfg.max_ping}\n"
)
# ── Public interface ──────────────────────────────────────────────────────
def write_configs(
self,
server_id: int,
server_dir: Path,
config_sections: dict[str, dict],
) -> list[Path]:
server_cfg = ServerConfig(**config_sections.get("server", {}))
basic_cfg = BasicConfig(**config_sections.get("basic", {}))
profile_cfg = ProfileConfig(**config_sections.get("profile", {}))
rcon_cfg = RConConfig(**config_sections.get("rcon", {}))
written = []
pairs = [
(server_dir / "server.cfg", self._render_server_cfg(server_cfg)),
(server_dir / "basic.cfg", self._render_basic_cfg(basic_cfg)),
(server_dir / "server" / "server.Arma3Profile", self._render_arma3profile(profile_cfg)),
(server_dir / "battleye" / "beserver.cfg", self._render_beserver_cfg(rcon_cfg)),
]
for path, content in pairs:
self._atomic_write(path, content)
written.append(path)
# Restrict permissions on files containing passwords (Unix only)
if os.name != "nt":
for path in [server_dir / "server.cfg", server_dir / "battleye" / "beserver.cfg"]:
if path.exists():
os.chmod(path, 0o600)
return written
def build_launch_args(
self,
config_sections: dict[str, dict],
mod_args: list[str] | None = None,
) -> list[str]:
from adapters.exceptions import LaunchArgsError
launch = LaunchConfig(**config_sections.get("launch", {}))
server = ServerConfig(**config_sections.get("server", {}))
args = [
f"-port={config_sections.get('_port', 2302)}",
"-config=server.cfg",
"-cfg=basic.cfg",
"-profiles=./server",
"-name=server",
f"-world={launch.world}",
f"-limitFPS={launch.limit_fps}",
"-bepath=./battleye",
]
if launch.auto_init:
args.append("-autoInit")
if launch.enable_ht:
args.append("-enableHT")
if launch.huge_pages:
args.append("-hugePages")
if launch.cpu_count is not None:
args.append(f"-cpuCount={launch.cpu_count}")
if launch.max_mem is not None:
args.append(f"-maxMem={launch.max_mem}")
if launch.no_logs:
args.append("-noLogs")
if launch.netlog:
args.append("-netlog")
if launch.extra_params:
args.extend(launch.extra_params.split())
if mod_args:
args.extend(mod_args)
return args
def preview_config(
self,
server_id: int,
server_dir: Path,
config_sections: dict[str, dict],
) -> dict[str, str]:
server_cfg = ServerConfig(**config_sections.get("server", {}))
basic_cfg = BasicConfig(**config_sections.get("basic", {}))
profile_cfg = ProfileConfig(**config_sections.get("profile", {}))
rcon_cfg = RConConfig(**config_sections.get("rcon", {}))
return {
"server.cfg": self._render_server_cfg(server_cfg),
"basic.cfg": self._render_basic_cfg(basic_cfg),
"server/server.Arma3Profile": self._render_arma3profile(profile_cfg),
"battleye/beserver.cfg": self._render_beserver_cfg(rcon_cfg),
}

View File

@@ -0,0 +1,81 @@
"""Arma 3 RPT log parser."""
from __future__ import annotations
import re
from datetime import datetime
from pathlib import Path
from typing import Callable
class RPTParser:
"""Parses Arma 3 .rpt log files."""
# Pattern: "HH:MM:SS ..." or "[HH:MM:SS] ..." with optional date prefix
_timestamp_re = re.compile(
r"^\s*(?:(\d{2}/\d{2}/\d{4})\s+)?"
r"(?:\[)?(\d{2}:\d{2}:\d{2})(?:\])?\s*"
r"(?:\[?(\w+)\]?\s*)?(.*)$"
)
def parse_line(self, line: str) -> dict | None:
"""Parse one RPT log line."""
if not line or not line.strip():
return None
match = self._timestamp_re.match(line)
if not match:
# Non-timestamped line — treat as info
stripped = line.strip()
if not stripped:
return None
return {
"timestamp": datetime.utcnow().isoformat(),
"level": "info",
"message": stripped,
}
date_str, time_str, level_str, message = match.groups()
# Map Arma 3 log levels
level = "info"
if level_str:
level_lower = level_str.lower()
if level_lower in ("error", "fault"):
level = "error"
elif level_lower in ("warning", "warn"):
level = "warning"
# Build ISO timestamp
try:
if date_str:
dt = datetime.strptime(f"{date_str} {time_str}", "%m/%d/%Y %H:%M:%S")
else:
dt = datetime.strptime(time_str, "%H:%M:%S")
dt = dt.replace(year=datetime.utcnow().year, month=datetime.utcnow().month, day=datetime.utcnow().day)
timestamp = dt.isoformat()
except ValueError:
timestamp = datetime.utcnow().isoformat()
return {
"timestamp": timestamp,
"level": level,
"message": (message or "").strip(),
}
def get_log_file_resolver(self, server_id: int) -> Callable[[Path], Path | None]:
"""Return a callable that finds the current RPT log file."""
def resolver(server_dir: Path) -> Path | None:
# Arma 3 stores logs in server_dir/server/*.rpt
profile_dir = server_dir / "server"
if not profile_dir.exists():
return None
rpt_files = sorted(profile_dir.glob("*.rpt"), key=lambda p: p.stat().st_mtime, reverse=True)
if rpt_files:
return rpt_files[0]
# Fallback: check for arma3server_x64_*.rpt pattern
rpt_files = sorted(profile_dir.glob("arma3server*.rpt"), key=lambda p: p.stat().st_mtime, reverse=True)
return rpt_files[0] if rpt_files else None
return resolver

View File

@@ -0,0 +1,191 @@
"""Arma 3 mission manager — handles .pbo mission files, upload, delete, rotation."""
from __future__ import annotations
import logging
import os
import re
from pathlib import Path
from pydantic import BaseModel
from adapters.exceptions import AdapterError
from core.utils.file_utils import get_server_dir, sanitize_filename, safe_delete_file
logger = logging.getLogger(__name__)
_MISSIONS_DIR = "mpmissions"
_ALLOWED_EXTENSION = ".pbo"
_MAX_MISSION_SIZE_MB = 500
class Arma3MissionData(BaseModel):
"""Mission data schema for Arma 3."""
terrain: str = ""
difficulty: str = "Regular"
class Arma3MissionManager:
file_extension = ".pbo"
def __init__(self, server_id: int | None = None) -> None:
self._server_id = server_id
def _missions_dir(self) -> Path:
return get_server_dir(self._server_id) / _MISSIONS_DIR
# ── File operations ──
def list_missions(self) -> list[dict]:
"""
Scan the mpmissions directory and return all .pbo files.
Returns list of dicts:
name: str — filename without extension
filename: str — full filename
size_bytes: int — file size
"""
missions_dir = self._missions_dir()
if not missions_dir.exists():
return []
missions = []
try:
for entry in missions_dir.iterdir():
if entry.is_file() and entry.suffix.lower() == _ALLOWED_EXTENSION:
missions.append({
"name": entry.stem,
"filename": entry.name,
"size_bytes": entry.stat().st_size,
})
except OSError as exc:
raise AdapterError(f"Cannot list missions: {exc}") from exc
missions.sort(key=lambda m: m["filename"].lower())
return missions
def upload_mission(self, filename: str, content: bytes) -> dict:
"""
Save a mission file to the mpmissions directory.
Args:
filename: Original filename from the upload (will be sanitized).
content: Raw file bytes.
Returns the saved mission dict.
"""
safe_name = sanitize_filename(filename)
if not safe_name.lower().endswith(_ALLOWED_EXTENSION):
raise AdapterError(
f"Invalid mission file extension. Only {_ALLOWED_EXTENSION} files are allowed."
)
size_mb = len(content) / (1024 * 1024)
if size_mb > _MAX_MISSION_SIZE_MB:
raise AdapterError(
f"Mission file too large ({size_mb:.1f} MB). Max is {_MAX_MISSION_SIZE_MB} MB."
)
missions_dir = self._missions_dir()
missions_dir.mkdir(parents=True, exist_ok=True)
dest_path = missions_dir / safe_name
# Atomic write: write to .tmp first, then replace
tmp_path = str(dest_path) + ".tmp"
try:
with open(tmp_path, "wb") as f:
f.write(content)
os.replace(tmp_path, str(dest_path))
except OSError as exc:
safe_delete_file(Path(tmp_path))
raise AdapterError(f"Cannot save mission file: {exc}") from exc
logger.info(
"Mission uploaded for server %d: %s (%d bytes)",
self._server_id, safe_name, len(content),
)
return {
"name": dest_path.stem,
"filename": safe_name,
"size_bytes": len(content),
}
def delete_mission(self, filename: str) -> bool:
"""
Delete a mission file.
Returns True if deleted, False if not found.
"""
safe_name = sanitize_filename(filename)
if not safe_name.lower().endswith(_ALLOWED_EXTENSION):
raise AdapterError("Invalid mission filename")
dest_path = self._missions_dir() / safe_name
# Verify resolved path is inside missions directory (path traversal guard)
try:
dest_path.resolve().relative_to(self._missions_dir().resolve())
except ValueError:
raise AdapterError("Path traversal detected in filename")
if not dest_path.exists():
return False
try:
dest_path.unlink()
logger.info("Mission deleted for server %d: %s", self._server_id, safe_name)
return True
except OSError as exc:
raise AdapterError(f"Cannot delete mission: {exc}") from exc
# ── Mission rotation config ──
def parse_mission_filename(self, filename: str) -> dict:
"""
Parse Arma 3 mission filename.
Format: MissionName.Terrain.pbo
"""
name = filename
if name.endswith(self.file_extension):
name = name[: -len(self.file_extension)]
parts = name.rsplit(".", 1)
if len(parts) == 2:
return {
"mission_name": parts[0],
"terrain": parts[1],
"filename": filename,
}
return {
"mission_name": name,
"terrain": "",
"filename": filename,
}
def get_rotation_config(self, rotation_entries: list[dict]) -> str:
"""
Generate Arma 3 mission rotation config block.
rotation_entries: list of {mission_name, terrain, difficulty, params_json}
"""
if not rotation_entries:
return ""
lines = ['class Missions {']
for i, entry in enumerate(rotation_entries):
mission = entry.get("mission_name", "")
terrain = entry.get("terrain", "")
difficulty = entry.get("difficulty", "Regular")
params = entry.get("params_json", "{}")
lines.append(f' class Mission_{i} {{')
lines.append(f' template = "{mission}.{terrain}";')
lines.append(f' difficulty = "{difficulty}";')
if params and params != "{}":
lines.append(f' params = {params};')
lines.append(' };')
lines.append('};')
return "\n".join(lines)
def get_missions_dir(self, server_dir: Path) -> Path:
return server_dir / _MISSIONS_DIR
def get_mission_data_schema(self) -> type[BaseModel] | None:
return Arma3MissionData

View File

@@ -0,0 +1,165 @@
"""Arma 3 mod manager — handles mod folder conventions, CLI args, and enable/disable."""
from __future__ import annotations
import logging
import re
from pathlib import Path
from pydantic import BaseModel
from adapters.exceptions import AdapterError
from core.utils.file_utils import get_server_dir
logger = logging.getLogger(__name__)
_MOD_DIR_PATTERN = re.compile(r"^@.+", re.IGNORECASE)
class Arma3ModData(BaseModel):
"""Mod data schema for Arma 3."""
workshop_id: str = ""
is_server_mod: bool = False
class Arma3ModManager:
def __init__(self, server_id: int | None = None) -> None:
self._server_id = server_id
def _server_dir(self) -> Path:
return get_server_dir(self._server_id)
# ── File / DB operations ──
def list_available_mods(self) -> list[dict]:
"""
Scan the server directory for mod folders (directories starting with '@').
Returns list of dicts:
name: str — directory name (e.g. "@CBA_A3")
path: str — absolute directory path
size_bytes: int — total directory size (approximate, non-recursive)
"""
server_dir = self._server_dir()
if not server_dir.exists():
return []
mods = []
try:
for entry in server_dir.iterdir():
if entry.is_dir() and _MOD_DIR_PATTERN.match(entry.name):
try:
size = sum(
f.stat().st_size
for f in entry.iterdir()
if f.is_file()
)
except OSError:
size = 0
mods.append({
"name": entry.name,
"path": str(entry.resolve()),
"size_bytes": size,
})
except OSError as exc:
raise AdapterError(f"Cannot scan mod directory: {exc}") from exc
mods.sort(key=lambda m: m["name"].lower())
return mods
def get_enabled_mods(self, config_repo) -> list[str]:
"""
Get the list of enabled mod names from the database config.
Args:
config_repo: ConfigRepository instance.
Returns list of mod directory names (e.g. ["@CBA_A3", "@ace"]).
"""
mods_section = config_repo.get_section(self._server_id, "mods")
if mods_section is None:
return []
enabled = mods_section.get("enabled_mods", [])
if isinstance(enabled, str):
enabled = [m.strip() for m in enabled.split(",") if m.strip()]
return enabled
def set_enabled_mods(self, mod_names: list[str], config_repo) -> None:
"""
Update the enabled mods list in the database config.
Args:
mod_names: List of mod directory names to enable.
config_repo: ConfigRepository instance.
Raises AdapterError if any mod name doesn't exist on disk.
"""
available = {m["name"] for m in self.list_available_mods()}
for name in mod_names:
if not _MOD_DIR_PATTERN.match(name):
raise AdapterError(f"Invalid mod name '{name}': must start with '@'")
if name not in available:
raise AdapterError(
f"Mod '{name}' not found in server directory. "
f"Available: {sorted(available)}"
)
mods_section = config_repo.get_section(self._server_id, "mods") or {}
current_version = mods_section.get("config_version", 0)
config_repo.upsert_section(
server_id=self._server_id,
section="mods",
data={"enabled_mods": mod_names},
expected_version=current_version,
)
logger.info(
"Updated enabled mods for server %d: %s",
self._server_id, mod_names,
)
# ── CLI argument building ──
def get_mod_folder_pattern(self) -> str:
"""Arma 3 mods use @ prefix for local, or numeric workshop IDs."""
return "@*"
def build_mod_args(self, server_mods: list[dict]) -> list[str]:
"""
Build Arma 3 mod CLI arguments.
Returns -mod and -serverMod argument lists.
"""
client_mods = []
server_only_mods = []
for mod in server_mods:
path = mod.get("folder_path", "")
game_data = mod.get("game_data", {})
if isinstance(game_data, str):
import json
try:
game_data = json.loads(game_data)
except (json.JSONDecodeError, TypeError):
game_data = {}
is_server = game_data.get("is_server_mod", False) if isinstance(game_data, dict) else False
if is_server:
server_only_mods.append(path)
else:
client_mods.append(path)
args = []
if client_mods:
args.append('-mod="' + ";".join(client_mods) + '"')
if server_only_mods:
args.append('-serverMod="' + ";".join(server_only_mods) + '"')
return args
def validate_mod_folder(self, path: Path) -> bool:
"""Validate that a path looks like a valid Arma 3 mod folder."""
if not path.exists() or not path.is_dir():
return False
return (path / "addons").exists() or (path / "$PREFIX$").exists()
def get_mod_data_schema(self) -> type[BaseModel] | None:
return Arma3ModData

View File

@@ -0,0 +1,30 @@
"""Arma 3 process configuration: executables, ports, directory layout."""
class Arma3ProcessConfig:
def get_allowed_executables(self) -> list[str]:
return ["arma3server_x64.exe", "arma3server.exe"]
def get_port_conventions(self, game_port: int) -> dict[str, int]:
"""
Arma 3 derives 3 additional ports from the game port.
All 4 must be free when starting a server.
rcon_port is separate (user-configurable, not auto-derived here).
"""
return {
"game": game_port,
"steam_query": game_port + 1,
"von": game_port + 2,
"steam_auth": game_port + 3,
}
def get_default_game_port(self) -> int:
return 2302
def get_default_rcon_port(self, game_port: int) -> int | None:
return game_port + 4 # e.g. 2306 for default game port
def get_server_dir_layout(self) -> list[str]:
"""Subdirectories to create inside servers/{id}/."""
return ["server", "battleye", "mpmissions"]

View File

@@ -0,0 +1,278 @@
"""
BERConClient — BattlEye RCon UDP client for Arma3.
Implements the BattlEye RCon protocol version 2.
Reference: https://www.battleye.com/downloads/BERConProtocol.txt
Thread safety: This client is NOT thread-safe by itself.
The RemoteAdminPollerThread serializes all calls through a single thread.
For the send_command() called from HTTP request handlers, use a threading.Lock.
"""
from __future__ import annotations
import logging
import socket
import struct
import threading
import time
import zlib
logger = logging.getLogger(__name__)
_SOCKET_TIMEOUT = 5.0
_LOGIN_TIMEOUT = 5.0
_RESPONSE_TIMEOUT = 5.0
_MAX_RESPONSE_PARTS = 10
_KEEPALIVE_INTERVAL = 30.0
class BERConClient:
"""
BattlEye RCon UDP client.
Usage:
client = BERConClient(host="127.0.0.1", port=2302, password="secret")
client.connect() # raises ConnectionError on failure
players = client.get_players()
client.send_command("say -1 Hello")
client.disconnect()
"""
def __init__(self, host: str, port: int, password: str) -> None:
self._host = host
self._port = port
self._password = password
self._sock: socket.socket | None = None
self._seq = 0
self._connected = False
self._lock = threading.Lock()
self._last_activity = 0.0
# ── Public API ──
def connect(self) -> None:
"""Open UDP socket and perform BattlEye login handshake."""
with self._lock:
if self._connected:
return
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.settimeout(_SOCKET_TIMEOUT)
self._sock.connect((self._host, self._port))
login_payload = self._password.encode("ascii", errors="replace")
packet = self._build_packet(0x00, login_payload)
self._sock.send(packet)
self._last_activity = time.monotonic()
deadline = time.monotonic() + _LOGIN_TIMEOUT
while time.monotonic() < deadline:
try:
data = self._sock.recv(4096)
except socket.timeout:
break
if not self._verify_checksum(data):
continue
if len(data) >= 9 and data[7] == 0x00:
if data[8] == 0x01:
self._connected = True
self._seq = 0
logger.info("BERConClient: logged in to %s:%d", self._host, self._port)
return
else:
self._sock.close()
self._sock = None
raise ConnectionError(
f"BattlEye login rejected at {self._host}:{self._port}"
)
self._sock.close()
self._sock = None
raise ConnectionError(
f"BattlEye login timed out at {self._host}:{self._port}"
)
def disconnect(self) -> None:
with self._lock:
self._connected = False
if self._sock is not None:
try:
self._sock.close()
except OSError as exc:
logger.debug("BERConClient: error closing socket during disconnect: %s", exc)
self._sock = None
@property
def is_connected(self) -> bool:
return self._connected
def send_command(self, command: str) -> str:
"""Send a BattlEye command and return the response string."""
with self._lock:
if not self._connected or self._sock is None:
raise ConnectionError("BERConClient: not connected")
return self._send_command_locked(command)
def get_players(self) -> list[dict]:
"""Send 'players' command and parse the response."""
response = self.send_command("players")
return self._parse_players(response)
def keepalive(self) -> None:
"""Send a keepalive packet if the connection has been idle."""
if not self._connected:
return
elapsed = time.monotonic() - self._last_activity
if elapsed >= _KEEPALIVE_INTERVAL:
try:
self.send_command("")
except Exception as exc:
logger.debug("BERConClient: keepalive failed: %s", exc)
# ── Packet building ──
def _build_packet(self, pkt_type: int, payload: bytes) -> bytes:
"""Build a BattlEye packet: 'B' 'E' <crc32 LE> 0xFF <type> <payload>"""
body = bytes([0xFF, pkt_type]) + payload
crc = zlib.crc32(body) & 0xFFFFFFFF
crc_bytes = struct.pack("<I", crc)
return b"BE" + crc_bytes + body
def _build_command_packet(self, seq: int, command: str) -> bytes:
payload = bytes([seq]) + command.encode("ascii", errors="replace")
return self._build_packet(0x01, payload)
def _build_ack_packet(self, seq: int) -> bytes:
return self._build_packet(0x02, bytes([seq]))
def _verify_checksum(self, data: bytes) -> bool:
"""Verify the CRC32 checksum in the received packet."""
if len(data) < 8:
return False
if data[0:2] != b"BE":
return False
stored_crc = struct.unpack("<I", data[2:6])[0]
body = data[6:]
computed_crc = zlib.crc32(body) & 0xFFFFFFFF
return stored_crc == computed_crc
# ── Command send (must be called with self._lock held) ──
def _send_command_locked(self, command: str) -> str:
seq = self._seq
self._seq = (self._seq + 1) % 256
packet = self._build_command_packet(seq, command)
self._sock.send(packet)
self._last_activity = time.monotonic()
parts: dict[int, str] = {}
total_parts: int | None = None
deadline = time.monotonic() + _RESPONSE_TIMEOUT
while time.monotonic() < deadline:
try:
data = self._sock.recv(65535)
except socket.timeout:
break
if not self._verify_checksum(data):
continue
if len(data) < 9:
continue
pkt_type = data[7]
# Server message — acknowledge and ignore
if pkt_type == 0x02:
srv_seq = data[8]
ack = self._build_ack_packet(srv_seq)
try:
self._sock.send(ack)
except OSError as exc:
logger.debug("BERConClient: failed to send ack for server message %d: %s", srv_seq, exc)
continue
# Command response
if pkt_type == 0x01:
resp_seq = data[8]
if resp_seq != seq:
continue
payload = data[9:]
# Check if multi-part
if len(payload) >= 3 and payload[0] == 0x00:
total_parts = payload[1]
part_index = payload[2]
part_text = payload[3:].decode("utf-8", errors="replace")
parts[part_index] = part_text
if len(parts) == total_parts:
break
else:
# Single-part response
return payload.decode("utf-8", errors="replace")
if total_parts is not None and parts:
return "".join(parts[i] for i in sorted(parts.keys()))
return ""
# ── Player parsing ──
def _parse_players(self, response: str) -> list[dict]:
"""Parse the 'players' command response."""
players = []
lines = response.split("\n")
for line in lines:
line = line.strip()
if not line:
continue
if line.startswith("Players on") or line.startswith("-") or line.startswith("("):
continue
parts = line.split(None, 4)
if len(parts) < 4:
continue
try:
number = int(parts[0])
except ValueError:
continue
ip_port = parts[1]
ping_str = parts[2]
guid_part = parts[3]
name = parts[4].strip() if len(parts) > 4 else ""
ip = ip_port
port = 0
if ":" in ip_port:
ip, port_str = ip_port.rsplit(":", 1)
try:
port = int(port_str)
except ValueError:
port = 0
try:
ping = int(ping_str)
except ValueError:
ping = 0
uid = guid_part.split("(")[0]
is_admin = "(Admin)" in name
name = name.replace("(Admin)", "").strip()
players.append({
"number": number,
"uid": uid,
"name": name,
"ip": ip,
"port": port,
"ping": ping,
"is_admin": is_admin,
"slot_id": number,
})
return players

View File

@@ -0,0 +1,142 @@
"""Arma 3 RCon service — remote admin via BattleEye RCon protocol."""
from __future__ import annotations
import socket
import logging
import struct
from typing import Any
from pydantic import BaseModel
logger = logging.getLogger(__name__)
class Arma3PlayerData(BaseModel):
"""Player data schema for Arma 3."""
name: str
ping: int = 0
guid: str = ""
class Arma3RConClient:
"""BattleEye RCon client for a single connection."""
def __init__(self, host: str, port: int, password: str):
self._host = host
self._port = port
self._password = password
self._sock: socket.socket | None = None
def _connect(self) -> None:
if self._sock is not None:
return
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.settimeout(5.0)
self._sock.connect((self._host, self._port))
# Login sequence
self._login()
def _login(self) -> None:
if self._sock is None:
raise ConnectionError("Not connected")
# BE RCon login: send password with checksum
password_bytes = self._password.encode("utf-8")
checksum = self._compute_checksum(password_bytes)
packet = b"\xff" + bytes([0, len(password_bytes) & 0xff]) + checksum + password_bytes
self._sock.send(packet)
response = self._sock.recv(4096)
if not response or response[0] != 0xff:
raise ConnectionError("RCon login failed")
@staticmethod
def _compute_checksum(data: bytes) -> bytes:
"""Compute BE RCon checksum (sum of bytes) & 0xFF."""
return bytes([sum(data) & 0xFF])
def send_command(self, command: str, timeout: float = 5.0) -> str | None:
try:
self._connect()
if self._sock is None:
return None
self._sock.settimeout(timeout)
cmd_bytes = command.encode("utf-8")
checksum = self._compute_checksum(cmd_bytes)
packet = b"\xff\x01" + bytes([len(cmd_bytes) & 0xff]) + checksum + cmd_bytes
self._sock.send(packet)
response = self._sock.recv(4096)
if response and len(response) > 2:
return response[2:].decode("utf-8", errors="replace")
return None
except Exception as e:
logger.error("RCon command error: %s", e)
return None
def get_players(self) -> list[dict]:
result = self.send_command("players")
if result is None:
return []
# Parse player list from BE RCon response
players = []
for line in result.split("\n"):
line = line.strip()
if not line or line.startswith("(") or line.startswith("total"):
continue
parts = line.split(maxsplit=4)
if len(parts) >= 5:
players.append({
"slot_id": parts[0],
"name": parts[3] if len(parts) > 3 else "",
"guid": parts[2] if len(parts) > 2 else "",
"ping": int(parts[1]) if parts[1].isdigit() else 0,
})
return players
def kick_player(self, identifier: str, reason: str = "") -> bool:
cmd = f"kick {identifier}"
if reason:
cmd += f" {reason}"
result = self.send_command(cmd)
return result is not None
def ban_player(self, identifier: str, duration_minutes: int, reason: str) -> bool:
cmd = f"ban {identifier} {duration_minutes} {reason}"
result = self.send_command(cmd)
return result is not None
def say_all(self, message: str) -> bool:
result = self.send_command(f"say {message}")
return result is not None
def shutdown(self) -> bool:
result = self.send_command("#shutdown")
return result is not None
def keepalive(self) -> None:
try:
self.send_command("")
except Exception as exc:
logger.debug("Arma3RConClient: keepalive failed: %s", exc)
def disconnect(self) -> None:
if self._sock:
try:
self._sock.close()
except Exception as exc:
logger.debug("Arma3RConClient: error closing socket: %s", exc)
self._sock = None
class Arma3RConService:
"""Factory for Arma 3 RCon clients."""
def create_client(self, host: str, port: int, password: str) -> Arma3RConClient:
return Arma3RConClient(host, port, password)
def get_startup_delay(self) -> float:
return 30.0
def get_poll_interval(self) -> float:
return 10.0
def get_player_data_schema(self) -> type[BaseModel] | None:
return Arma3PlayerData

View File

@@ -0,0 +1,135 @@
"""
Arma3RemoteAdmin — implements the RemoteAdmin protocol using BERConClient.
"""
from __future__ import annotations
import logging
from adapters.arma3.rcon_client import BERConClient
from adapters.exceptions import RemoteAdminError
logger = logging.getLogger(__name__)
class Arma3RemoteAdmin:
"""
RemoteAdmin protocol implementation for Arma3 BattlEye RCon.
Args:
server_id: Database server ID.
host: RCon host (usually 127.0.0.1).
port: RCon port (usually game_port + 3).
password: RCon password.
"""
def __init__(
self,
server_id: int,
host: str,
port: int,
password: str,
) -> None:
self._server_id = server_id
self._client = BERConClient(host=host, port=port, password=password)
# ── RemoteAdmin protocol ──
def connect(self) -> None:
"""Connect to RCon. Raises RemoteAdminError on failure."""
try:
self._client.connect()
except ConnectionError as exc:
raise RemoteAdminError(str(exc)) from exc
def disconnect(self) -> None:
self._client.disconnect()
def is_connected(self) -> bool:
return self._client.is_connected
def get_players(self) -> list[dict]:
"""Fetch current player list."""
try:
return self._client.get_players()
except Exception as exc:
raise RemoteAdminError(f"get_players failed: {exc}") from exc
def send_command(self, command: str, timeout: float = 5.0) -> str | None:
"""Send an arbitrary RCon command."""
try:
return self._client.send_command(command)
except Exception as exc:
raise RemoteAdminError(f"send_command failed: {exc}") from exc
def kick_player(self, player_number: int, reason: str = "") -> bool:
"""Kick a player by their in-game slot number."""
command = f"kick {player_number}"
if reason:
command += f" {reason}"
try:
self._client.send_command(command)
return True
except Exception as exc:
logger.warning("[%s] kick_player failed for player %d: %s", self._server_id, player_number, exc)
return False
def ban_player(self, player_uid: str, duration_minutes: int = 0, reason: str = "") -> bool:
"""Add a GUID ban. duration_minutes=0 means permanent."""
duration = duration_minutes if duration_minutes > 0 else 0
command = f"addBan {player_uid} {duration} {reason}"
try:
self._client.send_command(command)
return True
except Exception as exc:
logger.warning("[%s] ban_player failed: %s", self._server_id, exc)
return False
def say_all(self, message: str) -> bool:
"""Broadcast a message to all players."""
try:
self._client.send_command(f"say -1 {message}")
return True
except Exception as exc:
logger.warning("[%s] say_all failed: %s", self._server_id, exc)
return False
def shutdown(self) -> bool:
"""Shutdown the game server via RCon."""
try:
self._client.send_command("#shutdown")
return True
except Exception as exc:
logger.warning("[%s] shutdown failed: %s", self._server_id, exc)
return False
def keepalive(self) -> None:
"""Send keepalive if idle."""
self._client.keepalive()
class Arma3RemoteAdminFactory:
"""
RemoteAdmin factory for Arma3.
Implements the RemoteAdmin protocol (create_client, get_startup_delay, etc.).
"""
def create_client(self, host: str, port: int, password: str) -> Arma3RemoteAdmin:
"""Create a new Arma3RemoteAdmin client instance."""
return Arma3RemoteAdmin(
server_id=0, # Will be set by caller
host=host,
port=port,
password=password,
)
def get_startup_delay(self) -> float:
"""Seconds to wait after server start before connecting."""
return 30.0
def get_poll_interval(self) -> float:
"""Seconds between player list polls."""
return 10.0
def get_player_data_schema(self):
"""Pydantic model for players.game_data JSON."""
return None

View File

@@ -0,0 +1,53 @@
"""Typed adapter exceptions. Core catches these specifically."""
class AdapterError(Exception):
"""Base for all adapter errors."""
pass
class ConfigWriteError(AdapterError):
"""Atomic file write failed. Temp files are already cleaned up."""
def __init__(self, path: str, detail: str):
self.path = path
self.detail = detail
super().__init__(f"Config write failed at {path}: {detail}")
class ConfigValidationError(AdapterError):
"""Adapter Pydantic model rejected the config values."""
def __init__(self, section: str, errors: list[dict]):
self.section = section
self.errors = errors
super().__init__(f"Config validation failed for section '{section}': {errors}")
class ConfigMigrationError(AdapterError):
"""migrate_config() could not transform old schema. Core keeps original."""
def __init__(self, from_version: str, detail: str):
self.from_version = from_version
self.detail = detail
super().__init__(f"Config migration from {from_version} failed: {detail}")
class LaunchArgsError(AdapterError):
"""build_launch_args() failed (missing mod path, bad config value)."""
def __init__(self, detail: str):
self.detail = detail
super().__init__(f"Launch args error: {detail}")
class RemoteAdminError(AdapterError):
"""Remote admin connection or command failed."""
def __init__(self, detail: str, recoverable: bool = True):
self.detail = detail
self.recoverable = recoverable
super().__init__(f"Remote admin error: {detail}")
class ExeNotAllowedError(AdapterError):
"""Executable not in adapter allowlist."""
def __init__(self, exe: str, allowed: list[str]):
self.exe = exe
self.allowed = allowed
super().__init__(f"Executable '{exe}' not allowed. Allowed: {allowed}")

View File

@@ -0,0 +1,238 @@
"""
All adapter capability Protocol definitions.
Core code only imports from here — never from adapter internals.
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Callable, Protocol, runtime_checkable
from pydantic import BaseModel
@runtime_checkable
class ConfigGenerator(Protocol):
"""
Merged protocol: config schema definition + file generation + launch args.
Always implement all methods. Return empty dict/list where not applicable.
"""
game_type: str
def get_sections(self) -> dict[str, type[BaseModel]]:
"""Return {section_name: PydanticModelClass} for all config sections."""
...
def get_defaults(self, section: str) -> dict[str, Any]:
"""Return default values dict for the given section."""
...
def get_sensitive_fields(self, section: str) -> list[str]:
"""
Return JSON keys in this section that need Fernet encryption.
Core's ConfigRepository encrypts/decrypts these transparently.
Example: ["password", "password_admin"] for section "server".
"""
...
def get_config_version(self) -> str:
"""
Current adapter schema version string (e.g. "1.0.0").
Stored in game_configs.schema_version.
When this changes, core calls migrate_config() automatically.
"""
...
def migrate_config(
self, old_version: str, config_json: dict[str, dict]
) -> dict[str, dict]:
"""
Transform config JSON from old_version to current version.
Called by ConfigRepository when stored schema_version differs.
Returns migrated config dict.
Raises ConfigMigrationError on failure — core keeps original.
"""
...
def write_configs(
self,
server_id: int,
server_dir: Path,
config_sections: dict[str, dict],
) -> list[Path]:
"""
Write all config files to disk using atomic write pattern:
1. Write to .tmp files
2. os.replace() each .tmp to final path
3. On any failure: clean up .tmp files, raise ConfigWriteError
Returns list of written file paths.
"""
...
def build_launch_args(
self,
config_sections: dict[str, dict],
mod_args: list[str] | None = None,
) -> list[str]:
"""
Return full CLI argument list for the game executable.
Raises LaunchArgsError if required values are missing/invalid.
"""
...
def preview_config(
self,
server_id: int,
server_dir: Path,
config_sections: dict[str, dict],
) -> dict[str, str]:
"""
Render config files as strings WITHOUT writing to disk.
Returns {label: content}.
Label = filename for file-based games, var name for env-var games.
"""
...
@runtime_checkable
class RemoteAdminClient(Protocol):
"""A connected client instance. Not required to be thread-safe — core wraps calls."""
def send_command(self, command: str, timeout: float = 5.0) -> str | None: ...
def get_players(self) -> list[dict]: ...
def kick_player(self, identifier: str, reason: str = "") -> bool: ...
def ban_player(self, identifier: str, duration_minutes: int, reason: str) -> bool: ...
def say_all(self, message: str) -> bool: ...
def shutdown(self) -> bool: ...
def keepalive(self) -> None: ...
def disconnect(self) -> None: ...
@runtime_checkable
class RemoteAdmin(Protocol):
"""Factory for remote admin clients. One per adapter, creates clients on demand."""
def create_client(self, host: str, port: int, password: str) -> RemoteAdminClient: ...
def get_startup_delay(self) -> float:
"""Seconds to wait after server start before connecting. Default: 30."""
...
def get_poll_interval(self) -> float:
"""Seconds between player list polls. Default: 10."""
...
def get_player_data_schema(self) -> type[BaseModel] | None:
"""Pydantic model for players.game_data JSON. None = no validation."""
...
@runtime_checkable
class LogParser(Protocol):
"""Parses game-specific log lines into standard format."""
def parse_line(self, line: str) -> dict | None:
"""
Parse one log line.
Returns: {"timestamp": ISO str, "level": "info"|"warning"|"error", "message": str}
Returns None to skip the line (e.g. blank lines, binary garbage).
"""
...
def get_log_file_resolver(self, server_id: int) -> Callable[[Path], Path | None]:
"""
Return a callable(server_dir: Path) -> Path | None.
Called by LogTailThread to find the current log file.
Return None if log file not yet created.
"""
...
@runtime_checkable
class MissionManager(Protocol):
"""Handles mission/scenario file format and rotation."""
file_extension: str # e.g. ".pbo"
def parse_mission_filename(self, filename: str) -> dict: ...
def get_rotation_config(self, rotation_entries: list[dict]) -> str: ...
def get_missions_dir(self, server_dir: Path) -> Path: ...
def get_mission_data_schema(self) -> type[BaseModel] | None:
"""Pydantic model for missions.game_data. None = no validation."""
...
@runtime_checkable
class ModManager(Protocol):
"""Handles mod folder conventions and CLI argument building."""
def get_mod_folder_pattern(self) -> str: ...
def build_mod_args(self, server_mods: list[dict]) -> list[str]: ...
def validate_mod_folder(self, path: Path) -> bool: ...
def get_mod_data_schema(self) -> type[BaseModel] | None:
"""Pydantic model for mods.game_data. None = no validation."""
...
@runtime_checkable
class ProcessConfig(Protocol):
"""Game-specific process and directory conventions."""
def get_allowed_executables(self) -> list[str]: ...
def get_port_conventions(self, game_port: int) -> dict[str, int]: ...
def get_default_game_port(self) -> int: ...
def get_default_rcon_port(self, game_port: int) -> int | None: ...
def get_server_dir_layout(self) -> list[str]: ...
@runtime_checkable
class BanManager(Protocol):
"""Bidirectional sync between DB bans and game ban file."""
def get_ban_file_path(self, server_dir: Path) -> Path: ...
def sync_bans_to_file(self, bans: list[dict], ban_file: Path) -> None: ...
def read_bans_from_file(self, ban_file: Path) -> list[dict]: ...
def get_ban_data_schema(self) -> type[BaseModel] | None:
"""Pydantic model for bans.game_data. None = no validation."""
...
@runtime_checkable
class GameAdapter(Protocol):
"""
Composite adapter. Every game must implement this.
Optional capabilities return None — core degrades gracefully.
Use has_capability(name) instead of None checks throughout.
"""
game_type: str # e.g. "arma3"
display_name: str # e.g. "Arma 3"
version: str # e.g. "1.0.0"
def get_config_generator(self) -> ConfigGenerator: ...
def get_process_config(self) -> ProcessConfig: ...
def get_log_parser(self) -> LogParser: ...
def get_remote_admin(self) -> RemoteAdmin | None: ...
def get_mission_manager(self) -> MissionManager | None: ...
def get_mod_manager(self) -> ModManager | None: ...
def get_ban_manager(self) -> BanManager | None: ...
def has_capability(self, name: str) -> bool:
"""
Explicit capability probe. Use this instead of:
if adapter.get_remote_admin() is not None:
Use this instead:
if adapter.has_capability("remote_admin"):
Valid names: "config_generator", "process_config", "log_parser",
"remote_admin", "mission_manager", "mod_manager", "ban_manager"
"""
...
def get_additional_routers(self) -> list:
"""List of FastAPI APIRouter instances for game-specific routes."""
...
def get_custom_thread_factories(self) -> list[Callable]:
"""List of callables(server_id, db) -> BaseServerThread for extra threads."""
...

View File

@@ -0,0 +1,66 @@
"""
GameAdapterRegistry — singleton that holds all registered game adapters.
Adapters register themselves at import time.
"""
from __future__ import annotations
import logging
logger = logging.getLogger(__name__)
class GameAdapterRegistry:
_adapters: dict[str, object] = {} # game_type -> GameAdapter
@classmethod
def register(cls, adapter) -> None:
"""Register a game adapter. Called at import time by each adapter package."""
if adapter.game_type in cls._adapters:
logger.warning(
"Adapter for '%s' already registered. Overwriting.", adapter.game_type
)
cls._adapters[adapter.game_type] = adapter
logger.info("Registered game adapter: %s (%s)", adapter.game_type, adapter.display_name)
@classmethod
def get(cls, game_type: str):
"""
Get adapter by game_type. Raises KeyError if not registered.
Core code calls this whenever game-specific behavior is needed.
"""
adapter = cls._adapters.get(game_type)
if adapter is None:
raise KeyError(
f"No adapter registered for game type '{game_type}'. "
f"Available: {list(cls._adapters.keys())}"
)
return adapter
@classmethod
def all(cls) -> list:
"""Return all registered adapters."""
return list(cls._adapters.values())
@classmethod
def list_game_types(cls) -> list[dict]:
"""Return metadata list for API /games endpoint."""
result = []
for adapter in cls._adapters.values():
caps = []
for cap in [
"config_generator", "process_config", "log_parser",
"remote_admin", "mission_manager", "mod_manager", "ban_manager",
]:
if adapter.has_capability(cap):
caps.append(cap)
result.append({
"game_type": adapter.game_type,
"display_name": adapter.display_name,
"version": adapter.version,
"capabilities": caps,
})
return result
@classmethod
def is_registered(cls, game_type: str) -> bool:
return game_type in cls._adapters