Before step_fetch, scan all downloads/ subdirs and move any mod that comparison.json now assigns to a different group. Matching uses steam_id (via meta.cpp publishedid) first, normalized name as fallback. Stale junctions in arma_dir are removed before the folder move so step_link can re-create them pointing to the new location. - New arma_modlist_tools/migrator.py: migrate_mod_groups() - run.py: step_migrate(), --skip-migrate flag, wired into dispatch loop - gui/app.py: step_migrate inserted as Step 3/5 between compare and fetch - gui/locales.py: add step3/4/5 names (en + vi), renumber old 3->4, 4->5 - test_suite.py: 7 new migrator tests (158 total, 0 failed)
439 lines
16 KiB
Python
439 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import queue
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Optional
|
|
|
|
if TYPE_CHECKING:
|
|
from arma_modlist_tools.config import Config
|
|
from gui.views.dashboard import DashboardView
|
|
|
|
import customtkinter as ctk
|
|
from tkinter import messagebox
|
|
|
|
from gui._constants import (
|
|
SIDEBAR_W, APP_TITLE, PROJECT_ROOT, SELECTION_FILE,
|
|
)
|
|
from gui._io import _QueueWriter
|
|
from gui.locales import t
|
|
from gui.wizard import SetupWizard
|
|
from gui.views.base import BaseView
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# View name → class mapping (imported lazily to avoid circular imports)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_VIEW_NAMES = ("Dashboard", "Mods", "Tools", "Logs", "Settings")
|
|
|
|
|
|
def _get_view_class(name: str) -> type[BaseView]:
|
|
from gui.views import DashboardView, ModsView, ToolsView, LogsView, SettingsView
|
|
return {
|
|
"Dashboard": DashboardView,
|
|
"Mods": ModsView,
|
|
"Tools": ToolsView,
|
|
"Logs": LogsView,
|
|
"Settings": SettingsView,
|
|
}[name]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main application
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class ArmaModManagerApp(ctk.CTk):
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.title(APP_TITLE)
|
|
self.geometry("980x640")
|
|
self.minsize(820, 560)
|
|
|
|
self._log_q: queue.Queue[str] = queue.Queue()
|
|
self._orig_stdout = sys.stdout
|
|
self._orig_stderr = sys.stderr
|
|
self._pipeline_running: bool = False
|
|
self._cfg = None
|
|
self._view_cache: dict[str, BaseView] = {}
|
|
self._active_name: str = ""
|
|
|
|
if not (PROJECT_ROOT / "config.json").exists():
|
|
self.after(200, self.open_wizard)
|
|
else:
|
|
self._load_config()
|
|
|
|
self._apply_startup_language()
|
|
self._build_layout()
|
|
self._poll_log()
|
|
|
|
# =========================================================================
|
|
# Public interface (used by views)
|
|
# =========================================================================
|
|
|
|
@property
|
|
def cfg(self) -> Optional["Config"]:
|
|
"""Loaded Config object, or None if config.json is missing/invalid."""
|
|
return self._cfg
|
|
|
|
@property
|
|
def pipeline_running(self) -> bool:
|
|
return self._pipeline_running
|
|
|
|
def navigate_to(self, view_name: str) -> None:
|
|
"""Switch the content area to the named view, refreshing its data."""
|
|
assert view_name in _VIEW_NAMES, f"Unknown view: {view_name}"
|
|
|
|
# Build view on first visit
|
|
if view_name not in self._view_cache:
|
|
cls = _get_view_class(view_name)
|
|
view = cls(self._content, self)
|
|
self._view_cache[view_name] = view
|
|
|
|
# Hide current
|
|
if self._active_name and self._active_name in self._view_cache:
|
|
old = self._view_cache[self._active_name]
|
|
old.grid_forget()
|
|
old.lower() # push canvas-based widgets below everything on Windows
|
|
|
|
# Show new
|
|
view = self._view_cache[view_name]
|
|
view.grid(row=0, column=0, sticky="nsew")
|
|
view.lift() # ensure it's above any lingering hidden views
|
|
view.refresh()
|
|
self._active_name = view_name
|
|
self._nav_select(view_name)
|
|
|
|
def post_log(self, text: str) -> None:
|
|
"""Thread-safe: enqueue text for the Logs panel."""
|
|
self._log_q.put(text)
|
|
|
|
def switch_language(self, lang: str) -> None:
|
|
"""Switch the UI language and refresh all cached views."""
|
|
from gui import locales
|
|
locales.set_language(lang)
|
|
self._save_language_pref(lang)
|
|
self._rebuild_nav_labels()
|
|
for view in self._view_cache.values():
|
|
view.refresh()
|
|
if self._active_name:
|
|
self.navigate_to(self._active_name)
|
|
|
|
def run_pipeline(self, selected_names: set[str]) -> None:
|
|
"""Start the background pipeline for the given preset filenames."""
|
|
if self._pipeline_running:
|
|
return
|
|
if len(selected_names) < 2:
|
|
messagebox.showwarning(
|
|
t("app.dlg_presets_title"),
|
|
t("app.dlg_presets_body"),
|
|
)
|
|
return
|
|
|
|
cfg = self._cfg
|
|
if not cfg:
|
|
messagebox.showwarning(t("app.dlg_setup_title"), t("app.dlg_setup_body"))
|
|
return
|
|
|
|
self._pipeline_running = True
|
|
self._get_dashboard().set_pipeline_ui(running=True)
|
|
self.navigate_to("Logs")
|
|
# Post an immediate banner so the log is never blank after clicking Start.
|
|
_sep = "=" * 50
|
|
self.post_log(f"\n{_sep}\n {t('pipeline.starting')}\n{_sep}\n\n")
|
|
|
|
def worker() -> None:
|
|
# run.py calls fix_console_encoding() at import time, which needs
|
|
# the real sys.stdout.buffer. Import it before we redirect stdout.
|
|
try:
|
|
from run import step_migrate, step_fetch, step_link
|
|
except Exception as _import_err:
|
|
self.after(0, lambda: self.post_log(
|
|
f"\n✗ Failed to load pipeline: {_import_err}\n"
|
|
))
|
|
self.after(0, self._pipeline_done)
|
|
return
|
|
|
|
self._redirect_output()
|
|
try:
|
|
from arma_modlist_tools.parser import parse_modlist_html
|
|
from arma_modlist_tools.compare import compare_presets
|
|
|
|
# Step 1 — Parse selected presets
|
|
_hdr("Step 1 / 5", t("pipeline.step1_name"))
|
|
cfg.modlist_json.mkdir(exist_ok=True)
|
|
presets = []
|
|
for fp in sorted(cfg.modlist_html.glob("*.html")):
|
|
if fp.name not in selected_names:
|
|
print(f" SKIP {fp.name}")
|
|
continue
|
|
preset = parse_modlist_html(fp)
|
|
out = cfg.modlist_json / (preset["preset_name"] + ".json")
|
|
out.write_text(
|
|
json.dumps(preset, indent=2, ensure_ascii=False),
|
|
encoding="utf-8",
|
|
)
|
|
print(f" {fp.name} → {out.name} ({preset['mod_count']} mods)")
|
|
presets.append(preset)
|
|
|
|
# Step 2 — Compare
|
|
_hdr("Step 2 / 5", t("pipeline.step2_name"))
|
|
result = compare_presets(*presets)
|
|
cfg.comparison.write_text(
|
|
json.dumps(result, indent=2, ensure_ascii=False),
|
|
encoding="utf-8",
|
|
)
|
|
total_unique = sum(v["mod_count"] for v in result["unique"].values())
|
|
print(f" Compared: {', '.join(result['compared_presets'])}")
|
|
print(f" Shared: {result['shared']['mod_count']} | "
|
|
f"Unique: {total_unique}")
|
|
|
|
# Step 3 — Migrate
|
|
_hdr("Step 3 / 5", t("pipeline.step3_name"))
|
|
step_migrate(cfg)
|
|
|
|
# Step 4 — Fetch
|
|
_hdr("Step 4 / 5", t("pipeline.step4_name"))
|
|
step_fetch(cfg)
|
|
|
|
# Step 5 — Link
|
|
_hdr("Step 5 / 5", t("pipeline.step5_name"))
|
|
groups = (
|
|
sorted(p.name for p in cfg.downloads.iterdir() if p.is_dir())
|
|
if cfg.downloads.is_dir() else []
|
|
)
|
|
step_link(cfg, groups)
|
|
|
|
print("\n✓ Pipeline complete.\n")
|
|
|
|
except Exception as e:
|
|
print(f"\n✗ Error: {e}\n")
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
self._restore_output()
|
|
self.after(0, self._pipeline_done)
|
|
|
|
threading.Thread(target=worker, daemon=True).start()
|
|
|
|
def run_tool(self, script_args: list[str]) -> None:
|
|
"""Run a maintenance script via subprocess, streaming output to Logs."""
|
|
script = script_args[0]
|
|
extra = script_args[1:]
|
|
|
|
def worker() -> None:
|
|
self.post_log(f"\n{'─'*50}\n {' '.join(script_args)}\n{'─'*50}\n")
|
|
try:
|
|
env = os.environ.copy()
|
|
env["PYTHONUNBUFFERED"] = "1"
|
|
env["PYTHONUTF8"] = "1"
|
|
proc = subprocess.Popen(
|
|
[sys.executable, "-u", str(PROJECT_ROOT / script)] + extra,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
cwd=str(PROJECT_ROOT),
|
|
env=env,
|
|
)
|
|
for line in iter(proc.stdout.readline, ""):
|
|
self.post_log(line)
|
|
proc.wait()
|
|
ok = proc.returncode == 0
|
|
done_msg = (
|
|
t("app.tool_done") if ok
|
|
else t("app.tool_exit_code", code=proc.returncode)
|
|
)
|
|
self.post_log(f"\n{done_msg}.\n")
|
|
except Exception as e:
|
|
self.post_log(f"\n{t('app.tool_failed', script=script, e=e)}\n")
|
|
|
|
threading.Thread(target=worker, daemon=True).start()
|
|
|
|
def load_selection(self) -> set[str]:
|
|
"""Return selected preset filenames, defaulting to all if no file saved."""
|
|
if SELECTION_FILE.exists():
|
|
try:
|
|
data = json.loads(SELECTION_FILE.read_text(encoding="utf-8"))
|
|
return set(data.get("selected", []))
|
|
except Exception:
|
|
pass
|
|
if self._cfg and self._cfg.modlist_html.is_dir():
|
|
return {f.name for f in self._cfg.modlist_html.glob("*.html")}
|
|
return set()
|
|
|
|
def save_selection(self, selected: set[str]) -> None:
|
|
SELECTION_FILE.write_text(
|
|
json.dumps({"selected": sorted(selected)}, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
def open_wizard(self) -> None:
|
|
SetupWizard(self, on_complete=self._after_wizard)
|
|
|
|
# =========================================================================
|
|
# Private — language
|
|
# =========================================================================
|
|
|
|
@staticmethod
|
|
def _read_raw_config() -> dict:
|
|
"""Return config.json as a raw dict, or {} on missing / parse error."""
|
|
try:
|
|
return json.loads((PROJECT_ROOT / "config.json").read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
|
|
def _apply_startup_language(self) -> None:
|
|
"""Read language preference from config.json and activate it."""
|
|
from gui import locales
|
|
lang = self._read_raw_config().get("ui", {}).get("language", "en")
|
|
locales.set_language(lang)
|
|
|
|
def _save_language_pref(self, lang: str) -> None:
|
|
"""Persist language preference into the 'ui' key of config.json."""
|
|
try:
|
|
raw = self._read_raw_config()
|
|
raw.setdefault("ui", {})["language"] = lang
|
|
(PROJECT_ROOT / "config.json").write_text(
|
|
json.dumps(raw, indent=2), encoding="utf-8"
|
|
)
|
|
except Exception:
|
|
pass # non-fatal — language preference simply resets next run
|
|
|
|
def _rebuild_nav_labels(self) -> None:
|
|
"""Retranslate the sidebar navigation button labels."""
|
|
for name, btn in self._nav_btns.items():
|
|
btn.configure(text=t(f"nav.{name.lower()}"))
|
|
|
|
# =========================================================================
|
|
# Private — layout
|
|
# =========================================================================
|
|
|
|
def _build_layout(self) -> None:
|
|
self.grid_columnconfigure(1, weight=1)
|
|
self.grid_rowconfigure(0, weight=1)
|
|
|
|
# Sidebar
|
|
sb = ctk.CTkFrame(self, width=SIDEBAR_W, corner_radius=0)
|
|
sb.grid(row=0, column=0, sticky="nsew")
|
|
sb.grid_propagate(False)
|
|
sb.grid_rowconfigure(10, weight=1)
|
|
|
|
ctk.CTkLabel(
|
|
sb, text=APP_TITLE,
|
|
font=ctk.CTkFont(size=15, weight="bold"),
|
|
wraplength=SIDEBAR_W - 16, justify="center",
|
|
).grid(row=0, column=0, padx=12, pady=(22, 14))
|
|
|
|
self._nav_btns: dict[str, ctk.CTkButton] = {}
|
|
for i, name in enumerate(_VIEW_NAMES, start=1):
|
|
b = ctk.CTkButton(
|
|
sb, text=t(f"nav.{name.lower()}"), width=SIDEBAR_W - 24,
|
|
anchor="w", command=lambda n=name: self.navigate_to(n),
|
|
fg_color="transparent",
|
|
hover_color=("gray80", "gray30"),
|
|
text_color=("gray10", "gray90"),
|
|
)
|
|
b.grid(row=i, column=0, padx=12, pady=3)
|
|
self._nav_btns[name] = b
|
|
|
|
# Content area
|
|
self._content = ctk.CTkFrame(self, fg_color="transparent", corner_radius=0)
|
|
self._content.grid(row=0, column=1, sticky="nsew")
|
|
self._content.grid_columnconfigure(0, weight=1)
|
|
self._content.grid_rowconfigure(0, weight=1)
|
|
|
|
self.navigate_to("Dashboard")
|
|
|
|
def _nav_select(self, name: str) -> None:
|
|
for lbl, btn in self._nav_btns.items():
|
|
btn.configure(
|
|
fg_color=("gray75", "gray25") if lbl == name else "transparent"
|
|
)
|
|
|
|
# =========================================================================
|
|
# Private — config
|
|
# =========================================================================
|
|
|
|
def _load_config(self) -> None:
|
|
try:
|
|
from arma_modlist_tools.config import load_config
|
|
self._cfg = load_config(PROJECT_ROOT / "config.json")
|
|
except Exception as e:
|
|
self._cfg = None
|
|
self.post_log(f"[config] {e}\n")
|
|
|
|
def _after_wizard(self) -> None:
|
|
self._load_config()
|
|
# grid_forget before popping — navigate_to can't hide a view that's
|
|
# already been removed from _view_cache, so we do it manually first.
|
|
for name in ("Dashboard", "Settings"):
|
|
view = self._view_cache.pop(name, None)
|
|
if view is not None:
|
|
view.grid_forget()
|
|
view.lower()
|
|
self._active_name = ""
|
|
self.navigate_to("Dashboard")
|
|
|
|
# =========================================================================
|
|
# Private — logging
|
|
# =========================================================================
|
|
|
|
def _poll_log(self) -> None:
|
|
parts: list[str] = []
|
|
try:
|
|
while True:
|
|
parts.append(self._log_q.get_nowait())
|
|
except queue.Empty:
|
|
pass
|
|
if parts:
|
|
logs_view = self._view_cache.get("Logs")
|
|
if logs_view is not None and hasattr(logs_view, "append"):
|
|
logs_view.append("".join(parts)) # type: ignore[attr-defined]
|
|
self.after(80, self._poll_log)
|
|
|
|
def _redirect_output(self) -> None:
|
|
writer = _QueueWriter(self._log_q)
|
|
sys.stdout = writer
|
|
sys.stderr = writer
|
|
|
|
def _restore_output(self) -> None:
|
|
sys.stdout = self._orig_stdout
|
|
sys.stderr = self._orig_stderr
|
|
|
|
# =========================================================================
|
|
# Private — pipeline lifecycle
|
|
# =========================================================================
|
|
|
|
def _pipeline_done(self) -> None:
|
|
self._pipeline_running = False
|
|
self._get_dashboard().set_pipeline_ui(running=False)
|
|
# Refresh all cached views so data is current without manual refresh
|
|
for view in self._view_cache.values():
|
|
view.refresh()
|
|
|
|
def _get_dashboard(self) -> "DashboardView":
|
|
from gui.views.dashboard import DashboardView
|
|
view = self._view_cache.get("Dashboard")
|
|
if not isinstance(view, DashboardView):
|
|
# Build it if not yet visited (shouldn't normally happen)
|
|
view = DashboardView(self._content, self)
|
|
self._view_cache["Dashboard"] = view
|
|
return view
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Module-level helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _hdr(step: str, name: str) -> None:
|
|
print(f"\n{'='*50}")
|
|
print(f" {step}: {name}")
|
|
print(f"{'='*50}\n")
|