#!/usr/bin/env python3
"""
Comprehensive test suite for rev-arma-modlist-tools.
Run:
python test_suite.py
Tests are grouped by module. Network tests (fetcher) are skipped if the server
is unreachable. Linker tests use temp directories so they never touch real paths.
"""
import json
import os
import sys
import tempfile
import textwrap
import traceback
from pathlib import Path
# ---------------------------------------------------------------------------
# Test harness (no external dependencies)
# ---------------------------------------------------------------------------
_passed = _failed = _skipped = 0
_current_group = ""
def group(name: str) -> None:
global _current_group
_current_group = name
print(f"\n{'-'*60}")
print(f" {name}")
print(f"{'-'*60}")
class _SkipTest(Exception):
pass
def test(name: str, fn) -> None:
global _passed, _failed, _skipped
try:
fn()
print(f" [PASS] {name}")
_passed += 1
except _SkipTest as exc:
print(f" [SKIP] {name} ({exc})")
_skipped += 1
except Exception as exc:
print(f" [FAIL] {name}")
for line in traceback.format_exc().splitlines():
print(f" {line}")
_failed += 1
def skip(name: str, reason: str) -> None:
global _skipped
print(f" [SKIP] {name} ({reason})")
_skipped += 1
def assert_eq(a, b, msg=""):
assert a == b, f"{msg}\n expected: {b!r}\n got: {a!r}"
def assert_in(item, container, msg=""):
assert item in container, f"{msg}\n {item!r} not in {container!r}"
def assert_raises(exc_type, fn, *args, **kwargs):
try:
fn(*args, **kwargs)
except exc_type:
return
raise AssertionError(f"Expected {exc_type.__name__} but no exception was raised")
# ---------------------------------------------------------------------------
# 1. compat
# ---------------------------------------------------------------------------
group("compat")
from arma_modlist_tools.compat import (
is_windows, is_linux, get_os_label, fix_console_encoding,
)
def _test_is_windows_linux_mutually_exclusive():
# Exactly one of is_windows()/is_linux() can be True; both can be False (Unknown)
assert not (is_windows() and is_linux()), "Cannot be both Windows and Linux"
def _test_os_label_is_string():
label = get_os_label()
assert isinstance(label, str) and label, "OS label must be non-empty string"
valid = {"Windows", "Windows Server", "Ubuntu", "Ubuntu Server", "Linux", "Unknown"}
assert label in valid, f"Unexpected OS label: {label!r}"
def _test_fix_console_encoding_idempotent():
# Calling it twice must not raise
fix_console_encoding()
fix_console_encoding()
def _test_is_windows_matches_platform():
import platform
assert is_windows() == (sys.platform == "win32")
def _test_is_linux_matches_platform():
assert is_linux() == (sys.platform == "linux")
def _test_os_label_consistent_with_platform():
label = get_os_label()
if is_windows():
assert "Windows" in label
elif is_linux():
assert label in {"Ubuntu", "Ubuntu Server", "Linux"}
else:
assert label == "Unknown"
test("is_windows and is_linux are mutually exclusive", _test_is_windows_linux_mutually_exclusive)
test("get_os_label returns a valid label", _test_os_label_is_string)
test("fix_console_encoding is idempotent", _test_fix_console_encoding_idempotent)
test("is_windows() matches sys.platform", _test_is_windows_matches_platform)
test("is_linux() matches sys.platform", _test_is_linux_matches_platform)
test("OS label is consistent with platform", _test_os_label_consistent_with_platform)
# ---------------------------------------------------------------------------
# 2. config
# ---------------------------------------------------------------------------
group("config")
from arma_modlist_tools.config import load_config, Config
_VALID_CONFIG = {
"server": {
"base_url": "https://example.com/mods/",
"username": "user",
"password": "pass",
},
"paths": {
"arma_dir": "C:\\arma3",
"downloads": "downloads",
"modlist_html": "modlist_html",
"modlist_json": "modlist_json",
},
}
def _test_load_config_from_explicit_path():
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(_VALID_CONFIG, f)
tmp = f.name
try:
cfg = load_config(tmp)
assert_eq(cfg.server_url, "https://example.com/mods/")
assert_eq(cfg.server_auth, ("user", "pass"))
assert_eq(cfg.arma_dir, Path("C:\\arma3"))
assert_eq(cfg.downloads, Path("downloads"))
assert_eq(cfg.modlist_html, Path("modlist_html"))
assert_eq(cfg.modlist_json, Path("modlist_json"))
assert_eq(cfg.comparison, Path("modlist_json/comparison.json"))
assert_eq(cfg.missing_report, Path("modlist_json/missing_report.json"))
finally:
os.unlink(tmp)
def _test_load_config_file_not_found():
assert_raises(FileNotFoundError, load_config, "nonexistent_1234567890.json")
def _test_load_config_missing_key_raises():
bad = {"server": {"base_url": "x"}} # missing paths
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(bad, f)
tmp = f.name
try:
assert_raises(KeyError, load_config, tmp)
finally:
os.unlink(tmp)
def _test_config_derived_paths():
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(_VALID_CONFIG, f)
tmp = f.name
try:
cfg = load_config(tmp)
# comparison must live inside modlist_json
assert cfg.comparison.parent == cfg.modlist_json
assert cfg.comparison.name == "comparison.json"
# missing_report must live inside modlist_json
assert cfg.missing_report.parent == cfg.modlist_json
assert cfg.missing_report.name == "missing_report.json"
finally:
os.unlink(tmp)
def _test_load_config_from_cwd():
"""load_config() without args should find the real config.json at CWD."""
original_cwd = Path.cwd()
project_root = Path(__file__).parent
os.chdir(project_root)
try:
cfg = load_config()
assert cfg.server_url, "server_url must not be empty"
finally:
os.chdir(original_cwd)
test("load_config with explicit path", _test_load_config_from_explicit_path)
test("load_config raises FileNotFoundError for missing file", _test_load_config_file_not_found)
test("load_config raises KeyError for incomplete config", _test_load_config_missing_key_raises)
test("Config derived paths (comparison, missing_report)", _test_config_derived_paths)
test("load_config() auto-discovers config.json from CWD", _test_load_config_from_cwd)
# ---------------------------------------------------------------------------
# 3. parser
# ---------------------------------------------------------------------------
group("parser")
from arma_modlist_tools.parser import (
parse_mod_entry, parse_modlist_html, parse_modlist_dir,
)
import xml.etree.ElementTree as ET
_VALID_TR_XML = textwrap.dedent("""\
| CBA_A3 |
|
""")
_LOCAL_TR_XML = textwrap.dedent("""\
| My Local Mod |
|
""")
_NO_NAME_TR_XML = textwrap.dedent("""\
|
|
""")
_MINIMAL_HTML = textwrap.dedent("""\
Test
""")
def _test_parse_mod_entry_steam():
tr = ET.fromstring(_VALID_TR_XML)
entry = parse_mod_entry(tr)
assert entry is not None
assert_eq(entry["name"], "CBA_A3")
assert_eq(entry["source"], "steam")
assert_eq(entry["steam_id"], "450814997")
assert "450814997" in entry["url"]
def _test_parse_mod_entry_local():
tr = ET.fromstring(_LOCAL_TR_XML)
entry = parse_mod_entry(tr)
assert entry is not None
assert_eq(entry["name"], "My Local Mod")
assert_eq(entry["source"], "local")
assert entry["steam_id"] is None
assert entry["url"] is None
def _test_parse_mod_entry_no_name_returns_none():
tr = ET.fromstring(_NO_NAME_TR_XML)
entry = parse_mod_entry(tr)
assert entry is None, "Entry without DisplayName must return None"
def _test_parse_modlist_html_from_file():
with tempfile.NamedTemporaryFile(
mode="w", suffix=".html", delete=False, encoding="utf-8"
) as f:
f.write(_MINIMAL_HTML)
tmp = Path(f.name)
try:
preset = parse_modlist_html(tmp)
assert_eq(preset["source_file"], tmp.name)
assert_eq(preset["mod_count"], 3)
assert len(preset["mods"]) == 3
names = [m["name"] for m in preset["mods"]]
assert_in("ACE3", names)
assert_in("CBA_A3", names)
assert_in("LocalMod", names)
# steam IDs
steam_ids = {m["steam_id"] for m in preset["mods"]}
assert_in("463939057", steam_ids)
assert_in("450814997", steam_ids)
assert_in(None, steam_ids) # LocalMod has no steam_id
finally:
tmp.unlink()
def _test_parse_modlist_html_not_found():
assert_raises(FileNotFoundError, parse_modlist_html, "no_such_file_xyz.html")
def _test_parse_modlist_dir_real_files():
d = Path(__file__).parent / "modlist_html"
if not d.is_dir():
raise AssertionError(f"modlist_html directory not found: {d}")
presets = parse_modlist_dir(d)
assert len(presets) >= 2, f"Expected >=2 presets, got {len(presets)}"
for p in presets:
assert p["mod_count"] > 0, f"{p['source_file']} has 0 mods"
assert len(p["mods"]) == p["mod_count"]
def _test_parse_modlist_dir_empty():
with tempfile.TemporaryDirectory() as d:
result = parse_modlist_dir(d)
assert_eq(result, [])
def _test_parse_modlist_dir_not_a_dir():
assert_raises(NotADirectoryError, parse_modlist_dir, "no_such_directory_xyz")
def _test_parse_modlist_dir_returns_sorted():
with tempfile.TemporaryDirectory() as d:
d = Path(d)
for name in ["c_preset.html", "a_preset.html", "b_preset.html"]:
f = d / name
f.write_text(_MINIMAL_HTML, encoding="utf-8")
presets = parse_modlist_dir(d)
names = [p["source_file"] for p in presets]
assert names == sorted(names), "parse_modlist_dir must return results sorted by filename"
test("parse_mod_entry: Steam mod", _test_parse_mod_entry_steam)
test("parse_mod_entry: Local mod", _test_parse_mod_entry_local)
test("parse_mod_entry: No name returns None", _test_parse_mod_entry_no_name_returns_none)
test("parse_modlist_html: from temp HTML file", _test_parse_modlist_html_from_file)
test("parse_modlist_html: FileNotFoundError", _test_parse_modlist_html_not_found)
test("parse_modlist_dir: real modlist_html/ files", _test_parse_modlist_dir_real_files)
test("parse_modlist_dir: empty directory", _test_parse_modlist_dir_empty)
test("parse_modlist_dir: non-existent directory", _test_parse_modlist_dir_not_a_dir)
test("parse_modlist_dir: results are sorted by filename", _test_parse_modlist_dir_returns_sorted)
# ---------------------------------------------------------------------------
# 4. compare
# ---------------------------------------------------------------------------
group("compare")
from arma_modlist_tools.compare import compare_presets
_P1 = {
"preset_name": "Preset_A",
"source_file": "preset_a.html",
"mod_count": 3,
"mods": [
{"name": "CBA_A3", "source": "steam", "url": None, "steam_id": "450814997"},
{"name": "ACE3", "source": "steam", "url": None, "steam_id": "463939057"},
{"name": "OnlyInA", "source": "steam", "url": None, "steam_id": "111111111"},
],
}
_P2 = {
"preset_name": "Preset_B",
"source_file": "preset_b.html",
"mod_count": 3,
"mods": [
{"name": "CBA_A3", "source": "steam", "url": None, "steam_id": "450814997"},
{"name": "ACE3", "source": "steam", "url": None, "steam_id": "463939057"},
{"name": "OnlyInB", "source": "steam", "url": None, "steam_id": "222222222"},
],
}
_P_LOCAL = {
"preset_name": "Preset_Local",
"source_file": "preset_local.html",
"mod_count": 2,
"mods": [
{"name": "CBA_A3", "source": "steam", "url": None, "steam_id": "450814997"},
{"name": "MyLocalMod", "source": "local", "url": None, "steam_id": None},
],
}
_P_IDENTICAL = {
"preset_name": "Preset_C",
"source_file": "preset_c.html",
"mod_count": 3,
"mods": [
{"name": "CBA_A3", "source": "steam", "url": None, "steam_id": "450814997"},
{"name": "ACE3", "source": "steam", "url": None, "steam_id": "463939057"},
{"name": "OnlyInA", "source": "steam", "url": None, "steam_id": "111111111"},
],
}
def _test_compare_basic_shared_and_unique():
result = compare_presets(_P1, _P2)
assert_eq(sorted(result["compared_presets"]), ["Preset_A", "Preset_B"])
shared_ids = {m["steam_id"] for m in result["shared"]["mods"]}
assert_eq(shared_ids, {"450814997", "463939057"})
assert_eq(result["shared"]["mod_count"], 2)
unique_a = {m["steam_id"] for m in result["unique"]["Preset_A"]["mods"]}
unique_b = {m["steam_id"] for m in result["unique"]["Preset_B"]["mods"]}
assert_eq(unique_a, {"111111111"})
assert_eq(unique_b, {"222222222"})
def _test_compare_requires_two_or_more():
assert_raises(ValueError, compare_presets, _P1)
def _test_compare_three_presets():
result = compare_presets(_P1, _P2, _P_LOCAL)
# Only CBA_A3 (450814997) is in all three
shared_ids = {m["steam_id"] for m in result["shared"]["mods"]}
assert_eq(shared_ids, {"450814997"})
assert "Preset_Local" in result["unique"]
def _test_compare_identical_presets_all_shared():
result = compare_presets(_P1, _P_IDENTICAL)
assert_eq(result["shared"]["mod_count"], 3)
for data in result["unique"].values():
assert_eq(data["mod_count"], 0)
def _test_compare_no_shared_mods():
p_no_overlap = {
"preset_name": "Preset_X",
"source_file": "px.html",
"mod_count": 1,
"mods": [{"name": "Exclusive", "source": "steam", "url": None, "steam_id": "999999999"}],
}
result = compare_presets(_P1, p_no_overlap)
assert_eq(result["shared"]["mod_count"], 0)
assert_eq(len(result["shared"]["mods"]), 0)
def _test_compare_local_mod_uses_name_as_key():
# Two presets share a local mod by name (no steam_id)
p_a = {
"preset_name": "A",
"source_file": "a.html",
"mod_count": 1,
"mods": [{"name": "MyLocalMod", "source": "local", "url": None, "steam_id": None}],
}
p_b = {
"preset_name": "B",
"source_file": "b.html",
"mod_count": 1,
"mods": [{"name": "MyLocalMod", "source": "local", "url": None, "steam_id": None}],
}
result = compare_presets(p_a, p_b)
assert_eq(result["shared"]["mod_count"], 1)
assert_eq(result["shared"]["mods"][0]["name"], "MyLocalMod")
def _test_compare_preserves_all_fields():
result = compare_presets(_P1, _P2)
for mod in result["shared"]["mods"]:
assert "name" in mod
assert "source" in mod
assert "steam_id" in mod
def _test_compare_result_counts_consistent():
result = compare_presets(_P1, _P2)
for preset in (_P1, _P2):
pname = preset["preset_name"]
total = result["shared"]["mod_count"] + result["unique"][pname]["mod_count"]
assert_eq(total, preset["mod_count"],
f"shared + unique must equal total for {pname}")
test("compare_presets: shared and unique breakdown", _test_compare_basic_shared_and_unique)
test("compare_presets: requires >= 2 presets", _test_compare_requires_two_or_more)
test("compare_presets: three presets, intersection only", _test_compare_three_presets)
test("compare_presets: identical presets → all shared", _test_compare_identical_presets_all_shared)
test("compare_presets: no overlap → shared is empty", _test_compare_no_shared_mods)
test("compare_presets: local mod matched by name", _test_compare_local_mod_uses_name_as_key)
test("compare_presets: result mods retain all fields", _test_compare_preserves_all_fields)
test("compare_presets: shared + unique = total per preset", _test_compare_result_counts_consistent)
# ---------------------------------------------------------------------------
# 5. fetcher (pure functions only — no network)
# ---------------------------------------------------------------------------
group("fetcher (pure functions, no network)")
from arma_modlist_tools.fetcher import (
_normalize_name, _parse_meta_cpp, _folder_url, find_mod_folder,
)
def _test_normalize_name_strips_at():
assert_eq(_normalize_name("@ace"), "ace")
assert_eq(_normalize_name("@CBA_A3"), "cbaa3")
def _test_normalize_name_lowercase():
assert_eq(_normalize_name("@RHS_USAF"), "rhsusaf")
def _test_normalize_name_removes_spaces_and_special():
assert_eq(_normalize_name("@My Mod (v2)"), "mymodv2")
def _test_normalize_name_no_at():
assert_eq(_normalize_name("ace"), "ace")
def _test_parse_meta_cpp_standard():
txt = 'publishedid = 463939057;\nname = "ACE3";'
assert_eq(_parse_meta_cpp(txt), "463939057")
def _test_parse_meta_cpp_no_spaces():
assert_eq(_parse_meta_cpp("publishedid=123456;"), "123456")
def _test_parse_meta_cpp_case_insensitive():
assert_eq(_parse_meta_cpp("PublishedId = 999;"), "999")
def _test_parse_meta_cpp_missing():
assert _parse_meta_cpp("name = somemod;") is None
def _test_folder_url_trailing_slash():
url = _folder_url("https://example.com/mods", "@ace")
assert url.endswith("/"), "folder_url must end with /"
assert "@ace" in url
def _test_find_mod_folder_by_steam_id():
index = {
"by_steam_id": {"450814997": "https://example.com/mods/@cba_a3/"},
"by_name": {},
}
mod = {"steam_id": "450814997", "name": "CBA_A3"}
url = find_mod_folder(mod, index)
assert_eq(url, "https://example.com/mods/@cba_a3/")
def _test_find_mod_folder_by_name_fallback():
index = {
"by_steam_id": {},
"by_name": {"ace3": "https://example.com/mods/@ace3/"},
}
mod = {"steam_id": None, "name": "ACE3"}
url = find_mod_folder(mod, index)
assert_eq(url, "https://example.com/mods/@ace3/")
def _test_find_mod_folder_steam_id_preferred_over_name():
index = {
"by_steam_id": {"111": "https://example.com/mods/@right/"},
"by_name": {"mymod": "https://example.com/mods/@wrong/"},
}
mod = {"steam_id": "111", "name": "MyMod"}
url = find_mod_folder(mod, index)
assert_eq(url, "https://example.com/mods/@right/")
def _test_find_mod_folder_not_found():
index = {"by_steam_id": {}, "by_name": {}}
mod = {"steam_id": "999", "name": "Missing"}
assert find_mod_folder(mod, index) is None
def _test_find_mod_folder_normalized_name_match():
index = {
"by_steam_id": {},
"by_name": {"rhsusaf": "https://example.com/mods/@rhs_usaf/"},
}
mod = {"steam_id": None, "name": "@RHS_USAF"}
url = find_mod_folder(mod, index)
assert_eq(url, "https://example.com/mods/@rhs_usaf/")
test("_normalize_name: strips leading @", _test_normalize_name_strips_at)
test("_normalize_name: lowercases", _test_normalize_name_lowercase)
test("_normalize_name: removes spaces and special chars", _test_normalize_name_removes_spaces_and_special)
test("_normalize_name: no @ prefix still works", _test_normalize_name_no_at)
test("_parse_meta_cpp: standard format", _test_parse_meta_cpp_standard)
test("_parse_meta_cpp: no spaces", _test_parse_meta_cpp_no_spaces)
test("_parse_meta_cpp: case-insensitive", _test_parse_meta_cpp_case_insensitive)
test("_parse_meta_cpp: missing → None", _test_parse_meta_cpp_missing)
test("_folder_url: always trailing slash", _test_folder_url_trailing_slash)
test("find_mod_folder: by steam_id", _test_find_mod_folder_by_steam_id)
test("find_mod_folder: by name fallback", _test_find_mod_folder_by_name_fallback)
test("find_mod_folder: steam_id preferred over name", _test_find_mod_folder_steam_id_preferred_over_name)
test("find_mod_folder: not found → None", _test_find_mod_folder_not_found)
test("find_mod_folder: normalized @ name match", _test_find_mod_folder_normalized_name_match)
# ---- list_mod_updates (uses temp dirs, no network) ----
from arma_modlist_tools.fetcher import list_mod_updates
import unittest.mock as mock
def _test_list_mod_updates_all_uptodate():
"""All local files exist with matching sizes → empty result."""
with tempfile.TemporaryDirectory() as d:
dest = Path(d)
# Create local files matching server sizes
(dest / "addons").mkdir()
(dest / "addons" / "mod.pbo").write_bytes(b"x" * 1000)
server_files = [("addons/mod.pbo", "https://x.com/addons/mod.pbo", 1000)]
with mock.patch("arma_modlist_tools.fetcher.list_mod_files", return_value=server_files):
result = list_mod_updates("https://x.com/", dest, None)
assert_eq(result, [], "No stale files expected when sizes match")
def _test_list_mod_updates_missing_file():
"""Local file does not exist → included."""
with tempfile.TemporaryDirectory() as d:
dest = Path(d)
server_files = [("addons/new.pbo", "https://x.com/addons/new.pbo", 500)]
with mock.patch("arma_modlist_tools.fetcher.list_mod_files", return_value=server_files):
result = list_mod_updates("https://x.com/", dest, None)
assert_eq(len(result), 1)
assert_eq(result[0][0], "addons/new.pbo")
def _test_list_mod_updates_size_mismatch():
"""Local file exists but size differs → included."""
with tempfile.TemporaryDirectory() as d:
dest = Path(d)
(dest / "addons").mkdir()
(dest / "addons" / "mod.pbo").write_bytes(b"x" * 999) # local: 999 bytes
server_files = [("addons/mod.pbo", "https://x.com/addons/mod.pbo", 1000)] # server: 1000
with mock.patch("arma_modlist_tools.fetcher.list_mod_files", return_value=server_files):
result = list_mod_updates("https://x.com/", dest, None)
assert_eq(len(result), 1)
assert_eq(result[0][2], 1000, "Returned entry must carry server size")
def _test_list_mod_updates_zero_server_size_skips_check():
"""Server reports size=0 (unknown) — only include if file is missing."""
with tempfile.TemporaryDirectory() as d:
dest = Path(d)
(dest / "addons").mkdir()
(dest / "addons" / "mod.pbo").write_bytes(b"x" * 500) # exists locally
server_files = [("addons/mod.pbo", "https://x.com/addons/mod.pbo", 0)]
with mock.patch("arma_modlist_tools.fetcher.list_mod_files", return_value=server_files):
result = list_mod_updates("https://x.com/", dest, None)
# size=0 → skip size check, file exists → not stale
assert_eq(result, [], "Existing file with server size=0 must not be re-downloaded")
def _test_list_mod_updates_mixed():
"""One up-to-date, one missing, one stale → two results."""
with tempfile.TemporaryDirectory() as d:
dest = Path(d)
(dest / "addons").mkdir()
(dest / "addons" / "current.pbo").write_bytes(b"x" * 100) # matches server
(dest / "addons" / "stale.pbo").write_bytes(b"x" * 50) # differs from server
# missing.pbo does not exist
server_files = [
("addons/current.pbo", "https://x.com/addons/current.pbo", 100),
("addons/stale.pbo", "https://x.com/addons/stale.pbo", 200),
("addons/missing.pbo", "https://x.com/addons/missing.pbo", 300),
]
with mock.patch("arma_modlist_tools.fetcher.list_mod_files", return_value=server_files):
result = list_mod_updates("https://x.com/", dest, None)
result_names = {r[0] for r in result}
assert_eq(result_names, {"addons/stale.pbo", "addons/missing.pbo"})
assert "addons/current.pbo" not in result_names
test("list_mod_updates: all files up-to-date → empty", _test_list_mod_updates_all_uptodate)
test("list_mod_updates: missing file → included", _test_list_mod_updates_missing_file)
test("list_mod_updates: size mismatch → included", _test_list_mod_updates_size_mismatch)
test("list_mod_updates: server size=0 → skip size check", _test_list_mod_updates_zero_server_size_skips_check)
test("list_mod_updates: mixed state (up-to-date, stale, missing)", _test_list_mod_updates_mixed)
# ---------------------------------------------------------------------------
# 6. reporter
# ---------------------------------------------------------------------------
group("reporter")
from arma_modlist_tools.reporter import build_missing_report, save_missing_report
_COMPARISON = {
"compared_presets": ["Preset_A", "Preset_B"],
"shared": {
"mod_count": 2,
"mods": [
{"name": "CBA_A3", "steam_id": "450814997", "url": None, "source": "steam"},
{"name": "ACE3", "steam_id": "463939057", "url": None, "source": "steam"},
],
},
"unique": {
"Preset_A": {
"mod_count": 1,
"mods": [{"name": "OnlyA", "steam_id": "111", "url": None, "source": "steam"}],
},
"Preset_B": {
"mod_count": 1,
"mods": [{"name": "OnlyB", "steam_id": "222", "url": None, "source": "steam"}],
},
},
}
_INDEX_ALL = {
"by_steam_id": {
"450814997": "https://x.com/@cba/",
"463939057": "https://x.com/@ace/",
"111": "https://x.com/@onlya/",
"222": "https://x.com/@onlyb/",
},
"by_name": {},
}
_INDEX_NONE: dict = {"by_steam_id": {}, "by_name": {}}
_INDEX_PARTIAL = {
"by_steam_id": {
"450814997": "https://x.com/@cba/",
"463939057": "https://x.com/@ace/",
},
"by_name": {},
}
def _test_build_missing_report_all_found():
report = build_missing_report(_COMPARISON, _INDEX_ALL)
assert_eq(report["total_mods"], 4)
assert_eq(report["on_server"], 4)
assert_eq(report["missing"], 0)
assert_eq(report["missing_mods"], [])
def _test_build_missing_report_all_missing():
report = build_missing_report(_COMPARISON, _INDEX_NONE)
assert_eq(report["total_mods"], 4)
assert_eq(report["on_server"], 0)
assert_eq(report["missing"], 4)
assert_eq(len(report["missing_mods"]), 4)
def _test_build_missing_report_partial():
report = build_missing_report(_COMPARISON, _INDEX_PARTIAL)
assert_eq(report["on_server"], 2)
assert_eq(report["missing"], 2)
missing_ids = {m["steam_id"] for m in report["missing_mods"]}
assert_eq(missing_ids, {"111", "222"})
def _test_build_missing_report_group_field():
report = build_missing_report(_COMPARISON, _INDEX_NONE)
groups = {m["group"] for m in report["missing_mods"]}
assert_in("shared", groups)
assert_in("Preset_A", groups)
assert_in("Preset_B", groups)
def _test_build_missing_report_shared_group_label():
report = build_missing_report(_COMPARISON, _INDEX_NONE)
shared_entries = [m for m in report["missing_mods"] if m["name"] in ("CBA_A3", "ACE3")]
for e in shared_entries:
assert_eq(e["group"], "shared", f"{e['name']} must have group='shared'")
def _test_build_missing_report_has_timestamp():
report = build_missing_report(_COMPARISON, _INDEX_ALL)
assert "generated_at" in report
assert isinstance(report["generated_at"], str)
assert "T" in report["generated_at"] # ISO 8601 format
def _test_save_and_reload_missing_report():
report = build_missing_report(_COMPARISON, _INDEX_PARTIAL)
with tempfile.TemporaryDirectory() as d:
out = Path(d) / "sub" / "missing_report.json"
save_missing_report(report, out)
assert out.exists(), "save_missing_report must create the file"
loaded = json.loads(out.read_text(encoding="utf-8"))
assert_eq(loaded["missing"], report["missing"])
assert_eq(loaded["total_mods"], report["total_mods"])
def _test_save_missing_report_creates_parent_dirs():
with tempfile.TemporaryDirectory() as d:
deep = Path(d) / "a" / "b" / "c" / "report.json"
save_missing_report({"test": True}, deep)
assert deep.exists()
test("build_missing_report: all mods found", _test_build_missing_report_all_found)
test("build_missing_report: all mods missing", _test_build_missing_report_all_missing)
test("build_missing_report: partial coverage", _test_build_missing_report_partial)
test("build_missing_report: group field present on all entries", _test_build_missing_report_group_field)
test("build_missing_report: shared mods labeled group='shared'", _test_build_missing_report_shared_group_label)
test("build_missing_report: includes ISO timestamp", _test_build_missing_report_has_timestamp)
test("save_missing_report: write and reload roundtrip", _test_save_and_reload_missing_report)
test("save_missing_report: creates parent directories", _test_save_missing_report_creates_parent_dirs)
# ---------------------------------------------------------------------------
# 7. linker
# ---------------------------------------------------------------------------
group("linker")
from arma_modlist_tools.linker import (
get_mod_folders, get_link_status,
create_junction, remove_junction,
link_group, unlink_group,
_is_junction,
)
def _make_fake_mods(base: Path, names: list[str]) -> None:
"""Create @ModName directories with a dummy file inside each."""
for name in names:
mod_dir = base / name
mod_dir.mkdir(parents=True)
(mod_dir / "mod.cpp").write_text(f"// {name}", encoding="utf-8")
def _test_get_mod_folders_finds_at_dirs():
with tempfile.TemporaryDirectory() as d:
d = Path(d)
_make_fake_mods(d, ["@ace", "@cba_a3", "@rhs"])
(d / "not_a_mod").mkdir() # should be ignored
(d / "readme.txt").write_text("x") # should be ignored
folders = get_mod_folders(d)
names = [f.name for f in folders]
assert_eq(sorted(names), ["@ace", "@cba_a3", "@rhs"])
assert "not_a_mod" not in names
def _test_get_mod_folders_empty_dir():
with tempfile.TemporaryDirectory() as d:
folders = get_mod_folders(Path(d))
assert_eq(folders, [])
def _test_get_mod_folders_nonexistent():
result = get_mod_folders(Path("no_such_dir_xyz_abc"))
assert_eq(result, [])
def _test_get_link_status_unlinked():
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
src_d = Path(src_d)
arma_d = Path(arma_d)
_make_fake_mods(src_d, ["@ace", "@cba_a3"])
status = get_link_status(src_d, arma_d)
assert_eq(len(status), 2)
for s in status:
assert not s["is_linked"], f"{s['name']} should not be linked yet"
assert s["link_path"].parent == arma_d
assert s["source_path"].exists()
def _test_create_and_detect_junction():
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
src_d = Path(src_d)
arma_d = Path(arma_d)
_make_fake_mods(src_d, ["@mymod"])
target = src_d / "@mymod"
link = arma_d / "@mymod"
ok = create_junction(link, target)
assert ok, "create_junction must return True on success"
assert link.exists(), "link must exist after creation"
assert _is_junction(link), "_is_junction must detect the new junction"
# Verify junction points to the right files
assert (link / "mod.cpp").exists(), "linked junction must expose target contents"
def _test_remove_junction():
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
src_d = Path(src_d)
arma_d = Path(arma_d)
_make_fake_mods(src_d, ["@mymod"])
target = src_d / "@mymod"
link = arma_d / "@mymod"
create_junction(link, target)
assert _is_junction(link)
ok, err = remove_junction(link)
assert ok, f"remove_junction failed: {err}"
assert not link.exists(), "junction must be gone after removal"
# Target must be untouched
assert target.exists(), "target directory must survive junction removal"
assert (target / "mod.cpp").exists(), "target files must survive junction removal"
def _test_link_group_full_flow():
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
src_d = Path(src_d)
arma_d = Path(arma_d)
_make_fake_mods(src_d, ["@ace", "@cba_a3", "@rhs"])
result = link_group(src_d, arma_d)
assert_eq(result["linked"], 3)
assert_eq(result["already_linked"], 0)
assert_eq(result["failed"], 0)
# All links exist
for name in ["@ace", "@cba_a3", "@rhs"]:
assert _is_junction(arma_d / name), f"{name} must be linked"
def _test_link_group_idempotent():
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
src_d = Path(src_d)
arma_d = Path(arma_d)
_make_fake_mods(src_d, ["@ace"])
link_group(src_d, arma_d)
result2 = link_group(src_d, arma_d)
assert_eq(result2["linked"], 0)
assert_eq(result2["already_linked"], 1)
assert_eq(result2["failed"], 0)
def _test_unlink_group_removes_links():
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
src_d = Path(src_d)
arma_d = Path(arma_d)
_make_fake_mods(src_d, ["@ace", "@cba_a3"])
link_group(src_d, arma_d)
result = unlink_group(src_d, arma_d)
assert_eq(result["unlinked"], 2)
assert_eq(result["not_linked"], 0)
assert_eq(result["failed"], 0)
# Links gone, sources still there
for name in ["@ace", "@cba_a3"]:
assert not (arma_d / name).exists(), f"Link {name} must be gone"
assert (src_d / name).exists(), f"Source {name} must survive"
def _test_unlink_group_nothing_linked():
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
src_d = Path(src_d)
arma_d = Path(arma_d)
_make_fake_mods(src_d, ["@ace"])
result = unlink_group(src_d, arma_d)
assert_eq(result["unlinked"], 0)
assert_eq(result["not_linked"], 1)
def _test_get_link_status_after_linking():
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
src_d = Path(src_d)
arma_d = Path(arma_d)
_make_fake_mods(src_d, ["@ace"])
link_group(src_d, arma_d)
status = get_link_status(src_d, arma_d)
assert_eq(len(status), 1)
assert status[0]["is_linked"], "status must reflect active junction"
def _test_link_group_skips_existing_non_junction():
"""If arma_dir already has a regular folder with the same name, must not overwrite."""
with tempfile.TemporaryDirectory() as src_d, tempfile.TemporaryDirectory() as arma_d:
src_d = Path(src_d)
arma_d = Path(arma_d)
_make_fake_mods(src_d, ["@ace"])
# Create a plain (non-junction) directory at the link path
(arma_d / "@ace").mkdir()
result = link_group(src_d, arma_d)
assert_eq(result["failed"], 1, "Must report failure for path that exists but is not a junction")
assert_eq(result["linked"], 0)
test("get_mod_folders: finds @-prefixed dirs only", _test_get_mod_folders_finds_at_dirs)
test("get_mod_folders: empty directory", _test_get_mod_folders_empty_dir)
test("get_mod_folders: nonexistent path → []", _test_get_mod_folders_nonexistent)
test("get_link_status: all unlinked initially", _test_get_link_status_unlinked)
test("create_junction + _is_junction detection", _test_create_and_detect_junction)
test("remove_junction: removes link, target untouched", _test_remove_junction)
test("link_group: full link flow", _test_link_group_full_flow)
test("link_group: idempotent (already_linked on second call)", _test_link_group_idempotent)
test("unlink_group: removes all links", _test_unlink_group_removes_links)
test("unlink_group: nothing linked → not_linked count", _test_unlink_group_nothing_linked)
test("get_link_status: reflects active junctions", _test_get_link_status_after_linking)
test("link_group: skips plain dir (fails safely)", _test_link_group_skips_existing_non_junction)
# ---------------------------------------------------------------------------
# 8. __init__ exports
# ---------------------------------------------------------------------------
group("__init__ public exports")
import arma_modlist_tools as pkg
_EXPECTED_EXPORTS = [
"parse_mod_entry", "parse_modlist_html", "parse_modlist_dir",
"compare_presets",
"make_session", "build_server_index", "find_mod_folder",
"list_mod_files", "list_mod_updates", "download_file", "download_mod_folder",
"get_mod_folders", "get_link_status",
"create_junction", "remove_junction",
"link_group", "unlink_group",
"load_config", "Config",
"is_windows", "is_linux", "get_os_label", "fix_console_encoding",
"build_missing_report", "save_missing_report",
"find_orphan_folders", "folder_size",
]
def _test_all_exports_present():
missing = [name for name in _EXPECTED_EXPORTS if not hasattr(pkg, name)]
assert not missing, f"Missing exports from arma_modlist_tools: {missing}"
def _test_exports_are_callable():
non_callable = [
name for name in _EXPECTED_EXPORTS
if hasattr(pkg, name) and not callable(getattr(pkg, name))
and not isinstance(getattr(pkg, name), type)
]
assert not non_callable, f"Non-callable exports: {non_callable}"
test("all expected symbols exported", _test_all_exports_present)
test("all exported symbols are callable or types", _test_exports_are_callable)
# ---------------------------------------------------------------------------
# 9. check_names helpers
# ---------------------------------------------------------------------------
group("check_names helpers")
from check_names import (
_server_name_from_url, _read_local_steam_id,
_lookup_detailed, _resolve_status, _write_steam_id,
_build_comparison_id_map,
)
def _test_server_name_from_url():
assert_eq(_server_name_from_url("https://x.com/mods/@ace/"), "@ace")
assert_eq(_server_name_from_url("https://x.com/mods/@cba_a3"), "@cba_a3")
def _test_read_local_steam_id_present():
with tempfile.TemporaryDirectory() as d:
d = Path(d)
(d / "meta.cpp").write_text('publishedid = 463939057;\nname = "ACE3";', encoding="utf-8")
assert_eq(_read_local_steam_id(d), "463939057")
def _test_read_local_steam_id_missing():
with tempfile.TemporaryDirectory() as d:
assert _read_local_steam_id(Path(d)) is None
def _test_read_local_steam_id_no_id_in_file():
with tempfile.TemporaryDirectory() as d:
d = Path(d)
(d / "meta.cpp").write_text('name = "LocalMod";\n', encoding="utf-8")
assert _read_local_steam_id(d) is None
def _test_lookup_detailed_by_steam_id():
index = {
"by_steam_id": {"463939057": "https://x.com/@ace3/"},
"by_name": {},
}
with tempfile.TemporaryDirectory() as d:
d = Path(d)
(d / "meta.cpp").write_text("publishedid = 463939057;", encoding="utf-8")
server_name, local_sid = _lookup_detailed("@ACE3", d, index)
assert_eq(server_name, "@ace3")
assert_eq(local_sid, "463939057")
def _test_lookup_detailed_by_name_fallback():
index = {
"by_steam_id": {},
"by_name": {"cbaa3": "https://x.com/@cba_a3/"},
}
with tempfile.TemporaryDirectory() as d:
server_name, local_sid = _lookup_detailed("@CBA_A3", Path(d), index)
assert_eq(server_name, "@cba_a3")
assert local_sid is None # no meta.cpp
def _test_lookup_detailed_steam_id_beats_name():
index = {
"by_steam_id": {"111": "https://x.com/@correct/"},
"by_name": {"mymod": "https://x.com/@wrong/"},
}
with tempfile.TemporaryDirectory() as d:
d = Path(d)
(d / "meta.cpp").write_text("publishedid = 111;", encoding="utf-8")
server_name, local_sid = _lookup_detailed("@MyMod", d, index)
assert_eq(server_name, "@correct")
assert_eq(local_sid, "111")
def _test_lookup_detailed_not_found():
index = {"by_steam_id": {}, "by_name": {}}
with tempfile.TemporaryDirectory() as d:
server_name, local_sid = _lookup_detailed("@Unknown", Path(d), index)
assert server_name is None
def _test_lookup_detailed_returns_sid_even_when_not_on_server():
"""local_sid should be returned even when server lookup fails."""
index = {"by_steam_id": {}, "by_name": {}}
with tempfile.TemporaryDirectory() as d:
d = Path(d)
(d / "meta.cpp").write_text("publishedid = 999;", encoding="utf-8")
server_name, local_sid = _lookup_detailed("@SomeMod", d, index)
assert server_name is None
assert_eq(local_sid, "999")
test("_lookup_detailed: matches by steam_id", _test_lookup_detailed_by_steam_id)
test("_lookup_detailed: falls back to name", _test_lookup_detailed_by_name_fallback)
test("_lookup_detailed: steam_id beats name fallback", _test_lookup_detailed_steam_id_beats_name)
test("_lookup_detailed: not found -> None", _test_lookup_detailed_not_found)
test("_lookup_detailed: returns local_sid even when not on server", _test_lookup_detailed_returns_sid_even_when_not_on_server)
# _resolve_status tests
def _test_resolve_status_ok():
status, col = _resolve_status("@ace", "@ace", None, ok_disk_names={"@ace"})
assert_eq(status, "OK")
assert_eq(col, "@ace")
def _test_resolve_status_mismatch():
status, col = _resolve_status("@ACE3", "@ace3", None, ok_disk_names={"@cba_a3"})
assert_eq(status, "MISMATCH")
assert_eq(col, "@ace3")
def _test_resolve_status_not_found():
status, col = _resolve_status("@Unknown", None, None, ok_disk_names=set())
assert_eq(status, "NOT_ON_SERVER")
def _test_resolve_status_id_collision():
ok = {"@Realistic Ragdoll Physics"}
status, col = _resolve_status(
"@NIArms All in One- ACE Compatibility",
"@Realistic Ragdoll Physics",
"1234567",
ok_disk_names=ok,
)
assert_eq(status, "ID_COLLISION")
assert "1234567" in col # bad steam_id shown in output
def _test_resolve_status_collision_without_sid():
ok = {"@Realistic Ragdoll Physics"}
status, col = _resolve_status(
"@Some Mod", "@Realistic Ragdoll Physics", None, ok_disk_names=ok
)
assert_eq(status, "ID_COLLISION")
test("_resolve_status: OK match", _test_resolve_status_ok)
test("_resolve_status: genuine mismatch", _test_resolve_status_mismatch)
test("_resolve_status: not found", _test_resolve_status_not_found)
test("_resolve_status: ID_COLLISION shows bad steam_id", _test_resolve_status_id_collision)
test("_resolve_status: ID_COLLISION without local sid", _test_resolve_status_collision_without_sid)
# _write_steam_id tests
def _test_write_steam_id_updates_existing():
with tempfile.TemporaryDirectory() as d:
d = Path(d)
meta = d / "meta.cpp"
meta.write_text('publishedid = 111;\nname = "OldMod";\n', encoding="utf-8")
ok, msg = _write_steam_id(d, "999")
assert ok
assert "999" in meta.read_text(encoding="utf-8")
assert "111" not in meta.read_text(encoding="utf-8")
assert 'name = "OldMod"' in meta.read_text(encoding="utf-8") # preserved
def _test_write_steam_id_creates_if_missing():
with tempfile.TemporaryDirectory() as d:
d = Path(d)
ok, msg = _write_steam_id(d, "12345")
assert ok
meta = d / "meta.cpp"
assert meta.exists()
assert "12345" in meta.read_text(encoding="utf-8")
def _test_write_steam_id_appends_if_no_line():
with tempfile.TemporaryDirectory() as d:
d = Path(d)
meta = d / "meta.cpp"
meta.write_text('name = "NoId";\n', encoding="utf-8")
ok, msg = _write_steam_id(d, "777")
assert ok
content = meta.read_text(encoding="utf-8")
assert "777" in content
assert 'name = "NoId"' in content
test("_write_steam_id: updates existing publishedid", _test_write_steam_id_updates_existing)
test("_write_steam_id: creates meta.cpp if missing", _test_write_steam_id_creates_if_missing)
test("_write_steam_id: appends if no publishedid line", _test_write_steam_id_appends_if_no_line)
# _build_comparison_id_map tests
def _test_build_comparison_id_map_reads_json():
comparison = {
"compared_presets": ["A", "B"],
"shared": {"mod_count": 1, "mods": [
{"name": "CBA_A3", "steam_id": "450814997", "url": None, "source": "steam"},
]},
"unique": {
"A": {"mod_count": 1, "mods": [
{"name": "ACE3", "steam_id": "463939057", "url": None, "source": "steam"},
]},
"B": {"mod_count": 0, "mods": []},
},
}
with tempfile.TemporaryDirectory() as d:
cfg_data = {
"server": {"base_url": "x", "username": "u", "password": "p"},
"paths": {"arma_dir": d, "downloads": d,
"modlist_html": d, "modlist_json": d},
}
import json as _json
cfg_file = Path(d) / "config.json"
cfg_file.write_text(_json.dumps(cfg_data), encoding="utf-8")
from arma_modlist_tools.config import load_config as _lc
cfg = _lc(str(cfg_file))
# Write a fake comparison.json
comp_path = Path(d) / "comparison.json"
comp_path.write_text(_json.dumps(comparison), encoding="utf-8")
id_map = _build_comparison_id_map(cfg)
assert_in("cbaa3", id_map)
assert_eq(id_map["cbaa3"][0], "450814997")
assert_in("ace3", id_map)
assert_eq(id_map["ace3"][0], "463939057")
test("_build_comparison_id_map: reads steam_ids from comparison.json", _test_build_comparison_id_map_reads_json)
# ---------------------------------------------------------------------------
# 10. Integration: parse → compare → reporter (offline)
# ---------------------------------------------------------------------------
group("integration: parse → compare → reporter (offline)")
def _test_end_to_end_offline():
"""Parse real HTML files, compare them, build a mock report."""
html_dir = Path(__file__).parent / "modlist_html"
presets = parse_modlist_dir(html_dir)
assert len(presets) >= 2, "Need >=2 HTML presets for integration test"
comparison = compare_presets(*presets)
total = comparison["shared"]["mod_count"] + sum(
d["mod_count"] for d in comparison["unique"].values()
)
assert total > 0, "Comparison must have mods"
# Build report against empty index (all missing)
report_all_missing = build_missing_report(comparison, {"by_steam_id": {}, "by_name": {}})
assert_eq(report_all_missing["missing"], total)
# Build report against full index (all found)
full_by_id = {
mod["steam_id"]: f"https://x.com/@mod{i}/"
for i, mod in enumerate(
comparison["shared"]["mods"]
+ [m for d in comparison["unique"].values() for m in d["mods"]]
)
if mod.get("steam_id")
}
full_by_name = {
_normalize_name(mod["name"]): f"https://x.com/@mod{i}/"
for i, mod in enumerate(
comparison["shared"]["mods"]
+ [m for d in comparison["unique"].values() for m in d["mods"]]
)
if not mod.get("steam_id")
}
report_all_found = build_missing_report(
comparison, {"by_steam_id": full_by_id, "by_name": full_by_name}
)
assert_eq(report_all_found["missing"], 0)
def _test_comparison_json_consistent_with_html():
"""The on-disk comparison.json must be internally consistent with the HTML files.
The pipeline lets users compare a *subset* of available presets, so we only
verify that every preset listed in comparison.json has a matching HTML file —
not that all HTML files were included.
"""
html_dir = Path(__file__).parent / "modlist_html"
json_file = Path(__file__).parent / "modlist_json" / "comparison.json"
if not json_file.exists():
raise _SkipTest("comparison.json not found (run pipeline first)")
available_presets = {p.stem for p in html_dir.glob("*.html")}
on_disk = json.loads(json_file.read_text(encoding="utf-8"))
# Every preset referenced in comparison.json must have a source HTML file
for pname in on_disk["compared_presets"]:
assert pname in available_presets, (
f"comparison.json references '{pname}' but no matching HTML file found"
)
# Re-compare only the presets that were actually used on disk
selected = [p for p in parse_modlist_dir(html_dir)
if p["preset_name"] in on_disk["compared_presets"]]
if len(selected) < 2:
raise _SkipTest("fewer than 2 matching HTML presets available")
fresh = compare_presets(*selected)
assert_eq(fresh["shared"]["mod_count"], on_disk["shared"]["mod_count"])
for pname in fresh["compared_presets"]:
assert_eq(
fresh["unique"][pname]["mod_count"],
on_disk["unique"][pname]["mod_count"],
f"unique mod count mismatch for {pname}",
)
test("end-to-end offline pipeline (parse → compare → report)", _test_end_to_end_offline)
test("comparison.json on disk matches fresh parse+compare", _test_comparison_json_consistent_with_html)
# ---------------------------------------------------------------------------
# 11. gui._io — QueueWriter
# ---------------------------------------------------------------------------
group("gui._io — QueueWriter")
import importlib.util as _ilu
import queue as _queue_mod
# Load gui/_io.py without triggering gui/__init__.py (which requires customtkinter)
_io_spec = _ilu.spec_from_file_location("gui._io", Path(__file__).parent / "gui" / "_io.py")
_io_mod = _ilu.module_from_spec(_io_spec)
_io_spec.loader.exec_module(_io_mod)
_QueueWriter = _io_mod._QueueWriter
def _qw():
"""Return a fresh (writer, queue) pair."""
q = _queue_mod.Queue()
return _QueueWriter(q), q
def _test_qw_clean_text_passes_through():
w, q = _qw()
w.write("hello\n")
assert_eq(q.get_nowait(), "hello\n")
def _test_qw_strips_csi_escape_sequences():
w, q = _qw()
w.write("\x1b[32mGreen\x1b[0m")
assert_eq(q.get_nowait(), "Green")
def _test_qw_strips_osc_escape_sequences():
w, q = _qw()
w.write("\x1b]0;window title\x07visible text")
assert_eq(q.get_nowait(), "visible text")
def _test_qw_bare_cr_becomes_newline():
w, q = _qw()
w.write("first\rsecond")
assert_eq(q.get_nowait(), "first\nsecond")
def _test_qw_crlf_normalised_to_lf():
w, q = _qw()
w.write("line1\r\nline2")
assert_eq(q.get_nowait(), "line1\nline2")
def _test_qw_empty_string_not_enqueued():
w, q = _qw()
w.write("")
assert q.empty(), "empty write should not enqueue anything"
def _test_qw_only_ansi_not_enqueued():
w, q = _qw()
w.write("\x1b[32m\x1b[0m")
assert q.empty(), "write that strips to empty should not enqueue"
def _test_qw_returns_original_byte_length():
w, q = _qw()
raw = "\x1b[32mhello\x1b[0m"
result = w.write(raw)
assert_eq(result, len(raw))
def _test_qw_tqdm_progress_line():
"""tqdm uses \\r to overwrite in-place; should become a newline in the log."""
w, q = _qw()
w.write("\r 50%|█████ | 1/2 [00:01<00:01]")
out = q.get_nowait()
assert out.startswith("\n"), f"expected leading newline, got {out!r}"
assert "50%" in out
def _test_qw_mixed_ansi_and_cr():
w, q = _qw()
w.write("\x1b[1m\rProgress: 75%\x1b[0m")
out = q.get_nowait()
assert "\x1b" not in out, "ANSI codes should be stripped"
assert "\r" not in out, "bare CR should be converted"
assert "75%" in out
test("clean text passes through unchanged", _test_qw_clean_text_passes_through)
test("CSI escape sequences stripped", _test_qw_strips_csi_escape_sequences)
test("OSC escape sequences stripped", _test_qw_strips_osc_escape_sequences)
test("bare CR converted to newline", _test_qw_bare_cr_becomes_newline)
test("CRLF normalised to LF", _test_qw_crlf_normalised_to_lf)
test("empty string not enqueued", _test_qw_empty_string_not_enqueued)
test("all-ANSI write not enqueued", _test_qw_only_ansi_not_enqueued)
test("write() returns original length", _test_qw_returns_original_byte_length)
test("tqdm \\r progress line becomes newline", _test_qw_tqdm_progress_line)
test("mixed ANSI + CR stripped and converted", _test_qw_mixed_ansi_and_cr)
def _test_qw_osc_st_terminator():
"""OSC sequences ended with ST (ESC \\) must also be stripped."""
w, q = _qw()
w.write("\x1b]0;title\x1b\\visible text")
assert_eq(q.get_nowait(), "visible text")
test("OSC ST-terminated sequence stripped", _test_qw_osc_st_terminator)
# ---------------------------------------------------------------------------
# 12. cleaner — find_orphan_folders
# ---------------------------------------------------------------------------
group("cleaner — find_orphan_folders")
from arma_modlist_tools.cleaner import find_orphan_folders, folder_size
def _mk_mod_dir(root: Path, group: str, name: str, files: list[str] | None = None) -> Path:
"""Create a mock mod folder under root/group/@name with optional dummy files."""
mod_dir = root / group / f"@{name}"
mod_dir.mkdir(parents=True, exist_ok=True)
for fname in (files or []):
f = mod_dir / fname
f.write_bytes(b"x" * 1024)
return mod_dir
_COMPARISON_BASE = {
"compared_presets": ["A", "B"],
"shared": {"mod_count": 1, "mods": [{"name": "CBA_A3", "steam_id": "1", "url": None, "source": "steam"}]},
"unique": {
"A": {"mod_count": 1, "mods": [{"name": "ACE3", "steam_id": "2", "url": None, "source": "steam"}]},
"B": {"mod_count": 0, "mods": []},
},
}
def _test_orphan_empty_downloads():
with tempfile.TemporaryDirectory() as d:
result = find_orphan_folders(Path(d) / "nonexistent", _COMPARISON_BASE)
assert result == []
def _test_orphan_none_when_all_match():
with tempfile.TemporaryDirectory() as d:
d = Path(d)
_mk_mod_dir(d, "shared", "CBA_A3")
_mk_mod_dir(d, "A", "ACE3")
result = find_orphan_folders(d, _COMPARISON_BASE)
assert result == [], f"Expected no orphans, got {result}"
def _test_orphan_detects_removed_mod():
with tempfile.TemporaryDirectory() as d:
d = Path(d)
_mk_mod_dir(d, "shared", "CBA_A3")
_mk_mod_dir(d, "shared", "OldMod") # not in comparison
result = find_orphan_folders(d, _COMPARISON_BASE)
assert len(result) == 1
assert_eq(result[0]["name"], "@OldMod")
assert_eq(result[0]["group"], "shared")
def _test_orphan_detects_removed_group():
with tempfile.TemporaryDirectory() as d:
d = Path(d)
_mk_mod_dir(d, "shared", "CBA_A3")
_mk_mod_dir(d, "OldPreset", "SomeMod") # group no longer in comparison
result = find_orphan_folders(d, _COMPARISON_BASE)
assert len(result) == 1
assert_eq(result[0]["group"], "OldPreset")
def _test_orphan_normalised_name_matches():
"""A folder named @CBA A3 should match mod named CBA_A3 (normalised)."""
with tempfile.TemporaryDirectory() as d:
d = Path(d)
# Create folder with spaces — normalises to "cbaa3" same as "CBA_A3"
mod_dir = d / "shared" / "@CBA A3"
mod_dir.mkdir(parents=True)
result = find_orphan_folders(d, _COMPARISON_BASE)
assert result == [], f"Normalised name should match, got {result}"
def _test_orphan_size_reported():
with tempfile.TemporaryDirectory() as d:
d = Path(d)
mod_dir = _mk_mod_dir(d, "shared", "OldMod", files=["a.pbo", "b.pbo"])
result = find_orphan_folders(d, _COMPARISON_BASE)
assert len(result) == 1
assert result[0]["size"] == 2048 # 2 × 1024 bytes
def _test_orphan_ignores_non_at_folders():
"""Only @-prefixed directories are considered mod folders."""
with tempfile.TemporaryDirectory() as d:
d = Path(d)
non_mod = d / "shared" / "keys"
non_mod.mkdir(parents=True)
_mk_mod_dir(d, "shared", "CBA_A3")
result = find_orphan_folders(d, _COMPARISON_BASE)
assert result == []
def _test_folder_size_recursive():
with tempfile.TemporaryDirectory() as d:
d = Path(d)
(d / "a.pbo").write_bytes(b"x" * 512)
sub = d / "sub"
sub.mkdir()
(sub / "b.pbo").write_bytes(b"x" * 512)
assert_eq(folder_size(d), 1024)
test("find_orphan_folders: empty downloads dir returns []", _test_orphan_empty_downloads)
test("find_orphan_folders: no orphans when all mods match", _test_orphan_none_when_all_match)
test("find_orphan_folders: detects mod removed from comparison", _test_orphan_detects_removed_mod)
test("find_orphan_folders: entire removed group flagged as orphan", _test_orphan_detects_removed_group)
test("find_orphan_folders: normalised name matches (spaces vs underscores)", _test_orphan_normalised_name_matches)
test("find_orphan_folders: orphan size summed correctly", _test_orphan_size_reported)
test("find_orphan_folders: non-@ folders ignored", _test_orphan_ignores_non_at_folders)
test("folder_size: sums files recursively", _test_folder_size_recursive)
# ---------------------------------------------------------------------------
# 13. E2E — clean_orphans.py CLI (subprocess)
# ---------------------------------------------------------------------------
group("e2e — clean_orphans.py CLI")
import subprocess as _subprocess
def _make_e2e_root(base: Path, comparison: dict) -> Path:
"""Create a self-contained project root with config.json + comparison.json
and a downloads/ directory. Returns the root path."""
root = base / "project"
root.mkdir()
dl = root / "downloads"
dl.mkdir()
json_dir = root / "modlist_json"
json_dir.mkdir()
arma = root / "arma3server"
arma.mkdir()
cfg = {
"server": {"base_url": "https://example.com/", "username": "u", "password": "p"},
"paths": {
"arma_dir": str(arma),
"downloads": str(dl),
"modlist_html": str(root / "modlist_html"),
"modlist_json": str(json_dir),
},
}
(root / "config.json").write_text(json.dumps(cfg), encoding="utf-8")
(json_dir / "comparison.json").write_text(
json.dumps(comparison), encoding="utf-8"
)
return root
def _run_clean_orphans(root: Path, *extra_args: str) -> _subprocess.CompletedProcess:
"""Run clean_orphans.py from the given project root."""
script = str(Path(__file__).parent / "clean_orphans.py")
return _subprocess.run(
[sys.executable, script] + list(extra_args),
cwd=str(root),
capture_output=True,
text=True,
)
_E2E_COMPARISON = {
"compared_presets": ["A", "B"],
"shared": {"mod_count": 1, "mods": [{"name": "CBA_A3", "steam_id": "1", "url": None, "source": "steam"}]},
"unique": {
"A": {"mod_count": 1, "mods": [{"name": "ACE3", "steam_id": "2", "url": None, "source": "steam"}]},
"B": {"mod_count": 0, "mods": []},
},
}
def _test_e2e_dry_run_lists_orphans():
with tempfile.TemporaryDirectory() as d:
root = _make_e2e_root(Path(d), _E2E_COMPARISON)
dl = root / "downloads"
(dl / "shared" / "@CBA_A3").mkdir(parents=True) # known
orphan = dl / "shared" / "@OldMod"
orphan.mkdir(parents=True)
(orphan / "file.pbo").write_bytes(b"x" * 512)
result = _run_clean_orphans(root, "--dry-run")
assert result.returncode == 0, f"Expected 0, got {result.returncode}\n{result.stderr}"
assert "@OldMod" in result.stdout, "Orphan not listed in dry-run output"
assert orphan.exists(), "--dry-run must not delete files"
def _test_e2e_no_orphans_clean_exit():
with tempfile.TemporaryDirectory() as d:
root = _make_e2e_root(Path(d), _E2E_COMPARISON)
dl = root / "downloads"
(dl / "shared" / "@CBA_A3").mkdir(parents=True)
(dl / "A" / "@ACE3").mkdir(parents=True)
result = _run_clean_orphans(root, "--dry-run")
assert result.returncode == 0
assert "No orphans" in result.stdout
def _test_e2e_yes_deletes_orphans():
with tempfile.TemporaryDirectory() as d:
root = _make_e2e_root(Path(d), _E2E_COMPARISON)
dl = root / "downloads"
(dl / "shared" / "@CBA_A3").mkdir(parents=True)
orphan = dl / "shared" / "@OldMod"
orphan.mkdir(parents=True)
(orphan / "file.pbo").write_bytes(b"data")
result = _run_clean_orphans(root, "--yes")
assert result.returncode == 0, f"Expected 0\n{result.stderr}"
assert not orphan.exists(), "Orphan should have been deleted"
assert "Deleted" in result.stdout
def _test_e2e_yes_preserves_known_mods():
with tempfile.TemporaryDirectory() as d:
root = _make_e2e_root(Path(d), _E2E_COMPARISON)
dl = root / "downloads"
known = dl / "shared" / "@CBA_A3"
known.mkdir(parents=True)
(known / "cba.pbo").write_bytes(b"keep me")
orphan = dl / "shared" / "@GoneMod"
orphan.mkdir(parents=True)
result = _run_clean_orphans(root, "--yes")
assert result.returncode == 0
assert known.exists(), "Known mod must NOT be deleted"
assert not orphan.exists(), "Orphan must be deleted"
def _test_e2e_missing_comparison_exits_1():
with tempfile.TemporaryDirectory() as d:
root = _make_e2e_root(Path(d), _E2E_COMPARISON)
# Remove the comparison.json so the script can't find it
(root / "modlist_json" / "comparison.json").unlink()
result = _run_clean_orphans(root, "--dry-run")
assert result.returncode == 1, "Should exit 1 when comparison.json missing"
assert "ERROR" in result.stdout
def _test_e2e_removed_group_flagged():
"""A group folder that no longer exists in comparison.json is fully orphaned."""
with tempfile.TemporaryDirectory() as d:
root = _make_e2e_root(Path(d), _E2E_COMPARISON)
dl = root / "downloads"
old_group = dl / "OldPreset" / "@SomeMod"
old_group.mkdir(parents=True)
result = _run_clean_orphans(root, "--dry-run")
assert result.returncode == 0
assert "@SomeMod" in result.stdout
test("e2e dry-run: lists orphans, does not delete", _test_e2e_dry_run_lists_orphans)
test("e2e dry-run: exits 0 when downloads clean", _test_e2e_no_orphans_clean_exit)
test("e2e --yes: deletes orphans", _test_e2e_yes_deletes_orphans)
test("e2e --yes: preserves known mods", _test_e2e_yes_preserves_known_mods)
test("e2e missing comparison.json: exits 1", _test_e2e_missing_comparison_exits_1)
test("e2e removed group: all mods flagged as orphans", _test_e2e_removed_group_flagged)
# ---------------------------------------------------------------------------
# 14. Coverage gap tests
# ---------------------------------------------------------------------------
group("coverage gaps — cleaner, config, parser, linker, compat")
import os as _os
from unittest.mock import patch as _patch, MagicMock as _MagicMock
from arma_modlist_tools.config import load_config as _load_config
from arma_modlist_tools import compat as _compat_mod
from arma_modlist_tools import linker as _linker_mod
from arma_modlist_tools.parser import _source_from_class, parse_modlist_html
# ── cleaner.py ───────────────────────────────────────────────────────────────
def _test_cleaner_skips_file_in_downloads_root():
"""A plain file at downloads/{file} (not a dir) is silently skipped."""
with tempfile.TemporaryDirectory() as d:
d = Path(d)
(d / "shared").mkdir()
(d / "shared" / "@CBA_A3").mkdir()
(d / "readme.txt").write_text("not a group dir") # triggers the skip branch
result = find_orphan_folders(d, _COMPARISON_BASE)
assert result == []
test("cleaner: plain file in downloads root is skipped (not treated as group)", _test_cleaner_skips_file_in_downloads_root)
# ── config.py ────────────────────────────────────────────────────────────────
def _test_load_config_fallback_to_root_path():
"""load_config() falls back to root_path (project root) when CWD has no config.json."""
old_cwd = _os.getcwd()
with tempfile.TemporaryDirectory() as d:
_os.chdir(d) # CWD has no config.json
try:
cfg = _load_config() # must find via root_path (project root has config.json)
finally:
_os.chdir(old_cwd) # restore BEFORE tempdir cleanup to avoid WinError 32
assert cfg is not None
def _test_load_config_raises_when_not_found():
"""load_config() raises FileNotFoundError when neither search path has config.json."""
old_cwd = _os.getcwd()
with tempfile.TemporaryDirectory() as d:
_os.chdir(d) # no config.json in CWD
try:
# Patch __file__ inside config.py so root_path also points somewhere without config.json
with _patch("arma_modlist_tools.config.__file__",
str(Path(d) / "fake" / "arma_modlist_tools" / "config.py")):
try:
_load_config()
assert False, "Should have raised FileNotFoundError"
except FileNotFoundError:
pass
finally:
_os.chdir(old_cwd)
test("config: load_config() falls back to root_path when CWD has no config.json", _test_load_config_fallback_to_root_path)
test("config: load_config() raises FileNotFoundError when config.json not found anywhere", _test_load_config_raises_when_not_found)
# ── parser.py ────────────────────────────────────────────────────────────────
def _test_source_from_class_unknown():
"""_source_from_class returns 'unknown' for unrecognised CSS class strings."""
assert_eq(_source_from_class("from-workshop"), "unknown")
assert_eq(_source_from_class(""), "unknown")
assert_eq(_source_from_class("some-other-class"), "unknown")
def _test_parse_modlist_html_skips_rows_without_name():
"""Rows with no DisplayName td return None and are excluded from results.
Also exercises the `continue` branch for non-ModContainer elements.
"""
# Row 1: valid mod
# Row 2: no DisplayName td at all → parse_mod_entry returns None → skipped
# Row 3: regular header row (no data-type="ModContainer") → parser skips via continue
html = textwrap.dedent("""\
| Header row (no ModContainer attr) |
| Real Mod |
|
| no display name here |
""")
with tempfile.NamedTemporaryFile(mode="w", suffix=".html", delete=False,
encoding="utf-8") as f:
f.write(html)
fname = f.name
try:
result = parse_modlist_html(fname)
assert_eq(result["mod_count"], 1)
assert_eq(result["mods"][0]["name"], "Real Mod")
finally:
Path(fname).unlink(missing_ok=True)
test("parser: _source_from_class returns 'unknown' for unrecognised class", _test_source_from_class_unknown)
test("parser: rows with empty DisplayName are skipped", _test_parse_modlist_html_skips_rows_without_name)
# ── linker.py ────────────────────────────────────────────────────────────────
def _test_remove_junction_oserror():
"""remove_junction returns (False, message) when OSError occurs."""
with tempfile.TemporaryDirectory() as d:
non_existent = Path(d) / "ghost_link"
ok, err = _linker_mod.remove_junction(non_existent)
assert not ok
assert err # error message is non-empty
def _test_link_group_records_failed_when_create_returns_false():
"""link_group counts a failure when create_junction returns False."""
with tempfile.TemporaryDirectory() as d:
d = Path(d)
group_dir = d / "shared"
arma_dir = d / "arma"
group_dir.mkdir()
arma_dir.mkdir()
(group_dir / "@ace").mkdir()
with _patch("arma_modlist_tools.linker.create_junction", return_value=False):
result = _linker_mod.link_group(group_dir, arma_dir)
assert_eq(result["failed"], 1)
assert "ace" in " ".join(result["errors"].keys()).lower()
def _test_unlink_group_records_failed_when_remove_errors():
"""unlink_group counts a failure when remove_junction returns an error."""
with tempfile.TemporaryDirectory() as d:
d = Path(d)
group_dir = d / "shared"
arma_dir = d / "arma"
group_dir.mkdir()
arma_dir.mkdir()
(group_dir / "@ace").mkdir()
# Pretend it's already linked so unlink_group tries to remove it
with _patch("arma_modlist_tools.linker.get_link_status") as mock_status, \
_patch("arma_modlist_tools.linker.remove_junction", return_value=(False, "perm denied")):
mock_status.return_value = [{
"name": "@ace",
"source_path": group_dir / "@ace",
"link_path": arma_dir / "@ace",
"is_linked": True,
}]
result = _linker_mod.unlink_group(group_dir, arma_dir)
assert_eq(result["failed"], 1)
assert result["errors"]
def _test_is_junction_linux_path():
"""_is_junction uses os.path.islink on Linux."""
with _patch("arma_modlist_tools.linker.is_windows", return_value=False), \
_patch("os.path.islink", return_value=True) as mock_islink:
result = _linker_mod._is_junction(Path("/fake/path"))
assert result is True
mock_islink.assert_called_once()
def _test_create_junction_linux_success():
"""create_junction calls os.symlink on Linux (mocked — Windows lacks symlink perms)."""
with tempfile.TemporaryDirectory() as d:
link_path = Path(d) / "link"
target = Path(d) / "target"
with _patch("arma_modlist_tools.linker.is_windows", return_value=False), \
_patch("os.symlink") as mock_sym:
ok = _linker_mod.create_junction(link_path, target)
assert ok
mock_sym.assert_called_once_with(str(target), str(link_path))
def _test_create_junction_linux_oserror():
"""create_junction returns False if os.symlink raises OSError on Linux."""
with tempfile.TemporaryDirectory() as d:
link_path = Path(d) / "link"
with _patch("arma_modlist_tools.linker.is_windows", return_value=False), \
_patch("os.symlink", side_effect=OSError("perm")):
ok = _linker_mod.create_junction(link_path, Path(d) / "target")
assert not ok
def _test_remove_junction_linux():
"""remove_junction calls os.unlink on Linux (mocked — Windows lacks symlink perms)."""
with tempfile.TemporaryDirectory() as d:
link = Path(d) / "link"
with _patch("arma_modlist_tools.linker.is_windows", return_value=False), \
_patch("os.unlink") as mock_unlink:
ok, err = _linker_mod.remove_junction(link)
assert ok
assert err == ""
mock_unlink.assert_called_once_with(str(link))
test("linker: remove_junction returns (False, msg) on OSError", _test_remove_junction_oserror)
test("linker: link_group records failure when create_junction -> False", _test_link_group_records_failed_when_create_returns_false)
test("linker: unlink_group records failure when remove_junction errors", _test_unlink_group_records_failed_when_remove_errors)
test("linker: _is_junction uses os.path.islink on Linux", _test_is_junction_linux_path)
test("linker: create_junction calls os.symlink on Linux (success)", _test_create_junction_linux_success)
test("linker: create_junction returns False if os.symlink raises", _test_create_junction_linux_oserror)
test("linker: remove_junction calls os.unlink on Linux", _test_remove_junction_linux)
# ── compat.py ────────────────────────────────────────────────────────────────
def _test_get_os_label_windows_server():
"""get_os_label returns 'Windows Server' when version string contains 'Server'."""
with _patch("arma_modlist_tools.compat.is_windows", return_value=True), \
_patch("platform.version", return_value="10.0.17763 Windows Server 2019"):
label = _compat_mod.get_os_label()
assert_eq(label, "Windows Server")
def _test_get_os_label_linux_ubuntu_desktop():
with _patch("arma_modlist_tools.compat.is_windows", return_value=False), \
_patch("arma_modlist_tools.compat.is_linux", return_value=True), \
_patch("arma_modlist_tools.compat._read_os_release",
return_value={"NAME": "Ubuntu"}), \
_patch("arma_modlist_tools.compat._is_headless", return_value=False):
label = _compat_mod.get_os_label()
assert_eq(label, "Ubuntu")
def _test_get_os_label_linux_ubuntu_server():
with _patch("arma_modlist_tools.compat.is_windows", return_value=False), \
_patch("arma_modlist_tools.compat.is_linux", return_value=True), \
_patch("arma_modlist_tools.compat._read_os_release",
return_value={"NAME": "Ubuntu"}), \
_patch("arma_modlist_tools.compat._is_headless", return_value=True):
label = _compat_mod.get_os_label()
assert_eq(label, "Ubuntu Server")
def _test_get_os_label_linux_other():
with _patch("arma_modlist_tools.compat.is_windows", return_value=False), \
_patch("arma_modlist_tools.compat.is_linux", return_value=True), \
_patch("arma_modlist_tools.compat._read_os_release",
return_value={"NAME": "Debian GNU/Linux"}):
label = _compat_mod.get_os_label()
assert_eq(label, "Linux")
def _test_get_os_label_unknown_platform():
with _patch("arma_modlist_tools.compat.is_windows", return_value=False), \
_patch("arma_modlist_tools.compat.is_linux", return_value=False):
label = _compat_mod.get_os_label()
assert_eq(label, "Unknown")
def _test_read_os_release_parses_file():
content = 'NAME="Ubuntu"\nVERSION_ID="22.04"\n# comment\nID=ubuntu\n'
import io as _io
with _patch("builtins.open", return_value=_io.StringIO(content)):
result = _compat_mod._read_os_release()
assert_eq(result.get("NAME"), "Ubuntu")
assert_eq(result.get("VERSION_ID"), "22.04")
def _test_read_os_release_handles_missing_file():
with _patch("builtins.open", side_effect=OSError("no such file")):
result = _compat_mod._read_os_release()
assert_eq(result, {})
def _test_is_headless_with_display():
with _patch.dict("os.environ", {"DISPLAY": ":0"}, clear=False):
assert not _compat_mod._is_headless()
def _test_is_headless_without_display():
with _patch.dict("os.environ", {}, clear=True):
assert _compat_mod._is_headless()
def _test_fix_console_encoding_non_windows_noop():
"""fix_console_encoding is a no-op on non-Windows."""
import io as _io
original_stdout = sys.stdout
with _patch("arma_modlist_tools.compat.is_windows", return_value=False):
_compat_mod.fix_console_encoding()
assert sys.stdout is original_stdout
def _test_fix_console_encoding_already_utf8():
"""fix_console_encoding skips wrapping when stdout is already UTF-8."""
# encoding is a readonly attribute on real TextIOWrapper, so use a fake stdout
fake_stdout = _MagicMock()
fake_stdout.encoding = "utf-8"
original_stdout = sys.stdout
try:
with _patch("arma_modlist_tools.compat.is_windows", return_value=True):
sys.stdout = fake_stdout
_compat_mod.fix_console_encoding()
assert sys.stdout is fake_stdout # still the same object (not wrapped)
finally:
sys.stdout = original_stdout
test("compat: get_os_label returns 'Windows Server' on Windows Server", _test_get_os_label_windows_server)
test("compat: get_os_label returns 'Ubuntu' on Ubuntu desktop", _test_get_os_label_linux_ubuntu_desktop)
test("compat: get_os_label returns 'Ubuntu Server' on headless Ubuntu", _test_get_os_label_linux_ubuntu_server)
test("compat: get_os_label returns 'Linux' on non-Ubuntu Linux", _test_get_os_label_linux_other)
test("compat: get_os_label returns 'Unknown' on unrecognised platform", _test_get_os_label_unknown_platform)
test("compat: _read_os_release parses key=value pairs", _test_read_os_release_parses_file)
test("compat: _read_os_release returns {} when file missing", _test_read_os_release_handles_missing_file)
test("compat: _is_headless returns False when DISPLAY is set", _test_is_headless_with_display)
test("compat: _is_headless returns True when no display env vars", _test_is_headless_without_display)
def _test_fix_console_encoding_none_stdout():
"""fix_console_encoding is a no-op when sys.stdout is None (pythonw.exe)."""
original_stdout = sys.stdout
try:
with _patch("arma_modlist_tools.compat.is_windows", return_value=True):
sys.stdout = None
_compat_mod.fix_console_encoding() # must not raise
assert sys.stdout is None
finally:
sys.stdout = original_stdout
test("compat: fix_console_encoding is no-op on non-Windows", _test_fix_console_encoding_non_windows_noop)
test("compat: fix_console_encoding skips when stdout already UTF-8", _test_fix_console_encoding_already_utf8)
test("compat: fix_console_encoding is no-op when stdout is None", _test_fix_console_encoding_none_stdout)
# ---------------------------------------------------------------------------
# 15. Live-server fetcher tests (skipped when server unreachable)
# ---------------------------------------------------------------------------
group("fetcher — live server (skipped if unreachable)")
import requests as _requests
from arma_modlist_tools.fetcher import (
make_session as _make_session,
build_server_index as _build_server_index,
find_mod_folder as _find_mod_folder,
list_mod_files as _list_mod_files,
)
# ── One-time setup: try to load config and probe the server ─────────────────
_LIVE_INDEX: dict | None = None
_LIVE_SESSION: "_requests.Session | None" = None
_LIVE_BASE_URL: str = ""
_LIVE_SKIP_REASON: str = ""
try:
_live_cfg = _load_config()
_LIVE_BASE_URL = _live_cfg.server_url
_live_auth = _live_cfg.server_auth
# Quick reachability probe — just GET the root, no JSON parsing
_probe = _requests.get(_LIVE_BASE_URL, auth=_live_auth, timeout=8)
_probe.raise_for_status()
# Reachable → build the index (one network round-trip per @ folder)
_LIVE_SESSION = _make_session(_live_auth)
_LIVE_INDEX = _build_server_index(_LIVE_BASE_URL, _live_auth)
except Exception as _live_exc:
_LIVE_SKIP_REASON = str(_live_exc)
def _require_live() -> None:
"""Raise _SkipTest if the server is unreachable."""
if _LIVE_INDEX is None:
raise _SkipTest(f"server unreachable: {_LIVE_SKIP_REASON}")
# ── Tests ────────────────────────────────────────────────────────────────────
def _test_live_index_structure():
"""build_server_index returns the expected three-key structure."""
_require_live()
assert "by_steam_id" in _LIVE_INDEX
assert "by_name" in _LIVE_INDEX
assert "folders" in _LIVE_INDEX
def _test_live_index_has_folders():
"""The server must have at least one mod folder."""
_require_live()
assert len(_LIVE_INDEX["folders"]) > 0, "Expected at least one folder on server"
def _test_live_index_has_steam_id_entries():
"""At least some folders must have parseable meta.cpp (steam_id index populated)."""
_require_live()
assert len(_LIVE_INDEX["by_steam_id"]) > 0, "Expected at least one steam_id entry"
def _test_live_index_has_name_entries():
"""Every @ folder adds an entry to by_name (normalized)."""
_require_live()
assert len(_LIVE_INDEX["by_name"]) > 0, "Expected at least one by_name entry"
def _test_live_find_mod_by_steam_id():
"""find_mod_folder locates CBA_A3 by its known steam_id (450814997)."""
_require_live()
mod = {"name": "CBA_A3", "steam_id": "450814997"}
url = _find_mod_folder(mod, _LIVE_INDEX)
assert url is not None, "CBA_A3 not found by steam_id — is it on the server?"
assert "@" in url.lower() or "cba" in url.lower(), f"URL looks wrong: {url}"
def _test_live_find_mod_url_is_reachable():
"""The URL returned for CBA_A3 must respond with HTTP 200."""
_require_live()
mod = {"name": "CBA_A3", "steam_id": "450814997"}
url = _find_mod_folder(mod, _LIVE_INDEX)
if url is None:
raise _SkipTest("CBA_A3 not in index — skipping reachability check")
r = _LIVE_SESSION.get(url, timeout=10)
assert r.status_code == 200, f"Expected 200, got {r.status_code} for {url}"
def _test_live_list_mod_files_returns_entries():
"""list_mod_files returns a non-empty list for CBA_A3."""
_require_live()
mod = {"name": "CBA_A3", "steam_id": "450814997"}
url = _find_mod_folder(mod, _LIVE_INDEX)
if url is None:
raise _SkipTest("CBA_A3 not in index — skipping list_mod_files check")
files = _list_mod_files(url, _LIVE_SESSION)
assert len(files) > 0, "CBA_A3 has no files? Unexpected."
def _test_live_list_mod_files_tuple_shape():
"""Each entry from list_mod_files is a (rel_path, url, size) 3-tuple."""
_require_live()
mod = {"name": "CBA_A3", "steam_id": "450814997"}
url = _find_mod_folder(mod, _LIVE_INDEX)
if url is None:
raise _SkipTest("CBA_A3 not in index — skipping tuple shape check")
files = _list_mod_files(url, _LIVE_SESSION)
if not files:
raise _SkipTest("no files returned — skipping tuple shape check")
rel, file_url, size = files[0]
assert isinstance(rel, str) and rel, f"rel_path must be non-empty string, got {rel!r}"
assert isinstance(file_url, str) and file_url, f"file_url must be non-empty string"
assert isinstance(size, int) and size >= 0, f"size must be non-negative int, got {size!r}"
def _test_live_find_mod_by_name_fallback():
"""find_mod_folder can locate a mod by normalized name when steam_id is absent."""
_require_live()
# Use a mod with no steam_id — name-only lookup
mod = {"name": "CBA_A3", "steam_id": ""}
url = _find_mod_folder(mod, _LIVE_INDEX)
assert url is not None, "CBA_A3 not found via name fallback"
test("live: build_server_index returns expected structure", _test_live_index_structure)
test("live: server has at least one mod folder", _test_live_index_has_folders)
test("live: by_steam_id populated from meta.cpp files", _test_live_index_has_steam_id_entries)
test("live: by_name populated for all @ folders", _test_live_index_has_name_entries)
test("live: find_mod_folder locates CBA_A3 by steam_id", _test_live_find_mod_by_steam_id)
test("live: CBA_A3 folder URL returns HTTP 200", _test_live_find_mod_url_is_reachable)
test("live: list_mod_files returns non-empty list for CBA_A3", _test_live_list_mod_files_returns_entries)
test("live: list_mod_files entries are (rel_path, url, size) tuples", _test_live_list_mod_files_tuple_shape)
test("live: find_mod_folder name fallback works (no steam_id)", _test_live_find_mod_by_name_fallback)
# ---------------------------------------------------------------------------
# gui.views.mods — _find_folder
# ---------------------------------------------------------------------------
group("gui.views.mods — _find_folder")
import importlib.util as _mods_ilu
_mods_spec = _mods_ilu.spec_from_file_location(
"gui.views.mods", Path(__file__).parent / "gui" / "views" / "mods.py"
)
_mods_mod = _mods_ilu.module_from_spec(_mods_spec)
# Stub out customtkinter and gui imports so the module loads without a display
import types as _types
import sys as _sys
for _stub in ("customtkinter", "gui", "gui._constants", "gui.locales",
"gui.views", "gui.views.base"):
if _stub not in _sys.modules:
_sys.modules[_stub] = _types.ModuleType(_stub)
# Provide the colour constants the module references at import time
_sys.modules["gui._constants"].COLOR_OK = "#4CAF50"
_sys.modules["gui._constants"].COLOR_ERROR = "#F44336"
_sys.modules["gui._constants"].COLOR_WARN = "#FF9800"
_sys.modules["gui._constants"].COLOR_RUNNING = "#2196F3"
# Stub BaseView so the class body does not fail
_base_stub = _sys.modules["gui.views.base"] = _types.ModuleType("gui.views.base")
_base_stub.BaseView = object
# Stub locales.t so string calls don't crash
_sys.modules["gui.locales"].t = lambda key, **kw: key
_mods_spec.loader.exec_module(_mods_mod)
_find_folder_fn = _mods_mod._find_folder
def _test_ff_returns_none_for_missing_group(tmp_path):
assert _find_folder_fn(tmp_path / "nonexistent", "MyMod") is None
def _test_ff_exact_match(tmp_path):
(tmp_path / "@MyMod").mkdir()
result = _find_folder_fn(tmp_path, "MyMod")
assert result == tmp_path / "@MyMod"
def _test_ff_case_insensitive_match(tmp_path):
(tmp_path / "@mymod").mkdir()
result = _find_folder_fn(tmp_path, "MyMod")
assert result == tmp_path / "@mymod"
def _test_ff_normalized_match(tmp_path):
# Folder on disk uses underscores; modlist name uses spaces — both normalize to same string
(tmp_path / "@My_Mod_Edition").mkdir()
result = _find_folder_fn(tmp_path, "My Mod Edition")
assert result == tmp_path / "@My_Mod_Edition"
def _test_ff_steam_id_fallback(tmp_path):
"""Folder name bears no resemblance to mod name but meta.cpp has correct ID."""
folder = tmp_path / "@ServerCanonicalName"
folder.mkdir()
(folder / "meta.cpp").write_text('publishedid = 123456789;\nname = "Some Mod";\n')
result = _find_folder_fn(tmp_path, "Completely Different Name", steam_id="123456789")
assert result == folder
def _test_ff_steam_id_no_false_positive(tmp_path):
"""Wrong steam_id in meta.cpp must not match."""
folder = tmp_path / "@WrongMod"
folder.mkdir()
(folder / "meta.cpp").write_text('publishedid = 999999999;\n')
result = _find_folder_fn(tmp_path, "My Mod", steam_id="123456789")
assert result is None
def _test_ff_steam_id_skipped_when_none(tmp_path):
"""No steam_id supplied → meta.cpp is never read (no false positives)."""
folder = tmp_path / "@SomeFolder"
folder.mkdir()
(folder / "meta.cpp").write_text('publishedid = 123456789;\n')
result = _find_folder_fn(tmp_path, "My Mod", steam_id=None)
assert result is None
def _test_ff_missing_meta_cpp_skipped(tmp_path):
"""Folders without meta.cpp are silently skipped in the steam_id pass."""
folder = tmp_path / "@NoMeta"
folder.mkdir()
result = _find_folder_fn(tmp_path, "My Mod", steam_id="123456789")
assert result is None
# Wrap tmp_path calls in lambdas that supply a temp dir
def _with_tmp(fn):
def wrapper():
with tempfile.TemporaryDirectory() as d:
fn(Path(d))
return wrapper
test("_find_folder: None when group dir missing",
_with_tmp(_test_ff_returns_none_for_missing_group))
test("_find_folder: exact @ModName match",
_with_tmp(_test_ff_exact_match))
test("_find_folder: case-insensitive name match",
_with_tmp(_test_ff_case_insensitive_match))
test("_find_folder: normalized name match (punctuation differs)",
_with_tmp(_test_ff_normalized_match))
test("_find_folder: steam_id fallback via meta.cpp",
_with_tmp(_test_ff_steam_id_fallback))
test("_find_folder: wrong steam_id in meta.cpp is not a match",
_with_tmp(_test_ff_steam_id_no_false_positive))
test("_find_folder: no steam_id supplied → meta.cpp not checked",
_with_tmp(_test_ff_steam_id_skipped_when_none))
test("_find_folder: missing meta.cpp silently skipped",
_with_tmp(_test_ff_missing_meta_cpp_skipped))
# ---------------------------------------------------------------------------
# migrator — migrate_mod_groups
# ---------------------------------------------------------------------------
group("migrator — migrate_mod_groups")
from arma_modlist_tools.migrator import migrate_mod_groups as _migrate
def _make_mod(dl: Path, grp: str, folder: str, steam_id: str | None = None) -> Path:
"""Create a minimal mod folder under downloads/group/folder."""
d = dl / grp / folder
d.mkdir(parents=True)
if steam_id:
(d / "meta.cpp").write_text(f"publishedid = {steam_id};\n", encoding="utf-8")
(d / "dummy.pbo").write_bytes(b"\x00" * 8)
return d
def _test_migrate_already_correct():
comp = {"shared": {"mods": [{"name": "CBA_A3", "steam_id": "450814997"}]}, "unique": {}}
with tempfile.TemporaryDirectory() as d:
dl = Path(d) / "downloads"
_make_mod(dl, "shared", "@CBA_A3", "450814997")
result = _migrate(dl, None, comp)
assert_eq(result["moved"], 0)
assert_eq(result["skipped_correct"], 1)
def _test_migrate_moves_by_steam_id():
comp = {"shared": {"mods": []}, "unique": {
"A_v1": {"mods": [{"name": "ACE3", "steam_id": "463939057"}]}
}}
with tempfile.TemporaryDirectory() as d:
dl = Path(d) / "downloads"
old = _make_mod(dl, "A", "@ACE3", "463939057")
result = _migrate(dl, None, comp)
assert_eq(result["moved"], 1)
assert not old.exists(), "old folder must be gone"
assert (dl / "A_v1" / "@ACE3").exists(), "new folder must exist"
assert (dl / "A_v1" / "@ACE3" / "dummy.pbo").exists(), "files preserved"
def _test_migrate_moves_by_normalized_name():
"""No meta.cpp — matching falls back to normalised folder name."""
comp = {"shared": {"mods": [{"name": "CBA_A3", "steam_id": None}]}, "unique": {
"A": {"mods": []}
}}
with tempfile.TemporaryDirectory() as d:
dl = Path(d) / "downloads"
_make_mod(dl, "A", "@CBA_A3", steam_id=None)
result = _migrate(dl, None, comp)
assert_eq(result["moved"], 1)
assert (dl / "shared" / "@CBA_A3").exists()
def _test_migrate_skips_dest_exists():
comp = {"shared": {"mods": [{"name": "CBA_A3", "steam_id": "450814997"}]}, "unique": {}}
with tempfile.TemporaryDirectory() as d:
dl = Path(d) / "downloads"
_make_mod(dl, "A", "@CBA_A3", "450814997")
_make_mod(dl, "shared", "@CBA_A3", "450814997")
result = _migrate(dl, None, comp)
assert_eq(result["moved"], 0)
assert_eq(result["skipped_dest_exists"], 1)
def _test_migrate_skips_not_on_disk():
comp = {"shared": {"mods": [{"name": "CBA_A3", "steam_id": "450814997"}]}, "unique": {}}
with tempfile.TemporaryDirectory() as d:
dl = Path(d) / "downloads"
dl.mkdir()
result = _migrate(dl, None, comp)
assert_eq(result["skipped_not_found"], 1)
assert_eq(result["moved"], 0)
def _test_migrate_removes_stale_junction():
from arma_modlist_tools.linker import create_junction, _is_junction
comp = {"shared": {"mods": [{"name": "ACE3", "steam_id": "463939057"}]}, "unique": {}}
with tempfile.TemporaryDirectory() as dl_d, \
tempfile.TemporaryDirectory() as arma_d:
dl = Path(dl_d) / "downloads"
arma = Path(arma_d)
old = _make_mod(dl, "A", "@ACE3", "463939057")
link = arma / "@ACE3"
create_junction(link, old)
assert _is_junction(link), "precondition: junction must exist"
result = _migrate(dl, arma, comp)
assert_eq(result["moved"], 1)
assert_eq(result["junction_removed"], 1)
assert not _is_junction(link), "stale junction must be removed"
assert (dl / "shared" / "@ACE3").exists()
def _test_migrate_missing_downloads_dir():
comp = {"shared": {"mods": [{"name": "X", "steam_id": "1"}]}, "unique": {}}
with tempfile.TemporaryDirectory() as d:
result = _migrate(Path(d) / "nonexistent", None, comp)
assert_eq(result["moved"], 0)
assert_eq(result["skipped_not_found"], 1)
test("migrator: already in correct group → no move", _test_migrate_already_correct)
test("migrator: moves mod by steam_id to new group", _test_migrate_moves_by_steam_id)
test("migrator: moves mod by normalized name (no meta.cpp)", _test_migrate_moves_by_normalized_name)
test("migrator: skips when destination already exists", _test_migrate_skips_dest_exists)
test("migrator: skips mod not on disk", _test_migrate_skips_not_on_disk)
test("migrator: removes stale junction before moving", _test_migrate_removes_stale_junction)
test("migrator: no-op when downloads dir missing", _test_migrate_missing_downloads_dir)
# ---------------------------------------------------------------------------
# Summary
# ---------------------------------------------------------------------------
total = _passed + _failed + _skipped
print(f"\n{'='*60}")
print(f" Results: {_passed} passed, {_failed} failed, {_skipped} skipped ({total} total)")
print(f"{'='*60}\n")
if _failed:
sys.exit(1)