Files
languard-servers-manager/backend/adapters/arma3/mod_manager.py
Tran G. (Revernomad) Khoa 5a62d21def feat: implement phases 3-5 of Arma 3 UX enhancement plan
Phase 3 - Mod display names + split-pane selector:
- Parse mod.cpp/meta.cpp for display_name and workshop_id
- Rewrite ModList as two-pane available/selected interface

Phase 4 - Player kick/ban from Players tab:
- Add get_by_slot() to PlayerRepository
- Add get_rcon_client() class method to ThreadRegistry
- Add /players/{slot_id}/kick and /ban endpoints
- Rewrite PlayerTable with kick/ban modals and ban presets

Phase 5 - Historical log file browser:
- Add list_log_files() and get_log_file_path() to RPTParser
- Add logfiles_router with GET/download/DELETE endpoints
- Update LogViewer with collapsible log files section (download + delete)
2026-04-17 20:47:37 +07:00

185 lines
6.2 KiB
Python

"""Arma 3 mod manager — handles mod folder conventions, CLI args, and enable/disable."""
from __future__ import annotations
import logging
import re
from pathlib import Path
from pydantic import BaseModel
from adapters.exceptions import AdapterError
from core.utils.file_utils import get_server_dir
logger = logging.getLogger(__name__)
_MOD_DIR_PATTERN = re.compile(r"^@.+", re.IGNORECASE)
def _parse_mod_cpp(mod_dir: Path) -> str | None:
mod_cpp = mod_dir / "mod.cpp"
if not mod_cpp.exists():
return None
text = mod_cpp.read_text(errors="ignore")
m = re.search(r'name\s*=\s*"([^"]+)"', text, re.IGNORECASE)
return m.group(1) if m else None
def _parse_meta_cpp(mod_dir: Path) -> str | None:
meta_cpp = mod_dir / "meta.cpp"
if not meta_cpp.exists():
return None
text = meta_cpp.read_text(errors="ignore")
m = re.search(r'publishedid\s*=\s*(\d+)', text, re.IGNORECASE)
return m.group(1) if m else None
class Arma3ModData(BaseModel):
"""Mod data schema for Arma 3."""
workshop_id: str = ""
is_server_mod: bool = False
class Arma3ModManager:
def __init__(self, server_id: int | None = None) -> None:
self._server_id = server_id
def _server_dir(self) -> Path:
return get_server_dir(self._server_id)
# ── File / DB operations ──
def list_available_mods(self) -> list[dict]:
"""
Scan the server directory for mod folders (directories starting with '@').
Returns list of dicts:
name: str — directory name (e.g. "@CBA_A3")
path: str — absolute directory path
size_bytes: int — total directory size (approximate, non-recursive)
"""
server_dir = self._server_dir()
if not server_dir.exists():
return []
mods = []
try:
for entry in server_dir.iterdir():
if entry.is_dir() and _MOD_DIR_PATTERN.match(entry.name):
try:
size = sum(
f.stat().st_size
for f in entry.iterdir()
if f.is_file()
)
except OSError:
size = 0
mods.append({
"name": entry.name,
"path": str(entry.resolve()),
"size_bytes": size,
"display_name": _parse_mod_cpp(entry),
"workshop_id": _parse_meta_cpp(entry),
})
except OSError as exc:
raise AdapterError(f"Cannot scan mod directory: {exc}") from exc
mods.sort(key=lambda m: m["name"].lower())
return mods
def get_enabled_mods(self, config_repo) -> list[str]:
"""
Get the list of enabled mod names from the database config.
Args:
config_repo: ConfigRepository instance.
Returns list of mod directory names (e.g. ["@CBA_A3", "@ace"]).
"""
mods_section = config_repo.get_section(self._server_id, "mods")
if mods_section is None:
return []
enabled = mods_section.get("enabled_mods", [])
if isinstance(enabled, str):
enabled = [m.strip() for m in enabled.split(",") if m.strip()]
return enabled
def set_enabled_mods(self, mod_names: list[str], config_repo) -> None:
"""
Update the enabled mods list in the database config.
Args:
mod_names: List of mod directory names to enable.
config_repo: ConfigRepository instance.
Raises AdapterError if any mod name doesn't exist on disk.
"""
available = {m["name"] for m in self.list_available_mods()}
for name in mod_names:
if not _MOD_DIR_PATTERN.match(name):
raise AdapterError(f"Invalid mod name '{name}': must start with '@'")
if name not in available:
raise AdapterError(
f"Mod '{name}' not found in server directory. "
f"Available: {sorted(available)}"
)
mods_section = config_repo.get_section(self._server_id, "mods") or {}
current_version = mods_section.get("config_version", 0)
config_repo.upsert_section(
server_id=self._server_id,
section="mods",
data={"enabled_mods": mod_names},
expected_version=current_version,
)
logger.info(
"Updated enabled mods for server %d: %s",
self._server_id, mod_names,
)
# ── CLI argument building ──
def get_mod_folder_pattern(self) -> str:
"""Arma 3 mods use @ prefix for local, or numeric workshop IDs."""
return "@*"
def build_mod_args(self, server_mods: list[dict]) -> list[str]:
"""
Build Arma 3 mod CLI arguments.
Returns -mod and -serverMod argument lists.
"""
client_mods = []
server_only_mods = []
for mod in server_mods:
path = mod.get("folder_path", "")
game_data = mod.get("game_data", {})
if isinstance(game_data, str):
import json
try:
game_data = json.loads(game_data)
except (json.JSONDecodeError, TypeError):
game_data = {}
is_server = game_data.get("is_server_mod", False) if isinstance(game_data, dict) else False
if is_server:
server_only_mods.append(path)
else:
client_mods.append(path)
args = []
if client_mods:
args.append('-mod="' + ";".join(client_mods) + '"')
if server_only_mods:
args.append('-serverMod="' + ";".join(server_only_mods) + '"')
return args
def validate_mod_folder(self, path: Path) -> bool:
"""Validate that a path looks like a valid Arma 3 mod folder."""
if not path.exists() or not path.is_dir():
return False
return (path / "addons").exists() or (path / "$PREFIX$").exists()
def get_mod_data_schema(self) -> type[BaseModel] | None:
return Arma3ModData