""" arma_modlist_tools.fetcher ~~~~~~~~~~~~~~~~~~~~~~~~~~ Download Arma 3 mods from a Caddy file server using a comparison dict as input. The server is expected to host mods as ``@ModName/`` folders under a base URL, with a ``meta.cpp`` file inside each folder containing the Steam Workshop ID:: publishedid = 463939057; Typical usage:: from arma_modlist_tools.fetcher import ( make_session, build_server_index, find_mod_folder, list_mod_files, download_file, download_mod_folder, ) session = make_session(("user", "password")) index = build_server_index("https://example.com/arma3mods/", ("user", "pass")) url = find_mod_folder({"steam_id": "463939057", "name": "ace"}, index) files = list_mod_files(url, session) download_mod_folder(url, Path("downloads/shared/@ace"), session) """ from __future__ import annotations import re from collections.abc import Callable from pathlib import Path import requests _CHUNK_SIZE = 64 * 1024 # 64 KB per read _META_CPP_RE = re.compile(r"publishedid\s*=\s*(\d+)", re.IGNORECASE) _NON_ALNUM_RE = re.compile(r"[^a-z0-9]") # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _list_dir(url: str, session: requests.Session) -> list[dict]: """ Fetch a Caddy browse directory listing as JSON. Caddy returns a list of ``{name, size, url, is_dir, ...}`` dicts when the ``Accept: application/json`` header is sent. """ resp = session.get(url, headers={"Accept": "application/json"}, timeout=30) resp.raise_for_status() data = resp.json() # Caddy v2 returns a plain list; guard against wrapped responses if isinstance(data, list): return data return data.get("items", []) def _parse_meta_cpp(text: str) -> str | None: """Extract ``publishedid`` from a ``meta.cpp`` file, or return ``None``.""" m = _META_CPP_RE.search(text) return m.group(1) if m else None def _normalize_name(name: str) -> str: """Strip leading ``@``, lowercase, remove all non-alphanumeric characters.""" return _NON_ALNUM_RE.sub("", name.lower().lstrip("@")) def _folder_url(base: str, name: str) -> str: """Build a canonical trailing-slash folder URL.""" return base.rstrip("/") + "/" + name.strip("/") + "/" # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def make_session(auth: tuple[str, str]) -> requests.Session: """Return a ``requests.Session`` pre-configured with basic auth credentials.""" s = requests.Session() s.auth = auth return s def build_server_index(base_url: str, auth: tuple[str, str]) -> dict: """ Scan the root of the file server and build mod lookup maps. For every ``@...`` folder found at *base_url*, the function attempts to fetch ``meta.cpp`` to extract the Steam Workshop ID. :param base_url: Root URL of the Caddy file server (trailing slash optional). :param auth: ``(username, password)`` tuple for HTTP Basic Auth. :returns: Dict with keys: - ``by_steam_id`` — ``{steam_id: folder_url}`` - ``by_name`` — ``{normalized_name: folder_url}`` - ``folders`` — raw list of item dicts from the root listing """ session = make_session(auth) root = base_url.rstrip("/") + "/" items = _list_dir(root, session) folders = [it for it in items if it.get("is_dir")] by_steam_id: dict[str, str] = {} by_name: dict[str, str] = {} for folder in folders: name = folder["name"].strip("/") url = _folder_url(root, name) by_name[_normalize_name(name)] = url try: resp = session.get(url + "meta.cpp", timeout=10) if resp.ok: sid = _parse_meta_cpp(resp.text) if sid: by_steam_id[sid] = url except requests.RequestException: pass # meta.cpp missing or unreachable — name-based fallback still works return {"by_steam_id": by_steam_id, "by_name": by_name, "folders": folders} def find_mod_folder(mod: dict, index: dict) -> str | None: """ Return the server folder URL for a mod entry, or ``None`` if not found. Lookup order: 1. ``steam_id`` → ``index["by_steam_id"]`` (exact, reliable) 2. Normalized ``name`` → ``index["by_name"]`` (fuzzy fallback for local mods) :param mod: Mod entry dict with at least ``"steam_id"`` and ``"name"`` keys. :param index: Index dict returned by :func:`build_server_index`. """ if mod.get("steam_id"): url = index["by_steam_id"].get(mod["steam_id"]) if url: return url return index["by_name"].get(_normalize_name(mod.get("name", ""))) def list_mod_files( folder_url: str, session: requests.Session, ) -> list[tuple[str, str, int]]: """ Recursively list all files under a mod folder on the server. :returns: List of ``(relative_path, absolute_url, size_bytes)`` tuples, where *relative_path* is relative to *folder_url*. """ return _walk(folder_url.rstrip("/") + "/", session, "") def list_mod_updates( folder_url: str, dest_path: Path, session: requests.Session, ) -> list[tuple[str, str, int]]: """ Return only the files that are missing locally or whose local size differs from the server size. Files that exist and match the server size are considered up-to-date and omitted. Use this to detect which files need to be re-downloaded after the server has been updated without changing the modlist structure. :param folder_url: Server folder URL for the mod (e.g. ``https://…/@ace/``). :param dest_path: Local destination directory for this mod. :param session: Authenticated ``requests.Session``. :returns: Subset of :func:`list_mod_files` results — ``(rel_path, url, size)``. """ stale = [] for rel, url, server_size in list_mod_files(folder_url, session): local = dest_path / rel if not local.exists(): stale.append((rel, url, server_size)) elif server_size and local.stat().st_size != server_size: stale.append((rel, url, server_size)) return stale def _walk(url: str, session: requests.Session, prefix: str) -> list[tuple[str, str, int]]: items = _list_dir(url, session) result = [] for item in items: name = item["name"].strip("/") rel = (prefix + "/" + name).lstrip("/") item_url = url.rstrip("/") + "/" + name if item.get("is_dir"): result.extend(_walk(item_url + "/", session, rel)) else: result.append((rel, item_url, item.get("size", 0))) return result def download_file( url: str, dest: Path, session: requests.Session, on_chunk: Callable[[int], None] | None = None, ) -> int: """ Stream-download a single file to *dest*. :param on_chunk: Optional callback ``(bytes_written)`` called after each chunk is flushed to disk. :returns: Total bytes written. """ dest.parent.mkdir(parents=True, exist_ok=True) resp = session.get(url, stream=True, timeout=120) resp.raise_for_status() written = 0 with open(dest, "wb") as fh: for chunk in resp.iter_content(chunk_size=_CHUNK_SIZE): if chunk: fh.write(chunk) written += len(chunk) if on_chunk: on_chunk(len(chunk)) return written def download_mod_folder( folder_url: str, dest_path: Path, session: requests.Session, overwrite: bool = False, on_file: Callable[[str, int, bool], None] | None = None, on_chunk: Callable[[int], None] | None = None, ) -> dict: """ Recursively download all files in a mod folder. :param folder_url: Server folder URL (must be browsable by Caddy). :param dest_path: Local destination directory (created if necessary). :param session: Authenticated ``requests.Session``. :param overwrite: If ``False``, existing files are skipped. :param on_file: ``(rel_path, size_bytes, is_skipped)`` — called before each file, whether it will be downloaded or skipped. :param on_chunk: ``(bytes)`` — called per chunk **only** for files that are actually downloaded (not skipped). :returns: ``{"files_downloaded": n, "files_skipped": n, "bytes_downloaded": n}`` """ files = list_mod_files(folder_url, session) downloaded = skipped = total_bytes = 0 for rel, url, size in files: dest_file = dest_path / rel is_skipped = dest_file.exists() and not overwrite if on_file: on_file(rel, size, is_skipped) if is_skipped: skipped += 1 continue n = download_file(url, dest_file, session, on_chunk=on_chunk) total_bytes += n downloaded += 1 return { "files_downloaded": downloaded, "files_skipped": skipped, "bytes_downloaded": total_bytes, }