Mods tab bug fixes:
- mod_manager: fix wrong kwargs in set_enabled_mods, fix scan dir to use
mods/ subdir instead of server root, migrate old string-list format to
dict format on read
- service: replace dead server_mods SQL JOIN with get_enabled_mods()
call through the mod_manager capability; pass is_server_mod to
build_mod_args
- mods_router: accept list[EnabledModEntry] objects (name + is_server_mod)
instead of bare strings
Client/server mod split:
- Mods now stored as list[{"name": str, "is_server_mod": bool}]; old
string-list format auto-migrated on read
- is_server_mod=true routes to -serverMod= arg; false to -mod= arg
- ModList UI: amber Client/Server badge in selected pane; toggle button
in split-pane selector
Directory scaffold:
- process_config: adds "mods" to dir layout; provides get_dir_readme()
with per-directory README.txt content
- file_utils: ensure_server_dirs() gains readme_provider kwarg; writes
README.txt idempotently if absent
- service.create_server: passes readme_provider via hasattr probe
- main.py startup: backfills all existing servers with correct subdirs
and README files (idempotent)
Docs: API.md and FRONTEND.md updated for new mod schema and types
Test __init__.py files added for pytest discovery
193 lines
6.6 KiB
Python
193 lines
6.6 KiB
Python
"""Arma 3 mod manager — handles mod folder conventions, CLI args, and enable/disable."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
|
|
from pydantic import BaseModel
|
|
|
|
from adapters.exceptions import AdapterError
|
|
from core.utils.file_utils import get_server_dir
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_MOD_DIR_PATTERN = re.compile(r"^@.+", re.IGNORECASE)
|
|
|
|
|
|
def _parse_mod_cpp(mod_dir: Path) -> str | None:
|
|
mod_cpp = mod_dir / "mod.cpp"
|
|
if not mod_cpp.exists():
|
|
return None
|
|
text = mod_cpp.read_text(errors="ignore")
|
|
m = re.search(r'name\s*=\s*"([^"]+)"', text, re.IGNORECASE)
|
|
return m.group(1) if m else None
|
|
|
|
|
|
def _parse_meta_cpp(mod_dir: Path) -> str | None:
|
|
meta_cpp = mod_dir / "meta.cpp"
|
|
if not meta_cpp.exists():
|
|
return None
|
|
text = meta_cpp.read_text(errors="ignore")
|
|
m = re.search(r'publishedid\s*=\s*(\d+)', text, re.IGNORECASE)
|
|
return m.group(1) if m else None
|
|
|
|
|
|
class Arma3ModData(BaseModel):
|
|
"""Mod data schema for Arma 3."""
|
|
workshop_id: str = ""
|
|
is_server_mod: bool = False
|
|
|
|
|
|
class Arma3ModManager:
|
|
|
|
def __init__(self, server_id: int | None = None) -> None:
|
|
self._server_id = server_id
|
|
|
|
def _server_dir(self) -> Path:
|
|
return get_server_dir(self._server_id)
|
|
|
|
def _mods_dir(self) -> Path:
|
|
return get_server_dir(self._server_id) / "mods"
|
|
|
|
# ── File / DB operations ──
|
|
|
|
def list_available_mods(self) -> list[dict]:
|
|
"""
|
|
Scan the server's mods/ subdirectory for mod folders (directories starting with '@').
|
|
|
|
Returns list of dicts:
|
|
name: str — directory name (e.g. "@CBA_A3")
|
|
path: str — absolute directory path
|
|
size_bytes: int — total directory size (approximate, non-recursive)
|
|
"""
|
|
mods_dir = self._mods_dir()
|
|
if not mods_dir.exists():
|
|
return []
|
|
|
|
mods = []
|
|
try:
|
|
for entry in mods_dir.iterdir():
|
|
if entry.is_dir() and _MOD_DIR_PATTERN.match(entry.name):
|
|
try:
|
|
size = sum(
|
|
f.stat().st_size
|
|
for f in entry.iterdir()
|
|
if f.is_file()
|
|
)
|
|
except OSError:
|
|
size = 0
|
|
mods.append({
|
|
"name": entry.name,
|
|
"path": str(entry.resolve()),
|
|
"size_bytes": size,
|
|
"display_name": _parse_mod_cpp(entry),
|
|
"workshop_id": _parse_meta_cpp(entry),
|
|
})
|
|
except OSError as exc:
|
|
raise AdapterError(f"Cannot scan mod directory: {exc}") from exc
|
|
|
|
mods.sort(key=lambda m: m["name"].lower())
|
|
return mods
|
|
|
|
def get_enabled_mods(self, config_repo) -> list[dict]:
|
|
"""
|
|
Get the list of enabled mods from the database config.
|
|
|
|
Returns list of dicts: [{"name": "@CBA_A3", "is_server_mod": False}, ...]
|
|
Handles migration from old string-list format automatically.
|
|
"""
|
|
mods_section = config_repo.get_section(self._server_id, "mods")
|
|
if mods_section is None:
|
|
return []
|
|
raw = mods_section.get("enabled_mods", [])
|
|
result = []
|
|
for item in raw:
|
|
if isinstance(item, str):
|
|
result.append({"name": item, "is_server_mod": False})
|
|
elif isinstance(item, dict):
|
|
result.append({"name": item.get("name", ""), "is_server_mod": bool(item.get("is_server_mod", False))})
|
|
return result
|
|
|
|
def set_enabled_mods(self, mod_entries: list[dict], config_repo) -> None:
|
|
"""
|
|
Update the enabled mods list in the database config.
|
|
|
|
Args:
|
|
mod_entries: List of dicts with "name" (str) and "is_server_mod" (bool).
|
|
config_repo: ConfigRepository instance.
|
|
|
|
Raises AdapterError if any mod name is invalid or not found on disk.
|
|
"""
|
|
available = {m["name"] for m in self.list_available_mods()}
|
|
for entry in mod_entries:
|
|
name = entry.get("name", "")
|
|
if not _MOD_DIR_PATTERN.match(name):
|
|
raise AdapterError(f"Invalid mod name '{name}': must start with '@'")
|
|
if name not in available:
|
|
raise AdapterError(
|
|
f"Mod '{name}' not found in mods directory. "
|
|
f"Available: {sorted(available)}"
|
|
)
|
|
|
|
mods_section = config_repo.get_section(self._server_id, "mods") or {}
|
|
current_version = mods_section.get("_meta", {}).get("config_version")
|
|
config_repo.upsert_section(
|
|
server_id=self._server_id,
|
|
game_type="arma3",
|
|
section="mods",
|
|
config_data={"enabled_mods": mod_entries},
|
|
schema_version="1.0.0",
|
|
expected_config_version=current_version,
|
|
)
|
|
logger.info(
|
|
"Updated enabled mods for server %d: %s",
|
|
self._server_id, [e["name"] for e in mod_entries],
|
|
)
|
|
|
|
# ── CLI argument building ──
|
|
|
|
def get_mod_folder_pattern(self) -> str:
|
|
"""Arma 3 mods use @ prefix for local, or numeric workshop IDs."""
|
|
return "@*"
|
|
|
|
def build_mod_args(self, server_mods: list[dict]) -> list[str]:
|
|
"""
|
|
Build Arma 3 mod CLI arguments.
|
|
Returns -mod and -serverMod argument lists.
|
|
"""
|
|
client_mods = []
|
|
server_only_mods = []
|
|
|
|
for mod in server_mods:
|
|
path = mod.get("folder_path", "")
|
|
game_data = mod.get("game_data", {})
|
|
if isinstance(game_data, str):
|
|
import json
|
|
try:
|
|
game_data = json.loads(game_data)
|
|
except (json.JSONDecodeError, TypeError):
|
|
game_data = {}
|
|
|
|
is_server = game_data.get("is_server_mod", False) if isinstance(game_data, dict) else False
|
|
|
|
if is_server:
|
|
server_only_mods.append(path)
|
|
else:
|
|
client_mods.append(path)
|
|
|
|
args = []
|
|
if client_mods:
|
|
args.append('-mod="' + ";".join(client_mods) + '"')
|
|
if server_only_mods:
|
|
args.append('-serverMod="' + ";".join(server_only_mods) + '"')
|
|
return args
|
|
|
|
def validate_mod_folder(self, path: Path) -> bool:
|
|
"""Validate that a path looks like a valid Arma 3 mod folder."""
|
|
if not path.exists() or not path.is_dir():
|
|
return False
|
|
return (path / "addons").exists() or (path / "$PREFIX$").exists()
|
|
|
|
def get_mod_data_schema(self) -> type[BaseModel] | None:
|
|
return Arma3ModData |