Files
Tran G. (Revernomad) Khoa 48637ffe90 feat: add migrate step to move mod folders between groups on preset change
Before step_fetch, scan all downloads/ subdirs and move any mod that
comparison.json now assigns to a different group. Matching uses steam_id
(via meta.cpp publishedid) first, normalized name as fallback.

Stale junctions in arma_dir are removed before the folder move so
step_link can re-create them pointing to the new location.

- New arma_modlist_tools/migrator.py: migrate_mod_groups()
- run.py: step_migrate(), --skip-migrate flag, wired into dispatch loop
- gui/app.py: step_migrate inserted as Step 3/5 between compare and fetch
- gui/locales.py: add step3/4/5 names (en + vi), renumber old 3->4, 4->5
- test_suite.py: 7 new migrator tests (158 total, 0 failed)
2026-04-14 15:08:40 +07:00

439 lines
16 KiB
Python

from __future__ import annotations
import json
import os
import queue
import subprocess
import sys
import threading
from pathlib import Path
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from arma_modlist_tools.config import Config
from gui.views.dashboard import DashboardView
import customtkinter as ctk
from tkinter import messagebox
from gui._constants import (
SIDEBAR_W, APP_TITLE, PROJECT_ROOT, SELECTION_FILE,
)
from gui._io import _QueueWriter
from gui.locales import t
from gui.wizard import SetupWizard
from gui.views.base import BaseView
# ---------------------------------------------------------------------------
# View name → class mapping (imported lazily to avoid circular imports)
# ---------------------------------------------------------------------------
_VIEW_NAMES = ("Dashboard", "Mods", "Tools", "Logs", "Settings")
def _get_view_class(name: str) -> type[BaseView]:
from gui.views import DashboardView, ModsView, ToolsView, LogsView, SettingsView
return {
"Dashboard": DashboardView,
"Mods": ModsView,
"Tools": ToolsView,
"Logs": LogsView,
"Settings": SettingsView,
}[name]
# ---------------------------------------------------------------------------
# Main application
# ---------------------------------------------------------------------------
class ArmaModManagerApp(ctk.CTk):
def __init__(self) -> None:
super().__init__()
self.title(APP_TITLE)
self.geometry("980x640")
self.minsize(820, 560)
self._log_q: queue.Queue[str] = queue.Queue()
self._orig_stdout = sys.stdout
self._orig_stderr = sys.stderr
self._pipeline_running: bool = False
self._cfg = None
self._view_cache: dict[str, BaseView] = {}
self._active_name: str = ""
if not (PROJECT_ROOT / "config.json").exists():
self.after(200, self.open_wizard)
else:
self._load_config()
self._apply_startup_language()
self._build_layout()
self._poll_log()
# =========================================================================
# Public interface (used by views)
# =========================================================================
@property
def cfg(self) -> Optional["Config"]:
"""Loaded Config object, or None if config.json is missing/invalid."""
return self._cfg
@property
def pipeline_running(self) -> bool:
return self._pipeline_running
def navigate_to(self, view_name: str) -> None:
"""Switch the content area to the named view, refreshing its data."""
assert view_name in _VIEW_NAMES, f"Unknown view: {view_name}"
# Build view on first visit
if view_name not in self._view_cache:
cls = _get_view_class(view_name)
view = cls(self._content, self)
self._view_cache[view_name] = view
# Hide current
if self._active_name and self._active_name in self._view_cache:
old = self._view_cache[self._active_name]
old.grid_forget()
old.lower() # push canvas-based widgets below everything on Windows
# Show new
view = self._view_cache[view_name]
view.grid(row=0, column=0, sticky="nsew")
view.lift() # ensure it's above any lingering hidden views
view.refresh()
self._active_name = view_name
self._nav_select(view_name)
def post_log(self, text: str) -> None:
"""Thread-safe: enqueue text for the Logs panel."""
self._log_q.put(text)
def switch_language(self, lang: str) -> None:
"""Switch the UI language and refresh all cached views."""
from gui import locales
locales.set_language(lang)
self._save_language_pref(lang)
self._rebuild_nav_labels()
for view in self._view_cache.values():
view.refresh()
if self._active_name:
self.navigate_to(self._active_name)
def run_pipeline(self, selected_names: set[str]) -> None:
"""Start the background pipeline for the given preset filenames."""
if self._pipeline_running:
return
if len(selected_names) < 2:
messagebox.showwarning(
t("app.dlg_presets_title"),
t("app.dlg_presets_body"),
)
return
cfg = self._cfg
if not cfg:
messagebox.showwarning(t("app.dlg_setup_title"), t("app.dlg_setup_body"))
return
self._pipeline_running = True
self._get_dashboard().set_pipeline_ui(running=True)
self.navigate_to("Logs")
# Post an immediate banner so the log is never blank after clicking Start.
_sep = "=" * 50
self.post_log(f"\n{_sep}\n {t('pipeline.starting')}\n{_sep}\n\n")
def worker() -> None:
# run.py calls fix_console_encoding() at import time, which needs
# the real sys.stdout.buffer. Import it before we redirect stdout.
try:
from run import step_migrate, step_fetch, step_link
except Exception as _import_err:
self.after(0, lambda: self.post_log(
f"\n✗ Failed to load pipeline: {_import_err}\n"
))
self.after(0, self._pipeline_done)
return
self._redirect_output()
try:
from arma_modlist_tools.parser import parse_modlist_html
from arma_modlist_tools.compare import compare_presets
# Step 1 — Parse selected presets
_hdr("Step 1 / 5", t("pipeline.step1_name"))
cfg.modlist_json.mkdir(exist_ok=True)
presets = []
for fp in sorted(cfg.modlist_html.glob("*.html")):
if fp.name not in selected_names:
print(f" SKIP {fp.name}")
continue
preset = parse_modlist_html(fp)
out = cfg.modlist_json / (preset["preset_name"] + ".json")
out.write_text(
json.dumps(preset, indent=2, ensure_ascii=False),
encoding="utf-8",
)
print(f" {fp.name}{out.name} ({preset['mod_count']} mods)")
presets.append(preset)
# Step 2 — Compare
_hdr("Step 2 / 5", t("pipeline.step2_name"))
result = compare_presets(*presets)
cfg.comparison.write_text(
json.dumps(result, indent=2, ensure_ascii=False),
encoding="utf-8",
)
total_unique = sum(v["mod_count"] for v in result["unique"].values())
print(f" Compared: {', '.join(result['compared_presets'])}")
print(f" Shared: {result['shared']['mod_count']} | "
f"Unique: {total_unique}")
# Step 3 — Migrate
_hdr("Step 3 / 5", t("pipeline.step3_name"))
step_migrate(cfg)
# Step 4 — Fetch
_hdr("Step 4 / 5", t("pipeline.step4_name"))
step_fetch(cfg)
# Step 5 — Link
_hdr("Step 5 / 5", t("pipeline.step5_name"))
groups = (
sorted(p.name for p in cfg.downloads.iterdir() if p.is_dir())
if cfg.downloads.is_dir() else []
)
step_link(cfg, groups)
print("\n✓ Pipeline complete.\n")
except Exception as e:
print(f"\n✗ Error: {e}\n")
import traceback
traceback.print_exc()
finally:
self._restore_output()
self.after(0, self._pipeline_done)
threading.Thread(target=worker, daemon=True).start()
def run_tool(self, script_args: list[str]) -> None:
"""Run a maintenance script via subprocess, streaming output to Logs."""
script = script_args[0]
extra = script_args[1:]
def worker() -> None:
self.post_log(f"\n{''*50}\n {' '.join(script_args)}\n{''*50}\n")
try:
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
env["PYTHONUTF8"] = "1"
proc = subprocess.Popen(
[sys.executable, "-u", str(PROJECT_ROOT / script)] + extra,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="replace",
cwd=str(PROJECT_ROOT),
env=env,
)
for line in iter(proc.stdout.readline, ""):
self.post_log(line)
proc.wait()
ok = proc.returncode == 0
done_msg = (
t("app.tool_done") if ok
else t("app.tool_exit_code", code=proc.returncode)
)
self.post_log(f"\n{done_msg}.\n")
except Exception as e:
self.post_log(f"\n{t('app.tool_failed', script=script, e=e)}\n")
threading.Thread(target=worker, daemon=True).start()
def load_selection(self) -> set[str]:
"""Return selected preset filenames, defaulting to all if no file saved."""
if SELECTION_FILE.exists():
try:
data = json.loads(SELECTION_FILE.read_text(encoding="utf-8"))
return set(data.get("selected", []))
except Exception:
pass
if self._cfg and self._cfg.modlist_html.is_dir():
return {f.name for f in self._cfg.modlist_html.glob("*.html")}
return set()
def save_selection(self, selected: set[str]) -> None:
SELECTION_FILE.write_text(
json.dumps({"selected": sorted(selected)}, indent=2),
encoding="utf-8",
)
def open_wizard(self) -> None:
SetupWizard(self, on_complete=self._after_wizard)
# =========================================================================
# Private — language
# =========================================================================
@staticmethod
def _read_raw_config() -> dict:
"""Return config.json as a raw dict, or {} on missing / parse error."""
try:
return json.loads((PROJECT_ROOT / "config.json").read_text(encoding="utf-8"))
except Exception:
return {}
def _apply_startup_language(self) -> None:
"""Read language preference from config.json and activate it."""
from gui import locales
lang = self._read_raw_config().get("ui", {}).get("language", "en")
locales.set_language(lang)
def _save_language_pref(self, lang: str) -> None:
"""Persist language preference into the 'ui' key of config.json."""
try:
raw = self._read_raw_config()
raw.setdefault("ui", {})["language"] = lang
(PROJECT_ROOT / "config.json").write_text(
json.dumps(raw, indent=2), encoding="utf-8"
)
except Exception:
pass # non-fatal — language preference simply resets next run
def _rebuild_nav_labels(self) -> None:
"""Retranslate the sidebar navigation button labels."""
for name, btn in self._nav_btns.items():
btn.configure(text=t(f"nav.{name.lower()}"))
# =========================================================================
# Private — layout
# =========================================================================
def _build_layout(self) -> None:
self.grid_columnconfigure(1, weight=1)
self.grid_rowconfigure(0, weight=1)
# Sidebar
sb = ctk.CTkFrame(self, width=SIDEBAR_W, corner_radius=0)
sb.grid(row=0, column=0, sticky="nsew")
sb.grid_propagate(False)
sb.grid_rowconfigure(10, weight=1)
ctk.CTkLabel(
sb, text=APP_TITLE,
font=ctk.CTkFont(size=15, weight="bold"),
wraplength=SIDEBAR_W - 16, justify="center",
).grid(row=0, column=0, padx=12, pady=(22, 14))
self._nav_btns: dict[str, ctk.CTkButton] = {}
for i, name in enumerate(_VIEW_NAMES, start=1):
b = ctk.CTkButton(
sb, text=t(f"nav.{name.lower()}"), width=SIDEBAR_W - 24,
anchor="w", command=lambda n=name: self.navigate_to(n),
fg_color="transparent",
hover_color=("gray80", "gray30"),
text_color=("gray10", "gray90"),
)
b.grid(row=i, column=0, padx=12, pady=3)
self._nav_btns[name] = b
# Content area
self._content = ctk.CTkFrame(self, fg_color="transparent", corner_radius=0)
self._content.grid(row=0, column=1, sticky="nsew")
self._content.grid_columnconfigure(0, weight=1)
self._content.grid_rowconfigure(0, weight=1)
self.navigate_to("Dashboard")
def _nav_select(self, name: str) -> None:
for lbl, btn in self._nav_btns.items():
btn.configure(
fg_color=("gray75", "gray25") if lbl == name else "transparent"
)
# =========================================================================
# Private — config
# =========================================================================
def _load_config(self) -> None:
try:
from arma_modlist_tools.config import load_config
self._cfg = load_config(PROJECT_ROOT / "config.json")
except Exception as e:
self._cfg = None
self.post_log(f"[config] {e}\n")
def _after_wizard(self) -> None:
self._load_config()
# grid_forget before popping — navigate_to can't hide a view that's
# already been removed from _view_cache, so we do it manually first.
for name in ("Dashboard", "Settings"):
view = self._view_cache.pop(name, None)
if view is not None:
view.grid_forget()
view.lower()
self._active_name = ""
self.navigate_to("Dashboard")
# =========================================================================
# Private — logging
# =========================================================================
def _poll_log(self) -> None:
parts: list[str] = []
try:
while True:
parts.append(self._log_q.get_nowait())
except queue.Empty:
pass
if parts:
logs_view = self._view_cache.get("Logs")
if logs_view is not None and hasattr(logs_view, "append"):
logs_view.append("".join(parts)) # type: ignore[attr-defined]
self.after(80, self._poll_log)
def _redirect_output(self) -> None:
writer = _QueueWriter(self._log_q)
sys.stdout = writer
sys.stderr = writer
def _restore_output(self) -> None:
sys.stdout = self._orig_stdout
sys.stderr = self._orig_stderr
# =========================================================================
# Private — pipeline lifecycle
# =========================================================================
def _pipeline_done(self) -> None:
self._pipeline_running = False
self._get_dashboard().set_pipeline_ui(running=False)
# Refresh all cached views so data is current without manual refresh
for view in self._view_cache.values():
view.refresh()
def _get_dashboard(self) -> "DashboardView":
from gui.views.dashboard import DashboardView
view = self._view_cache.get("Dashboard")
if not isinstance(view, DashboardView):
# Build it if not yet visited (shouldn't normally happen)
view = DashboardView(self._content, self)
self._view_cache["Dashboard"] = view
return view
# ---------------------------------------------------------------------------
# Module-level helper
# ---------------------------------------------------------------------------
def _hdr(step: str, name: str) -> None:
print(f"\n{'='*50}")
print(f" {step}: {name}")
print(f"{'='*50}\n")