Initial release: full Arma 3 mod management toolchain
Pipeline: parse HTML presets, compare modlists, download from Caddy file server, create junctions/symlinks to Arma 3 Server directory. Includes update/sync flows, missing-mod reporting, OS compat layer, shared config, dep checker, comprehensive test suite (71 tests). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
32
arma_modlist_tools/__init__.py
Normal file
32
arma_modlist_tools/__init__.py
Normal file
@@ -0,0 +1,32 @@
|
||||
from .parser import parse_mod_entry, parse_modlist_html, parse_modlist_dir
|
||||
from .compare import compare_presets
|
||||
from .fetcher import (
|
||||
make_session, build_server_index, find_mod_folder,
|
||||
list_mod_files, list_mod_updates, download_file, download_mod_folder,
|
||||
)
|
||||
from .linker import (
|
||||
get_mod_folders, get_link_status, create_junction,
|
||||
remove_junction, link_group, unlink_group,
|
||||
)
|
||||
from .config import load_config, Config
|
||||
from .compat import is_windows, is_linux, get_os_label, fix_console_encoding
|
||||
from .reporter import build_missing_report, save_missing_report
|
||||
|
||||
__all__ = [
|
||||
# parser
|
||||
"parse_mod_entry", "parse_modlist_html", "parse_modlist_dir",
|
||||
# compare
|
||||
"compare_presets",
|
||||
# fetcher
|
||||
"make_session", "build_server_index", "find_mod_folder",
|
||||
"list_mod_files", "list_mod_updates", "download_file", "download_mod_folder",
|
||||
# linker
|
||||
"get_mod_folders", "get_link_status", "create_junction",
|
||||
"remove_junction", "link_group", "unlink_group",
|
||||
# config
|
||||
"load_config", "Config",
|
||||
# compat
|
||||
"is_windows", "is_linux", "get_os_label", "fix_console_encoding",
|
||||
# reporter
|
||||
"build_missing_report", "save_missing_report",
|
||||
]
|
||||
79
arma_modlist_tools/compare.py
Normal file
79
arma_modlist_tools/compare.py
Normal file
@@ -0,0 +1,79 @@
|
||||
"""
|
||||
arma_modlist_tools.compare
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Compare two or more Arma 3 mod presets (parsed by :mod:`arma_modlist_tools.parser`)
|
||||
and produce a breakdown of shared and preset-unique mods.
|
||||
|
||||
Typical usage::
|
||||
|
||||
from arma_modlist_tools.parser import parse_modlist_dir
|
||||
from arma_modlist_tools.compare import compare_presets
|
||||
|
||||
presets = parse_modlist_dir("modlist_html")
|
||||
result = compare_presets(*presets)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
def _mod_key(mod: dict) -> str:
|
||||
"""Return the identity key for a mod.
|
||||
|
||||
Uses ``steam_id`` when available (canonical Workshop identifier),
|
||||
falls back to ``name`` for local mods that have no Workshop ID.
|
||||
"""
|
||||
return mod["steam_id"] or mod["name"]
|
||||
|
||||
|
||||
def compare_presets(*presets: dict) -> dict:
|
||||
"""
|
||||
Compare two or more preset dicts and return a comparison dict.
|
||||
|
||||
:param presets: Two or more preset dicts as returned by
|
||||
:func:`~arma_modlist_tools.parser.parse_modlist_html`.
|
||||
:returns: Dict with keys:
|
||||
|
||||
- ``compared_presets`` — list of preset names that were compared
|
||||
- ``shared`` — mods present in **every** preset
|
||||
- ``mod_count`` — number of shared mods
|
||||
- ``mods`` — list of mod entry dicts
|
||||
- ``unique`` — per-preset mods not present in any other preset
|
||||
- keyed by ``preset_name``
|
||||
- each value has ``mod_count`` and ``mods``
|
||||
|
||||
:raises ValueError: If fewer than two presets are provided.
|
||||
"""
|
||||
if len(presets) < 2:
|
||||
raise ValueError("compare_presets requires at least two presets")
|
||||
|
||||
# Build per-preset {identity_key -> mod_entry} mappings
|
||||
preset_maps: list[dict[str, dict]] = [
|
||||
{_mod_key(mod): mod for mod in preset["mods"]}
|
||||
for preset in presets
|
||||
]
|
||||
|
||||
# Shared keys = intersection across ALL presets
|
||||
shared_keys: set[str] = set(preset_maps[0].keys())
|
||||
for pm in preset_maps[1:]:
|
||||
shared_keys &= pm.keys()
|
||||
|
||||
# Shared mods: take entries from the first preset (identical across all)
|
||||
shared_mods = [preset_maps[0][k] for k in preset_maps[0] if k in shared_keys]
|
||||
|
||||
# Unique mods per preset: entries whose key is not in the shared set
|
||||
unique: dict[str, dict] = {}
|
||||
for preset, pm in zip(presets, preset_maps):
|
||||
unique_mods = [mod for k, mod in pm.items() if k not in shared_keys]
|
||||
unique[preset["preset_name"]] = {
|
||||
"mod_count": len(unique_mods),
|
||||
"mods": unique_mods,
|
||||
}
|
||||
|
||||
return {
|
||||
"compared_presets": [p["preset_name"] for p in presets],
|
||||
"shared": {
|
||||
"mod_count": len(shared_mods),
|
||||
"mods": shared_mods,
|
||||
},
|
||||
"unique": unique,
|
||||
}
|
||||
108
arma_modlist_tools/compat.py
Normal file
108
arma_modlist_tools/compat.py
Normal file
@@ -0,0 +1,108 @@
|
||||
"""
|
||||
arma_modlist_tools.compat
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
OS detection and cross-platform utilities shared by all CLI scripts.
|
||||
|
||||
Supported platforms:
|
||||
- Windows / Windows Server (sys.platform == "win32")
|
||||
- Ubuntu / Ubuntu Server (sys.platform == "linux")
|
||||
|
||||
Typical usage::
|
||||
|
||||
from arma_modlist_tools.compat import is_windows, get_os_label, fix_console_encoding
|
||||
|
||||
fix_console_encoding() # call once at script start on Windows
|
||||
print(get_os_label()) # "Windows Server", "Ubuntu", etc.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import platform
|
||||
import sys
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Platform detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def is_windows() -> bool:
|
||||
"""Return ``True`` on Windows and Windows Server."""
|
||||
return sys.platform == "win32"
|
||||
|
||||
|
||||
def is_linux() -> bool:
|
||||
"""Return ``True`` on Linux (Ubuntu, Ubuntu Server, and other distros)."""
|
||||
return sys.platform == "linux"
|
||||
|
||||
|
||||
def get_os_label() -> str:
|
||||
"""
|
||||
Return a human-readable OS label.
|
||||
|
||||
Possible values: ``"Windows"``, ``"Windows Server"``, ``"Ubuntu"``,
|
||||
``"Ubuntu Server"``, ``"Linux"``, ``"Unknown"``.
|
||||
"""
|
||||
if is_windows():
|
||||
ver = platform.version()
|
||||
# Windows Server versions contain "Server" in the version string
|
||||
# e.g. "10.0.17763 ... Windows Server 2019 ..."
|
||||
if "Server" in platform.version() or "Server" in platform.uname().version:
|
||||
return "Windows Server"
|
||||
return "Windows"
|
||||
|
||||
if is_linux():
|
||||
# Read /etc/os-release for distro name
|
||||
os_release = _read_os_release()
|
||||
name = os_release.get("NAME", "").lower()
|
||||
|
||||
if "ubuntu" in name:
|
||||
# Distinguish desktop vs server: server images have no display server
|
||||
if _is_headless():
|
||||
return "Ubuntu Server"
|
||||
return "Ubuntu"
|
||||
|
||||
return "Linux"
|
||||
|
||||
return "Unknown"
|
||||
|
||||
|
||||
def _read_os_release() -> dict[str, str]:
|
||||
"""Parse /etc/os-release into a dict (Linux only)."""
|
||||
result: dict[str, str] = {}
|
||||
try:
|
||||
with open("/etc/os-release", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if "=" in line and not line.startswith("#"):
|
||||
k, _, v = line.partition("=")
|
||||
result[k] = v.strip('"')
|
||||
except OSError:
|
||||
pass
|
||||
return result
|
||||
|
||||
|
||||
def _is_headless() -> bool:
|
||||
"""Return True if no graphical display server is detected (headless/server)."""
|
||||
import os
|
||||
# Check for common display environment variables
|
||||
return not (os.environ.get("DISPLAY") or os.environ.get("WAYLAND_DISPLAY"))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Console encoding
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def fix_console_encoding() -> None:
|
||||
"""
|
||||
Force UTF-8 output on Windows terminals that default to cp1252.
|
||||
|
||||
Call once at the top of any CLI script that uses Unicode characters
|
||||
(checkmarks, arrows, etc.). No-op on Linux.
|
||||
"""
|
||||
if not is_windows():
|
||||
return
|
||||
if sys.stdout.encoding and sys.stdout.encoding.lower() == "utf-8":
|
||||
return
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
|
||||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
|
||||
109
arma_modlist_tools/config.py
Normal file
109
arma_modlist_tools/config.py
Normal file
@@ -0,0 +1,109 @@
|
||||
"""
|
||||
arma_modlist_tools.config
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Load and expose project configuration from ``config.json``.
|
||||
|
||||
Search order for the config file:
|
||||
1. Explicit path passed to :func:`load_config`
|
||||
2. ``config.json`` in the current working directory
|
||||
3. ``config.json`` two levels above this module (project root)
|
||||
|
||||
Typical usage::
|
||||
|
||||
from arma_modlist_tools.config import load_config
|
||||
|
||||
cfg = load_config()
|
||||
print(cfg.server_url)
|
||||
print(cfg.arma_dir)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class Config:
|
||||
"""Typed wrapper around the parsed ``config.json`` dict."""
|
||||
|
||||
def __init__(self, data: dict) -> None:
|
||||
# Validate required keys immediately so callers get a clear error at
|
||||
# load time rather than a confusing AttributeError deep in the pipeline.
|
||||
_ = data["server"]["base_url"]
|
||||
_ = data["server"]["username"]
|
||||
_ = data["server"]["password"]
|
||||
_ = data["paths"]["arma_dir"]
|
||||
_ = data["paths"]["downloads"]
|
||||
_ = data["paths"]["modlist_html"]
|
||||
_ = data["paths"]["modlist_json"]
|
||||
self._data = data
|
||||
|
||||
# ---- server ----
|
||||
|
||||
@property
|
||||
def server_url(self) -> str:
|
||||
return self._data["server"]["base_url"]
|
||||
|
||||
@property
|
||||
def server_auth(self) -> tuple[str, str]:
|
||||
return (self._data["server"]["username"], self._data["server"]["password"])
|
||||
|
||||
# ---- paths ----
|
||||
|
||||
@property
|
||||
def arma_dir(self) -> Path:
|
||||
return Path(self._data["paths"]["arma_dir"])
|
||||
|
||||
@property
|
||||
def downloads(self) -> Path:
|
||||
return Path(self._data["paths"]["downloads"])
|
||||
|
||||
@property
|
||||
def modlist_html(self) -> Path:
|
||||
return Path(self._data["paths"]["modlist_html"])
|
||||
|
||||
@property
|
||||
def modlist_json(self) -> Path:
|
||||
return Path(self._data["paths"]["modlist_json"])
|
||||
|
||||
# ---- derived paths ----
|
||||
|
||||
@property
|
||||
def comparison(self) -> Path:
|
||||
return self.modlist_json / "comparison.json"
|
||||
|
||||
@property
|
||||
def missing_report(self) -> Path:
|
||||
return self.modlist_json / "missing_report.json"
|
||||
|
||||
|
||||
def load_config(path: Path | str | None = None) -> Config:
|
||||
"""
|
||||
Load ``config.json`` and return a :class:`Config` instance.
|
||||
|
||||
:param path: Explicit path to the config file. If ``None``, the function
|
||||
searches the current working directory then the project root.
|
||||
:raises FileNotFoundError: If no config file can be located.
|
||||
:raises KeyError: If required keys are absent from the config file.
|
||||
"""
|
||||
if path is not None:
|
||||
config_path = Path(path)
|
||||
else:
|
||||
# Try CWD first, then project root (two levels above this file)
|
||||
cwd_path = Path.cwd() / "config.json"
|
||||
root_path = Path(__file__).parent.parent / "config.json"
|
||||
if cwd_path.exists():
|
||||
config_path = cwd_path
|
||||
elif root_path.exists():
|
||||
config_path = root_path
|
||||
else:
|
||||
raise FileNotFoundError(
|
||||
"config.json not found. "
|
||||
f"Looked in:\n {cwd_path}\n {root_path}\n"
|
||||
"Create config.json in the project root (copy from the template)."
|
||||
)
|
||||
|
||||
with open(config_path, encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
|
||||
return Config(data)
|
||||
268
arma_modlist_tools/fetcher.py
Normal file
268
arma_modlist_tools/fetcher.py
Normal file
@@ -0,0 +1,268 @@
|
||||
"""
|
||||
arma_modlist_tools.fetcher
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Download Arma 3 mods from a Caddy file server using a comparison dict as input.
|
||||
|
||||
The server is expected to host mods as ``@ModName/`` folders under a base URL,
|
||||
with a ``meta.cpp`` file inside each folder containing the Steam Workshop ID::
|
||||
|
||||
publishedid = 463939057;
|
||||
|
||||
Typical usage::
|
||||
|
||||
from arma_modlist_tools.fetcher import (
|
||||
make_session, build_server_index, find_mod_folder,
|
||||
list_mod_files, download_file, download_mod_folder,
|
||||
)
|
||||
|
||||
session = make_session(("user", "password"))
|
||||
index = build_server_index("https://example.com/arma3mods/", ("user", "pass"))
|
||||
url = find_mod_folder({"steam_id": "463939057", "name": "ace"}, index)
|
||||
files = list_mod_files(url, session)
|
||||
download_mod_folder(url, Path("downloads/shared/@ace"), session)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
|
||||
_CHUNK_SIZE = 64 * 1024 # 64 KB per read
|
||||
_META_CPP_RE = re.compile(r"publishedid\s*=\s*(\d+)", re.IGNORECASE)
|
||||
_NON_ALNUM_RE = re.compile(r"[^a-z0-9]")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _list_dir(url: str, session: requests.Session) -> list[dict]:
|
||||
"""
|
||||
Fetch a Caddy browse directory listing as JSON.
|
||||
Caddy returns a list of ``{name, size, url, is_dir, ...}`` dicts when the
|
||||
``Accept: application/json`` header is sent.
|
||||
"""
|
||||
resp = session.get(url, headers={"Accept": "application/json"}, timeout=30)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
# Caddy v2 returns a plain list; guard against wrapped responses
|
||||
if isinstance(data, list):
|
||||
return data
|
||||
return data.get("items", [])
|
||||
|
||||
|
||||
def _parse_meta_cpp(text: str) -> str | None:
|
||||
"""Extract ``publishedid`` from a ``meta.cpp`` file, or return ``None``."""
|
||||
m = _META_CPP_RE.search(text)
|
||||
return m.group(1) if m else None
|
||||
|
||||
|
||||
def _normalize_name(name: str) -> str:
|
||||
"""Strip leading ``@``, lowercase, remove all non-alphanumeric characters."""
|
||||
return _NON_ALNUM_RE.sub("", name.lower().lstrip("@"))
|
||||
|
||||
|
||||
def _folder_url(base: str, name: str) -> str:
|
||||
"""Build a canonical trailing-slash folder URL."""
|
||||
return base.rstrip("/") + "/" + name.strip("/") + "/"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def make_session(auth: tuple[str, str]) -> requests.Session:
|
||||
"""Return a ``requests.Session`` pre-configured with basic auth credentials."""
|
||||
s = requests.Session()
|
||||
s.auth = auth
|
||||
return s
|
||||
|
||||
|
||||
def build_server_index(base_url: str, auth: tuple[str, str]) -> dict:
|
||||
"""
|
||||
Scan the root of the file server and build mod lookup maps.
|
||||
|
||||
For every ``@...`` folder found at *base_url*, the function attempts to
|
||||
fetch ``meta.cpp`` to extract the Steam Workshop ID.
|
||||
|
||||
:param base_url: Root URL of the Caddy file server (trailing slash optional).
|
||||
:param auth: ``(username, password)`` tuple for HTTP Basic Auth.
|
||||
:returns: Dict with keys:
|
||||
|
||||
- ``by_steam_id`` — ``{steam_id: folder_url}``
|
||||
- ``by_name`` — ``{normalized_name: folder_url}``
|
||||
- ``folders`` — raw list of item dicts from the root listing
|
||||
"""
|
||||
session = make_session(auth)
|
||||
root = base_url.rstrip("/") + "/"
|
||||
items = _list_dir(root, session)
|
||||
folders = [it for it in items if it.get("is_dir")]
|
||||
|
||||
by_steam_id: dict[str, str] = {}
|
||||
by_name: dict[str, str] = {}
|
||||
|
||||
for folder in folders:
|
||||
name = folder["name"].strip("/")
|
||||
url = _folder_url(root, name)
|
||||
by_name[_normalize_name(name)] = url
|
||||
|
||||
try:
|
||||
resp = session.get(url + "meta.cpp", timeout=10)
|
||||
if resp.ok:
|
||||
sid = _parse_meta_cpp(resp.text)
|
||||
if sid:
|
||||
by_steam_id[sid] = url
|
||||
except requests.RequestException:
|
||||
pass # meta.cpp missing or unreachable — name-based fallback still works
|
||||
|
||||
return {"by_steam_id": by_steam_id, "by_name": by_name, "folders": folders}
|
||||
|
||||
|
||||
def find_mod_folder(mod: dict, index: dict) -> str | None:
|
||||
"""
|
||||
Return the server folder URL for a mod entry, or ``None`` if not found.
|
||||
|
||||
Lookup order:
|
||||
|
||||
1. ``steam_id`` → ``index["by_steam_id"]`` (exact, reliable)
|
||||
2. Normalized ``name`` → ``index["by_name"]`` (fuzzy fallback for local mods)
|
||||
|
||||
:param mod: Mod entry dict with at least ``"steam_id"`` and ``"name"`` keys.
|
||||
:param index: Index dict returned by :func:`build_server_index`.
|
||||
"""
|
||||
if mod.get("steam_id"):
|
||||
url = index["by_steam_id"].get(mod["steam_id"])
|
||||
if url:
|
||||
return url
|
||||
return index["by_name"].get(_normalize_name(mod.get("name", "")))
|
||||
|
||||
|
||||
def list_mod_files(
|
||||
folder_url: str,
|
||||
session: requests.Session,
|
||||
) -> list[tuple[str, str, int]]:
|
||||
"""
|
||||
Recursively list all files under a mod folder on the server.
|
||||
|
||||
:returns: List of ``(relative_path, absolute_url, size_bytes)`` tuples,
|
||||
where *relative_path* is relative to *folder_url*.
|
||||
"""
|
||||
return _walk(folder_url.rstrip("/") + "/", session, "")
|
||||
|
||||
|
||||
def list_mod_updates(
|
||||
folder_url: str,
|
||||
dest_path: Path,
|
||||
session: requests.Session,
|
||||
) -> list[tuple[str, str, int]]:
|
||||
"""
|
||||
Return only the files that are missing locally or whose local size differs
|
||||
from the server size. Files that exist and match the server size are
|
||||
considered up-to-date and omitted.
|
||||
|
||||
Use this to detect which files need to be re-downloaded after the server
|
||||
has been updated without changing the modlist structure.
|
||||
|
||||
:param folder_url: Server folder URL for the mod (e.g. ``https://…/@ace/``).
|
||||
:param dest_path: Local destination directory for this mod.
|
||||
:param session: Authenticated ``requests.Session``.
|
||||
:returns: Subset of :func:`list_mod_files` results — ``(rel_path, url, size)``.
|
||||
"""
|
||||
stale = []
|
||||
for rel, url, server_size in list_mod_files(folder_url, session):
|
||||
local = dest_path / rel
|
||||
if not local.exists():
|
||||
stale.append((rel, url, server_size))
|
||||
elif server_size and local.stat().st_size != server_size:
|
||||
stale.append((rel, url, server_size))
|
||||
return stale
|
||||
|
||||
|
||||
def _walk(url: str, session: requests.Session, prefix: str) -> list[tuple[str, str, int]]:
|
||||
items = _list_dir(url, session)
|
||||
result = []
|
||||
for item in items:
|
||||
name = item["name"].strip("/")
|
||||
rel = (prefix + "/" + name).lstrip("/")
|
||||
item_url = url.rstrip("/") + "/" + name
|
||||
if item.get("is_dir"):
|
||||
result.extend(_walk(item_url + "/", session, rel))
|
||||
else:
|
||||
result.append((rel, item_url, item.get("size", 0)))
|
||||
return result
|
||||
|
||||
|
||||
def download_file(
|
||||
url: str,
|
||||
dest: Path,
|
||||
session: requests.Session,
|
||||
on_chunk: Callable[[int], None] | None = None,
|
||||
) -> int:
|
||||
"""
|
||||
Stream-download a single file to *dest*.
|
||||
|
||||
:param on_chunk: Optional callback ``(bytes_written)`` called after each
|
||||
chunk is flushed to disk.
|
||||
:returns: Total bytes written.
|
||||
"""
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
resp = session.get(url, stream=True, timeout=120)
|
||||
resp.raise_for_status()
|
||||
written = 0
|
||||
with open(dest, "wb") as fh:
|
||||
for chunk in resp.iter_content(chunk_size=_CHUNK_SIZE):
|
||||
if chunk:
|
||||
fh.write(chunk)
|
||||
written += len(chunk)
|
||||
if on_chunk:
|
||||
on_chunk(len(chunk))
|
||||
return written
|
||||
|
||||
|
||||
def download_mod_folder(
|
||||
folder_url: str,
|
||||
dest_path: Path,
|
||||
session: requests.Session,
|
||||
overwrite: bool = False,
|
||||
on_file: Callable[[str, int, bool], None] | None = None,
|
||||
on_chunk: Callable[[int], None] | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Recursively download all files in a mod folder.
|
||||
|
||||
:param folder_url: Server folder URL (must be browsable by Caddy).
|
||||
:param dest_path: Local destination directory (created if necessary).
|
||||
:param session: Authenticated ``requests.Session``.
|
||||
:param overwrite: If ``False``, existing files are skipped.
|
||||
:param on_file: ``(rel_path, size_bytes, is_skipped)`` — called before
|
||||
each file, whether it will be downloaded or skipped.
|
||||
:param on_chunk: ``(bytes)`` — called per chunk **only** for files that
|
||||
are actually downloaded (not skipped).
|
||||
:returns: ``{"files_downloaded": n, "files_skipped": n, "bytes_downloaded": n}``
|
||||
"""
|
||||
files = list_mod_files(folder_url, session)
|
||||
downloaded = skipped = total_bytes = 0
|
||||
|
||||
for rel, url, size in files:
|
||||
dest_file = dest_path / rel
|
||||
is_skipped = dest_file.exists() and not overwrite
|
||||
|
||||
if on_file:
|
||||
on_file(rel, size, is_skipped)
|
||||
|
||||
if is_skipped:
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
n = download_file(url, dest_file, session, on_chunk=on_chunk)
|
||||
total_bytes += n
|
||||
downloaded += 1
|
||||
|
||||
return {
|
||||
"files_downloaded": downloaded,
|
||||
"files_skipped": skipped,
|
||||
"bytes_downloaded": total_bytes,
|
||||
}
|
||||
188
arma_modlist_tools/linker.py
Normal file
188
arma_modlist_tools/linker.py
Normal file
@@ -0,0 +1,188 @@
|
||||
"""
|
||||
arma_modlist_tools.linker
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Manage directory links between downloaded mod folders and the Arma 3 Server
|
||||
directory. Works on both Windows (junction links) and Linux (symlinks).
|
||||
|
||||
Platform behaviour:
|
||||
- **Windows**: junctions via ``cmd /c mklink /J`` — no admin rights required.
|
||||
- **Linux**: symlinks via ``os.symlink()`` — standard directory symlinks.
|
||||
|
||||
Typical usage::
|
||||
|
||||
from arma_modlist_tools.linker import get_link_status, link_group, unlink_group
|
||||
from pathlib import Path
|
||||
|
||||
arma = Path("/opt/arma3server")
|
||||
group = Path("downloads/shared")
|
||||
|
||||
status = get_link_status(group, arma)
|
||||
result = link_group(group, arma)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from .compat import is_windows
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _is_junction(path: Path) -> bool:
|
||||
"""
|
||||
Return ``True`` if *path* is an active directory junction / symlink.
|
||||
|
||||
- **Windows**: checks ``FILE_ATTRIBUTE_REPARSE_POINT`` (0x400) in
|
||||
``os.lstat().st_file_attributes``. ``os.path.islink()`` is unreliable
|
||||
for junctions on Windows.
|
||||
- **Linux**: ``os.path.islink()`` correctly identifies symlinks.
|
||||
"""
|
||||
try:
|
||||
if is_windows():
|
||||
s = os.lstat(str(path))
|
||||
attrs = getattr(s, "st_file_attributes", 0)
|
||||
return bool(attrs & 0x400) # FILE_ATTRIBUTE_REPARSE_POINT
|
||||
else:
|
||||
return os.path.islink(str(path))
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def get_mod_folders(group_dir: Path) -> list[Path]:
|
||||
"""
|
||||
Return a sorted list of ``@*`` subdirectories inside *group_dir*.
|
||||
|
||||
:param group_dir: The mod category folder (e.g. ``downloads/shared``).
|
||||
"""
|
||||
if not group_dir.is_dir():
|
||||
return []
|
||||
return sorted(
|
||||
p for p in group_dir.iterdir()
|
||||
if p.is_dir() and p.name.startswith("@")
|
||||
)
|
||||
|
||||
|
||||
def get_link_status(group_dir: Path, arma_dir: Path) -> list[dict]:
|
||||
"""
|
||||
Return the link status for every ``@Mod`` folder in *group_dir*.
|
||||
|
||||
:returns: List of dicts with keys:
|
||||
|
||||
- ``name`` — folder name (e.g. ``@ace``)
|
||||
- ``source_path`` — absolute path of the mod folder
|
||||
- ``link_path`` — where the link would/does live in *arma_dir*
|
||||
- ``is_linked`` — ``True`` if a junction/symlink currently exists
|
||||
"""
|
||||
result = []
|
||||
for mod in get_mod_folders(group_dir):
|
||||
link_path = arma_dir / mod.name
|
||||
result.append({
|
||||
"name": mod.name,
|
||||
"source_path": mod.resolve(),
|
||||
"link_path": link_path,
|
||||
"is_linked": _is_junction(link_path),
|
||||
})
|
||||
return result
|
||||
|
||||
|
||||
def create_junction(link_path: Path, target: Path) -> bool:
|
||||
"""
|
||||
Create a directory junction (Windows) or symlink (Linux) at *link_path*
|
||||
pointing to *target*.
|
||||
|
||||
:returns: ``True`` on success, ``False`` on failure.
|
||||
"""
|
||||
if is_windows():
|
||||
proc = subprocess.run(
|
||||
["cmd", "/c", "mklink", "/J", str(link_path), str(target)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
return proc.returncode == 0
|
||||
else:
|
||||
try:
|
||||
os.symlink(str(target), str(link_path))
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
def remove_junction(link_path: Path) -> tuple[bool, str]:
|
||||
"""
|
||||
Remove the junction / symlink at *link_path*.
|
||||
|
||||
- **Windows**: ``os.rmdir()`` removes the junction pointer without touching
|
||||
the target directory's contents.
|
||||
- **Linux**: ``os.unlink()`` removes the symlink without touching the target.
|
||||
|
||||
:returns: ``(True, "")`` on success, ``(False, error_message)`` on failure.
|
||||
"""
|
||||
try:
|
||||
if is_windows():
|
||||
os.rmdir(str(link_path))
|
||||
else:
|
||||
os.unlink(str(link_path))
|
||||
return True, ""
|
||||
except OSError as exc:
|
||||
return False, str(exc)
|
||||
|
||||
|
||||
def link_group(group_dir: Path, arma_dir: Path) -> dict:
|
||||
"""
|
||||
Create links for every unlinked ``@Mod`` in *group_dir*.
|
||||
|
||||
:returns: ``{"linked": n, "already_linked": n, "failed": n, "errors": {name: msg}}``
|
||||
"""
|
||||
status = get_link_status(group_dir, arma_dir)
|
||||
linked = already_linked = failed = 0
|
||||
errors: dict[str, str] = {}
|
||||
|
||||
for s in status:
|
||||
if s["is_linked"]:
|
||||
already_linked += 1
|
||||
continue
|
||||
if s["link_path"].exists():
|
||||
failed += 1
|
||||
errors[s["name"]] = "path exists but is not a junction/symlink"
|
||||
continue
|
||||
ok = create_junction(s["link_path"], s["source_path"])
|
||||
if ok:
|
||||
linked += 1
|
||||
else:
|
||||
failed += 1
|
||||
errors[s["name"]] = "link creation failed"
|
||||
|
||||
return {"linked": linked, "already_linked": already_linked, "failed": failed, "errors": errors}
|
||||
|
||||
|
||||
def unlink_group(group_dir: Path, arma_dir: Path) -> dict:
|
||||
"""
|
||||
Remove links for every linked ``@Mod`` in *group_dir*.
|
||||
|
||||
:returns: ``{"unlinked": n, "not_linked": n, "failed": n, "errors": {name: msg}}``
|
||||
"""
|
||||
status = get_link_status(group_dir, arma_dir)
|
||||
unlinked = not_linked = failed = 0
|
||||
errors: dict[str, str] = {}
|
||||
|
||||
for s in status:
|
||||
if not s["is_linked"]:
|
||||
not_linked += 1
|
||||
continue
|
||||
ok, err = remove_junction(s["link_path"])
|
||||
if ok:
|
||||
unlinked += 1
|
||||
else:
|
||||
failed += 1
|
||||
errors[s["name"]] = err
|
||||
|
||||
return {"unlinked": unlinked, "not_linked": not_linked, "failed": failed, "errors": errors}
|
||||
146
arma_modlist_tools/parser.py
Normal file
146
arma_modlist_tools/parser.py
Normal file
@@ -0,0 +1,146 @@
|
||||
"""
|
||||
arma_modlist_tools.parser
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Parse Arma 3 Launcher mod preset HTML files (.html exported from the launcher)
|
||||
into plain Python dicts / lists suitable for JSON serialisation.
|
||||
|
||||
Typical usage::
|
||||
|
||||
from arma_modlist_tools.parser import parse_modlist_html, parse_modlist_dir
|
||||
|
||||
# single file
|
||||
preset = parse_modlist_html("modlist_html/my_preset.html")
|
||||
|
||||
# whole folder
|
||||
presets = parse_modlist_dir("modlist_html")
|
||||
"""
|
||||
|
||||
import re
|
||||
import xml.etree.ElementTree as ET
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public types (plain dicts — keep it dependency-free)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# ModEntry:
|
||||
# name : str display name from the launcher
|
||||
# source : "steam" | "local" | "unknown"
|
||||
# url : str | None full workshop / local path URL
|
||||
# steam_id : str | None numeric workshop item ID extracted from the URL
|
||||
|
||||
# Preset:
|
||||
# preset_name : str stem of the source filename
|
||||
# source_file : str basename of the source filename
|
||||
# mod_count : int
|
||||
# mods : list[ModEntry]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Low-level helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_STEAM_ID_RE = re.compile(r"[?&]id=(\d+)")
|
||||
|
||||
|
||||
def _extract_steam_id(url: str) -> str | None:
|
||||
"""Return the numeric workshop item ID from a Steam URL, or None."""
|
||||
m = _STEAM_ID_RE.search(url)
|
||||
return m.group(1) if m else None
|
||||
|
||||
|
||||
def _source_from_class(css_class: str) -> str:
|
||||
"""Map a span CSS class to a source label."""
|
||||
if "from-steam" in css_class:
|
||||
return "steam"
|
||||
if "from-local" in css_class:
|
||||
return "local"
|
||||
return "unknown"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Core parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def parse_mod_entry(tr_element: ET.Element) -> dict | None:
|
||||
"""
|
||||
Parse a single ``<tr data-type="ModContainer">`` element into a mod dict.
|
||||
|
||||
Returns ``None`` if the element does not contain a display name (i.e. it
|
||||
is not a valid mod row).
|
||||
"""
|
||||
name: str | None = None
|
||||
source: str = "unknown"
|
||||
url: str | None = None
|
||||
steam_id: str | None = None
|
||||
|
||||
for td in tr_element:
|
||||
dtype = td.get("data-type")
|
||||
|
||||
if dtype == "DisplayName":
|
||||
name = (td.text or "").strip()
|
||||
continue
|
||||
|
||||
for span in td.iter("span"):
|
||||
css = span.get("class", "")
|
||||
if "from-" in css:
|
||||
source = _source_from_class(css)
|
||||
|
||||
for a in td.iter("a"):
|
||||
if a.get("data-type") == "Link":
|
||||
href = (a.get("href") or "").strip()
|
||||
if href:
|
||||
url = href
|
||||
steam_id = _extract_steam_id(href)
|
||||
|
||||
if name is None:
|
||||
return None
|
||||
|
||||
return {"name": name, "source": source, "url": url, "steam_id": steam_id}
|
||||
|
||||
|
||||
def parse_modlist_html(filepath: str | Path) -> dict:
|
||||
"""
|
||||
Parse an Arma 3 Launcher preset HTML file and return a preset dict.
|
||||
|
||||
:param filepath: Path to the ``.html`` preset file.
|
||||
:returns: Dict with keys ``preset_name``, ``source_file``, ``mod_count``,
|
||||
and ``mods`` (list of mod entry dicts).
|
||||
:raises FileNotFoundError: If *filepath* does not exist.
|
||||
:raises ET.ParseError: If the file is not valid XML/HTML.
|
||||
"""
|
||||
path = Path(filepath)
|
||||
tree = ET.parse(path)
|
||||
root = tree.getroot()
|
||||
|
||||
mods = []
|
||||
for tr in root.iter("tr"):
|
||||
if tr.get("data-type") != "ModContainer":
|
||||
continue
|
||||
entry = parse_mod_entry(tr)
|
||||
if entry is not None:
|
||||
mods.append(entry)
|
||||
|
||||
return {
|
||||
"preset_name": path.stem,
|
||||
"source_file": path.name,
|
||||
"mod_count": len(mods),
|
||||
"mods": mods,
|
||||
}
|
||||
|
||||
|
||||
def parse_modlist_dir(directory: str | Path) -> list[dict]:
|
||||
"""
|
||||
Parse all ``.html`` preset files in *directory* and return a list of
|
||||
preset dicts (one per file, sorted by filename).
|
||||
|
||||
:param directory: Folder containing ``.html`` preset files.
|
||||
:returns: List of preset dicts as returned by :func:`parse_modlist_html`.
|
||||
:raises NotADirectoryError: If *directory* does not exist or is not a dir.
|
||||
"""
|
||||
d = Path(directory)
|
||||
if not d.is_dir():
|
||||
raise NotADirectoryError(f"Not a directory: {d}")
|
||||
|
||||
return [parse_modlist_html(f) for f in sorted(d.glob("*.html"))]
|
||||
95
arma_modlist_tools/reporter.py
Normal file
95
arma_modlist_tools/reporter.py
Normal file
@@ -0,0 +1,95 @@
|
||||
"""
|
||||
arma_modlist_tools.reporter
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Build and persist a report of mods that are required by the modlists but
|
||||
absent from the file server.
|
||||
|
||||
The report includes a ``group`` field per missing mod so downstream tools
|
||||
(``sync_missing.py``) know exactly where to place it when it becomes
|
||||
available on the server, without needing to re-read ``comparison.json``.
|
||||
|
||||
Typical usage::
|
||||
|
||||
from arma_modlist_tools.reporter import build_missing_report, save_missing_report
|
||||
|
||||
report = build_missing_report(comparison, server_index)
|
||||
save_missing_report(report, cfg.missing_report)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def build_missing_report(comparison: dict, server_index: dict) -> dict:
|
||||
"""
|
||||
Cross-reference every mod in *comparison* against *server_index* and
|
||||
return a report of mods that are not on the server.
|
||||
|
||||
:param comparison: Dict as returned by :func:`~arma_modlist_tools.compare.compare_presets`.
|
||||
:param server_index: Dict as returned by :func:`~arma_modlist_tools.fetcher.build_server_index`.
|
||||
:returns: Report dict::
|
||||
|
||||
{
|
||||
"generated_at": "2026-04-07T12:00:00+00:00",
|
||||
"total_mods": 80,
|
||||
"on_server": 2,
|
||||
"missing": 78,
|
||||
"missing_mods": [
|
||||
{
|
||||
"name": "CBA_A3",
|
||||
"steam_id": "450814997",
|
||||
"url": "https://steamcommunity.com/...",
|
||||
"group": "shared"
|
||||
},
|
||||
...
|
||||
]
|
||||
}
|
||||
"""
|
||||
by_steam_id: dict = server_index.get("by_steam_id", {})
|
||||
by_name: dict = server_index.get("by_name", {})
|
||||
|
||||
from .fetcher import _normalize_name # reuse existing helper
|
||||
|
||||
def _on_server(mod: dict) -> bool:
|
||||
if mod.get("steam_id") and mod["steam_id"] in by_steam_id:
|
||||
return True
|
||||
return _normalize_name(mod.get("name", "")) in by_name
|
||||
|
||||
# Flatten all mods with their group label
|
||||
all_mods: list[tuple[dict, str]] = []
|
||||
for mod in comparison["shared"]["mods"]:
|
||||
all_mods.append((mod, "shared"))
|
||||
for preset_name, data in comparison["unique"].items():
|
||||
for mod in data["mods"]:
|
||||
all_mods.append((mod, preset_name))
|
||||
|
||||
missing_mods = []
|
||||
on_server_count = 0
|
||||
|
||||
for mod, group in all_mods:
|
||||
if _on_server(mod):
|
||||
on_server_count += 1
|
||||
else:
|
||||
missing_mods.append({
|
||||
"name": mod["name"],
|
||||
"steam_id": mod.get("steam_id"),
|
||||
"url": mod.get("url"),
|
||||
"group": group,
|
||||
})
|
||||
|
||||
return {
|
||||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||||
"total_mods": len(all_mods),
|
||||
"on_server": on_server_count,
|
||||
"missing": len(missing_mods),
|
||||
"missing_mods": missing_mods,
|
||||
}
|
||||
|
||||
|
||||
def save_missing_report(report: dict, path: Path) -> None:
|
||||
"""Write *report* as indented JSON to *path*, creating parent dirs as needed."""
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(json.dumps(report, indent=2, ensure_ascii=False), encoding="utf-8")
|
||||
Reference in New Issue
Block a user