GUI log batching (_poll_log now drains queue into a single CTkTextbox.insert
call per 80 ms tick instead of N calls, each with see("end") scroll).
_QueueWriter strips ANSI/CSI escape codes and bare \r before enqueuing so
tqdm progress output is legible in the log textbox. OSC sequences terminated
by both BEL (\x07) and ST (\x1b\) are handled.
Wizard "Test Connection" moved off the main thread: requests.get runs in a
daemon thread; result posted back via after(0, ...). Widget refs captured
before thread launch to prevent stale updates if user navigates away. Bare
except narrowed to TclError (destroyed-widget guard only).
Code quality: import os moved to module level in app.py; _read_raw_config()
helper extracted to deduplicate dual raw config.json reads; return type
annotations added to _get_view_class, _get_dashboard, and cfg property.
Tests: 11 new unit tests for _QueueWriter (RED -> GREEN on OSC-ST fix).
421 lines
16 KiB
Python
421 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import queue
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Optional
|
|
|
|
if TYPE_CHECKING:
|
|
from arma_modlist_tools.config import Config
|
|
from gui.views.dashboard import DashboardView
|
|
|
|
import customtkinter as ctk
|
|
from tkinter import messagebox
|
|
|
|
from gui._constants import (
|
|
SIDEBAR_W, APP_TITLE, PROJECT_ROOT, SELECTION_FILE,
|
|
)
|
|
from gui._io import _QueueWriter
|
|
from gui.locales import t
|
|
from gui.wizard import SetupWizard
|
|
from gui.views.base import BaseView
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# View name → class mapping (imported lazily to avoid circular imports)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_VIEW_NAMES = ("Dashboard", "Mods", "Tools", "Logs", "Settings")
|
|
|
|
|
|
def _get_view_class(name: str) -> type[BaseView]:
|
|
from gui.views import DashboardView, ModsView, ToolsView, LogsView, SettingsView
|
|
return {
|
|
"Dashboard": DashboardView,
|
|
"Mods": ModsView,
|
|
"Tools": ToolsView,
|
|
"Logs": LogsView,
|
|
"Settings": SettingsView,
|
|
}[name]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main application
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class ArmaModManagerApp(ctk.CTk):
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.title(APP_TITLE)
|
|
self.geometry("980x640")
|
|
self.minsize(820, 560)
|
|
|
|
self._log_q: queue.Queue[str] = queue.Queue()
|
|
self._orig_stdout = sys.stdout
|
|
self._orig_stderr = sys.stderr
|
|
self._pipeline_running: bool = False
|
|
self._cfg = None
|
|
self._view_cache: dict[str, BaseView] = {}
|
|
self._active_name: str = ""
|
|
|
|
if not (PROJECT_ROOT / "config.json").exists():
|
|
self.after(200, self.open_wizard)
|
|
else:
|
|
self._load_config()
|
|
|
|
self._apply_startup_language()
|
|
self._build_layout()
|
|
self._poll_log()
|
|
|
|
# =========================================================================
|
|
# Public interface (used by views)
|
|
# =========================================================================
|
|
|
|
@property
|
|
def cfg(self) -> Optional["Config"]:
|
|
"""Loaded Config object, or None if config.json is missing/invalid."""
|
|
return self._cfg
|
|
|
|
@property
|
|
def pipeline_running(self) -> bool:
|
|
return self._pipeline_running
|
|
|
|
def navigate_to(self, view_name: str) -> None:
|
|
"""Switch the content area to the named view, refreshing its data."""
|
|
assert view_name in _VIEW_NAMES, f"Unknown view: {view_name}"
|
|
|
|
# Build view on first visit
|
|
if view_name not in self._view_cache:
|
|
cls = _get_view_class(view_name)
|
|
view = cls(self._content, self)
|
|
self._view_cache[view_name] = view
|
|
|
|
# Hide current
|
|
if self._active_name and self._active_name in self._view_cache:
|
|
old = self._view_cache[self._active_name]
|
|
old.grid_forget()
|
|
old.lower() # push canvas-based widgets below everything on Windows
|
|
|
|
# Show new
|
|
view = self._view_cache[view_name]
|
|
view.grid(row=0, column=0, sticky="nsew")
|
|
view.lift() # ensure it's above any lingering hidden views
|
|
view.refresh()
|
|
self._active_name = view_name
|
|
self._nav_select(view_name)
|
|
|
|
def post_log(self, text: str) -> None:
|
|
"""Thread-safe: enqueue text for the Logs panel."""
|
|
self._log_q.put(text)
|
|
|
|
def switch_language(self, lang: str) -> None:
|
|
"""Switch the UI language and refresh all cached views."""
|
|
from gui import locales
|
|
locales.set_language(lang)
|
|
self._save_language_pref(lang)
|
|
self._rebuild_nav_labels()
|
|
for view in self._view_cache.values():
|
|
view.refresh()
|
|
if self._active_name:
|
|
self.navigate_to(self._active_name)
|
|
|
|
def run_pipeline(self, selected_names: set[str]) -> None:
|
|
"""Start the background pipeline for the given preset filenames."""
|
|
if self._pipeline_running:
|
|
return
|
|
if len(selected_names) < 2:
|
|
messagebox.showwarning(
|
|
t("app.dlg_presets_title"),
|
|
t("app.dlg_presets_body"),
|
|
)
|
|
return
|
|
|
|
cfg = self._cfg
|
|
if not cfg:
|
|
messagebox.showwarning(t("app.dlg_setup_title"), t("app.dlg_setup_body"))
|
|
return
|
|
|
|
self._pipeline_running = True
|
|
self._get_dashboard().set_pipeline_ui(running=True)
|
|
self.navigate_to("Logs")
|
|
|
|
def worker() -> None:
|
|
# run.py calls fix_console_encoding() at import time, which needs
|
|
# the real sys.stdout.buffer. Import it before we redirect stdout.
|
|
from run import step_fetch, step_link
|
|
self._redirect_output()
|
|
try:
|
|
from arma_modlist_tools.parser import parse_modlist_html
|
|
from arma_modlist_tools.compare import compare_presets
|
|
|
|
# Step 1 — Parse selected presets
|
|
_hdr("Step 1 / 4", t("pipeline.step1_name"))
|
|
cfg.modlist_json.mkdir(exist_ok=True)
|
|
presets = []
|
|
for fp in sorted(cfg.modlist_html.glob("*.html")):
|
|
if fp.name not in selected_names:
|
|
print(f" SKIP {fp.name}")
|
|
continue
|
|
preset = parse_modlist_html(fp)
|
|
out = cfg.modlist_json / (preset["preset_name"] + ".json")
|
|
out.write_text(
|
|
json.dumps(preset, indent=2, ensure_ascii=False),
|
|
encoding="utf-8",
|
|
)
|
|
print(f" {fp.name} → {out.name} ({preset['mod_count']} mods)")
|
|
presets.append(preset)
|
|
|
|
# Step 2 — Compare
|
|
_hdr("Step 2 / 4", t("pipeline.step2_name"))
|
|
result = compare_presets(*presets)
|
|
cfg.comparison.write_text(
|
|
json.dumps(result, indent=2, ensure_ascii=False),
|
|
encoding="utf-8",
|
|
)
|
|
total_unique = sum(v["mod_count"] for v in result["unique"].values())
|
|
print(f" Compared: {', '.join(result['compared_presets'])}")
|
|
print(f" Shared: {result['shared']['mod_count']} | "
|
|
f"Unique: {total_unique}")
|
|
|
|
# Step 3 — Fetch
|
|
_hdr("Step 3 / 4", t("pipeline.step3_name"))
|
|
step_fetch(cfg)
|
|
|
|
# Step 4 — Link
|
|
_hdr("Step 4 / 4", t("pipeline.step4_name"))
|
|
groups = (
|
|
sorted(p.name for p in cfg.downloads.iterdir() if p.is_dir())
|
|
if cfg.downloads.is_dir() else []
|
|
)
|
|
step_link(cfg, groups)
|
|
|
|
print("\n✓ Pipeline complete.\n")
|
|
|
|
except Exception as e:
|
|
print(f"\n✗ Error: {e}\n")
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
self._restore_output()
|
|
self.after(0, self._pipeline_done)
|
|
|
|
threading.Thread(target=worker, daemon=True).start()
|
|
|
|
def run_tool(self, script_args: list[str]) -> None:
|
|
"""Run a maintenance script via subprocess, streaming output to Logs."""
|
|
script = script_args[0]
|
|
extra = script_args[1:]
|
|
|
|
def worker() -> None:
|
|
self.post_log(f"\n{'─'*50}\n {' '.join(script_args)}\n{'─'*50}\n")
|
|
try:
|
|
env = os.environ.copy()
|
|
env["PYTHONUNBUFFERED"] = "1"
|
|
proc = subprocess.Popen(
|
|
[sys.executable, "-u", str(PROJECT_ROOT / script)] + extra,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
cwd=str(PROJECT_ROOT),
|
|
env=env,
|
|
)
|
|
for line in iter(proc.stdout.readline, ""):
|
|
self.post_log(line)
|
|
proc.wait()
|
|
ok = proc.returncode == 0
|
|
done_msg = (
|
|
t("app.tool_done") if ok
|
|
else t("app.tool_exit_code", code=proc.returncode)
|
|
)
|
|
self.post_log(f"\n{done_msg}.\n")
|
|
except Exception as e:
|
|
self.post_log(f"\n{t('app.tool_failed', script=script, e=e)}\n")
|
|
|
|
threading.Thread(target=worker, daemon=True).start()
|
|
|
|
def load_selection(self) -> set[str]:
|
|
"""Return selected preset filenames, defaulting to all if no file saved."""
|
|
if SELECTION_FILE.exists():
|
|
try:
|
|
data = json.loads(SELECTION_FILE.read_text(encoding="utf-8"))
|
|
return set(data.get("selected", []))
|
|
except Exception:
|
|
pass
|
|
if self._cfg and self._cfg.modlist_html.is_dir():
|
|
return {f.name for f in self._cfg.modlist_html.glob("*.html")}
|
|
return set()
|
|
|
|
def save_selection(self, selected: set[str]) -> None:
|
|
SELECTION_FILE.write_text(
|
|
json.dumps({"selected": sorted(selected)}, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
def open_wizard(self) -> None:
|
|
SetupWizard(self, on_complete=self._after_wizard)
|
|
|
|
# =========================================================================
|
|
# Private — language
|
|
# =========================================================================
|
|
|
|
@staticmethod
|
|
def _read_raw_config() -> dict:
|
|
"""Return config.json as a raw dict, or {} on missing / parse error."""
|
|
try:
|
|
return json.loads((PROJECT_ROOT / "config.json").read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
|
|
def _apply_startup_language(self) -> None:
|
|
"""Read language preference from config.json and activate it."""
|
|
from gui import locales
|
|
lang = self._read_raw_config().get("ui", {}).get("language", "en")
|
|
locales.set_language(lang)
|
|
|
|
def _save_language_pref(self, lang: str) -> None:
|
|
"""Persist language preference into the 'ui' key of config.json."""
|
|
try:
|
|
raw = self._read_raw_config()
|
|
raw.setdefault("ui", {})["language"] = lang
|
|
(PROJECT_ROOT / "config.json").write_text(
|
|
json.dumps(raw, indent=2), encoding="utf-8"
|
|
)
|
|
except Exception:
|
|
pass # non-fatal — language preference simply resets next run
|
|
|
|
def _rebuild_nav_labels(self) -> None:
|
|
"""Retranslate the sidebar navigation button labels."""
|
|
for name, btn in self._nav_btns.items():
|
|
btn.configure(text=t(f"nav.{name.lower()}"))
|
|
|
|
# =========================================================================
|
|
# Private — layout
|
|
# =========================================================================
|
|
|
|
def _build_layout(self) -> None:
|
|
self.grid_columnconfigure(1, weight=1)
|
|
self.grid_rowconfigure(0, weight=1)
|
|
|
|
# Sidebar
|
|
sb = ctk.CTkFrame(self, width=SIDEBAR_W, corner_radius=0)
|
|
sb.grid(row=0, column=0, sticky="nsew")
|
|
sb.grid_propagate(False)
|
|
sb.grid_rowconfigure(10, weight=1)
|
|
|
|
ctk.CTkLabel(
|
|
sb, text=APP_TITLE,
|
|
font=ctk.CTkFont(size=15, weight="bold"),
|
|
wraplength=SIDEBAR_W - 16, justify="center",
|
|
).grid(row=0, column=0, padx=12, pady=(22, 14))
|
|
|
|
self._nav_btns: dict[str, ctk.CTkButton] = {}
|
|
for i, name in enumerate(_VIEW_NAMES, start=1):
|
|
b = ctk.CTkButton(
|
|
sb, text=t(f"nav.{name.lower()}"), width=SIDEBAR_W - 24,
|
|
anchor="w", command=lambda n=name: self.navigate_to(n),
|
|
fg_color="transparent",
|
|
hover_color=("gray80", "gray30"),
|
|
text_color=("gray10", "gray90"),
|
|
)
|
|
b.grid(row=i, column=0, padx=12, pady=3)
|
|
self._nav_btns[name] = b
|
|
|
|
# Content area
|
|
self._content = ctk.CTkFrame(self, fg_color="transparent", corner_radius=0)
|
|
self._content.grid(row=0, column=1, sticky="nsew")
|
|
self._content.grid_columnconfigure(0, weight=1)
|
|
self._content.grid_rowconfigure(0, weight=1)
|
|
|
|
self.navigate_to("Dashboard")
|
|
|
|
def _nav_select(self, name: str) -> None:
|
|
for lbl, btn in self._nav_btns.items():
|
|
btn.configure(
|
|
fg_color=("gray75", "gray25") if lbl == name else "transparent"
|
|
)
|
|
|
|
# =========================================================================
|
|
# Private — config
|
|
# =========================================================================
|
|
|
|
def _load_config(self) -> None:
|
|
try:
|
|
from arma_modlist_tools.config import load_config
|
|
self._cfg = load_config(PROJECT_ROOT / "config.json")
|
|
except Exception as e:
|
|
self._cfg = None
|
|
self.post_log(f"[config] {e}\n")
|
|
|
|
def _after_wizard(self) -> None:
|
|
self._load_config()
|
|
# grid_forget before popping — navigate_to can't hide a view that's
|
|
# already been removed from _view_cache, so we do it manually first.
|
|
for name in ("Dashboard", "Settings"):
|
|
view = self._view_cache.pop(name, None)
|
|
if view is not None:
|
|
view.grid_forget()
|
|
view.lower()
|
|
self._active_name = ""
|
|
self.navigate_to("Dashboard")
|
|
|
|
# =========================================================================
|
|
# Private — logging
|
|
# =========================================================================
|
|
|
|
def _poll_log(self) -> None:
|
|
parts: list[str] = []
|
|
try:
|
|
while True:
|
|
parts.append(self._log_q.get_nowait())
|
|
except queue.Empty:
|
|
pass
|
|
if parts:
|
|
logs_view = self._view_cache.get("Logs")
|
|
if logs_view is not None and hasattr(logs_view, "append"):
|
|
logs_view.append("".join(parts)) # type: ignore[attr-defined]
|
|
self.after(80, self._poll_log)
|
|
|
|
def _redirect_output(self) -> None:
|
|
writer = _QueueWriter(self._log_q)
|
|
sys.stdout = writer
|
|
sys.stderr = writer
|
|
|
|
def _restore_output(self) -> None:
|
|
sys.stdout = self._orig_stdout
|
|
sys.stderr = self._orig_stderr
|
|
|
|
# =========================================================================
|
|
# Private — pipeline lifecycle
|
|
# =========================================================================
|
|
|
|
def _pipeline_done(self) -> None:
|
|
self._pipeline_running = False
|
|
self._get_dashboard().set_pipeline_ui(running=False)
|
|
# Refresh all cached views so data is current without manual refresh
|
|
for view in self._view_cache.values():
|
|
view.refresh()
|
|
|
|
def _get_dashboard(self) -> "DashboardView":
|
|
from gui.views.dashboard import DashboardView
|
|
view = self._view_cache.get("Dashboard")
|
|
if not isinstance(view, DashboardView):
|
|
# Build it if not yet visited (shouldn't normally happen)
|
|
view = DashboardView(self._content, self)
|
|
self._view_cache["Dashboard"] = view
|
|
return view
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Module-level helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _hdr(step: str, name: str) -> None:
|
|
print(f"\n{'='*50}")
|
|
print(f" {step}: {name}")
|
|
print(f"{'='*50}\n")
|