_find_folder in mods.py now has a fourth fallback: reads publishedid from meta.cpp inside each candidate folder and matches against mod["steam_id"]. Fixes mods appearing as "not downloaded" when the folder name on disk differs from the name in the modlist but the mod content (meta.cpp) is correct. Also adds 8 tests covering all four match strategies and edge cases.
2365 lines
89 KiB
Python
2365 lines
89 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Comprehensive test suite for rev-arma-modlist-tools.
|
||
|
||
Run:
|
||
python test_suite.py
|
||
|
||
Tests are grouped by module. Network tests (fetcher) are skipped if the server
|
||
is unreachable. Linker tests use temp directories so they never touch real paths.
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import sys
|
||
import tempfile
|
||
import textwrap
|
||
import traceback
|
||
from pathlib import Path
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Test harness (no external dependencies)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_passed = _failed = _skipped = 0
|
||
_current_group = ""
|
||
|
||
|
||
def group(name: str) -> None:
|
||
global _current_group
|
||
_current_group = name
|
||
print(f"\n{'-'*60}")
|
||
print(f" {name}")
|
||
print(f"{'-'*60}")
|
||
|
||
|
||
class _SkipTest(Exception):
|
||
pass
|
||
|
||
|
||
def test(name: str, fn) -> None:
|
||
global _passed, _failed, _skipped
|
||
try:
|
||
fn()
|
||
print(f" [PASS] {name}")
|
||
_passed += 1
|
||
except _SkipTest as exc:
|
||
print(f" [SKIP] {name} ({exc})")
|
||
_skipped += 1
|
||
except Exception as exc:
|
||
print(f" [FAIL] {name}")
|
||
for line in traceback.format_exc().splitlines():
|
||
print(f" {line}")
|
||
_failed += 1
|
||
|
||
|
||
def skip(name: str, reason: str) -> None:
|
||
global _skipped
|
||
print(f" [SKIP] {name} ({reason})")
|
||
_skipped += 1
|
||
|
||
|
||
def assert_eq(a, b, msg=""):
|
||
assert a == b, f"{msg}\n expected: {b!r}\n got: {a!r}"
|
||
|
||
|
||
def assert_in(item, container, msg=""):
|
||
assert item in container, f"{msg}\n {item!r} not in {container!r}"
|
||
|
||
|
||
def assert_raises(exc_type, fn, *args, **kwargs):
|
||
try:
|
||
fn(*args, **kwargs)
|
||
except exc_type:
|
||
return
|
||
raise AssertionError(f"Expected {exc_type.__name__} but no exception was raised")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 1. compat
|
||
# ---------------------------------------------------------------------------
|
||
|
||
group("compat")
|
||
|
||
from arma_modlist_tools.compat import (
|
||
is_windows, is_linux, get_os_label, fix_console_encoding,
|
||
)
|
||
|
||
|
||
def _test_is_windows_linux_mutually_exclusive():
|
||
# Exactly one of is_windows()/is_linux() can be True; both can be False (Unknown)
|
||
assert not (is_windows() and is_linux()), "Cannot be both Windows and Linux"
|
||
|
||
|
||
def _test_os_label_is_string():
|
||
label = get_os_label()
|
||
assert isinstance(label, str) and label, "OS label must be non-empty string"
|
||
valid = {"Windows", "Windows Server", "Ubuntu", "Ubuntu Server", "Linux", "Unknown"}
|
||
assert label in valid, f"Unexpected OS label: {label!r}"
|
||
|
||
|
||
def _test_fix_console_encoding_idempotent():
|
||
# Calling it twice must not raise
|
||
fix_console_encoding()
|
||
fix_console_encoding()
|
||
|
||
|
||
def _test_is_windows_matches_platform():
|
||
import platform
|
||
assert is_windows() == (sys.platform == "win32")
|
||
|
||
|
||
def _test_is_linux_matches_platform():
|
||
assert is_linux() == (sys.platform == "linux")
|
||
|
||
|
||
def _test_os_label_consistent_with_platform():
|
||
label = get_os_label()
|
||
if is_windows():
|
||
assert "Windows" in label
|
||
elif is_linux():
|
||
assert label in {"Ubuntu", "Ubuntu Server", "Linux"}
|
||
else:
|
||
assert label == "Unknown"
|
||
|
||
|
||
test("is_windows and is_linux are mutually exclusive", _test_is_windows_linux_mutually_exclusive)
|
||
test("get_os_label returns a valid label", _test_os_label_is_string)
|
||
test("fix_console_encoding is idempotent", _test_fix_console_encoding_idempotent)
|
||
test("is_windows() matches sys.platform", _test_is_windows_matches_platform)
|
||
test("is_linux() matches sys.platform", _test_is_linux_matches_platform)
|
||
test("OS label is consistent with platform", _test_os_label_consistent_with_platform)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 2. config
|
||
# ---------------------------------------------------------------------------
|
||
|
||
group("config")
|
||
|
||
from arma_modlist_tools.config import load_config, Config
|
||
|
||
_VALID_CONFIG = {
|
||
"server": {
|
||
"base_url": "https://example.com/mods/",
|
||
"username": "user",
|
||
"password": "pass",
|
||
},
|
||
"paths": {
|
||
"arma_dir": "C:\\arma3",
|
||
"downloads": "downloads",
|
||
"modlist_html": "modlist_html",
|
||
"modlist_json": "modlist_json",
|
||
},
|
||
}
|
||
|
||
|
||
def _test_load_config_from_explicit_path():
|
||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||
json.dump(_VALID_CONFIG, f)
|
||
tmp = f.name
|
||
try:
|
||
cfg = load_config(tmp)
|
||
assert_eq(cfg.server_url, "https://example.com/mods/")
|
||
assert_eq(cfg.server_auth, ("user", "pass"))
|
||
assert_eq(cfg.arma_dir, Path("C:\\arma3"))
|
||
assert_eq(cfg.downloads, Path("downloads"))
|
||
assert_eq(cfg.modlist_html, Path("modlist_html"))
|
||
assert_eq(cfg.modlist_json, Path("modlist_json"))
|
||
assert_eq(cfg.comparison, Path("modlist_json/comparison.json"))
|
||
assert_eq(cfg.missing_report, Path("modlist_json/missing_report.json"))
|
||
finally:
|
||
os.unlink(tmp)
|
||
|
||
|
||
def _test_load_config_file_not_found():
|
||
assert_raises(FileNotFoundError, load_config, "nonexistent_1234567890.json")
|
||
|
||
|
||
def _test_load_config_missing_key_raises():
|
||
bad = {"server": {"base_url": "x"}} # missing paths
|
||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||
json.dump(bad, f)
|
||
tmp = f.name
|
||
try:
|
||
assert_raises(KeyError, load_config, tmp)
|
||
finally:
|
||
os.unlink(tmp)
|
||
|
||
|
||
def _test_config_derived_paths():
|
||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||
json.dump(_VALID_CONFIG, f)
|
||
tmp = f.name
|
||
try:
|
||
cfg = load_config(tmp)
|
||
# comparison must live inside modlist_json
|
||
assert cfg.comparison.parent == cfg.modlist_json
|
||
assert cfg.comparison.name == "comparison.json"
|
||
# missing_report must live inside modlist_json
|
||
assert cfg.missing_report.parent == cfg.modlist_json
|
||
assert cfg.missing_report.name == "missing_report.json"
|
||
finally:
|
||
os.unlink(tmp)
|
||
|
||
|
||
def _test_load_config_from_cwd():
|
||
"""load_config() without args should find the real config.json at CWD."""
|
||
original_cwd = Path.cwd()
|
||
project_root = Path(__file__).parent
|
||
os.chdir(project_root)
|
||
try:
|
||
cfg = load_config()
|
||
assert cfg.server_url, "server_url must not be empty"
|
||
finally:
|
||
os.chdir(original_cwd)
|
||
|
||
|
||
test("load_config with explicit path", _test_load_config_from_explicit_path)
|
||
test("load_config raises FileNotFoundError for missing file", _test_load_config_file_not_found)
|
||
test("load_config raises KeyError for incomplete config", _test_load_config_missing_key_raises)
|
||
test("Config derived paths (comparison, missing_report)", _test_config_derived_paths)
|
||
test("load_config() auto-discovers config.json from CWD", _test_load_config_from_cwd)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 3. parser
|
||
# ---------------------------------------------------------------------------
|
||
|
||
group("parser")
|
||
|
||
from arma_modlist_tools.parser import (
|
||
parse_mod_entry, parse_modlist_html, parse_modlist_dir,
|
||
)
|
||
import xml.etree.ElementTree as ET
|
||
|
||
_VALID_TR_XML = textwrap.dedent("""\
|
||
<tr data-type="ModContainer">
|
||
<td data-type="DisplayName">CBA_A3</td>
|
||
<td>
|
||
<span class="from-steam"></span>
|
||
<a data-type="Link" href="https://steamcommunity.com/sharedfiles/filedetails/?id=450814997"></a>
|
||
</td>
|
||
</tr>
|
||
""")
|
||
|
||
_LOCAL_TR_XML = textwrap.dedent("""\
|
||
<tr data-type="ModContainer">
|
||
<td data-type="DisplayName">My Local Mod</td>
|
||
<td>
|
||
<span class="from-local"></span>
|
||
</td>
|
||
</tr>
|
||
""")
|
||
|
||
_NO_NAME_TR_XML = textwrap.dedent("""\
|
||
<tr data-type="ModContainer">
|
||
<td>
|
||
<a data-type="Link" href="https://steamcommunity.com/sharedfiles/filedetails/?id=123"></a>
|
||
</td>
|
||
</tr>
|
||
""")
|
||
|
||
_MINIMAL_HTML = textwrap.dedent("""\
|
||
<?xml version="1.0" encoding="utf-8"?>
|
||
<html>
|
||
<head><title>Test</title></head>
|
||
<body>
|
||
<table>
|
||
<tr data-type="ModContainer">
|
||
<td data-type="DisplayName">ACE3</td>
|
||
<td><span class="from-steam"></span>
|
||
<a data-type="Link" href="https://steamcommunity.com/sharedfiles/filedetails/?id=463939057"></a>
|
||
</td>
|
||
</tr>
|
||
<tr data-type="ModContainer">
|
||
<td data-type="DisplayName">CBA_A3</td>
|
||
<td><span class="from-steam"></span>
|
||
<a data-type="Link" href="https://steamcommunity.com/sharedfiles/filedetails/?id=450814997"></a>
|
||
</td>
|
||
</tr>
|
||
<tr data-type="ModContainer">
|
||
<td data-type="DisplayName">LocalMod</td>
|
||
<td><span class="from-local"></span></td>
|
||
</tr>
|
||
</table>
|
||
</body>
|
||
</html>
|
||
""")
|
||
|
||
|
||
def _test_parse_mod_entry_steam():
|
||
tr = ET.fromstring(_VALID_TR_XML)
|
||
entry = parse_mod_entry(tr)
|
||
assert entry is not None
|
||
assert_eq(entry["name"], "CBA_A3")
|
||
assert_eq(entry["source"], "steam")
|
||
assert_eq(entry["steam_id"], "450814997")
|
||
assert "450814997" in entry["url"]
|
||
|
||
|
||
def _test_parse_mod_entry_local():
|
||
tr = ET.fromstring(_LOCAL_TR_XML)
|
||
entry = parse_mod_entry(tr)
|
||
assert entry is not None
|
||
assert_eq(entry["name"], "My Local Mod")
|
||
assert_eq(entry["source"], "local")
|
||
assert entry["steam_id"] is None
|
||
assert entry["url"] is None
|
||
|
||
|
||
def _test_parse_mod_entry_no_name_returns_none():
|
||
tr = ET.fromstring(_NO_NAME_TR_XML)
|
||
entry = parse_mod_entry(tr)
|
||
assert entry is None, "Entry without DisplayName must return None"
|
||
|
||
|
||
def _test_parse_modlist_html_from_file():
|
||
with tempfile.NamedTemporaryFile(
|
||
mode="w", suffix=".html", delete=False, encoding="utf-8"
|
||
) as f:
|
||
f.write(_MINIMAL_HTML)
|
||
tmp = Path(f.name)
|
||
try:
|
||
preset = parse_modlist_html(tmp)
|
||
assert_eq(preset["source_file"], tmp.name)
|
||
assert_eq(preset["mod_count"], 3)
|
||
assert len(preset["mods"]) == 3
|
||
names = [m["name"] for m in preset["mods"]]
|
||
assert_in("ACE3", names)
|
||
assert_in("CBA_A3", names)
|
||
assert_in("LocalMod", names)
|
||
# steam IDs
|
||
steam_ids = {m["steam_id"] for m in preset["mods"]}
|
||
assert_in("463939057", steam_ids)
|
||
assert_in("450814997", steam_ids)
|
||
assert_in(None, steam_ids) # LocalMod has no steam_id
|
||
finally:
|
||
tmp.unlink()
|
||
|
||
|
||
def _test_parse_modlist_html_not_found():
|
||
assert_raises(FileNotFoundError, parse_modlist_html, "no_such_file_xyz.html")
|
||
|
||
|
||
def _test_parse_modlist_dir_real_files():
|
||
d = Path(__file__).parent / "modlist_html"
|
||
if not d.is_dir():
|
||
raise AssertionError(f"modlist_html directory not found: {d}")
|
||
presets = parse_modlist_dir(d)
|
||
assert len(presets) >= 2, f"Expected >=2 presets, got {len(presets)}"
|
||
for p in presets:
|
||
assert p["mod_count"] > 0, f"{p['source_file']} has 0 mods"
|
||
assert len(p["mods"]) == p["mod_count"]
|
||
|
||
|
||
def _test_parse_modlist_dir_empty():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
result = parse_modlist_dir(d)
|
||
assert_eq(result, [])
|
||
|
||
|
||
def _test_parse_modlist_dir_not_a_dir():
|
||
assert_raises(NotADirectoryError, parse_modlist_dir, "no_such_directory_xyz")
|
||
|
||
|
||
def _test_parse_modlist_dir_returns_sorted():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
for name in ["c_preset.html", "a_preset.html", "b_preset.html"]:
|
||
f = d / name
|
||
f.write_text(_MINIMAL_HTML, encoding="utf-8")
|
||
presets = parse_modlist_dir(d)
|
||
names = [p["source_file"] for p in presets]
|
||
assert names == sorted(names), "parse_modlist_dir must return results sorted by filename"
|
||
|
||
|
||
test("parse_mod_entry: Steam mod", _test_parse_mod_entry_steam)
|
||
test("parse_mod_entry: Local mod", _test_parse_mod_entry_local)
|
||
test("parse_mod_entry: No name returns None", _test_parse_mod_entry_no_name_returns_none)
|
||
test("parse_modlist_html: from temp HTML file", _test_parse_modlist_html_from_file)
|
||
test("parse_modlist_html: FileNotFoundError", _test_parse_modlist_html_not_found)
|
||
test("parse_modlist_dir: real modlist_html/ files", _test_parse_modlist_dir_real_files)
|
||
test("parse_modlist_dir: empty directory", _test_parse_modlist_dir_empty)
|
||
test("parse_modlist_dir: non-existent directory", _test_parse_modlist_dir_not_a_dir)
|
||
test("parse_modlist_dir: results are sorted by filename", _test_parse_modlist_dir_returns_sorted)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 4. compare
|
||
# ---------------------------------------------------------------------------
|
||
|
||
group("compare")
|
||
|
||
from arma_modlist_tools.compare import compare_presets
|
||
|
||
_P1 = {
|
||
"preset_name": "Preset_A",
|
||
"source_file": "preset_a.html",
|
||
"mod_count": 3,
|
||
"mods": [
|
||
{"name": "CBA_A3", "source": "steam", "url": None, "steam_id": "450814997"},
|
||
{"name": "ACE3", "source": "steam", "url": None, "steam_id": "463939057"},
|
||
{"name": "OnlyInA", "source": "steam", "url": None, "steam_id": "111111111"},
|
||
],
|
||
}
|
||
|
||
_P2 = {
|
||
"preset_name": "Preset_B",
|
||
"source_file": "preset_b.html",
|
||
"mod_count": 3,
|
||
"mods": [
|
||
{"name": "CBA_A3", "source": "steam", "url": None, "steam_id": "450814997"},
|
||
{"name": "ACE3", "source": "steam", "url": None, "steam_id": "463939057"},
|
||
{"name": "OnlyInB", "source": "steam", "url": None, "steam_id": "222222222"},
|
||
],
|
||
}
|
||
|
||
_P_LOCAL = {
|
||
"preset_name": "Preset_Local",
|
||
"source_file": "preset_local.html",
|
||
"mod_count": 2,
|
||
"mods": [
|
||
{"name": "CBA_A3", "source": "steam", "url": None, "steam_id": "450814997"},
|
||
{"name": "MyLocalMod", "source": "local", "url": None, "steam_id": None},
|
||
],
|
||
}
|
||
|
||
_P_IDENTICAL = {
|
||
"preset_name": "Preset_C",
|
||
"source_file": "preset_c.html",
|
||
"mod_count": 3,
|
||
"mods": [
|
||
{"name": "CBA_A3", "source": "steam", "url": None, "steam_id": "450814997"},
|
||
{"name": "ACE3", "source": "steam", "url": None, "steam_id": "463939057"},
|
||
{"name": "OnlyInA", "source": "steam", "url": None, "steam_id": "111111111"},
|
||
],
|
||
}
|
||
|
||
|
||
def _test_compare_basic_shared_and_unique():
|
||
result = compare_presets(_P1, _P2)
|
||
assert_eq(sorted(result["compared_presets"]), ["Preset_A", "Preset_B"])
|
||
shared_ids = {m["steam_id"] for m in result["shared"]["mods"]}
|
||
assert_eq(shared_ids, {"450814997", "463939057"})
|
||
assert_eq(result["shared"]["mod_count"], 2)
|
||
unique_a = {m["steam_id"] for m in result["unique"]["Preset_A"]["mods"]}
|
||
unique_b = {m["steam_id"] for m in result["unique"]["Preset_B"]["mods"]}
|
||
assert_eq(unique_a, {"111111111"})
|
||
assert_eq(unique_b, {"222222222"})
|
||
|
||
|
||
def _test_compare_requires_two_or_more():
|
||
assert_raises(ValueError, compare_presets, _P1)
|
||
|
||
|
||
def _test_compare_three_presets():
|
||
result = compare_presets(_P1, _P2, _P_LOCAL)
|
||
# Only CBA_A3 (450814997) is in all three
|
||
shared_ids = {m["steam_id"] for m in result["shared"]["mods"]}
|
||
assert_eq(shared_ids, {"450814997"})
|
||
assert "Preset_Local" in result["unique"]
|
||
|
||
|
||
def _test_compare_identical_presets_all_shared():
|
||
result = compare_presets(_P1, _P_IDENTICAL)
|
||
assert_eq(result["shared"]["mod_count"], 3)
|
||
for data in result["unique"].values():
|
||
assert_eq(data["mod_count"], 0)
|
||
|
||
|
||
def _test_compare_no_shared_mods():
|
||
p_no_overlap = {
|
||
"preset_name": "Preset_X",
|
||
"source_file": "px.html",
|
||
"mod_count": 1,
|
||
"mods": [{"name": "Exclusive", "source": "steam", "url": None, "steam_id": "999999999"}],
|
||
}
|
||
result = compare_presets(_P1, p_no_overlap)
|
||
assert_eq(result["shared"]["mod_count"], 0)
|
||
assert_eq(len(result["shared"]["mods"]), 0)
|
||
|
||
|
||
def _test_compare_local_mod_uses_name_as_key():
|
||
# Two presets share a local mod by name (no steam_id)
|
||
p_a = {
|
||
"preset_name": "A",
|
||
"source_file": "a.html",
|
||
"mod_count": 1,
|
||
"mods": [{"name": "MyLocalMod", "source": "local", "url": None, "steam_id": None}],
|
||
}
|
||
p_b = {
|
||
"preset_name": "B",
|
||
"source_file": "b.html",
|
||
"mod_count": 1,
|
||
"mods": [{"name": "MyLocalMod", "source": "local", "url": None, "steam_id": None}],
|
||
}
|
||
result = compare_presets(p_a, p_b)
|
||
assert_eq(result["shared"]["mod_count"], 1)
|
||
assert_eq(result["shared"]["mods"][0]["name"], "MyLocalMod")
|
||
|
||
|
||
def _test_compare_preserves_all_fields():
|
||
result = compare_presets(_P1, _P2)
|
||
for mod in result["shared"]["mods"]:
|
||
assert "name" in mod
|
||
assert "source" in mod
|
||
assert "steam_id" in mod
|
||
|
||
|
||
def _test_compare_result_counts_consistent():
|
||
result = compare_presets(_P1, _P2)
|
||
for preset in (_P1, _P2):
|
||
pname = preset["preset_name"]
|
||
total = result["shared"]["mod_count"] + result["unique"][pname]["mod_count"]
|
||
assert_eq(total, preset["mod_count"],
|
||
f"shared + unique must equal total for {pname}")
|
||
|
||
|
||
test("compare_presets: shared and unique breakdown", _test_compare_basic_shared_and_unique)
|
||
test("compare_presets: requires >= 2 presets", _test_compare_requires_two_or_more)
|
||
test("compare_presets: three presets, intersection only", _test_compare_three_presets)
|
||
test("compare_presets: identical presets → all shared", _test_compare_identical_presets_all_shared)
|
||
test("compare_presets: no overlap → shared is empty", _test_compare_no_shared_mods)
|
||
test("compare_presets: local mod matched by name", _test_compare_local_mod_uses_name_as_key)
|
||
test("compare_presets: result mods retain all fields", _test_compare_preserves_all_fields)
|
||
test("compare_presets: shared + unique = total per preset", _test_compare_result_counts_consistent)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 5. fetcher (pure functions only — no network)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
group("fetcher (pure functions, no network)")
|
||
|
||
from arma_modlist_tools.fetcher import (
|
||
_normalize_name, _parse_meta_cpp, _folder_url, find_mod_folder,
|
||
)
|
||
|
||
|
||
def _test_normalize_name_strips_at():
|
||
assert_eq(_normalize_name("@ace"), "ace")
|
||
assert_eq(_normalize_name("@CBA_A3"), "cbaa3")
|
||
|
||
|
||
def _test_normalize_name_lowercase():
|
||
assert_eq(_normalize_name("@RHS_USAF"), "rhsusaf")
|
||
|
||
|
||
def _test_normalize_name_removes_spaces_and_special():
|
||
assert_eq(_normalize_name("@My Mod (v2)"), "mymodv2")
|
||
|
||
|
||
def _test_normalize_name_no_at():
|
||
assert_eq(_normalize_name("ace"), "ace")
|
||
|
||
|
||
def _test_parse_meta_cpp_standard():
|
||
txt = 'publishedid = 463939057;\nname = "ACE3";'
|
||
assert_eq(_parse_meta_cpp(txt), "463939057")
|
||
|
||
|
||
def _test_parse_meta_cpp_no_spaces():
|
||
assert_eq(_parse_meta_cpp("publishedid=123456;"), "123456")
|
||
|
||
|
||
def _test_parse_meta_cpp_case_insensitive():
|
||
assert_eq(_parse_meta_cpp("PublishedId = 999;"), "999")
|
||
|
||
|
||
def _test_parse_meta_cpp_missing():
|
||
assert _parse_meta_cpp("name = somemod;") is None
|
||
|
||
|
||
def _test_folder_url_trailing_slash():
|
||
url = _folder_url("https://example.com/mods", "@ace")
|
||
assert url.endswith("/"), "folder_url must end with /"
|
||
assert "@ace" in url
|
||
|
||
|
||
def _test_find_mod_folder_by_steam_id():
|
||
index = {
|
||
"by_steam_id": {"450814997": "https://example.com/mods/@cba_a3/"},
|
||
"by_name": {},
|
||
}
|
||
mod = {"steam_id": "450814997", "name": "CBA_A3"}
|
||
url = find_mod_folder(mod, index)
|
||
assert_eq(url, "https://example.com/mods/@cba_a3/")
|
||
|
||
|
||
def _test_find_mod_folder_by_name_fallback():
|
||
index = {
|
||
"by_steam_id": {},
|
||
"by_name": {"ace3": "https://example.com/mods/@ace3/"},
|
||
}
|
||
mod = {"steam_id": None, "name": "ACE3"}
|
||
url = find_mod_folder(mod, index)
|
||
assert_eq(url, "https://example.com/mods/@ace3/")
|
||
|
||
|
||
def _test_find_mod_folder_steam_id_preferred_over_name():
|
||
index = {
|
||
"by_steam_id": {"111": "https://example.com/mods/@right/"},
|
||
"by_name": {"mymod": "https://example.com/mods/@wrong/"},
|
||
}
|
||
mod = {"steam_id": "111", "name": "MyMod"}
|
||
url = find_mod_folder(mod, index)
|
||
assert_eq(url, "https://example.com/mods/@right/")
|
||
|
||
|
||
def _test_find_mod_folder_not_found():
|
||
index = {"by_steam_id": {}, "by_name": {}}
|
||
mod = {"steam_id": "999", "name": "Missing"}
|
||
assert find_mod_folder(mod, index) is None
|
||
|
||
|
||
def _test_find_mod_folder_normalized_name_match():
|
||
index = {
|
||
"by_steam_id": {},
|
||
"by_name": {"rhsusaf": "https://example.com/mods/@rhs_usaf/"},
|
||
}
|
||
mod = {"steam_id": None, "name": "@RHS_USAF"}
|
||
url = find_mod_folder(mod, index)
|
||
assert_eq(url, "https://example.com/mods/@rhs_usaf/")
|
||
|
||
|
||
test("_normalize_name: strips leading @", _test_normalize_name_strips_at)
|
||
test("_normalize_name: lowercases", _test_normalize_name_lowercase)
|
||
test("_normalize_name: removes spaces and special chars", _test_normalize_name_removes_spaces_and_special)
|
||
test("_normalize_name: no @ prefix still works", _test_normalize_name_no_at)
|
||
test("_parse_meta_cpp: standard format", _test_parse_meta_cpp_standard)
|
||
test("_parse_meta_cpp: no spaces", _test_parse_meta_cpp_no_spaces)
|
||
test("_parse_meta_cpp: case-insensitive", _test_parse_meta_cpp_case_insensitive)
|
||
test("_parse_meta_cpp: missing → None", _test_parse_meta_cpp_missing)
|
||
test("_folder_url: always trailing slash", _test_folder_url_trailing_slash)
|
||
test("find_mod_folder: by steam_id", _test_find_mod_folder_by_steam_id)
|
||
test("find_mod_folder: by name fallback", _test_find_mod_folder_by_name_fallback)
|
||
test("find_mod_folder: steam_id preferred over name", _test_find_mod_folder_steam_id_preferred_over_name)
|
||
test("find_mod_folder: not found → None", _test_find_mod_folder_not_found)
|
||
test("find_mod_folder: normalized @ name match", _test_find_mod_folder_normalized_name_match)
|
||
|
||
# ---- list_mod_updates (uses temp dirs, no network) ----
|
||
|
||
from arma_modlist_tools.fetcher import list_mod_updates
|
||
import unittest.mock as mock
|
||
|
||
|
||
def _test_list_mod_updates_all_uptodate():
|
||
"""All local files exist with matching sizes → empty result."""
|
||
with tempfile.TemporaryDirectory() as d:
|
||
dest = Path(d)
|
||
# Create local files matching server sizes
|
||
(dest / "addons").mkdir()
|
||
(dest / "addons" / "mod.pbo").write_bytes(b"x" * 1000)
|
||
server_files = [("addons/mod.pbo", "https://x.com/addons/mod.pbo", 1000)]
|
||
with mock.patch("arma_modlist_tools.fetcher.list_mod_files", return_value=server_files):
|
||
result = list_mod_updates("https://x.com/", dest, None)
|
||
assert_eq(result, [], "No stale files expected when sizes match")
|
||
|
||
|
||
def _test_list_mod_updates_missing_file():
|
||
"""Local file does not exist → included."""
|
||
with tempfile.TemporaryDirectory() as d:
|
||
dest = Path(d)
|
||
server_files = [("addons/new.pbo", "https://x.com/addons/new.pbo", 500)]
|
||
with mock.patch("arma_modlist_tools.fetcher.list_mod_files", return_value=server_files):
|
||
result = list_mod_updates("https://x.com/", dest, None)
|
||
assert_eq(len(result), 1)
|
||
assert_eq(result[0][0], "addons/new.pbo")
|
||
|
||
|
||
def _test_list_mod_updates_size_mismatch():
|
||
"""Local file exists but size differs → included."""
|
||
with tempfile.TemporaryDirectory() as d:
|
||
dest = Path(d)
|
||
(dest / "addons").mkdir()
|
||
(dest / "addons" / "mod.pbo").write_bytes(b"x" * 999) # local: 999 bytes
|
||
server_files = [("addons/mod.pbo", "https://x.com/addons/mod.pbo", 1000)] # server: 1000
|
||
with mock.patch("arma_modlist_tools.fetcher.list_mod_files", return_value=server_files):
|
||
result = list_mod_updates("https://x.com/", dest, None)
|
||
assert_eq(len(result), 1)
|
||
assert_eq(result[0][2], 1000, "Returned entry must carry server size")
|
||
|
||
|
||
def _test_list_mod_updates_zero_server_size_skips_check():
|
||
"""Server reports size=0 (unknown) — only include if file is missing."""
|
||
with tempfile.TemporaryDirectory() as d:
|
||
dest = Path(d)
|
||
(dest / "addons").mkdir()
|
||
(dest / "addons" / "mod.pbo").write_bytes(b"x" * 500) # exists locally
|
||
server_files = [("addons/mod.pbo", "https://x.com/addons/mod.pbo", 0)]
|
||
with mock.patch("arma_modlist_tools.fetcher.list_mod_files", return_value=server_files):
|
||
result = list_mod_updates("https://x.com/", dest, None)
|
||
# size=0 → skip size check, file exists → not stale
|
||
assert_eq(result, [], "Existing file with server size=0 must not be re-downloaded")
|
||
|
||
|
||
def _test_list_mod_updates_mixed():
|
||
"""One up-to-date, one missing, one stale → two results."""
|
||
with tempfile.TemporaryDirectory() as d:
|
||
dest = Path(d)
|
||
(dest / "addons").mkdir()
|
||
(dest / "addons" / "current.pbo").write_bytes(b"x" * 100) # matches server
|
||
(dest / "addons" / "stale.pbo").write_bytes(b"x" * 50) # differs from server
|
||
# missing.pbo does not exist
|
||
server_files = [
|
||
("addons/current.pbo", "https://x.com/addons/current.pbo", 100),
|
||
("addons/stale.pbo", "https://x.com/addons/stale.pbo", 200),
|
||
("addons/missing.pbo", "https://x.com/addons/missing.pbo", 300),
|
||
]
|
||
with mock.patch("arma_modlist_tools.fetcher.list_mod_files", return_value=server_files):
|
||
result = list_mod_updates("https://x.com/", dest, None)
|
||
result_names = {r[0] for r in result}
|
||
assert_eq(result_names, {"addons/stale.pbo", "addons/missing.pbo"})
|
||
assert "addons/current.pbo" not in result_names
|
||
|
||
|
||
test("list_mod_updates: all files up-to-date → empty", _test_list_mod_updates_all_uptodate)
|
||
test("list_mod_updates: missing file → included", _test_list_mod_updates_missing_file)
|
||
test("list_mod_updates: size mismatch → included", _test_list_mod_updates_size_mismatch)
|
||
test("list_mod_updates: server size=0 → skip size check", _test_list_mod_updates_zero_server_size_skips_check)
|
||
test("list_mod_updates: mixed state (up-to-date, stale, missing)", _test_list_mod_updates_mixed)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 6. reporter
|
||
# ---------------------------------------------------------------------------
|
||
|
||
group("reporter")
|
||
|
||
from arma_modlist_tools.reporter import build_missing_report, save_missing_report
|
||
|
||
_COMPARISON = {
|
||
"compared_presets": ["Preset_A", "Preset_B"],
|
||
"shared": {
|
||
"mod_count": 2,
|
||
"mods": [
|
||
{"name": "CBA_A3", "steam_id": "450814997", "url": None, "source": "steam"},
|
||
{"name": "ACE3", "steam_id": "463939057", "url": None, "source": "steam"},
|
||
],
|
||
},
|
||
"unique": {
|
||
"Preset_A": {
|
||
"mod_count": 1,
|
||
"mods": [{"name": "OnlyA", "steam_id": "111", "url": None, "source": "steam"}],
|
||
},
|
||
"Preset_B": {
|
||
"mod_count": 1,
|
||
"mods": [{"name": "OnlyB", "steam_id": "222", "url": None, "source": "steam"}],
|
||
},
|
||
},
|
||
}
|
||
|
||
_INDEX_ALL = {
|
||
"by_steam_id": {
|
||
"450814997": "https://x.com/@cba/",
|
||
"463939057": "https://x.com/@ace/",
|
||
"111": "https://x.com/@onlya/",
|
||
"222": "https://x.com/@onlyb/",
|
||
},
|
||
"by_name": {},
|
||
}
|
||
|
||
_INDEX_NONE: dict = {"by_steam_id": {}, "by_name": {}}
|
||
|
||
_INDEX_PARTIAL = {
|
||
"by_steam_id": {
|
||
"450814997": "https://x.com/@cba/",
|
||
"463939057": "https://x.com/@ace/",
|
||
},
|
||
"by_name": {},
|
||
}
|
||
|
||
|
||
def _test_build_missing_report_all_found():
|
||
report = build_missing_report(_COMPARISON, _INDEX_ALL)
|
||
assert_eq(report["total_mods"], 4)
|
||
assert_eq(report["on_server"], 4)
|
||
assert_eq(report["missing"], 0)
|
||
assert_eq(report["missing_mods"], [])
|
||
|
||
|
||
def _test_build_missing_report_all_missing():
|
||
report = build_missing_report(_COMPARISON, _INDEX_NONE)
|
||
assert_eq(report["total_mods"], 4)
|
||
assert_eq(report["on_server"], 0)
|
||
assert_eq(report["missing"], 4)
|
||
assert_eq(len(report["missing_mods"]), 4)
|
||
|
||
|
||
def _test_build_missing_report_partial():
|
||
report = build_missing_report(_COMPARISON, _INDEX_PARTIAL)
|
||
assert_eq(report["on_server"], 2)
|
||
assert_eq(report["missing"], 2)
|
||
missing_ids = {m["steam_id"] for m in report["missing_mods"]}
|
||
assert_eq(missing_ids, {"111", "222"})
|
||
|
||
|
||
def _test_build_missing_report_group_field():
|
||
report = build_missing_report(_COMPARISON, _INDEX_NONE)
|
||
groups = {m["group"] for m in report["missing_mods"]}
|
||
assert_in("shared", groups)
|
||
assert_in("Preset_A", groups)
|
||
assert_in("Preset_B", groups)
|
||
|
||
|
||
def _test_build_missing_report_shared_group_label():
|
||
report = build_missing_report(_COMPARISON, _INDEX_NONE)
|
||
shared_entries = [m for m in report["missing_mods"] if m["name"] in ("CBA_A3", "ACE3")]
|
||
for e in shared_entries:
|
||
assert_eq(e["group"], "shared", f"{e['name']} must have group='shared'")
|
||
|
||
|
||
def _test_build_missing_report_has_timestamp():
|
||
report = build_missing_report(_COMPARISON, _INDEX_ALL)
|
||
assert "generated_at" in report
|
||
assert isinstance(report["generated_at"], str)
|
||
assert "T" in report["generated_at"] # ISO 8601 format
|
||
|
||
|
||
def _test_save_and_reload_missing_report():
|
||
report = build_missing_report(_COMPARISON, _INDEX_PARTIAL)
|
||
with tempfile.TemporaryDirectory() as d:
|
||
out = Path(d) / "sub" / "missing_report.json"
|
||
save_missing_report(report, out)
|
||
assert out.exists(), "save_missing_report must create the file"
|
||
loaded = json.loads(out.read_text(encoding="utf-8"))
|
||
assert_eq(loaded["missing"], report["missing"])
|
||
assert_eq(loaded["total_mods"], report["total_mods"])
|
||
|
||
|
||
def _test_save_missing_report_creates_parent_dirs():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
deep = Path(d) / "a" / "b" / "c" / "report.json"
|
||
save_missing_report({"test": True}, deep)
|
||
assert deep.exists()
|
||
|
||
|
||
test("build_missing_report: all mods found", _test_build_missing_report_all_found)
|
||
test("build_missing_report: all mods missing", _test_build_missing_report_all_missing)
|
||
test("build_missing_report: partial coverage", _test_build_missing_report_partial)
|
||
test("build_missing_report: group field present on all entries", _test_build_missing_report_group_field)
|
||
test("build_missing_report: shared mods labeled group='shared'", _test_build_missing_report_shared_group_label)
|
||
test("build_missing_report: includes ISO timestamp", _test_build_missing_report_has_timestamp)
|
||
test("save_missing_report: write and reload roundtrip", _test_save_and_reload_missing_report)
|
||
test("save_missing_report: creates parent directories", _test_save_missing_report_creates_parent_dirs)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 7. linker
|
||
# ---------------------------------------------------------------------------
|
||
|
||
group("linker")
|
||
|
||
from arma_modlist_tools.linker import (
|
||
get_mod_folders, get_link_status,
|
||
create_junction, remove_junction,
|
||
link_group, unlink_group,
|
||
_is_junction,
|
||
)
|
||
|
||
|
||
def _make_fake_mods(base: Path, names: list[str]) -> None:
|
||
"""Create @ModName directories with a dummy file inside each."""
|
||
for name in names:
|
||
mod_dir = base / name
|
||
mod_dir.mkdir(parents=True)
|
||
(mod_dir / "mod.cpp").write_text(f"// {name}", encoding="utf-8")
|
||
|
||
|
||
def _test_get_mod_folders_finds_at_dirs():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
_make_fake_mods(d, ["@ace", "@cba_a3", "@rhs"])
|
||
(d / "not_a_mod").mkdir() # should be ignored
|
||
(d / "readme.txt").write_text("x") # should be ignored
|
||
folders = get_mod_folders(d)
|
||
names = [f.name for f in folders]
|
||
assert_eq(sorted(names), ["@ace", "@cba_a3", "@rhs"])
|
||
assert "not_a_mod" not in names
|
||
|
||
|
||
def _test_get_mod_folders_empty_dir():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
folders = get_mod_folders(Path(d))
|
||
assert_eq(folders, [])
|
||
|
||
|
||
def _test_get_mod_folders_nonexistent():
|
||
result = get_mod_folders(Path("no_such_dir_xyz_abc"))
|
||
assert_eq(result, [])
|
||
|
||
|
||
def _test_get_link_status_unlinked():
|
||
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
|
||
src_d = Path(src_d)
|
||
arma_d = Path(arma_d)
|
||
_make_fake_mods(src_d, ["@ace", "@cba_a3"])
|
||
status = get_link_status(src_d, arma_d)
|
||
assert_eq(len(status), 2)
|
||
for s in status:
|
||
assert not s["is_linked"], f"{s['name']} should not be linked yet"
|
||
assert s["link_path"].parent == arma_d
|
||
assert s["source_path"].exists()
|
||
|
||
|
||
def _test_create_and_detect_junction():
|
||
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
|
||
src_d = Path(src_d)
|
||
arma_d = Path(arma_d)
|
||
_make_fake_mods(src_d, ["@mymod"])
|
||
target = src_d / "@mymod"
|
||
link = arma_d / "@mymod"
|
||
ok = create_junction(link, target)
|
||
assert ok, "create_junction must return True on success"
|
||
assert link.exists(), "link must exist after creation"
|
||
assert _is_junction(link), "_is_junction must detect the new junction"
|
||
# Verify junction points to the right files
|
||
assert (link / "mod.cpp").exists(), "linked junction must expose target contents"
|
||
|
||
|
||
def _test_remove_junction():
|
||
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
|
||
src_d = Path(src_d)
|
||
arma_d = Path(arma_d)
|
||
_make_fake_mods(src_d, ["@mymod"])
|
||
target = src_d / "@mymod"
|
||
link = arma_d / "@mymod"
|
||
create_junction(link, target)
|
||
assert _is_junction(link)
|
||
ok, err = remove_junction(link)
|
||
assert ok, f"remove_junction failed: {err}"
|
||
assert not link.exists(), "junction must be gone after removal"
|
||
# Target must be untouched
|
||
assert target.exists(), "target directory must survive junction removal"
|
||
assert (target / "mod.cpp").exists(), "target files must survive junction removal"
|
||
|
||
|
||
def _test_link_group_full_flow():
|
||
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
|
||
src_d = Path(src_d)
|
||
arma_d = Path(arma_d)
|
||
_make_fake_mods(src_d, ["@ace", "@cba_a3", "@rhs"])
|
||
result = link_group(src_d, arma_d)
|
||
assert_eq(result["linked"], 3)
|
||
assert_eq(result["already_linked"], 0)
|
||
assert_eq(result["failed"], 0)
|
||
# All links exist
|
||
for name in ["@ace", "@cba_a3", "@rhs"]:
|
||
assert _is_junction(arma_d / name), f"{name} must be linked"
|
||
|
||
|
||
def _test_link_group_idempotent():
|
||
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
|
||
src_d = Path(src_d)
|
||
arma_d = Path(arma_d)
|
||
_make_fake_mods(src_d, ["@ace"])
|
||
link_group(src_d, arma_d)
|
||
result2 = link_group(src_d, arma_d)
|
||
assert_eq(result2["linked"], 0)
|
||
assert_eq(result2["already_linked"], 1)
|
||
assert_eq(result2["failed"], 0)
|
||
|
||
|
||
def _test_unlink_group_removes_links():
|
||
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
|
||
src_d = Path(src_d)
|
||
arma_d = Path(arma_d)
|
||
_make_fake_mods(src_d, ["@ace", "@cba_a3"])
|
||
link_group(src_d, arma_d)
|
||
result = unlink_group(src_d, arma_d)
|
||
assert_eq(result["unlinked"], 2)
|
||
assert_eq(result["not_linked"], 0)
|
||
assert_eq(result["failed"], 0)
|
||
# Links gone, sources still there
|
||
for name in ["@ace", "@cba_a3"]:
|
||
assert not (arma_d / name).exists(), f"Link {name} must be gone"
|
||
assert (src_d / name).exists(), f"Source {name} must survive"
|
||
|
||
|
||
def _test_unlink_group_nothing_linked():
|
||
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
|
||
src_d = Path(src_d)
|
||
arma_d = Path(arma_d)
|
||
_make_fake_mods(src_d, ["@ace"])
|
||
result = unlink_group(src_d, arma_d)
|
||
assert_eq(result["unlinked"], 0)
|
||
assert_eq(result["not_linked"], 1)
|
||
|
||
|
||
def _test_get_link_status_after_linking():
|
||
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
|
||
src_d = Path(src_d)
|
||
arma_d = Path(arma_d)
|
||
_make_fake_mods(src_d, ["@ace"])
|
||
link_group(src_d, arma_d)
|
||
status = get_link_status(src_d, arma_d)
|
||
assert_eq(len(status), 1)
|
||
assert status[0]["is_linked"], "status must reflect active junction"
|
||
|
||
|
||
def _test_link_group_skips_existing_non_junction():
|
||
"""If arma_dir already has a regular folder with the same name, must not overwrite."""
|
||
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
|
||
src_d = Path(src_d)
|
||
arma_d = Path(arma_d)
|
||
_make_fake_mods(src_d, ["@ace"])
|
||
# Create a plain (non-junction) directory at the link path
|
||
(arma_d / "@ace").mkdir()
|
||
result = link_group(src_d, arma_d)
|
||
assert_eq(result["failed"], 1, "Must report failure for path that exists but is not a junction")
|
||
assert_eq(result["linked"], 0)
|
||
|
||
|
||
test("get_mod_folders: finds @-prefixed dirs only", _test_get_mod_folders_finds_at_dirs)
|
||
test("get_mod_folders: empty directory", _test_get_mod_folders_empty_dir)
|
||
test("get_mod_folders: nonexistent path → []", _test_get_mod_folders_nonexistent)
|
||
test("get_link_status: all unlinked initially", _test_get_link_status_unlinked)
|
||
test("create_junction + _is_junction detection", _test_create_and_detect_junction)
|
||
test("remove_junction: removes link, target untouched", _test_remove_junction)
|
||
test("link_group: full link flow", _test_link_group_full_flow)
|
||
test("link_group: idempotent (already_linked on second call)", _test_link_group_idempotent)
|
||
test("unlink_group: removes all links", _test_unlink_group_removes_links)
|
||
test("unlink_group: nothing linked → not_linked count", _test_unlink_group_nothing_linked)
|
||
test("get_link_status: reflects active junctions", _test_get_link_status_after_linking)
|
||
test("link_group: skips plain dir (fails safely)", _test_link_group_skips_existing_non_junction)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 8. __init__ exports
|
||
# ---------------------------------------------------------------------------
|
||
|
||
group("__init__ public exports")
|
||
|
||
import arma_modlist_tools as pkg
|
||
|
||
_EXPECTED_EXPORTS = [
|
||
"parse_mod_entry", "parse_modlist_html", "parse_modlist_dir",
|
||
"compare_presets",
|
||
"make_session", "build_server_index", "find_mod_folder",
|
||
"list_mod_files", "list_mod_updates", "download_file", "download_mod_folder",
|
||
"get_mod_folders", "get_link_status",
|
||
"create_junction", "remove_junction",
|
||
"link_group", "unlink_group",
|
||
"load_config", "Config",
|
||
"is_windows", "is_linux", "get_os_label", "fix_console_encoding",
|
||
"build_missing_report", "save_missing_report",
|
||
"find_orphan_folders", "folder_size",
|
||
]
|
||
|
||
|
||
def _test_all_exports_present():
|
||
missing = [name for name in _EXPECTED_EXPORTS if not hasattr(pkg, name)]
|
||
assert not missing, f"Missing exports from arma_modlist_tools: {missing}"
|
||
|
||
|
||
def _test_exports_are_callable():
|
||
non_callable = [
|
||
name for name in _EXPECTED_EXPORTS
|
||
if hasattr(pkg, name) and not callable(getattr(pkg, name))
|
||
and not isinstance(getattr(pkg, name), type)
|
||
]
|
||
assert not non_callable, f"Non-callable exports: {non_callable}"
|
||
|
||
|
||
test("all expected symbols exported", _test_all_exports_present)
|
||
test("all exported symbols are callable or types", _test_exports_are_callable)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 9. check_names helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
group("check_names helpers")
|
||
|
||
from check_names import (
|
||
_server_name_from_url, _read_local_steam_id,
|
||
_lookup_detailed, _resolve_status, _write_steam_id,
|
||
_build_comparison_id_map,
|
||
)
|
||
|
||
|
||
def _test_server_name_from_url():
|
||
assert_eq(_server_name_from_url("https://x.com/mods/@ace/"), "@ace")
|
||
assert_eq(_server_name_from_url("https://x.com/mods/@cba_a3"), "@cba_a3")
|
||
|
||
|
||
def _test_read_local_steam_id_present():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
(d / "meta.cpp").write_text('publishedid = 463939057;\nname = "ACE3";', encoding="utf-8")
|
||
assert_eq(_read_local_steam_id(d), "463939057")
|
||
|
||
|
||
def _test_read_local_steam_id_missing():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
assert _read_local_steam_id(Path(d)) is None
|
||
|
||
|
||
def _test_read_local_steam_id_no_id_in_file():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
(d / "meta.cpp").write_text('name = "LocalMod";\n', encoding="utf-8")
|
||
assert _read_local_steam_id(d) is None
|
||
|
||
|
||
def _test_lookup_detailed_by_steam_id():
|
||
index = {
|
||
"by_steam_id": {"463939057": "https://x.com/@ace3/"},
|
||
"by_name": {},
|
||
}
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
(d / "meta.cpp").write_text("publishedid = 463939057;", encoding="utf-8")
|
||
server_name, local_sid = _lookup_detailed("@ACE3", d, index)
|
||
assert_eq(server_name, "@ace3")
|
||
assert_eq(local_sid, "463939057")
|
||
|
||
|
||
def _test_lookup_detailed_by_name_fallback():
|
||
index = {
|
||
"by_steam_id": {},
|
||
"by_name": {"cbaa3": "https://x.com/@cba_a3/"},
|
||
}
|
||
with tempfile.TemporaryDirectory() as d:
|
||
server_name, local_sid = _lookup_detailed("@CBA_A3", Path(d), index)
|
||
assert_eq(server_name, "@cba_a3")
|
||
assert local_sid is None # no meta.cpp
|
||
|
||
|
||
def _test_lookup_detailed_steam_id_beats_name():
|
||
index = {
|
||
"by_steam_id": {"111": "https://x.com/@correct/"},
|
||
"by_name": {"mymod": "https://x.com/@wrong/"},
|
||
}
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
(d / "meta.cpp").write_text("publishedid = 111;", encoding="utf-8")
|
||
server_name, local_sid = _lookup_detailed("@MyMod", d, index)
|
||
assert_eq(server_name, "@correct")
|
||
assert_eq(local_sid, "111")
|
||
|
||
|
||
def _test_lookup_detailed_not_found():
|
||
index = {"by_steam_id": {}, "by_name": {}}
|
||
with tempfile.TemporaryDirectory() as d:
|
||
server_name, local_sid = _lookup_detailed("@Unknown", Path(d), index)
|
||
assert server_name is None
|
||
|
||
|
||
def _test_lookup_detailed_returns_sid_even_when_not_on_server():
|
||
"""local_sid should be returned even when server lookup fails."""
|
||
index = {"by_steam_id": {}, "by_name": {}}
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
(d / "meta.cpp").write_text("publishedid = 999;", encoding="utf-8")
|
||
server_name, local_sid = _lookup_detailed("@SomeMod", d, index)
|
||
assert server_name is None
|
||
assert_eq(local_sid, "999")
|
||
|
||
|
||
test("_lookup_detailed: matches by steam_id", _test_lookup_detailed_by_steam_id)
|
||
test("_lookup_detailed: falls back to name", _test_lookup_detailed_by_name_fallback)
|
||
test("_lookup_detailed: steam_id beats name fallback", _test_lookup_detailed_steam_id_beats_name)
|
||
test("_lookup_detailed: not found -> None", _test_lookup_detailed_not_found)
|
||
test("_lookup_detailed: returns local_sid even when not on server", _test_lookup_detailed_returns_sid_even_when_not_on_server)
|
||
|
||
# _resolve_status tests
|
||
|
||
def _test_resolve_status_ok():
|
||
status, col = _resolve_status("@ace", "@ace", None, ok_disk_names={"@ace"})
|
||
assert_eq(status, "OK")
|
||
assert_eq(col, "@ace")
|
||
|
||
def _test_resolve_status_mismatch():
|
||
status, col = _resolve_status("@ACE3", "@ace3", None, ok_disk_names={"@cba_a3"})
|
||
assert_eq(status, "MISMATCH")
|
||
assert_eq(col, "@ace3")
|
||
|
||
def _test_resolve_status_not_found():
|
||
status, col = _resolve_status("@Unknown", None, None, ok_disk_names=set())
|
||
assert_eq(status, "NOT_ON_SERVER")
|
||
|
||
def _test_resolve_status_id_collision():
|
||
ok = {"@Realistic Ragdoll Physics"}
|
||
status, col = _resolve_status(
|
||
"@NIArms All in One- ACE Compatibility",
|
||
"@Realistic Ragdoll Physics",
|
||
"1234567",
|
||
ok_disk_names=ok,
|
||
)
|
||
assert_eq(status, "ID_COLLISION")
|
||
assert "1234567" in col # bad steam_id shown in output
|
||
|
||
def _test_resolve_status_collision_without_sid():
|
||
ok = {"@Realistic Ragdoll Physics"}
|
||
status, col = _resolve_status(
|
||
"@Some Mod", "@Realistic Ragdoll Physics", None, ok_disk_names=ok
|
||
)
|
||
assert_eq(status, "ID_COLLISION")
|
||
|
||
test("_resolve_status: OK match", _test_resolve_status_ok)
|
||
test("_resolve_status: genuine mismatch", _test_resolve_status_mismatch)
|
||
test("_resolve_status: not found", _test_resolve_status_not_found)
|
||
test("_resolve_status: ID_COLLISION shows bad steam_id", _test_resolve_status_id_collision)
|
||
test("_resolve_status: ID_COLLISION without local sid", _test_resolve_status_collision_without_sid)
|
||
|
||
# _write_steam_id tests
|
||
|
||
def _test_write_steam_id_updates_existing():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
meta = d / "meta.cpp"
|
||
meta.write_text('publishedid = 111;\nname = "OldMod";\n', encoding="utf-8")
|
||
ok, msg = _write_steam_id(d, "999")
|
||
assert ok
|
||
assert "999" in meta.read_text(encoding="utf-8")
|
||
assert "111" not in meta.read_text(encoding="utf-8")
|
||
assert 'name = "OldMod"' in meta.read_text(encoding="utf-8") # preserved
|
||
|
||
def _test_write_steam_id_creates_if_missing():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
ok, msg = _write_steam_id(d, "12345")
|
||
assert ok
|
||
meta = d / "meta.cpp"
|
||
assert meta.exists()
|
||
assert "12345" in meta.read_text(encoding="utf-8")
|
||
|
||
def _test_write_steam_id_appends_if_no_line():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
meta = d / "meta.cpp"
|
||
meta.write_text('name = "NoId";\n', encoding="utf-8")
|
||
ok, msg = _write_steam_id(d, "777")
|
||
assert ok
|
||
content = meta.read_text(encoding="utf-8")
|
||
assert "777" in content
|
||
assert 'name = "NoId"' in content
|
||
|
||
test("_write_steam_id: updates existing publishedid", _test_write_steam_id_updates_existing)
|
||
test("_write_steam_id: creates meta.cpp if missing", _test_write_steam_id_creates_if_missing)
|
||
test("_write_steam_id: appends if no publishedid line", _test_write_steam_id_appends_if_no_line)
|
||
|
||
# _build_comparison_id_map tests
|
||
|
||
def _test_build_comparison_id_map_reads_json():
|
||
comparison = {
|
||
"compared_presets": ["A", "B"],
|
||
"shared": {"mod_count": 1, "mods": [
|
||
{"name": "CBA_A3", "steam_id": "450814997", "url": None, "source": "steam"},
|
||
]},
|
||
"unique": {
|
||
"A": {"mod_count": 1, "mods": [
|
||
{"name": "ACE3", "steam_id": "463939057", "url": None, "source": "steam"},
|
||
]},
|
||
"B": {"mod_count": 0, "mods": []},
|
||
},
|
||
}
|
||
with tempfile.TemporaryDirectory() as d:
|
||
cfg_data = {
|
||
"server": {"base_url": "x", "username": "u", "password": "p"},
|
||
"paths": {"arma_dir": d, "downloads": d,
|
||
"modlist_html": d, "modlist_json": d},
|
||
}
|
||
import json as _json
|
||
cfg_file = Path(d) / "config.json"
|
||
cfg_file.write_text(_json.dumps(cfg_data), encoding="utf-8")
|
||
from arma_modlist_tools.config import load_config as _lc
|
||
cfg = _lc(str(cfg_file))
|
||
# Write a fake comparison.json
|
||
comp_path = Path(d) / "comparison.json"
|
||
comp_path.write_text(_json.dumps(comparison), encoding="utf-8")
|
||
id_map = _build_comparison_id_map(cfg)
|
||
|
||
assert_in("cbaa3", id_map)
|
||
assert_eq(id_map["cbaa3"][0], "450814997")
|
||
assert_in("ace3", id_map)
|
||
assert_eq(id_map["ace3"][0], "463939057")
|
||
|
||
test("_build_comparison_id_map: reads steam_ids from comparison.json", _test_build_comparison_id_map_reads_json)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 10. Integration: parse → compare → reporter (offline)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
group("integration: parse → compare → reporter (offline)")
|
||
|
||
|
||
def _test_end_to_end_offline():
|
||
"""Parse real HTML files, compare them, build a mock report."""
|
||
html_dir = Path(__file__).parent / "modlist_html"
|
||
presets = parse_modlist_dir(html_dir)
|
||
assert len(presets) >= 2, "Need >=2 HTML presets for integration test"
|
||
|
||
comparison = compare_presets(*presets)
|
||
total = comparison["shared"]["mod_count"] + sum(
|
||
d["mod_count"] for d in comparison["unique"].values()
|
||
)
|
||
assert total > 0, "Comparison must have mods"
|
||
|
||
# Build report against empty index (all missing)
|
||
report_all_missing = build_missing_report(comparison, {"by_steam_id": {}, "by_name": {}})
|
||
assert_eq(report_all_missing["missing"], total)
|
||
|
||
# Build report against full index (all found)
|
||
full_by_id = {
|
||
mod["steam_id"]: f"https://x.com/@mod{i}/"
|
||
for i, mod in enumerate(
|
||
comparison["shared"]["mods"]
|
||
+ [m for d in comparison["unique"].values() for m in d["mods"]]
|
||
)
|
||
if mod.get("steam_id")
|
||
}
|
||
full_by_name = {
|
||
_normalize_name(mod["name"]): f"https://x.com/@mod{i}/"
|
||
for i, mod in enumerate(
|
||
comparison["shared"]["mods"]
|
||
+ [m for d in comparison["unique"].values() for m in d["mods"]]
|
||
)
|
||
if not mod.get("steam_id")
|
||
}
|
||
report_all_found = build_missing_report(
|
||
comparison, {"by_steam_id": full_by_id, "by_name": full_by_name}
|
||
)
|
||
assert_eq(report_all_found["missing"], 0)
|
||
|
||
|
||
def _test_comparison_json_consistent_with_html():
|
||
"""The on-disk comparison.json must be internally consistent with the HTML files.
|
||
|
||
The pipeline lets users compare a *subset* of available presets, so we only
|
||
verify that every preset listed in comparison.json has a matching HTML file —
|
||
not that all HTML files were included.
|
||
"""
|
||
html_dir = Path(__file__).parent / "modlist_html"
|
||
json_file = Path(__file__).parent / "modlist_json" / "comparison.json"
|
||
if not json_file.exists():
|
||
raise _SkipTest("comparison.json not found (run pipeline first)")
|
||
|
||
available_presets = {p.stem for p in html_dir.glob("*.html")}
|
||
on_disk = json.loads(json_file.read_text(encoding="utf-8"))
|
||
|
||
# Every preset referenced in comparison.json must have a source HTML file
|
||
for pname in on_disk["compared_presets"]:
|
||
assert pname in available_presets, (
|
||
f"comparison.json references '{pname}' but no matching HTML file found"
|
||
)
|
||
|
||
# Re-compare only the presets that were actually used on disk
|
||
selected = [p for p in parse_modlist_dir(html_dir)
|
||
if p["preset_name"] in on_disk["compared_presets"]]
|
||
if len(selected) < 2:
|
||
raise _SkipTest("fewer than 2 matching HTML presets available")
|
||
|
||
fresh = compare_presets(*selected)
|
||
assert_eq(fresh["shared"]["mod_count"], on_disk["shared"]["mod_count"])
|
||
for pname in fresh["compared_presets"]:
|
||
assert_eq(
|
||
fresh["unique"][pname]["mod_count"],
|
||
on_disk["unique"][pname]["mod_count"],
|
||
f"unique mod count mismatch for {pname}",
|
||
)
|
||
|
||
|
||
test("end-to-end offline pipeline (parse → compare → report)", _test_end_to_end_offline)
|
||
test("comparison.json on disk matches fresh parse+compare", _test_comparison_json_consistent_with_html)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 11. gui._io — QueueWriter
|
||
# ---------------------------------------------------------------------------
|
||
|
||
group("gui._io — QueueWriter")
|
||
|
||
import importlib.util as _ilu
|
||
import queue as _queue_mod
|
||
|
||
# Load gui/_io.py without triggering gui/__init__.py (which requires customtkinter)
|
||
_io_spec = _ilu.spec_from_file_location("gui._io", Path(__file__).parent / "gui" / "_io.py")
|
||
_io_mod = _ilu.module_from_spec(_io_spec)
|
||
_io_spec.loader.exec_module(_io_mod)
|
||
_QueueWriter = _io_mod._QueueWriter
|
||
|
||
|
||
def _qw():
|
||
"""Return a fresh (writer, queue) pair."""
|
||
q = _queue_mod.Queue()
|
||
return _QueueWriter(q), q
|
||
|
||
|
||
def _test_qw_clean_text_passes_through():
|
||
w, q = _qw()
|
||
w.write("hello\n")
|
||
assert_eq(q.get_nowait(), "hello\n")
|
||
|
||
|
||
def _test_qw_strips_csi_escape_sequences():
|
||
w, q = _qw()
|
||
w.write("\x1b[32mGreen\x1b[0m")
|
||
assert_eq(q.get_nowait(), "Green")
|
||
|
||
|
||
def _test_qw_strips_osc_escape_sequences():
|
||
w, q = _qw()
|
||
w.write("\x1b]0;window title\x07visible text")
|
||
assert_eq(q.get_nowait(), "visible text")
|
||
|
||
|
||
def _test_qw_bare_cr_becomes_newline():
|
||
w, q = _qw()
|
||
w.write("first\rsecond")
|
||
assert_eq(q.get_nowait(), "first\nsecond")
|
||
|
||
|
||
def _test_qw_crlf_normalised_to_lf():
|
||
w, q = _qw()
|
||
w.write("line1\r\nline2")
|
||
assert_eq(q.get_nowait(), "line1\nline2")
|
||
|
||
|
||
def _test_qw_empty_string_not_enqueued():
|
||
w, q = _qw()
|
||
w.write("")
|
||
assert q.empty(), "empty write should not enqueue anything"
|
||
|
||
|
||
def _test_qw_only_ansi_not_enqueued():
|
||
w, q = _qw()
|
||
w.write("\x1b[32m\x1b[0m")
|
||
assert q.empty(), "write that strips to empty should not enqueue"
|
||
|
||
|
||
def _test_qw_returns_original_byte_length():
|
||
w, q = _qw()
|
||
raw = "\x1b[32mhello\x1b[0m"
|
||
result = w.write(raw)
|
||
assert_eq(result, len(raw))
|
||
|
||
|
||
def _test_qw_tqdm_progress_line():
|
||
"""tqdm uses \\r to overwrite in-place; should become a newline in the log."""
|
||
w, q = _qw()
|
||
w.write("\r 50%|█████ | 1/2 [00:01<00:01]")
|
||
out = q.get_nowait()
|
||
assert out.startswith("\n"), f"expected leading newline, got {out!r}"
|
||
assert "50%" in out
|
||
|
||
|
||
def _test_qw_mixed_ansi_and_cr():
|
||
w, q = _qw()
|
||
w.write("\x1b[1m\rProgress: 75%\x1b[0m")
|
||
out = q.get_nowait()
|
||
assert "\x1b" not in out, "ANSI codes should be stripped"
|
||
assert "\r" not in out, "bare CR should be converted"
|
||
assert "75%" in out
|
||
|
||
|
||
test("clean text passes through unchanged", _test_qw_clean_text_passes_through)
|
||
test("CSI escape sequences stripped", _test_qw_strips_csi_escape_sequences)
|
||
test("OSC escape sequences stripped", _test_qw_strips_osc_escape_sequences)
|
||
test("bare CR converted to newline", _test_qw_bare_cr_becomes_newline)
|
||
test("CRLF normalised to LF", _test_qw_crlf_normalised_to_lf)
|
||
test("empty string not enqueued", _test_qw_empty_string_not_enqueued)
|
||
test("all-ANSI write not enqueued", _test_qw_only_ansi_not_enqueued)
|
||
test("write() returns original length", _test_qw_returns_original_byte_length)
|
||
test("tqdm \\r progress line becomes newline", _test_qw_tqdm_progress_line)
|
||
test("mixed ANSI + CR stripped and converted", _test_qw_mixed_ansi_and_cr)
|
||
|
||
|
||
def _test_qw_osc_st_terminator():
|
||
"""OSC sequences ended with ST (ESC \\) must also be stripped."""
|
||
w, q = _qw()
|
||
w.write("\x1b]0;title\x1b\\visible text")
|
||
assert_eq(q.get_nowait(), "visible text")
|
||
|
||
|
||
test("OSC ST-terminated sequence stripped", _test_qw_osc_st_terminator)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 12. cleaner — find_orphan_folders
|
||
# ---------------------------------------------------------------------------
|
||
|
||
group("cleaner — find_orphan_folders")
|
||
|
||
from arma_modlist_tools.cleaner import find_orphan_folders, folder_size
|
||
|
||
|
||
def _mk_mod_dir(root: Path, group: str, name: str, files: list[str] | None = None) -> Path:
|
||
"""Create a mock mod folder under root/group/@name with optional dummy files."""
|
||
mod_dir = root / group / f"@{name}"
|
||
mod_dir.mkdir(parents=True, exist_ok=True)
|
||
for fname in (files or []):
|
||
f = mod_dir / fname
|
||
f.write_bytes(b"x" * 1024)
|
||
return mod_dir
|
||
|
||
|
||
_COMPARISON_BASE = {
|
||
"compared_presets": ["A", "B"],
|
||
"shared": {"mod_count": 1, "mods": [{"name": "CBA_A3", "steam_id": "1", "url": None, "source": "steam"}]},
|
||
"unique": {
|
||
"A": {"mod_count": 1, "mods": [{"name": "ACE3", "steam_id": "2", "url": None, "source": "steam"}]},
|
||
"B": {"mod_count": 0, "mods": []},
|
||
},
|
||
}
|
||
|
||
|
||
def _test_orphan_empty_downloads():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
result = find_orphan_folders(Path(d) / "nonexistent", _COMPARISON_BASE)
|
||
assert result == []
|
||
|
||
|
||
def _test_orphan_none_when_all_match():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
_mk_mod_dir(d, "shared", "CBA_A3")
|
||
_mk_mod_dir(d, "A", "ACE3")
|
||
result = find_orphan_folders(d, _COMPARISON_BASE)
|
||
assert result == [], f"Expected no orphans, got {result}"
|
||
|
||
|
||
def _test_orphan_detects_removed_mod():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
_mk_mod_dir(d, "shared", "CBA_A3")
|
||
_mk_mod_dir(d, "shared", "OldMod") # not in comparison
|
||
result = find_orphan_folders(d, _COMPARISON_BASE)
|
||
assert len(result) == 1
|
||
assert_eq(result[0]["name"], "@OldMod")
|
||
assert_eq(result[0]["group"], "shared")
|
||
|
||
|
||
def _test_orphan_detects_removed_group():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
_mk_mod_dir(d, "shared", "CBA_A3")
|
||
_mk_mod_dir(d, "OldPreset", "SomeMod") # group no longer in comparison
|
||
result = find_orphan_folders(d, _COMPARISON_BASE)
|
||
assert len(result) == 1
|
||
assert_eq(result[0]["group"], "OldPreset")
|
||
|
||
|
||
def _test_orphan_normalised_name_matches():
|
||
"""A folder named @CBA A3 should match mod named CBA_A3 (normalised)."""
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
# Create folder with spaces — normalises to "cbaa3" same as "CBA_A3"
|
||
mod_dir = d / "shared" / "@CBA A3"
|
||
mod_dir.mkdir(parents=True)
|
||
result = find_orphan_folders(d, _COMPARISON_BASE)
|
||
assert result == [], f"Normalised name should match, got {result}"
|
||
|
||
|
||
def _test_orphan_size_reported():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
mod_dir = _mk_mod_dir(d, "shared", "OldMod", files=["a.pbo", "b.pbo"])
|
||
result = find_orphan_folders(d, _COMPARISON_BASE)
|
||
assert len(result) == 1
|
||
assert result[0]["size"] == 2048 # 2 × 1024 bytes
|
||
|
||
|
||
def _test_orphan_ignores_non_at_folders():
|
||
"""Only @-prefixed directories are considered mod folders."""
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
non_mod = d / "shared" / "keys"
|
||
non_mod.mkdir(parents=True)
|
||
_mk_mod_dir(d, "shared", "CBA_A3")
|
||
result = find_orphan_folders(d, _COMPARISON_BASE)
|
||
assert result == []
|
||
|
||
|
||
def _test_folder_size_recursive():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
(d / "a.pbo").write_bytes(b"x" * 512)
|
||
sub = d / "sub"
|
||
sub.mkdir()
|
||
(sub / "b.pbo").write_bytes(b"x" * 512)
|
||
assert_eq(folder_size(d), 1024)
|
||
|
||
|
||
test("find_orphan_folders: empty downloads dir returns []", _test_orphan_empty_downloads)
|
||
test("find_orphan_folders: no orphans when all mods match", _test_orphan_none_when_all_match)
|
||
test("find_orphan_folders: detects mod removed from comparison", _test_orphan_detects_removed_mod)
|
||
test("find_orphan_folders: entire removed group flagged as orphan", _test_orphan_detects_removed_group)
|
||
test("find_orphan_folders: normalised name matches (spaces vs underscores)", _test_orphan_normalised_name_matches)
|
||
test("find_orphan_folders: orphan size summed correctly", _test_orphan_size_reported)
|
||
test("find_orphan_folders: non-@ folders ignored", _test_orphan_ignores_non_at_folders)
|
||
test("folder_size: sums files recursively", _test_folder_size_recursive)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 13. E2E — clean_orphans.py CLI (subprocess)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
group("e2e — clean_orphans.py CLI")
|
||
|
||
import subprocess as _subprocess
|
||
|
||
|
||
def _make_e2e_root(base: Path, comparison: dict) -> Path:
|
||
"""Create a self-contained project root with config.json + comparison.json
|
||
and a downloads/ directory. Returns the root path."""
|
||
root = base / "project"
|
||
root.mkdir()
|
||
dl = root / "downloads"
|
||
dl.mkdir()
|
||
json_dir = root / "modlist_json"
|
||
json_dir.mkdir()
|
||
arma = root / "arma3server"
|
||
arma.mkdir()
|
||
|
||
cfg = {
|
||
"server": {"base_url": "https://example.com/", "username": "u", "password": "p"},
|
||
"paths": {
|
||
"arma_dir": str(arma),
|
||
"downloads": str(dl),
|
||
"modlist_html": str(root / "modlist_html"),
|
||
"modlist_json": str(json_dir),
|
||
},
|
||
}
|
||
(root / "config.json").write_text(json.dumps(cfg), encoding="utf-8")
|
||
(json_dir / "comparison.json").write_text(
|
||
json.dumps(comparison), encoding="utf-8"
|
||
)
|
||
return root
|
||
|
||
|
||
def _run_clean_orphans(root: Path, *extra_args: str) -> _subprocess.CompletedProcess:
|
||
"""Run clean_orphans.py from the given project root."""
|
||
script = str(Path(__file__).parent / "clean_orphans.py")
|
||
return _subprocess.run(
|
||
[sys.executable, script] + list(extra_args),
|
||
cwd=str(root),
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
|
||
|
||
_E2E_COMPARISON = {
|
||
"compared_presets": ["A", "B"],
|
||
"shared": {"mod_count": 1, "mods": [{"name": "CBA_A3", "steam_id": "1", "url": None, "source": "steam"}]},
|
||
"unique": {
|
||
"A": {"mod_count": 1, "mods": [{"name": "ACE3", "steam_id": "2", "url": None, "source": "steam"}]},
|
||
"B": {"mod_count": 0, "mods": []},
|
||
},
|
||
}
|
||
|
||
|
||
def _test_e2e_dry_run_lists_orphans():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
root = _make_e2e_root(Path(d), _E2E_COMPARISON)
|
||
dl = root / "downloads"
|
||
(dl / "shared" / "@CBA_A3").mkdir(parents=True) # known
|
||
orphan = dl / "shared" / "@OldMod"
|
||
orphan.mkdir(parents=True)
|
||
(orphan / "file.pbo").write_bytes(b"x" * 512)
|
||
|
||
result = _run_clean_orphans(root, "--dry-run")
|
||
|
||
assert result.returncode == 0, f"Expected 0, got {result.returncode}\n{result.stderr}"
|
||
assert "@OldMod" in result.stdout, "Orphan not listed in dry-run output"
|
||
assert orphan.exists(), "--dry-run must not delete files"
|
||
|
||
|
||
def _test_e2e_no_orphans_clean_exit():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
root = _make_e2e_root(Path(d), _E2E_COMPARISON)
|
||
dl = root / "downloads"
|
||
(dl / "shared" / "@CBA_A3").mkdir(parents=True)
|
||
(dl / "A" / "@ACE3").mkdir(parents=True)
|
||
|
||
result = _run_clean_orphans(root, "--dry-run")
|
||
|
||
assert result.returncode == 0
|
||
assert "No orphans" in result.stdout
|
||
|
||
|
||
def _test_e2e_yes_deletes_orphans():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
root = _make_e2e_root(Path(d), _E2E_COMPARISON)
|
||
dl = root / "downloads"
|
||
(dl / "shared" / "@CBA_A3").mkdir(parents=True)
|
||
orphan = dl / "shared" / "@OldMod"
|
||
orphan.mkdir(parents=True)
|
||
(orphan / "file.pbo").write_bytes(b"data")
|
||
|
||
result = _run_clean_orphans(root, "--yes")
|
||
|
||
assert result.returncode == 0, f"Expected 0\n{result.stderr}"
|
||
assert not orphan.exists(), "Orphan should have been deleted"
|
||
assert "Deleted" in result.stdout
|
||
|
||
|
||
def _test_e2e_yes_preserves_known_mods():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
root = _make_e2e_root(Path(d), _E2E_COMPARISON)
|
||
dl = root / "downloads"
|
||
known = dl / "shared" / "@CBA_A3"
|
||
known.mkdir(parents=True)
|
||
(known / "cba.pbo").write_bytes(b"keep me")
|
||
orphan = dl / "shared" / "@GoneMod"
|
||
orphan.mkdir(parents=True)
|
||
|
||
result = _run_clean_orphans(root, "--yes")
|
||
|
||
assert result.returncode == 0
|
||
assert known.exists(), "Known mod must NOT be deleted"
|
||
assert not orphan.exists(), "Orphan must be deleted"
|
||
|
||
|
||
def _test_e2e_missing_comparison_exits_1():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
root = _make_e2e_root(Path(d), _E2E_COMPARISON)
|
||
# Remove the comparison.json so the script can't find it
|
||
(root / "modlist_json" / "comparison.json").unlink()
|
||
|
||
result = _run_clean_orphans(root, "--dry-run")
|
||
|
||
assert result.returncode == 1, "Should exit 1 when comparison.json missing"
|
||
assert "ERROR" in result.stdout
|
||
|
||
|
||
def _test_e2e_removed_group_flagged():
|
||
"""A group folder that no longer exists in comparison.json is fully orphaned."""
|
||
with tempfile.TemporaryDirectory() as d:
|
||
root = _make_e2e_root(Path(d), _E2E_COMPARISON)
|
||
dl = root / "downloads"
|
||
old_group = dl / "OldPreset" / "@SomeMod"
|
||
old_group.mkdir(parents=True)
|
||
|
||
result = _run_clean_orphans(root, "--dry-run")
|
||
|
||
assert result.returncode == 0
|
||
assert "@SomeMod" in result.stdout
|
||
|
||
|
||
test("e2e dry-run: lists orphans, does not delete", _test_e2e_dry_run_lists_orphans)
|
||
test("e2e dry-run: exits 0 when downloads clean", _test_e2e_no_orphans_clean_exit)
|
||
test("e2e --yes: deletes orphans", _test_e2e_yes_deletes_orphans)
|
||
test("e2e --yes: preserves known mods", _test_e2e_yes_preserves_known_mods)
|
||
test("e2e missing comparison.json: exits 1", _test_e2e_missing_comparison_exits_1)
|
||
test("e2e removed group: all mods flagged as orphans", _test_e2e_removed_group_flagged)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 14. Coverage gap tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
group("coverage gaps — cleaner, config, parser, linker, compat")
|
||
|
||
import os as _os
|
||
from unittest.mock import patch as _patch, MagicMock as _MagicMock
|
||
from arma_modlist_tools.config import load_config as _load_config
|
||
from arma_modlist_tools import compat as _compat_mod
|
||
from arma_modlist_tools import linker as _linker_mod
|
||
from arma_modlist_tools.parser import _source_from_class, parse_modlist_html
|
||
|
||
|
||
# ── cleaner.py ───────────────────────────────────────────────────────────────
|
||
|
||
def _test_cleaner_skips_file_in_downloads_root():
|
||
"""A plain file at downloads/{file} (not a dir) is silently skipped."""
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
(d / "shared").mkdir()
|
||
(d / "shared" / "@CBA_A3").mkdir()
|
||
(d / "readme.txt").write_text("not a group dir") # triggers the skip branch
|
||
result = find_orphan_folders(d, _COMPARISON_BASE)
|
||
assert result == []
|
||
|
||
|
||
test("cleaner: plain file in downloads root is skipped (not treated as group)", _test_cleaner_skips_file_in_downloads_root)
|
||
|
||
|
||
# ── config.py ────────────────────────────────────────────────────────────────
|
||
|
||
def _test_load_config_fallback_to_root_path():
|
||
"""load_config() falls back to root_path (project root) when CWD has no config.json."""
|
||
old_cwd = _os.getcwd()
|
||
with tempfile.TemporaryDirectory() as d:
|
||
_os.chdir(d) # CWD has no config.json
|
||
try:
|
||
cfg = _load_config() # must find via root_path (project root has config.json)
|
||
finally:
|
||
_os.chdir(old_cwd) # restore BEFORE tempdir cleanup to avoid WinError 32
|
||
assert cfg is not None
|
||
|
||
|
||
def _test_load_config_raises_when_not_found():
|
||
"""load_config() raises FileNotFoundError when neither search path has config.json."""
|
||
old_cwd = _os.getcwd()
|
||
with tempfile.TemporaryDirectory() as d:
|
||
_os.chdir(d) # no config.json in CWD
|
||
try:
|
||
# Patch __file__ inside config.py so root_path also points somewhere without config.json
|
||
with _patch("arma_modlist_tools.config.__file__",
|
||
str(Path(d) / "fake" / "arma_modlist_tools" / "config.py")):
|
||
try:
|
||
_load_config()
|
||
assert False, "Should have raised FileNotFoundError"
|
||
except FileNotFoundError:
|
||
pass
|
||
finally:
|
||
_os.chdir(old_cwd)
|
||
|
||
|
||
test("config: load_config() falls back to root_path when CWD has no config.json", _test_load_config_fallback_to_root_path)
|
||
test("config: load_config() raises FileNotFoundError when config.json not found anywhere", _test_load_config_raises_when_not_found)
|
||
|
||
|
||
# ── parser.py ────────────────────────────────────────────────────────────────
|
||
|
||
def _test_source_from_class_unknown():
|
||
"""_source_from_class returns 'unknown' for unrecognised CSS class strings."""
|
||
assert_eq(_source_from_class("from-workshop"), "unknown")
|
||
assert_eq(_source_from_class(""), "unknown")
|
||
assert_eq(_source_from_class("some-other-class"), "unknown")
|
||
|
||
|
||
def _test_parse_modlist_html_skips_rows_without_name():
|
||
"""Rows with no DisplayName td return None and are excluded from results.
|
||
Also exercises the `continue` branch for non-ModContainer <tr> elements.
|
||
"""
|
||
# Row 1: valid mod
|
||
# Row 2: no DisplayName td at all → parse_mod_entry returns None → skipped
|
||
# Row 3: regular header row (no data-type="ModContainer") → parser skips via continue
|
||
html = textwrap.dedent("""\
|
||
<?xml version="1.0" encoding="utf-8"?>
|
||
<html>
|
||
<body>
|
||
<table>
|
||
<tr>
|
||
<th>Header row (no ModContainer attr)</th>
|
||
</tr>
|
||
<tr data-type="ModContainer">
|
||
<td data-type="DisplayName">Real Mod</td>
|
||
<td><span class="from-steam"></span>
|
||
<a data-type="Link" href="https://steamcommunity.com/sharedfiles/filedetails/?id=123"></a>
|
||
</td>
|
||
</tr>
|
||
<tr data-type="ModContainer">
|
||
<td data-type="SomeOtherType">no display name here</td>
|
||
</tr>
|
||
</table>
|
||
</body>
|
||
</html>
|
||
""")
|
||
with tempfile.NamedTemporaryFile(mode="w", suffix=".html", delete=False,
|
||
encoding="utf-8") as f:
|
||
f.write(html)
|
||
fname = f.name
|
||
try:
|
||
result = parse_modlist_html(fname)
|
||
assert_eq(result["mod_count"], 1)
|
||
assert_eq(result["mods"][0]["name"], "Real Mod")
|
||
finally:
|
||
Path(fname).unlink(missing_ok=True)
|
||
|
||
|
||
test("parser: _source_from_class returns 'unknown' for unrecognised class", _test_source_from_class_unknown)
|
||
test("parser: rows with empty DisplayName are skipped", _test_parse_modlist_html_skips_rows_without_name)
|
||
|
||
|
||
# ── linker.py ────────────────────────────────────────────────────────────────
|
||
|
||
def _test_remove_junction_oserror():
|
||
"""remove_junction returns (False, message) when OSError occurs."""
|
||
with tempfile.TemporaryDirectory() as d:
|
||
non_existent = Path(d) / "ghost_link"
|
||
ok, err = _linker_mod.remove_junction(non_existent)
|
||
assert not ok
|
||
assert err # error message is non-empty
|
||
|
||
|
||
def _test_link_group_records_failed_when_create_returns_false():
|
||
"""link_group counts a failure when create_junction returns False."""
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
group_dir = d / "shared"
|
||
arma_dir = d / "arma"
|
||
group_dir.mkdir()
|
||
arma_dir.mkdir()
|
||
(group_dir / "@ace").mkdir()
|
||
|
||
with _patch("arma_modlist_tools.linker.create_junction", return_value=False):
|
||
result = _linker_mod.link_group(group_dir, arma_dir)
|
||
|
||
assert_eq(result["failed"], 1)
|
||
assert "ace" in " ".join(result["errors"].keys()).lower()
|
||
|
||
|
||
def _test_unlink_group_records_failed_when_remove_errors():
|
||
"""unlink_group counts a failure when remove_junction returns an error."""
|
||
with tempfile.TemporaryDirectory() as d:
|
||
d = Path(d)
|
||
group_dir = d / "shared"
|
||
arma_dir = d / "arma"
|
||
group_dir.mkdir()
|
||
arma_dir.mkdir()
|
||
(group_dir / "@ace").mkdir()
|
||
|
||
# Pretend it's already linked so unlink_group tries to remove it
|
||
with _patch("arma_modlist_tools.linker.get_link_status") as mock_status, \
|
||
_patch("arma_modlist_tools.linker.remove_junction", return_value=(False, "perm denied")):
|
||
mock_status.return_value = [{
|
||
"name": "@ace",
|
||
"source_path": group_dir / "@ace",
|
||
"link_path": arma_dir / "@ace",
|
||
"is_linked": True,
|
||
}]
|
||
result = _linker_mod.unlink_group(group_dir, arma_dir)
|
||
|
||
assert_eq(result["failed"], 1)
|
||
assert result["errors"]
|
||
|
||
|
||
def _test_is_junction_linux_path():
|
||
"""_is_junction uses os.path.islink on Linux."""
|
||
with _patch("arma_modlist_tools.linker.is_windows", return_value=False), \
|
||
_patch("os.path.islink", return_value=True) as mock_islink:
|
||
result = _linker_mod._is_junction(Path("/fake/path"))
|
||
assert result is True
|
||
mock_islink.assert_called_once()
|
||
|
||
|
||
def _test_create_junction_linux_success():
|
||
"""create_junction calls os.symlink on Linux (mocked — Windows lacks symlink perms)."""
|
||
with tempfile.TemporaryDirectory() as d:
|
||
link_path = Path(d) / "link"
|
||
target = Path(d) / "target"
|
||
with _patch("arma_modlist_tools.linker.is_windows", return_value=False), \
|
||
_patch("os.symlink") as mock_sym:
|
||
ok = _linker_mod.create_junction(link_path, target)
|
||
assert ok
|
||
mock_sym.assert_called_once_with(str(target), str(link_path))
|
||
|
||
|
||
def _test_create_junction_linux_oserror():
|
||
"""create_junction returns False if os.symlink raises OSError on Linux."""
|
||
with tempfile.TemporaryDirectory() as d:
|
||
link_path = Path(d) / "link"
|
||
with _patch("arma_modlist_tools.linker.is_windows", return_value=False), \
|
||
_patch("os.symlink", side_effect=OSError("perm")):
|
||
ok = _linker_mod.create_junction(link_path, Path(d) / "target")
|
||
assert not ok
|
||
|
||
|
||
def _test_remove_junction_linux():
|
||
"""remove_junction calls os.unlink on Linux (mocked — Windows lacks symlink perms)."""
|
||
with tempfile.TemporaryDirectory() as d:
|
||
link = Path(d) / "link"
|
||
with _patch("arma_modlist_tools.linker.is_windows", return_value=False), \
|
||
_patch("os.unlink") as mock_unlink:
|
||
ok, err = _linker_mod.remove_junction(link)
|
||
assert ok
|
||
assert err == ""
|
||
mock_unlink.assert_called_once_with(str(link))
|
||
|
||
|
||
test("linker: remove_junction returns (False, msg) on OSError", _test_remove_junction_oserror)
|
||
test("linker: link_group records failure when create_junction -> False", _test_link_group_records_failed_when_create_returns_false)
|
||
test("linker: unlink_group records failure when remove_junction errors", _test_unlink_group_records_failed_when_remove_errors)
|
||
test("linker: _is_junction uses os.path.islink on Linux", _test_is_junction_linux_path)
|
||
test("linker: create_junction calls os.symlink on Linux (success)", _test_create_junction_linux_success)
|
||
test("linker: create_junction returns False if os.symlink raises", _test_create_junction_linux_oserror)
|
||
test("linker: remove_junction calls os.unlink on Linux", _test_remove_junction_linux)
|
||
|
||
|
||
# ── compat.py ────────────────────────────────────────────────────────────────
|
||
|
||
def _test_get_os_label_windows_server():
|
||
"""get_os_label returns 'Windows Server' when version string contains 'Server'."""
|
||
with _patch("arma_modlist_tools.compat.is_windows", return_value=True), \
|
||
_patch("platform.version", return_value="10.0.17763 Windows Server 2019"):
|
||
label = _compat_mod.get_os_label()
|
||
assert_eq(label, "Windows Server")
|
||
|
||
|
||
def _test_get_os_label_linux_ubuntu_desktop():
|
||
with _patch("arma_modlist_tools.compat.is_windows", return_value=False), \
|
||
_patch("arma_modlist_tools.compat.is_linux", return_value=True), \
|
||
_patch("arma_modlist_tools.compat._read_os_release",
|
||
return_value={"NAME": "Ubuntu"}), \
|
||
_patch("arma_modlist_tools.compat._is_headless", return_value=False):
|
||
label = _compat_mod.get_os_label()
|
||
assert_eq(label, "Ubuntu")
|
||
|
||
|
||
def _test_get_os_label_linux_ubuntu_server():
|
||
with _patch("arma_modlist_tools.compat.is_windows", return_value=False), \
|
||
_patch("arma_modlist_tools.compat.is_linux", return_value=True), \
|
||
_patch("arma_modlist_tools.compat._read_os_release",
|
||
return_value={"NAME": "Ubuntu"}), \
|
||
_patch("arma_modlist_tools.compat._is_headless", return_value=True):
|
||
label = _compat_mod.get_os_label()
|
||
assert_eq(label, "Ubuntu Server")
|
||
|
||
|
||
def _test_get_os_label_linux_other():
|
||
with _patch("arma_modlist_tools.compat.is_windows", return_value=False), \
|
||
_patch("arma_modlist_tools.compat.is_linux", return_value=True), \
|
||
_patch("arma_modlist_tools.compat._read_os_release",
|
||
return_value={"NAME": "Debian GNU/Linux"}):
|
||
label = _compat_mod.get_os_label()
|
||
assert_eq(label, "Linux")
|
||
|
||
|
||
def _test_get_os_label_unknown_platform():
|
||
with _patch("arma_modlist_tools.compat.is_windows", return_value=False), \
|
||
_patch("arma_modlist_tools.compat.is_linux", return_value=False):
|
||
label = _compat_mod.get_os_label()
|
||
assert_eq(label, "Unknown")
|
||
|
||
|
||
def _test_read_os_release_parses_file():
|
||
content = 'NAME="Ubuntu"\nVERSION_ID="22.04"\n# comment\nID=ubuntu\n'
|
||
import io as _io
|
||
with _patch("builtins.open", return_value=_io.StringIO(content)):
|
||
result = _compat_mod._read_os_release()
|
||
assert_eq(result.get("NAME"), "Ubuntu")
|
||
assert_eq(result.get("VERSION_ID"), "22.04")
|
||
|
||
|
||
def _test_read_os_release_handles_missing_file():
|
||
with _patch("builtins.open", side_effect=OSError("no such file")):
|
||
result = _compat_mod._read_os_release()
|
||
assert_eq(result, {})
|
||
|
||
|
||
def _test_is_headless_with_display():
|
||
with _patch.dict("os.environ", {"DISPLAY": ":0"}, clear=False):
|
||
assert not _compat_mod._is_headless()
|
||
|
||
|
||
def _test_is_headless_without_display():
|
||
with _patch.dict("os.environ", {}, clear=True):
|
||
assert _compat_mod._is_headless()
|
||
|
||
|
||
def _test_fix_console_encoding_non_windows_noop():
|
||
"""fix_console_encoding is a no-op on non-Windows."""
|
||
import io as _io
|
||
original_stdout = sys.stdout
|
||
with _patch("arma_modlist_tools.compat.is_windows", return_value=False):
|
||
_compat_mod.fix_console_encoding()
|
||
assert sys.stdout is original_stdout
|
||
|
||
|
||
def _test_fix_console_encoding_already_utf8():
|
||
"""fix_console_encoding skips wrapping when stdout is already UTF-8."""
|
||
# encoding is a readonly attribute on real TextIOWrapper, so use a fake stdout
|
||
fake_stdout = _MagicMock()
|
||
fake_stdout.encoding = "utf-8"
|
||
original_stdout = sys.stdout
|
||
try:
|
||
with _patch("arma_modlist_tools.compat.is_windows", return_value=True):
|
||
sys.stdout = fake_stdout
|
||
_compat_mod.fix_console_encoding()
|
||
assert sys.stdout is fake_stdout # still the same object (not wrapped)
|
||
finally:
|
||
sys.stdout = original_stdout
|
||
|
||
|
||
test("compat: get_os_label returns 'Windows Server' on Windows Server", _test_get_os_label_windows_server)
|
||
test("compat: get_os_label returns 'Ubuntu' on Ubuntu desktop", _test_get_os_label_linux_ubuntu_desktop)
|
||
test("compat: get_os_label returns 'Ubuntu Server' on headless Ubuntu", _test_get_os_label_linux_ubuntu_server)
|
||
test("compat: get_os_label returns 'Linux' on non-Ubuntu Linux", _test_get_os_label_linux_other)
|
||
test("compat: get_os_label returns 'Unknown' on unrecognised platform", _test_get_os_label_unknown_platform)
|
||
test("compat: _read_os_release parses key=value pairs", _test_read_os_release_parses_file)
|
||
test("compat: _read_os_release returns {} when file missing", _test_read_os_release_handles_missing_file)
|
||
test("compat: _is_headless returns False when DISPLAY is set", _test_is_headless_with_display)
|
||
test("compat: _is_headless returns True when no display env vars", _test_is_headless_without_display)
|
||
def _test_fix_console_encoding_none_stdout():
|
||
"""fix_console_encoding is a no-op when sys.stdout is None (pythonw.exe)."""
|
||
original_stdout = sys.stdout
|
||
try:
|
||
with _patch("arma_modlist_tools.compat.is_windows", return_value=True):
|
||
sys.stdout = None
|
||
_compat_mod.fix_console_encoding() # must not raise
|
||
assert sys.stdout is None
|
||
finally:
|
||
sys.stdout = original_stdout
|
||
|
||
|
||
test("compat: fix_console_encoding is no-op on non-Windows", _test_fix_console_encoding_non_windows_noop)
|
||
test("compat: fix_console_encoding skips when stdout already UTF-8", _test_fix_console_encoding_already_utf8)
|
||
test("compat: fix_console_encoding is no-op when stdout is None", _test_fix_console_encoding_none_stdout)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 15. Live-server fetcher tests (skipped when server unreachable)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
group("fetcher — live server (skipped if unreachable)")
|
||
|
||
import requests as _requests
|
||
from arma_modlist_tools.fetcher import (
|
||
make_session as _make_session,
|
||
build_server_index as _build_server_index,
|
||
find_mod_folder as _find_mod_folder,
|
||
list_mod_files as _list_mod_files,
|
||
)
|
||
|
||
# ── One-time setup: try to load config and probe the server ─────────────────
|
||
|
||
_LIVE_INDEX: dict | None = None
|
||
_LIVE_SESSION: "_requests.Session | None" = None
|
||
_LIVE_BASE_URL: str = ""
|
||
_LIVE_SKIP_REASON: str = ""
|
||
|
||
try:
|
||
_live_cfg = _load_config()
|
||
_LIVE_BASE_URL = _live_cfg.server_url
|
||
_live_auth = _live_cfg.server_auth
|
||
# Quick reachability probe — just GET the root, no JSON parsing
|
||
_probe = _requests.get(_LIVE_BASE_URL, auth=_live_auth, timeout=8)
|
||
_probe.raise_for_status()
|
||
# Reachable → build the index (one network round-trip per @ folder)
|
||
_LIVE_SESSION = _make_session(_live_auth)
|
||
_LIVE_INDEX = _build_server_index(_LIVE_BASE_URL, _live_auth)
|
||
except Exception as _live_exc:
|
||
_LIVE_SKIP_REASON = str(_live_exc)
|
||
|
||
|
||
def _require_live() -> None:
|
||
"""Raise _SkipTest if the server is unreachable."""
|
||
if _LIVE_INDEX is None:
|
||
raise _SkipTest(f"server unreachable: {_LIVE_SKIP_REASON}")
|
||
|
||
|
||
# ── Tests ────────────────────────────────────────────────────────────────────
|
||
|
||
def _test_live_index_structure():
|
||
"""build_server_index returns the expected three-key structure."""
|
||
_require_live()
|
||
assert "by_steam_id" in _LIVE_INDEX
|
||
assert "by_name" in _LIVE_INDEX
|
||
assert "folders" in _LIVE_INDEX
|
||
|
||
|
||
def _test_live_index_has_folders():
|
||
"""The server must have at least one mod folder."""
|
||
_require_live()
|
||
assert len(_LIVE_INDEX["folders"]) > 0, "Expected at least one folder on server"
|
||
|
||
|
||
def _test_live_index_has_steam_id_entries():
|
||
"""At least some folders must have parseable meta.cpp (steam_id index populated)."""
|
||
_require_live()
|
||
assert len(_LIVE_INDEX["by_steam_id"]) > 0, "Expected at least one steam_id entry"
|
||
|
||
|
||
def _test_live_index_has_name_entries():
|
||
"""Every @ folder adds an entry to by_name (normalized)."""
|
||
_require_live()
|
||
assert len(_LIVE_INDEX["by_name"]) > 0, "Expected at least one by_name entry"
|
||
|
||
|
||
def _test_live_find_mod_by_steam_id():
|
||
"""find_mod_folder locates CBA_A3 by its known steam_id (450814997)."""
|
||
_require_live()
|
||
mod = {"name": "CBA_A3", "steam_id": "450814997"}
|
||
url = _find_mod_folder(mod, _LIVE_INDEX)
|
||
assert url is not None, "CBA_A3 not found by steam_id — is it on the server?"
|
||
assert "@" in url.lower() or "cba" in url.lower(), f"URL looks wrong: {url}"
|
||
|
||
|
||
def _test_live_find_mod_url_is_reachable():
|
||
"""The URL returned for CBA_A3 must respond with HTTP 200."""
|
||
_require_live()
|
||
mod = {"name": "CBA_A3", "steam_id": "450814997"}
|
||
url = _find_mod_folder(mod, _LIVE_INDEX)
|
||
if url is None:
|
||
raise _SkipTest("CBA_A3 not in index — skipping reachability check")
|
||
r = _LIVE_SESSION.get(url, timeout=10)
|
||
assert r.status_code == 200, f"Expected 200, got {r.status_code} for {url}"
|
||
|
||
|
||
def _test_live_list_mod_files_returns_entries():
|
||
"""list_mod_files returns a non-empty list for CBA_A3."""
|
||
_require_live()
|
||
mod = {"name": "CBA_A3", "steam_id": "450814997"}
|
||
url = _find_mod_folder(mod, _LIVE_INDEX)
|
||
if url is None:
|
||
raise _SkipTest("CBA_A3 not in index — skipping list_mod_files check")
|
||
files = _list_mod_files(url, _LIVE_SESSION)
|
||
assert len(files) > 0, "CBA_A3 has no files? Unexpected."
|
||
|
||
|
||
def _test_live_list_mod_files_tuple_shape():
|
||
"""Each entry from list_mod_files is a (rel_path, url, size) 3-tuple."""
|
||
_require_live()
|
||
mod = {"name": "CBA_A3", "steam_id": "450814997"}
|
||
url = _find_mod_folder(mod, _LIVE_INDEX)
|
||
if url is None:
|
||
raise _SkipTest("CBA_A3 not in index — skipping tuple shape check")
|
||
files = _list_mod_files(url, _LIVE_SESSION)
|
||
if not files:
|
||
raise _SkipTest("no files returned — skipping tuple shape check")
|
||
rel, file_url, size = files[0]
|
||
assert isinstance(rel, str) and rel, f"rel_path must be non-empty string, got {rel!r}"
|
||
assert isinstance(file_url, str) and file_url, f"file_url must be non-empty string"
|
||
assert isinstance(size, int) and size >= 0, f"size must be non-negative int, got {size!r}"
|
||
|
||
|
||
def _test_live_find_mod_by_name_fallback():
|
||
"""find_mod_folder can locate a mod by normalized name when steam_id is absent."""
|
||
_require_live()
|
||
# Use a mod with no steam_id — name-only lookup
|
||
mod = {"name": "CBA_A3", "steam_id": ""}
|
||
url = _find_mod_folder(mod, _LIVE_INDEX)
|
||
assert url is not None, "CBA_A3 not found via name fallback"
|
||
|
||
|
||
test("live: build_server_index returns expected structure", _test_live_index_structure)
|
||
test("live: server has at least one mod folder", _test_live_index_has_folders)
|
||
test("live: by_steam_id populated from meta.cpp files", _test_live_index_has_steam_id_entries)
|
||
test("live: by_name populated for all @ folders", _test_live_index_has_name_entries)
|
||
test("live: find_mod_folder locates CBA_A3 by steam_id", _test_live_find_mod_by_steam_id)
|
||
test("live: CBA_A3 folder URL returns HTTP 200", _test_live_find_mod_url_is_reachable)
|
||
test("live: list_mod_files returns non-empty list for CBA_A3", _test_live_list_mod_files_returns_entries)
|
||
test("live: list_mod_files entries are (rel_path, url, size) tuples", _test_live_list_mod_files_tuple_shape)
|
||
test("live: find_mod_folder name fallback works (no steam_id)", _test_live_find_mod_by_name_fallback)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# gui.views.mods — _find_folder
|
||
# ---------------------------------------------------------------------------
|
||
|
||
group("gui.views.mods — _find_folder")
|
||
|
||
import importlib.util as _mods_ilu
|
||
|
||
_mods_spec = _mods_ilu.spec_from_file_location(
|
||
"gui.views.mods", Path(__file__).parent / "gui" / "views" / "mods.py"
|
||
)
|
||
_mods_mod = _mods_ilu.module_from_spec(_mods_spec)
|
||
# Stub out customtkinter and gui imports so the module loads without a display
|
||
import types as _types
|
||
import sys as _sys
|
||
|
||
for _stub in ("customtkinter", "gui", "gui._constants", "gui.locales",
|
||
"gui.views", "gui.views.base"):
|
||
if _stub not in _sys.modules:
|
||
_sys.modules[_stub] = _types.ModuleType(_stub)
|
||
|
||
# Provide the colour constants the module references at import time
|
||
_sys.modules["gui._constants"].COLOR_OK = "#4CAF50"
|
||
_sys.modules["gui._constants"].COLOR_ERROR = "#F44336"
|
||
_sys.modules["gui._constants"].COLOR_WARN = "#FF9800"
|
||
_sys.modules["gui._constants"].COLOR_RUNNING = "#2196F3"
|
||
|
||
# Stub BaseView so the class body does not fail
|
||
_base_stub = _sys.modules["gui.views.base"] = _types.ModuleType("gui.views.base")
|
||
_base_stub.BaseView = object
|
||
|
||
# Stub locales.t so string calls don't crash
|
||
_sys.modules["gui.locales"].t = lambda key, **kw: key
|
||
|
||
_mods_spec.loader.exec_module(_mods_mod)
|
||
_find_folder_fn = _mods_mod._find_folder
|
||
|
||
|
||
def _test_ff_returns_none_for_missing_group(tmp_path):
|
||
assert _find_folder_fn(tmp_path / "nonexistent", "MyMod") is None
|
||
|
||
|
||
def _test_ff_exact_match(tmp_path):
|
||
(tmp_path / "@MyMod").mkdir()
|
||
result = _find_folder_fn(tmp_path, "MyMod")
|
||
assert result == tmp_path / "@MyMod"
|
||
|
||
|
||
def _test_ff_case_insensitive_match(tmp_path):
|
||
(tmp_path / "@mymod").mkdir()
|
||
result = _find_folder_fn(tmp_path, "MyMod")
|
||
assert result == tmp_path / "@mymod"
|
||
|
||
|
||
def _test_ff_normalized_match(tmp_path):
|
||
# Folder on disk uses underscores; modlist name uses spaces — both normalize to same string
|
||
(tmp_path / "@My_Mod_Edition").mkdir()
|
||
result = _find_folder_fn(tmp_path, "My Mod Edition")
|
||
assert result == tmp_path / "@My_Mod_Edition"
|
||
|
||
|
||
def _test_ff_steam_id_fallback(tmp_path):
|
||
"""Folder name bears no resemblance to mod name but meta.cpp has correct ID."""
|
||
folder = tmp_path / "@ServerCanonicalName"
|
||
folder.mkdir()
|
||
(folder / "meta.cpp").write_text('publishedid = 123456789;\nname = "Some Mod";\n')
|
||
result = _find_folder_fn(tmp_path, "Completely Different Name", steam_id="123456789")
|
||
assert result == folder
|
||
|
||
|
||
def _test_ff_steam_id_no_false_positive(tmp_path):
|
||
"""Wrong steam_id in meta.cpp must not match."""
|
||
folder = tmp_path / "@WrongMod"
|
||
folder.mkdir()
|
||
(folder / "meta.cpp").write_text('publishedid = 999999999;\n')
|
||
result = _find_folder_fn(tmp_path, "My Mod", steam_id="123456789")
|
||
assert result is None
|
||
|
||
|
||
def _test_ff_steam_id_skipped_when_none(tmp_path):
|
||
"""No steam_id supplied → meta.cpp is never read (no false positives)."""
|
||
folder = tmp_path / "@SomeFolder"
|
||
folder.mkdir()
|
||
(folder / "meta.cpp").write_text('publishedid = 123456789;\n')
|
||
result = _find_folder_fn(tmp_path, "My Mod", steam_id=None)
|
||
assert result is None
|
||
|
||
|
||
def _test_ff_missing_meta_cpp_skipped(tmp_path):
|
||
"""Folders without meta.cpp are silently skipped in the steam_id pass."""
|
||
folder = tmp_path / "@NoMeta"
|
||
folder.mkdir()
|
||
result = _find_folder_fn(tmp_path, "My Mod", steam_id="123456789")
|
||
assert result is None
|
||
|
||
|
||
# Wrap tmp_path calls in lambdas that supply a temp dir
|
||
def _with_tmp(fn):
|
||
def wrapper():
|
||
with tempfile.TemporaryDirectory() as d:
|
||
fn(Path(d))
|
||
return wrapper
|
||
|
||
|
||
test("_find_folder: None when group dir missing",
|
||
_with_tmp(_test_ff_returns_none_for_missing_group))
|
||
test("_find_folder: exact @ModName match",
|
||
_with_tmp(_test_ff_exact_match))
|
||
test("_find_folder: case-insensitive name match",
|
||
_with_tmp(_test_ff_case_insensitive_match))
|
||
test("_find_folder: normalized name match (punctuation differs)",
|
||
_with_tmp(_test_ff_normalized_match))
|
||
test("_find_folder: steam_id fallback via meta.cpp",
|
||
_with_tmp(_test_ff_steam_id_fallback))
|
||
test("_find_folder: wrong steam_id in meta.cpp is not a match",
|
||
_with_tmp(_test_ff_steam_id_no_false_positive))
|
||
test("_find_folder: no steam_id supplied → meta.cpp not checked",
|
||
_with_tmp(_test_ff_steam_id_skipped_when_none))
|
||
test("_find_folder: missing meta.cpp silently skipped",
|
||
_with_tmp(_test_ff_missing_meta_cpp_skipped))
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Summary
|
||
# ---------------------------------------------------------------------------
|
||
|
||
total = _passed + _failed + _skipped
|
||
print(f"\n{'='*60}")
|
||
print(f" Results: {_passed} passed, {_failed} failed, {_skipped} skipped ({total} total)")
|
||
print(f"{'='*60}\n")
|
||
|
||
if _failed:
|
||
sys.exit(1)
|