"""Arma 3 mod manager — handles mod folder conventions, CLI args, and enable/disable.""" from __future__ import annotations import logging import re from pathlib import Path from pydantic import BaseModel from adapters.exceptions import AdapterError from core.utils.file_utils import get_server_dir logger = logging.getLogger(__name__) _MOD_DIR_PATTERN = re.compile(r"^@.+", re.IGNORECASE) def _parse_mod_cpp(mod_dir: Path) -> str | None: mod_cpp = mod_dir / "mod.cpp" if not mod_cpp.exists(): return None text = mod_cpp.read_text(errors="ignore") m = re.search(r'name\s*=\s*"([^"]+)"', text, re.IGNORECASE) return m.group(1) if m else None def _parse_meta_cpp(mod_dir: Path) -> str | None: meta_cpp = mod_dir / "meta.cpp" if not meta_cpp.exists(): return None text = meta_cpp.read_text(errors="ignore") m = re.search(r'publishedid\s*=\s*(\d+)', text, re.IGNORECASE) return m.group(1) if m else None class Arma3ModData(BaseModel): """Mod data schema for Arma 3.""" workshop_id: str = "" is_server_mod: bool = False class Arma3ModManager: def __init__(self, server_id: int | None = None) -> None: self._server_id = server_id def _server_dir(self) -> Path: return get_server_dir(self._server_id) # ── File / DB operations ── def list_available_mods(self) -> list[dict]: """ Scan the server directory for mod folders (directories starting with '@'). Returns list of dicts: name: str — directory name (e.g. "@CBA_A3") path: str — absolute directory path size_bytes: int — total directory size (approximate, non-recursive) """ server_dir = self._server_dir() if not server_dir.exists(): return [] mods = [] try: for entry in server_dir.iterdir(): if entry.is_dir() and _MOD_DIR_PATTERN.match(entry.name): try: size = sum( f.stat().st_size for f in entry.iterdir() if f.is_file() ) except OSError: size = 0 mods.append({ "name": entry.name, "path": str(entry.resolve()), "size_bytes": size, "display_name": _parse_mod_cpp(entry), "workshop_id": _parse_meta_cpp(entry), }) except OSError as exc: raise AdapterError(f"Cannot scan mod directory: {exc}") from exc mods.sort(key=lambda m: m["name"].lower()) return mods def get_enabled_mods(self, config_repo) -> list[str]: """ Get the list of enabled mod names from the database config. Args: config_repo: ConfigRepository instance. Returns list of mod directory names (e.g. ["@CBA_A3", "@ace"]). """ mods_section = config_repo.get_section(self._server_id, "mods") if mods_section is None: return [] enabled = mods_section.get("enabled_mods", []) if isinstance(enabled, str): enabled = [m.strip() for m in enabled.split(",") if m.strip()] return enabled def set_enabled_mods(self, mod_names: list[str], config_repo) -> None: """ Update the enabled mods list in the database config. Args: mod_names: List of mod directory names to enable. config_repo: ConfigRepository instance. Raises AdapterError if any mod name doesn't exist on disk. """ available = {m["name"] for m in self.list_available_mods()} for name in mod_names: if not _MOD_DIR_PATTERN.match(name): raise AdapterError(f"Invalid mod name '{name}': must start with '@'") if name not in available: raise AdapterError( f"Mod '{name}' not found in server directory. " f"Available: {sorted(available)}" ) mods_section = config_repo.get_section(self._server_id, "mods") or {} current_version = mods_section.get("config_version", 0) config_repo.upsert_section( server_id=self._server_id, section="mods", data={"enabled_mods": mod_names}, expected_version=current_version, ) logger.info( "Updated enabled mods for server %d: %s", self._server_id, mod_names, ) # ── CLI argument building ── def get_mod_folder_pattern(self) -> str: """Arma 3 mods use @ prefix for local, or numeric workshop IDs.""" return "@*" def build_mod_args(self, server_mods: list[dict]) -> list[str]: """ Build Arma 3 mod CLI arguments. Returns -mod and -serverMod argument lists. """ client_mods = [] server_only_mods = [] for mod in server_mods: path = mod.get("folder_path", "") game_data = mod.get("game_data", {}) if isinstance(game_data, str): import json try: game_data = json.loads(game_data) except (json.JSONDecodeError, TypeError): game_data = {} is_server = game_data.get("is_server_mod", False) if isinstance(game_data, dict) else False if is_server: server_only_mods.append(path) else: client_mods.append(path) args = [] if client_mods: args.append('-mod="' + ";".join(client_mods) + '"') if server_only_mods: args.append('-serverMod="' + ";".join(server_only_mods) + '"') return args def validate_mod_folder(self, path: Path) -> bool: """Validate that a path looks like a valid Arma 3 mod folder.""" if not path.exists() or not path.is_dir(): return False return (path / "addons").exists() or (path / "$PREFIX$").exists() def get_mod_data_schema(self) -> type[BaseModel] | None: return Arma3ModData