diff --git a/arma_modlist_tools/__init__.py b/arma_modlist_tools/__init__.py index 94177ec..fcbebad 100644 --- a/arma_modlist_tools/__init__.py +++ b/arma_modlist_tools/__init__.py @@ -11,6 +11,7 @@ from .linker import ( from .config import load_config, Config from .compat import is_windows, is_linux, get_os_label, fix_console_encoding from .reporter import build_missing_report, save_missing_report +from .cleaner import find_orphan_folders, folder_size __all__ = [ # parser @@ -29,4 +30,6 @@ __all__ = [ "is_windows", "is_linux", "get_os_label", "fix_console_encoding", # reporter "build_missing_report", "save_missing_report", + # cleaner + "find_orphan_folders", "folder_size", ] diff --git a/arma_modlist_tools/cleaner.py b/arma_modlist_tools/cleaner.py new file mode 100644 index 0000000..9809abf --- /dev/null +++ b/arma_modlist_tools/cleaner.py @@ -0,0 +1,80 @@ +""" +arma_modlist_tools.cleaner +~~~~~~~~~~~~~~~~~~~~~~~~~~ +Identify orphaned mod folders in the downloads directory. + +An *orphan* is a downloaded ``@ModName`` folder that is no longer referenced +by any group in ``comparison.json``. This happens when the user swaps out a +modlist preset and re-runs the compare step — mods that were removed from the +preset remain on disk but are no longer tracked. + +Typical usage:: + + from arma_modlist_tools.cleaner import find_orphan_folders + + comparison = json.loads(Path("modlist_json/comparison.json").read_text()) + orphans = find_orphan_folders(Path("downloads"), comparison) + for o in orphans: + print(o["group"], o["name"], o["size"]) +""" +from __future__ import annotations + +from pathlib import Path + +from .fetcher import _normalize_name as _normalize + + +def folder_size(path: Path) -> int: + """Return the total size in bytes of all files under *path* (recursive).""" + return sum(f.stat().st_size for f in path.rglob("*") if f.is_file()) + + +def find_orphan_folders( + downloads: Path, + comparison: dict, +) -> list[dict]: + """Return a list of orphan mod folder entries. + + A folder ``downloads/{group}/@ModName`` is considered an orphan when its + normalised name does not match any mod in *comparison* under the same + group. Groups in ``downloads/`` that do not exist in *comparison* at all + are treated as entirely orphaned. + + :param downloads: Path to the ``downloads/`` directory. + :param comparison: Parsed ``comparison.json`` dict (output of + :func:`~arma_modlist_tools.compare.compare_presets`). + :returns: List of dicts, each with: + + - ``path`` — absolute :class:`~pathlib.Path` of the folder + - ``group`` — group name (e.g. ``"shared"``) + - ``name`` — folder name as it appears on disk (e.g. ``"@ace"``) + - ``size`` — total size in bytes (recursive) + """ + # Build group → set-of-normalised-mod-names from comparison data + known: dict[str, set[str]] = {} + for mod in comparison.get("shared", {}).get("mods", []): + known.setdefault("shared", set()).add(_normalize(mod["name"])) + for preset, pdata in comparison.get("unique", {}).items(): + for mod in pdata.get("mods", []): + known.setdefault(preset, set()).add(_normalize(mod["name"])) + + orphans: list[dict] = [] + if not downloads.is_dir(): + return orphans + + for group_dir in sorted(downloads.iterdir()): + if not group_dir.is_dir(): + continue + group_known = known.get(group_dir.name, set()) # empty → group removed + for mod_dir in sorted(group_dir.iterdir()): + if not mod_dir.is_dir() or not mod_dir.name.startswith("@"): + continue + if _normalize(mod_dir.name) not in group_known: + orphans.append({ + "path": mod_dir, + "group": group_dir.name, + "name": mod_dir.name, + "size": folder_size(mod_dir), + }) + + return orphans diff --git a/clean_orphans.py b/clean_orphans.py new file mode 100644 index 0000000..70f9cfb --- /dev/null +++ b/clean_orphans.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +""" +CLI entry point: find and remove orphaned mod folders from downloads/. + +An orphan is a downloads/{group}/@ModName folder that is no longer referenced +by any group in comparison.json. These accumulate when presets change and +the pipeline is re-run without cleaning up old folders. + +Usage: + python clean_orphans.py # list orphans, ask for confirmation + python clean_orphans.py --dry-run # list orphans, do not delete + python clean_orphans.py --yes # list and delete without prompting +""" +from __future__ import annotations + +import argparse +import json +import os +import shutil +import sys + +from arma_modlist_tools.cleaner import find_orphan_folders +from arma_modlist_tools.compat import fix_console_encoding +from arma_modlist_tools.config import load_config +from arma_modlist_tools.linker import _is_junction, remove_junction + +fix_console_encoding() + +_UNITS = ("B", "KB", "MB", "GB", "TB") + + +def _fmt_size(n: int) -> str: + for unit in _UNITS: + if n < 1024: + return f"{n:.1f} {unit}" + n /= 1024 + return f"{n:.1f} PB" + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Find and remove orphaned mod folders from downloads/." + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="List orphans but do not delete anything.", + ) + parser.add_argument( + "--yes", "-y", + action="store_true", + help="Delete without prompting for confirmation.", + ) + args = parser.parse_args() + + cfg = load_config() + + if not cfg.comparison.exists(): + print(f"ERROR: {cfg.comparison} not found. Run compare_modlists.py first.") + sys.exit(1) + + comparison = json.loads(cfg.comparison.read_text(encoding="utf-8")) + + print(f"\nScanning {cfg.downloads} for orphaned mod folders...\n") + orphans = find_orphan_folders(cfg.downloads, comparison) + + if not orphans: + print(" No orphans found. Your downloads folder is clean.") + print() + return + + total_size = sum(o["size"] for o in orphans) + print(f" {'Group':<28} {'Folder':<32} Size") + print(f" {'-'*28} {'-'*32} {'-'*10}") + for o in orphans: + print(f" {o['group']:<28} {o['name']:<32} {_fmt_size(o['size'])}") + + print() + print(f" {len(orphans)} orphan(s) found — {_fmt_size(total_size)} total") + print() + + if args.dry_run: + print(" --dry-run: nothing deleted.") + print() + return + + if not args.yes: + answer = input(" Delete all orphans? [y/N] ").strip().lower() + if answer not in ("y", "yes"): + print(" Aborted.") + print() + return + + deleted = 0 + freed = 0 + errors = 0 + for o in orphans: + p = o["path"] + try: + if _is_junction(p): + # Safety: never rmtree a junction — use remove_junction() which + # calls os.rmdir() and removes only the pointer, not the target. + ok, err = remove_junction(p) + if not ok: + print(f" ERROR: could not remove junction {p.name}: {err}") + errors += 1 + continue + else: + shutil.rmtree(p) + deleted += 1 + freed += o["size"] + print(f" Deleted: {o['group']}/{o['name']}") + except Exception as e: + print(f" ERROR: {p.name}: {e}") + errors += 1 + + print() + print(f" Done: {deleted} deleted, freed {_fmt_size(freed)}" + + (f", {errors} error(s)" if errors else "")) + print() + + if errors: + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/gui/locales.py b/gui/locales.py index fb083ce..6c873d2 100644 --- a/gui/locales.py +++ b/gui/locales.py @@ -224,6 +224,36 @@ _EN: dict[str, str] = { "tools.rm_btn": "Generate Report", "tools.rm_last": "Last generated: {ts}", "tools.rm_none": "No report yet.", + + # ── Tools — Clean Orphans ──────────────────────────────────────────────── + "tools.oc_desc": ( + "Scan the downloads folder for mod folders that are no longer " + "referenced in comparison.json. These orphans accumulate when you " + "remove mods from your presets and re-run the pipeline. " + "Select the ones you want to remove to free up disk space." + ), + "tools.oc_warn": ( + "⚠ Deleting orphans permanently removes mod files from disk. " + "This cannot be undone." + ), + "tools.oc_scan_btn": "Scan for Orphans", + "tools.oc_scanning": "Scanning…", + "tools.oc_no_config": "No config found. Complete Setup first.", + "tools.oc_no_comparison": "No comparison.json found — run the pipeline first.", + "tools.oc_none_found": "No orphans found. Your downloads folder is clean.", + "tools.oc_found": "{count} orphan(s) found — {size} total", + "tools.oc_sel_all": "Select All", + "tools.oc_sel_none": "Deselect All", + "tools.oc_delete_btn": "Delete Selected", + "tools.oc_confirm_title": "Confirm Delete", + "tools.oc_confirm_body": ( + "Permanently delete {count} orphan folder(s) ({size})?\n\n" + "This cannot be undone." + ), + "tools.oc_done": "Deleted {count} folder(s), freed {size}.", + "tools.oc_error": "Error deleting {path}: {e}", + "tools.oc_error_title": "Delete errors", + "tools.oc_scan_error": "Scan error: {e}", } _VI: dict[str, str] = { @@ -431,6 +461,36 @@ _VI: dict[str, str] = { "tools.rm_btn": "Tạo báo cáo", "tools.rm_last": "Tạo lần cuối: {ts}", "tools.rm_none": "Chưa có báo cáo.", + + # ── Tools — Clean Orphans ──────────────────────────────────────────────── + "tools.oc_desc": ( + "Quét thư mục downloads để tìm các thư mục mod không còn được " + "tham chiếu trong comparison.json. Các mod mồ côi này tích tụ khi " + "bạn xóa mod khỏi preset và chạy lại pipeline. " + "Chọn các thư mục muốn xóa để giải phóng dung lượng." + ), + "tools.oc_warn": ( + "⚠ Xóa mod mồ côi sẽ xóa vĩnh viễn tệp mod khỏi ổ đĩa. " + "Thao tác này không thể hoàn tác." + ), + "tools.oc_scan_btn": "Quét mod mồ côi", + "tools.oc_scanning": "Đang quét…", + "tools.oc_no_config": "Chưa tìm thấy cấu hình. Vui lòng hoàn thành thiết lập.", + "tools.oc_no_comparison": "Chưa có comparison.json — hãy chạy pipeline trước.", + "tools.oc_none_found": "Không tìm thấy mod mồ côi. Thư mục downloads sạch.", + "tools.oc_found": "Tìm thấy {count} mod mồ côi — tổng {size}", + "tools.oc_sel_all": "Chọn tất cả", + "tools.oc_sel_none": "Bỏ chọn", + "tools.oc_delete_btn": "Xóa đã chọn", + "tools.oc_confirm_title": "Xác nhận xóa", + "tools.oc_confirm_body": ( + "Xóa vĩnh viễn {count} thư mục mồ côi ({size})?\n\n" + "Thao tác này không thể hoàn tác." + ), + "tools.oc_done": "Đã xóa {count} thư mục, giải phóng {size}.", + "tools.oc_error": "Lỗi khi xóa {path}: {e}", + "tools.oc_error_title": "Lỗi xóa", + "tools.oc_scan_error": "Lỗi quét: {e}", } # Guard: both dicts must have identical key sets diff --git a/gui/views/tools.py b/gui/views/tools.py index 9446751..57d0f73 100644 --- a/gui/views/tools.py +++ b/gui/views/tools.py @@ -1,11 +1,15 @@ from __future__ import annotations import json +import shutil +import threading from tkinter import messagebox -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING import customtkinter as ctk +from arma_modlist_tools.cleaner import find_orphan_folders +from arma_modlist_tools.linker import _is_junction, remove_junction from gui._constants import COLOR_WARN, PROJECT_ROOT from gui.locales import t from gui.views.base import BaseView @@ -41,6 +45,7 @@ class ToolsView(BaseView): self._build_link_mods_tab() self._build_sync_missing_tab() self._build_report_missing_tab() + self._build_clean_orphans_tab() # ========================================================================= # Public @@ -378,6 +383,208 @@ class ToolsView(BaseView): pass self._rm_info.configure(text=t("tools.rm_none")) + # ------------------------------------------------------------------------- + + def _build_clean_orphans_tab(self) -> None: + self._tab_view.add("Clean Orphans") + tab = self._tab_view.tab("Clean Orphans") + tab.grid_columnconfigure(0, weight=1) + tab.grid_rowconfigure(3, weight=1) + + desc_lbl = _desc(tab, row=0, text=t("tools.oc_desc")) + self._translatable.append((desc_lbl, "tools.oc_desc")) + + oc_warn = ctk.CTkLabel(tab, text=t("tools.oc_warn"), + text_color=_WARN_COLOR, anchor="w") + oc_warn.grid(row=1, column=0, padx=24, pady=(0, 4), sticky="w") + self._translatable.append((oc_warn, "tools.oc_warn")) + + self._oc_status = ctk.CTkLabel(tab, text="", text_color="gray", anchor="w") + self._oc_status.grid(row=2, column=0, padx=24, pady=(0, 2), sticky="w") + + # Scrollable list for results + self._oc_scroll = ctk.CTkScrollableFrame(tab) + self._oc_scroll.grid(row=3, column=0, sticky="nsew", padx=16, pady=(0, 4)) + self._oc_scroll.grid_columnconfigure(0, weight=1) + + # Bottom action bar + bot = ctk.CTkFrame(tab, fg_color="transparent") + bot.grid(row=4, column=0, sticky="ew", padx=16, pady=(4, 12)) + + self._oc_sel_all_btn = ctk.CTkButton( + bot, text=t("tools.oc_sel_all"), width=110, + command=self._oc_select_all, + ) + self._oc_sel_all_btn.pack(side="left", padx=(0, 4)) + self._translatable.append((self._oc_sel_all_btn, "tools.oc_sel_all")) + + self._oc_sel_none_btn = ctk.CTkButton( + bot, text=t("tools.oc_sel_none"), width=110, + command=self._oc_deselect_all, + ) + self._oc_sel_none_btn.pack(side="left", padx=4) + self._translatable.append((self._oc_sel_none_btn, "tools.oc_sel_none")) + + self._oc_scan_btn = ctk.CTkButton( + bot, text=t("tools.oc_scan_btn"), width=150, + command=self._oc_scan, + ) + self._oc_scan_btn.pack(side="right", padx=(4, 0)) + self._translatable.append((self._oc_scan_btn, "tools.oc_scan_btn")) + + self._oc_delete_btn = ctk.CTkButton( + bot, text=t("tools.oc_delete_btn"), width=150, + fg_color="darkred", hover_color="#8b0000", + command=self._oc_delete_selected, + state="disabled", + ) + self._oc_delete_btn.pack(side="right", padx=4) + self._translatable.append((self._oc_delete_btn, "tools.oc_delete_btn")) + + # Internal scan state + self._oc_orphans: list[dict] = [] + self._oc_check_vars: list[ctk.BooleanVar] = [] + self._oc_pending_done_msg: str | None = None + + def _oc_scan(self) -> None: + cfg = self.app.cfg + if not cfg: + self._oc_status.configure(text=t("tools.oc_no_config"), text_color="gray") + return + if not cfg.comparison.exists(): + self._oc_status.configure(text=t("tools.oc_no_comparison"), text_color="gray") + return + + self._oc_scan_btn.configure(state="disabled", text=t("tools.oc_scanning")) + self._oc_delete_btn.configure(state="disabled") + self._oc_status.configure(text=t("tools.oc_scanning"), text_color="gray") + + def _run() -> None: + try: + comparison = json.loads(cfg.comparison.read_text(encoding="utf-8")) + orphans = find_orphan_folders(cfg.downloads, comparison) + except Exception as e: + self.after(0, lambda: self._oc_scan_done(None, str(e))) + return + self.after(0, lambda: self._oc_scan_done(orphans, None)) + + threading.Thread(target=_run, daemon=True).start() + + def _oc_scan_done(self, orphans: list[dict] | None, error: str | None) -> None: + self._oc_scan_btn.configure(state="normal", text=t("tools.oc_scan_btn")) + + # Consume any pending success message from a previous delete operation + done_msg = self._oc_pending_done_msg + self._oc_pending_done_msg = None + + # Clear previous results + for w in self._oc_scroll.winfo_children(): + w.destroy() + self._oc_orphans = [] + self._oc_check_vars = [] + + if error: + self._oc_status.configure(text=t("tools.oc_scan_error", e=error), text_color="red") + return + + if not orphans: + msg = done_msg or t("tools.oc_none_found") + self._oc_status.configure(text=msg, text_color="gray") + return + + total_size = sum(o["size"] for o in orphans) + self._oc_status.configure( + text=t("tools.oc_found", count=len(orphans), size=_fmt_size(total_size)), + text_color="gray", + ) + self._oc_orphans = orphans + self._oc_delete_btn.configure(state="normal") + + for i, orphan in enumerate(orphans): + var = ctk.BooleanVar(value=True) + self._oc_check_vars.append(var) + bg = ("gray90", "gray17") if i % 2 == 0 else ("gray86", "gray14") + row = ctk.CTkFrame(self._oc_scroll, fg_color=bg, corner_radius=4) + row.pack(fill="x", pady=1) + row.columnconfigure(1, weight=1) + + ctk.CTkCheckBox(row, text="", variable=var, width=24).grid( + row=0, column=0, padx=(8, 4), pady=4, + ) + ctk.CTkLabel( + row, + text=f" {orphan['group']} / {orphan['name']}", + anchor="w", + ).grid(row=0, column=1, sticky="ew", padx=4) + ctk.CTkLabel( + row, + text=_fmt_size(orphan["size"]), + text_color="gray", + width=80, + anchor="e", + ).grid(row=0, column=2, padx=(4, 12)) + + def _oc_select_all(self) -> None: + for var in self._oc_check_vars: + var.set(True) + + def _oc_deselect_all(self) -> None: + for var in self._oc_check_vars: + var.set(False) + + def _oc_delete_selected(self) -> None: + selected = [ + self._oc_orphans[i] + for i, var in enumerate(self._oc_check_vars) + if var.get() + ] + if not selected: + return + total_size = sum(o["size"] for o in selected) + confirmed = messagebox.askyesno( + t("tools.oc_confirm_title"), + t("tools.oc_confirm_body", count=len(selected), size=_fmt_size(total_size)), + ) + if not confirmed: + return + + self._oc_delete_btn.configure(state="disabled") + self._oc_scan_btn.configure(state="disabled") + + def _run() -> None: + freed = 0 + errors = [] + for orphan in selected: + try: + p = orphan["path"] + if _is_junction(p): + # Safety: never rmtree a junction — it follows the + # reparse point and deletes the target's contents. + # Use remove_junction() which calls os.rmdir() instead. + ok, err = remove_junction(p) + if not ok: + errors.append(t("tools.oc_error", path=p.name, e=err)) + continue + else: + shutil.rmtree(p) + freed += orphan["size"] + except Exception as e: + errors.append(t("tools.oc_error", path=orphan["path"].name, e=e)) + self.after(0, lambda: self._oc_delete_done(len(selected), freed, errors)) + + threading.Thread(target=_run, daemon=True).start() + + def _oc_delete_done(self, count: int, freed: int, errors: list[str]) -> None: + # Store success message so _oc_scan_done() can display it after the rescan + self._oc_pending_done_msg = ( + None if errors + else t("tools.oc_done", count=count, size=_fmt_size(freed)) + ) + self._oc_scan_btn.configure(state="normal") + self._oc_scan() + if errors: + messagebox.showerror(t("tools.oc_error_title"), "\n".join(errors)) + # ========================================================================= # Private — helpers # ========================================================================= @@ -400,6 +607,21 @@ class ToolsView(BaseView): self.app.run_tool(args) +# --------------------------------------------------------------------------- +# Size formatting helper +# --------------------------------------------------------------------------- + +def _fmt_size(n: int) -> str: + """Human-readable file size string.""" + if n < 1024: + return f"{n} B" + if n < 1024 ** 2: + return f"{n / 1024:.1f} KB" + if n < 1024 ** 3: + return f"{n / 1024 ** 2:.1f} MB" + return f"{n / 1024 ** 3:.2f} GB" + + # --------------------------------------------------------------------------- # Layout helpers # --------------------------------------------------------------------------- diff --git a/test_suite.py b/test_suite.py index d747e62..dedea7c 100644 --- a/test_suite.py +++ b/test_suite.py @@ -1044,6 +1044,7 @@ _EXPECTED_EXPORTS = [ "load_config", "Config", "is_windows", "is_linux", "get_os_label", "fix_console_encoding", "build_missing_report", "save_missing_report", + "find_orphan_folders", "folder_size", ] @@ -1326,20 +1327,33 @@ def _test_end_to_end_offline(): def _test_comparison_json_consistent_with_html(): - """The real comparison.json on disk must match a fresh parse+compare.""" + """The on-disk comparison.json must be internally consistent with the HTML files. + + The pipeline lets users compare a *subset* of available presets, so we only + verify that every preset listed in comparison.json has a matching HTML file — + not that all HTML files were included. + """ html_dir = Path(__file__).parent / "modlist_html" json_file = Path(__file__).parent / "modlist_json" / "comparison.json" if not json_file.exists(): raise _SkipTest("comparison.json not found (run pipeline first)") - presets = parse_modlist_dir(html_dir) - fresh = compare_presets(*presets) + available_presets = {p.stem for p in html_dir.glob("*.html")} on_disk = json.loads(json_file.read_text(encoding="utf-8")) - assert_eq( - sorted(fresh["compared_presets"]), - sorted(on_disk["compared_presets"]), - ) + # Every preset referenced in comparison.json must have a source HTML file + for pname in on_disk["compared_presets"]: + assert pname in available_presets, ( + f"comparison.json references '{pname}' but no matching HTML file found" + ) + + # Re-compare only the presets that were actually used on disk + selected = [p for p in parse_modlist_dir(html_dir) + if p["preset_name"] in on_disk["compared_presets"]] + if len(selected) < 2: + raise _SkipTest("fewer than 2 matching HTML presets available") + + fresh = compare_presets(*selected) assert_eq(fresh["shared"]["mod_count"], on_disk["shared"]["mod_count"]) for pname in fresh["compared_presets"]: assert_eq( @@ -1464,6 +1478,744 @@ def _test_qw_osc_st_terminator(): test("OSC ST-terminated sequence stripped", _test_qw_osc_st_terminator) +# --------------------------------------------------------------------------- +# 12. cleaner — find_orphan_folders +# --------------------------------------------------------------------------- + +group("cleaner — find_orphan_folders") + +from arma_modlist_tools.cleaner import find_orphan_folders, folder_size + + +def _mk_mod_dir(root: Path, group: str, name: str, files: list[str] | None = None) -> Path: + """Create a mock mod folder under root/group/@name with optional dummy files.""" + mod_dir = root / group / f"@{name}" + mod_dir.mkdir(parents=True, exist_ok=True) + for fname in (files or []): + f = mod_dir / fname + f.write_bytes(b"x" * 1024) + return mod_dir + + +_COMPARISON_BASE = { + "compared_presets": ["A", "B"], + "shared": {"mod_count": 1, "mods": [{"name": "CBA_A3", "steam_id": "1", "url": None, "source": "steam"}]}, + "unique": { + "A": {"mod_count": 1, "mods": [{"name": "ACE3", "steam_id": "2", "url": None, "source": "steam"}]}, + "B": {"mod_count": 0, "mods": []}, + }, +} + + +def _test_orphan_empty_downloads(): + with tempfile.TemporaryDirectory() as d: + result = find_orphan_folders(Path(d) / "nonexistent", _COMPARISON_BASE) + assert result == [] + + +def _test_orphan_none_when_all_match(): + with tempfile.TemporaryDirectory() as d: + d = Path(d) + _mk_mod_dir(d, "shared", "CBA_A3") + _mk_mod_dir(d, "A", "ACE3") + result = find_orphan_folders(d, _COMPARISON_BASE) + assert result == [], f"Expected no orphans, got {result}" + + +def _test_orphan_detects_removed_mod(): + with tempfile.TemporaryDirectory() as d: + d = Path(d) + _mk_mod_dir(d, "shared", "CBA_A3") + _mk_mod_dir(d, "shared", "OldMod") # not in comparison + result = find_orphan_folders(d, _COMPARISON_BASE) + assert len(result) == 1 + assert_eq(result[0]["name"], "@OldMod") + assert_eq(result[0]["group"], "shared") + + +def _test_orphan_detects_removed_group(): + with tempfile.TemporaryDirectory() as d: + d = Path(d) + _mk_mod_dir(d, "shared", "CBA_A3") + _mk_mod_dir(d, "OldPreset", "SomeMod") # group no longer in comparison + result = find_orphan_folders(d, _COMPARISON_BASE) + assert len(result) == 1 + assert_eq(result[0]["group"], "OldPreset") + + +def _test_orphan_normalised_name_matches(): + """A folder named @CBA A3 should match mod named CBA_A3 (normalised).""" + with tempfile.TemporaryDirectory() as d: + d = Path(d) + # Create folder with spaces — normalises to "cbaa3" same as "CBA_A3" + mod_dir = d / "shared" / "@CBA A3" + mod_dir.mkdir(parents=True) + result = find_orphan_folders(d, _COMPARISON_BASE) + assert result == [], f"Normalised name should match, got {result}" + + +def _test_orphan_size_reported(): + with tempfile.TemporaryDirectory() as d: + d = Path(d) + mod_dir = _mk_mod_dir(d, "shared", "OldMod", files=["a.pbo", "b.pbo"]) + result = find_orphan_folders(d, _COMPARISON_BASE) + assert len(result) == 1 + assert result[0]["size"] == 2048 # 2 × 1024 bytes + + +def _test_orphan_ignores_non_at_folders(): + """Only @-prefixed directories are considered mod folders.""" + with tempfile.TemporaryDirectory() as d: + d = Path(d) + non_mod = d / "shared" / "keys" + non_mod.mkdir(parents=True) + _mk_mod_dir(d, "shared", "CBA_A3") + result = find_orphan_folders(d, _COMPARISON_BASE) + assert result == [] + + +def _test_folder_size_recursive(): + with tempfile.TemporaryDirectory() as d: + d = Path(d) + (d / "a.pbo").write_bytes(b"x" * 512) + sub = d / "sub" + sub.mkdir() + (sub / "b.pbo").write_bytes(b"x" * 512) + assert_eq(folder_size(d), 1024) + + +test("find_orphan_folders: empty downloads dir returns []", _test_orphan_empty_downloads) +test("find_orphan_folders: no orphans when all mods match", _test_orphan_none_when_all_match) +test("find_orphan_folders: detects mod removed from comparison", _test_orphan_detects_removed_mod) +test("find_orphan_folders: entire removed group flagged as orphan", _test_orphan_detects_removed_group) +test("find_orphan_folders: normalised name matches (spaces vs underscores)", _test_orphan_normalised_name_matches) +test("find_orphan_folders: orphan size summed correctly", _test_orphan_size_reported) +test("find_orphan_folders: non-@ folders ignored", _test_orphan_ignores_non_at_folders) +test("folder_size: sums files recursively", _test_folder_size_recursive) + + +# --------------------------------------------------------------------------- +# 13. E2E — clean_orphans.py CLI (subprocess) +# --------------------------------------------------------------------------- + +group("e2e — clean_orphans.py CLI") + +import subprocess as _subprocess + + +def _make_e2e_root(base: Path, comparison: dict) -> Path: + """Create a self-contained project root with config.json + comparison.json + and a downloads/ directory. Returns the root path.""" + root = base / "project" + root.mkdir() + dl = root / "downloads" + dl.mkdir() + json_dir = root / "modlist_json" + json_dir.mkdir() + arma = root / "arma3server" + arma.mkdir() + + cfg = { + "server": {"base_url": "https://example.com/", "username": "u", "password": "p"}, + "paths": { + "arma_dir": str(arma), + "downloads": str(dl), + "modlist_html": str(root / "modlist_html"), + "modlist_json": str(json_dir), + }, + } + (root / "config.json").write_text(json.dumps(cfg), encoding="utf-8") + (json_dir / "comparison.json").write_text( + json.dumps(comparison), encoding="utf-8" + ) + return root + + +def _run_clean_orphans(root: Path, *extra_args: str) -> _subprocess.CompletedProcess: + """Run clean_orphans.py from the given project root.""" + script = str(Path(__file__).parent / "clean_orphans.py") + return _subprocess.run( + [sys.executable, script] + list(extra_args), + cwd=str(root), + capture_output=True, + text=True, + ) + + +_E2E_COMPARISON = { + "compared_presets": ["A", "B"], + "shared": {"mod_count": 1, "mods": [{"name": "CBA_A3", "steam_id": "1", "url": None, "source": "steam"}]}, + "unique": { + "A": {"mod_count": 1, "mods": [{"name": "ACE3", "steam_id": "2", "url": None, "source": "steam"}]}, + "B": {"mod_count": 0, "mods": []}, + }, +} + + +def _test_e2e_dry_run_lists_orphans(): + with tempfile.TemporaryDirectory() as d: + root = _make_e2e_root(Path(d), _E2E_COMPARISON) + dl = root / "downloads" + (dl / "shared" / "@CBA_A3").mkdir(parents=True) # known + orphan = dl / "shared" / "@OldMod" + orphan.mkdir(parents=True) + (orphan / "file.pbo").write_bytes(b"x" * 512) + + result = _run_clean_orphans(root, "--dry-run") + + assert result.returncode == 0, f"Expected 0, got {result.returncode}\n{result.stderr}" + assert "@OldMod" in result.stdout, "Orphan not listed in dry-run output" + assert orphan.exists(), "--dry-run must not delete files" + + +def _test_e2e_no_orphans_clean_exit(): + with tempfile.TemporaryDirectory() as d: + root = _make_e2e_root(Path(d), _E2E_COMPARISON) + dl = root / "downloads" + (dl / "shared" / "@CBA_A3").mkdir(parents=True) + (dl / "A" / "@ACE3").mkdir(parents=True) + + result = _run_clean_orphans(root, "--dry-run") + + assert result.returncode == 0 + assert "No orphans" in result.stdout + + +def _test_e2e_yes_deletes_orphans(): + with tempfile.TemporaryDirectory() as d: + root = _make_e2e_root(Path(d), _E2E_COMPARISON) + dl = root / "downloads" + (dl / "shared" / "@CBA_A3").mkdir(parents=True) + orphan = dl / "shared" / "@OldMod" + orphan.mkdir(parents=True) + (orphan / "file.pbo").write_bytes(b"data") + + result = _run_clean_orphans(root, "--yes") + + assert result.returncode == 0, f"Expected 0\n{result.stderr}" + assert not orphan.exists(), "Orphan should have been deleted" + assert "Deleted" in result.stdout + + +def _test_e2e_yes_preserves_known_mods(): + with tempfile.TemporaryDirectory() as d: + root = _make_e2e_root(Path(d), _E2E_COMPARISON) + dl = root / "downloads" + known = dl / "shared" / "@CBA_A3" + known.mkdir(parents=True) + (known / "cba.pbo").write_bytes(b"keep me") + orphan = dl / "shared" / "@GoneMod" + orphan.mkdir(parents=True) + + result = _run_clean_orphans(root, "--yes") + + assert result.returncode == 0 + assert known.exists(), "Known mod must NOT be deleted" + assert not orphan.exists(), "Orphan must be deleted" + + +def _test_e2e_missing_comparison_exits_1(): + with tempfile.TemporaryDirectory() as d: + root = _make_e2e_root(Path(d), _E2E_COMPARISON) + # Remove the comparison.json so the script can't find it + (root / "modlist_json" / "comparison.json").unlink() + + result = _run_clean_orphans(root, "--dry-run") + + assert result.returncode == 1, "Should exit 1 when comparison.json missing" + assert "ERROR" in result.stdout + + +def _test_e2e_removed_group_flagged(): + """A group folder that no longer exists in comparison.json is fully orphaned.""" + with tempfile.TemporaryDirectory() as d: + root = _make_e2e_root(Path(d), _E2E_COMPARISON) + dl = root / "downloads" + old_group = dl / "OldPreset" / "@SomeMod" + old_group.mkdir(parents=True) + + result = _run_clean_orphans(root, "--dry-run") + + assert result.returncode == 0 + assert "@SomeMod" in result.stdout + + +test("e2e dry-run: lists orphans, does not delete", _test_e2e_dry_run_lists_orphans) +test("e2e dry-run: exits 0 when downloads clean", _test_e2e_no_orphans_clean_exit) +test("e2e --yes: deletes orphans", _test_e2e_yes_deletes_orphans) +test("e2e --yes: preserves known mods", _test_e2e_yes_preserves_known_mods) +test("e2e missing comparison.json: exits 1", _test_e2e_missing_comparison_exits_1) +test("e2e removed group: all mods flagged as orphans", _test_e2e_removed_group_flagged) + + +# --------------------------------------------------------------------------- +# 14. Coverage gap tests +# --------------------------------------------------------------------------- + +group("coverage gaps — cleaner, config, parser, linker, compat") + +import os as _os +from unittest.mock import patch as _patch, MagicMock as _MagicMock +from arma_modlist_tools.config import load_config as _load_config +from arma_modlist_tools import compat as _compat_mod +from arma_modlist_tools import linker as _linker_mod +from arma_modlist_tools.parser import _source_from_class, parse_modlist_html + + +# ── cleaner.py ─────────────────────────────────────────────────────────────── + +def _test_cleaner_skips_file_in_downloads_root(): + """A plain file at downloads/{file} (not a dir) is silently skipped.""" + with tempfile.TemporaryDirectory() as d: + d = Path(d) + (d / "shared").mkdir() + (d / "shared" / "@CBA_A3").mkdir() + (d / "readme.txt").write_text("not a group dir") # triggers the skip branch + result = find_orphan_folders(d, _COMPARISON_BASE) + assert result == [] + + +test("cleaner: plain file in downloads root is skipped (not treated as group)", _test_cleaner_skips_file_in_downloads_root) + + +# ── config.py ──────────────────────────────────────────────────────────────── + +def _test_load_config_fallback_to_root_path(): + """load_config() falls back to root_path (project root) when CWD has no config.json.""" + old_cwd = _os.getcwd() + with tempfile.TemporaryDirectory() as d: + _os.chdir(d) # CWD has no config.json + try: + cfg = _load_config() # must find via root_path (project root has config.json) + finally: + _os.chdir(old_cwd) # restore BEFORE tempdir cleanup to avoid WinError 32 + assert cfg is not None + + +def _test_load_config_raises_when_not_found(): + """load_config() raises FileNotFoundError when neither search path has config.json.""" + old_cwd = _os.getcwd() + with tempfile.TemporaryDirectory() as d: + _os.chdir(d) # no config.json in CWD + try: + # Patch __file__ inside config.py so root_path also points somewhere without config.json + with _patch("arma_modlist_tools.config.__file__", + str(Path(d) / "fake" / "arma_modlist_tools" / "config.py")): + try: + _load_config() + assert False, "Should have raised FileNotFoundError" + except FileNotFoundError: + pass + finally: + _os.chdir(old_cwd) + + +test("config: load_config() falls back to root_path when CWD has no config.json", _test_load_config_fallback_to_root_path) +test("config: load_config() raises FileNotFoundError when config.json not found anywhere", _test_load_config_raises_when_not_found) + + +# ── parser.py ──────────────────────────────────────────────────────────────── + +def _test_source_from_class_unknown(): + """_source_from_class returns 'unknown' for unrecognised CSS class strings.""" + assert_eq(_source_from_class("from-workshop"), "unknown") + assert_eq(_source_from_class(""), "unknown") + assert_eq(_source_from_class("some-other-class"), "unknown") + + +def _test_parse_modlist_html_skips_rows_without_name(): + """Rows with no DisplayName td return None and are excluded from results. + Also exercises the `continue` branch for non-ModContainer elements. + """ + # Row 1: valid mod + # Row 2: no DisplayName td at all → parse_mod_entry returns None → skipped + # Row 3: regular header row (no data-type="ModContainer") → parser skips via continue + html = textwrap.dedent("""\ + + + + + + + + + + + + + + +
Header row (no ModContainer attr)
Real Mod + +
no display name here
+ + + """) + with tempfile.NamedTemporaryFile(mode="w", suffix=".html", delete=False, + encoding="utf-8") as f: + f.write(html) + fname = f.name + try: + result = parse_modlist_html(fname) + assert_eq(result["mod_count"], 1) + assert_eq(result["mods"][0]["name"], "Real Mod") + finally: + Path(fname).unlink(missing_ok=True) + + +test("parser: _source_from_class returns 'unknown' for unrecognised class", _test_source_from_class_unknown) +test("parser: rows with empty DisplayName are skipped", _test_parse_modlist_html_skips_rows_without_name) + + +# ── linker.py ──────────────────────────────────────────────────────────────── + +def _test_remove_junction_oserror(): + """remove_junction returns (False, message) when OSError occurs.""" + with tempfile.TemporaryDirectory() as d: + non_existent = Path(d) / "ghost_link" + ok, err = _linker_mod.remove_junction(non_existent) + assert not ok + assert err # error message is non-empty + + +def _test_link_group_records_failed_when_create_returns_false(): + """link_group counts a failure when create_junction returns False.""" + with tempfile.TemporaryDirectory() as d: + d = Path(d) + group_dir = d / "shared" + arma_dir = d / "arma" + group_dir.mkdir() + arma_dir.mkdir() + (group_dir / "@ace").mkdir() + + with _patch("arma_modlist_tools.linker.create_junction", return_value=False): + result = _linker_mod.link_group(group_dir, arma_dir) + + assert_eq(result["failed"], 1) + assert "ace" in " ".join(result["errors"].keys()).lower() + + +def _test_unlink_group_records_failed_when_remove_errors(): + """unlink_group counts a failure when remove_junction returns an error.""" + with tempfile.TemporaryDirectory() as d: + d = Path(d) + group_dir = d / "shared" + arma_dir = d / "arma" + group_dir.mkdir() + arma_dir.mkdir() + (group_dir / "@ace").mkdir() + + # Pretend it's already linked so unlink_group tries to remove it + with _patch("arma_modlist_tools.linker.get_link_status") as mock_status, \ + _patch("arma_modlist_tools.linker.remove_junction", return_value=(False, "perm denied")): + mock_status.return_value = [{ + "name": "@ace", + "source_path": group_dir / "@ace", + "link_path": arma_dir / "@ace", + "is_linked": True, + }] + result = _linker_mod.unlink_group(group_dir, arma_dir) + + assert_eq(result["failed"], 1) + assert result["errors"] + + +def _test_is_junction_linux_path(): + """_is_junction uses os.path.islink on Linux.""" + with _patch("arma_modlist_tools.linker.is_windows", return_value=False), \ + _patch("os.path.islink", return_value=True) as mock_islink: + result = _linker_mod._is_junction(Path("/fake/path")) + assert result is True + mock_islink.assert_called_once() + + +def _test_create_junction_linux_success(): + """create_junction calls os.symlink on Linux (mocked — Windows lacks symlink perms).""" + with tempfile.TemporaryDirectory() as d: + link_path = Path(d) / "link" + target = Path(d) / "target" + with _patch("arma_modlist_tools.linker.is_windows", return_value=False), \ + _patch("os.symlink") as mock_sym: + ok = _linker_mod.create_junction(link_path, target) + assert ok + mock_sym.assert_called_once_with(str(target), str(link_path)) + + +def _test_create_junction_linux_oserror(): + """create_junction returns False if os.symlink raises OSError on Linux.""" + with tempfile.TemporaryDirectory() as d: + link_path = Path(d) / "link" + with _patch("arma_modlist_tools.linker.is_windows", return_value=False), \ + _patch("os.symlink", side_effect=OSError("perm")): + ok = _linker_mod.create_junction(link_path, Path(d) / "target") + assert not ok + + +def _test_remove_junction_linux(): + """remove_junction calls os.unlink on Linux (mocked — Windows lacks symlink perms).""" + with tempfile.TemporaryDirectory() as d: + link = Path(d) / "link" + with _patch("arma_modlist_tools.linker.is_windows", return_value=False), \ + _patch("os.unlink") as mock_unlink: + ok, err = _linker_mod.remove_junction(link) + assert ok + assert err == "" + mock_unlink.assert_called_once_with(str(link)) + + +test("linker: remove_junction returns (False, msg) on OSError", _test_remove_junction_oserror) +test("linker: link_group records failure when create_junction -> False", _test_link_group_records_failed_when_create_returns_false) +test("linker: unlink_group records failure when remove_junction errors", _test_unlink_group_records_failed_when_remove_errors) +test("linker: _is_junction uses os.path.islink on Linux", _test_is_junction_linux_path) +test("linker: create_junction calls os.symlink on Linux (success)", _test_create_junction_linux_success) +test("linker: create_junction returns False if os.symlink raises", _test_create_junction_linux_oserror) +test("linker: remove_junction calls os.unlink on Linux", _test_remove_junction_linux) + + +# ── compat.py ──────────────────────────────────────────────────────────────── + +def _test_get_os_label_windows_server(): + """get_os_label returns 'Windows Server' when version string contains 'Server'.""" + with _patch("arma_modlist_tools.compat.is_windows", return_value=True), \ + _patch("platform.version", return_value="10.0.17763 Windows Server 2019"): + label = _compat_mod.get_os_label() + assert_eq(label, "Windows Server") + + +def _test_get_os_label_linux_ubuntu_desktop(): + with _patch("arma_modlist_tools.compat.is_windows", return_value=False), \ + _patch("arma_modlist_tools.compat.is_linux", return_value=True), \ + _patch("arma_modlist_tools.compat._read_os_release", + return_value={"NAME": "Ubuntu"}), \ + _patch("arma_modlist_tools.compat._is_headless", return_value=False): + label = _compat_mod.get_os_label() + assert_eq(label, "Ubuntu") + + +def _test_get_os_label_linux_ubuntu_server(): + with _patch("arma_modlist_tools.compat.is_windows", return_value=False), \ + _patch("arma_modlist_tools.compat.is_linux", return_value=True), \ + _patch("arma_modlist_tools.compat._read_os_release", + return_value={"NAME": "Ubuntu"}), \ + _patch("arma_modlist_tools.compat._is_headless", return_value=True): + label = _compat_mod.get_os_label() + assert_eq(label, "Ubuntu Server") + + +def _test_get_os_label_linux_other(): + with _patch("arma_modlist_tools.compat.is_windows", return_value=False), \ + _patch("arma_modlist_tools.compat.is_linux", return_value=True), \ + _patch("arma_modlist_tools.compat._read_os_release", + return_value={"NAME": "Debian GNU/Linux"}): + label = _compat_mod.get_os_label() + assert_eq(label, "Linux") + + +def _test_get_os_label_unknown_platform(): + with _patch("arma_modlist_tools.compat.is_windows", return_value=False), \ + _patch("arma_modlist_tools.compat.is_linux", return_value=False): + label = _compat_mod.get_os_label() + assert_eq(label, "Unknown") + + +def _test_read_os_release_parses_file(): + content = 'NAME="Ubuntu"\nVERSION_ID="22.04"\n# comment\nID=ubuntu\n' + import io as _io + with _patch("builtins.open", return_value=_io.StringIO(content)): + result = _compat_mod._read_os_release() + assert_eq(result.get("NAME"), "Ubuntu") + assert_eq(result.get("VERSION_ID"), "22.04") + + +def _test_read_os_release_handles_missing_file(): + with _patch("builtins.open", side_effect=OSError("no such file")): + result = _compat_mod._read_os_release() + assert_eq(result, {}) + + +def _test_is_headless_with_display(): + with _patch.dict("os.environ", {"DISPLAY": ":0"}, clear=False): + assert not _compat_mod._is_headless() + + +def _test_is_headless_without_display(): + with _patch.dict("os.environ", {}, clear=True): + assert _compat_mod._is_headless() + + +def _test_fix_console_encoding_non_windows_noop(): + """fix_console_encoding is a no-op on non-Windows.""" + import io as _io + original_stdout = sys.stdout + with _patch("arma_modlist_tools.compat.is_windows", return_value=False): + _compat_mod.fix_console_encoding() + assert sys.stdout is original_stdout + + +def _test_fix_console_encoding_already_utf8(): + """fix_console_encoding skips wrapping when stdout is already UTF-8.""" + # encoding is a readonly attribute on real TextIOWrapper, so use a fake stdout + fake_stdout = _MagicMock() + fake_stdout.encoding = "utf-8" + original_stdout = sys.stdout + try: + with _patch("arma_modlist_tools.compat.is_windows", return_value=True): + sys.stdout = fake_stdout + _compat_mod.fix_console_encoding() + assert sys.stdout is fake_stdout # still the same object (not wrapped) + finally: + sys.stdout = original_stdout + + +test("compat: get_os_label returns 'Windows Server' on Windows Server", _test_get_os_label_windows_server) +test("compat: get_os_label returns 'Ubuntu' on Ubuntu desktop", _test_get_os_label_linux_ubuntu_desktop) +test("compat: get_os_label returns 'Ubuntu Server' on headless Ubuntu", _test_get_os_label_linux_ubuntu_server) +test("compat: get_os_label returns 'Linux' on non-Ubuntu Linux", _test_get_os_label_linux_other) +test("compat: get_os_label returns 'Unknown' on unrecognised platform", _test_get_os_label_unknown_platform) +test("compat: _read_os_release parses key=value pairs", _test_read_os_release_parses_file) +test("compat: _read_os_release returns {} when file missing", _test_read_os_release_handles_missing_file) +test("compat: _is_headless returns False when DISPLAY is set", _test_is_headless_with_display) +test("compat: _is_headless returns True when no display env vars", _test_is_headless_without_display) +test("compat: fix_console_encoding is no-op on non-Windows", _test_fix_console_encoding_non_windows_noop) +test("compat: fix_console_encoding skips when stdout already UTF-8", _test_fix_console_encoding_already_utf8) + + +# --------------------------------------------------------------------------- +# 15. Live-server fetcher tests (skipped when server unreachable) +# --------------------------------------------------------------------------- + +group("fetcher — live server (skipped if unreachable)") + +import requests as _requests +from arma_modlist_tools.fetcher import ( + make_session as _make_session, + build_server_index as _build_server_index, + find_mod_folder as _find_mod_folder, + list_mod_files as _list_mod_files, +) + +# ── One-time setup: try to load config and probe the server ───────────────── + +_LIVE_INDEX: dict | None = None +_LIVE_SESSION: "_requests.Session | None" = None +_LIVE_BASE_URL: str = "" +_LIVE_SKIP_REASON: str = "" + +try: + _live_cfg = _load_config() + _LIVE_BASE_URL = _live_cfg.server_url + _live_auth = _live_cfg.server_auth + # Quick reachability probe — just GET the root, no JSON parsing + _probe = _requests.get(_LIVE_BASE_URL, auth=_live_auth, timeout=8) + _probe.raise_for_status() + # Reachable → build the index (one network round-trip per @ folder) + _LIVE_SESSION = _make_session(_live_auth) + _LIVE_INDEX = _build_server_index(_LIVE_BASE_URL, _live_auth) +except Exception as _live_exc: + _LIVE_SKIP_REASON = str(_live_exc) + + +def _require_live() -> None: + """Raise _SkipTest if the server is unreachable.""" + if _LIVE_INDEX is None: + raise _SkipTest(f"server unreachable: {_LIVE_SKIP_REASON}") + + +# ── Tests ──────────────────────────────────────────────────────────────────── + +def _test_live_index_structure(): + """build_server_index returns the expected three-key structure.""" + _require_live() + assert "by_steam_id" in _LIVE_INDEX + assert "by_name" in _LIVE_INDEX + assert "folders" in _LIVE_INDEX + + +def _test_live_index_has_folders(): + """The server must have at least one mod folder.""" + _require_live() + assert len(_LIVE_INDEX["folders"]) > 0, "Expected at least one folder on server" + + +def _test_live_index_has_steam_id_entries(): + """At least some folders must have parseable meta.cpp (steam_id index populated).""" + _require_live() + assert len(_LIVE_INDEX["by_steam_id"]) > 0, "Expected at least one steam_id entry" + + +def _test_live_index_has_name_entries(): + """Every @ folder adds an entry to by_name (normalized).""" + _require_live() + assert len(_LIVE_INDEX["by_name"]) > 0, "Expected at least one by_name entry" + + +def _test_live_find_mod_by_steam_id(): + """find_mod_folder locates CBA_A3 by its known steam_id (450814997).""" + _require_live() + mod = {"name": "CBA_A3", "steam_id": "450814997"} + url = _find_mod_folder(mod, _LIVE_INDEX) + assert url is not None, "CBA_A3 not found by steam_id — is it on the server?" + assert "@" in url.lower() or "cba" in url.lower(), f"URL looks wrong: {url}" + + +def _test_live_find_mod_url_is_reachable(): + """The URL returned for CBA_A3 must respond with HTTP 200.""" + _require_live() + mod = {"name": "CBA_A3", "steam_id": "450814997"} + url = _find_mod_folder(mod, _LIVE_INDEX) + if url is None: + raise _SkipTest("CBA_A3 not in index — skipping reachability check") + r = _LIVE_SESSION.get(url, timeout=10) + assert r.status_code == 200, f"Expected 200, got {r.status_code} for {url}" + + +def _test_live_list_mod_files_returns_entries(): + """list_mod_files returns a non-empty list for CBA_A3.""" + _require_live() + mod = {"name": "CBA_A3", "steam_id": "450814997"} + url = _find_mod_folder(mod, _LIVE_INDEX) + if url is None: + raise _SkipTest("CBA_A3 not in index — skipping list_mod_files check") + files = _list_mod_files(url, _LIVE_SESSION) + assert len(files) > 0, "CBA_A3 has no files? Unexpected." + + +def _test_live_list_mod_files_tuple_shape(): + """Each entry from list_mod_files is a (rel_path, url, size) 3-tuple.""" + _require_live() + mod = {"name": "CBA_A3", "steam_id": "450814997"} + url = _find_mod_folder(mod, _LIVE_INDEX) + if url is None: + raise _SkipTest("CBA_A3 not in index — skipping tuple shape check") + files = _list_mod_files(url, _LIVE_SESSION) + if not files: + raise _SkipTest("no files returned — skipping tuple shape check") + rel, file_url, size = files[0] + assert isinstance(rel, str) and rel, f"rel_path must be non-empty string, got {rel!r}" + assert isinstance(file_url, str) and file_url, f"file_url must be non-empty string" + assert isinstance(size, int) and size >= 0, f"size must be non-negative int, got {size!r}" + + +def _test_live_find_mod_by_name_fallback(): + """find_mod_folder can locate a mod by normalized name when steam_id is absent.""" + _require_live() + # Use a mod with no steam_id — name-only lookup + mod = {"name": "CBA_A3", "steam_id": ""} + url = _find_mod_folder(mod, _LIVE_INDEX) + assert url is not None, "CBA_A3 not found via name fallback" + + +test("live: build_server_index returns expected structure", _test_live_index_structure) +test("live: server has at least one mod folder", _test_live_index_has_folders) +test("live: by_steam_id populated from meta.cpp files", _test_live_index_has_steam_id_entries) +test("live: by_name populated for all @ folders", _test_live_index_has_name_entries) +test("live: find_mod_folder locates CBA_A3 by steam_id", _test_live_find_mod_by_steam_id) +test("live: CBA_A3 folder URL returns HTTP 200", _test_live_find_mod_url_is_reachable) +test("live: list_mod_files returns non-empty list for CBA_A3", _test_live_list_mod_files_returns_entries) +test("live: list_mod_files entries are (rel_path, url, size) tuples", _test_live_list_mod_files_tuple_shape) +test("live: find_mod_folder name fallback works (no steam_id)", _test_live_find_mod_by_name_fallback) + + # --------------------------------------------------------------------------- # Summary # ---------------------------------------------------------------------------