Files
arma-modlist-tools/gui/app.py
Tran G. (Revernomad) Khoa 85bc406236 fix: smooth GUI during pipeline downloads and harden wizard connection test
GUI log batching (_poll_log now drains queue into a single CTkTextbox.insert
call per 80 ms tick instead of N calls, each with see("end") scroll).

_QueueWriter strips ANSI/CSI escape codes and bare \r before enqueuing so
tqdm progress output is legible in the log textbox. OSC sequences terminated
by both BEL (\x07) and ST (\x1b\) are handled.

Wizard "Test Connection" moved off the main thread: requests.get runs in a
daemon thread; result posted back via after(0, ...). Widget refs captured
before thread launch to prevent stale updates if user navigates away. Bare
except narrowed to TclError (destroyed-widget guard only).

Code quality: import os moved to module level in app.py; _read_raw_config()
helper extracted to deduplicate dual raw config.json reads; return type
annotations added to _get_view_class, _get_dashboard, and cfg property.

Tests: 11 new unit tests for _QueueWriter (RED -> GREEN on OSC-ST fix).
2026-04-08 17:27:25 +07:00

421 lines
16 KiB
Python

from __future__ import annotations
import json
import os
import queue
import subprocess
import sys
import threading
from pathlib import Path
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from arma_modlist_tools.config import Config
from gui.views.dashboard import DashboardView
import customtkinter as ctk
from tkinter import messagebox
from gui._constants import (
SIDEBAR_W, APP_TITLE, PROJECT_ROOT, SELECTION_FILE,
)
from gui._io import _QueueWriter
from gui.locales import t
from gui.wizard import SetupWizard
from gui.views.base import BaseView
# ---------------------------------------------------------------------------
# View name → class mapping (imported lazily to avoid circular imports)
# ---------------------------------------------------------------------------
_VIEW_NAMES = ("Dashboard", "Mods", "Tools", "Logs", "Settings")
def _get_view_class(name: str) -> type[BaseView]:
from gui.views import DashboardView, ModsView, ToolsView, LogsView, SettingsView
return {
"Dashboard": DashboardView,
"Mods": ModsView,
"Tools": ToolsView,
"Logs": LogsView,
"Settings": SettingsView,
}[name]
# ---------------------------------------------------------------------------
# Main application
# ---------------------------------------------------------------------------
class ArmaModManagerApp(ctk.CTk):
def __init__(self) -> None:
super().__init__()
self.title(APP_TITLE)
self.geometry("980x640")
self.minsize(820, 560)
self._log_q: queue.Queue[str] = queue.Queue()
self._orig_stdout = sys.stdout
self._orig_stderr = sys.stderr
self._pipeline_running: bool = False
self._cfg = None
self._view_cache: dict[str, BaseView] = {}
self._active_name: str = ""
if not (PROJECT_ROOT / "config.json").exists():
self.after(200, self.open_wizard)
else:
self._load_config()
self._apply_startup_language()
self._build_layout()
self._poll_log()
# =========================================================================
# Public interface (used by views)
# =========================================================================
@property
def cfg(self) -> Optional["Config"]:
"""Loaded Config object, or None if config.json is missing/invalid."""
return self._cfg
@property
def pipeline_running(self) -> bool:
return self._pipeline_running
def navigate_to(self, view_name: str) -> None:
"""Switch the content area to the named view, refreshing its data."""
assert view_name in _VIEW_NAMES, f"Unknown view: {view_name}"
# Build view on first visit
if view_name not in self._view_cache:
cls = _get_view_class(view_name)
view = cls(self._content, self)
self._view_cache[view_name] = view
# Hide current
if self._active_name and self._active_name in self._view_cache:
old = self._view_cache[self._active_name]
old.grid_forget()
old.lower() # push canvas-based widgets below everything on Windows
# Show new
view = self._view_cache[view_name]
view.grid(row=0, column=0, sticky="nsew")
view.lift() # ensure it's above any lingering hidden views
view.refresh()
self._active_name = view_name
self._nav_select(view_name)
def post_log(self, text: str) -> None:
"""Thread-safe: enqueue text for the Logs panel."""
self._log_q.put(text)
def switch_language(self, lang: str) -> None:
"""Switch the UI language and refresh all cached views."""
from gui import locales
locales.set_language(lang)
self._save_language_pref(lang)
self._rebuild_nav_labels()
for view in self._view_cache.values():
view.refresh()
if self._active_name:
self.navigate_to(self._active_name)
def run_pipeline(self, selected_names: set[str]) -> None:
"""Start the background pipeline for the given preset filenames."""
if self._pipeline_running:
return
if len(selected_names) < 2:
messagebox.showwarning(
t("app.dlg_presets_title"),
t("app.dlg_presets_body"),
)
return
cfg = self._cfg
if not cfg:
messagebox.showwarning(t("app.dlg_setup_title"), t("app.dlg_setup_body"))
return
self._pipeline_running = True
self._get_dashboard().set_pipeline_ui(running=True)
self.navigate_to("Logs")
def worker() -> None:
# run.py calls fix_console_encoding() at import time, which needs
# the real sys.stdout.buffer. Import it before we redirect stdout.
from run import step_fetch, step_link
self._redirect_output()
try:
from arma_modlist_tools.parser import parse_modlist_html
from arma_modlist_tools.compare import compare_presets
# Step 1 — Parse selected presets
_hdr("Step 1 / 4", t("pipeline.step1_name"))
cfg.modlist_json.mkdir(exist_ok=True)
presets = []
for fp in sorted(cfg.modlist_html.glob("*.html")):
if fp.name not in selected_names:
print(f" SKIP {fp.name}")
continue
preset = parse_modlist_html(fp)
out = cfg.modlist_json / (preset["preset_name"] + ".json")
out.write_text(
json.dumps(preset, indent=2, ensure_ascii=False),
encoding="utf-8",
)
print(f" {fp.name}{out.name} ({preset['mod_count']} mods)")
presets.append(preset)
# Step 2 — Compare
_hdr("Step 2 / 4", t("pipeline.step2_name"))
result = compare_presets(*presets)
cfg.comparison.write_text(
json.dumps(result, indent=2, ensure_ascii=False),
encoding="utf-8",
)
total_unique = sum(v["mod_count"] for v in result["unique"].values())
print(f" Compared: {', '.join(result['compared_presets'])}")
print(f" Shared: {result['shared']['mod_count']} | "
f"Unique: {total_unique}")
# Step 3 — Fetch
_hdr("Step 3 / 4", t("pipeline.step3_name"))
step_fetch(cfg)
# Step 4 — Link
_hdr("Step 4 / 4", t("pipeline.step4_name"))
groups = (
sorted(p.name for p in cfg.downloads.iterdir() if p.is_dir())
if cfg.downloads.is_dir() else []
)
step_link(cfg, groups)
print("\n✓ Pipeline complete.\n")
except Exception as e:
print(f"\n✗ Error: {e}\n")
import traceback
traceback.print_exc()
finally:
self._restore_output()
self.after(0, self._pipeline_done)
threading.Thread(target=worker, daemon=True).start()
def run_tool(self, script_args: list[str]) -> None:
"""Run a maintenance script via subprocess, streaming output to Logs."""
script = script_args[0]
extra = script_args[1:]
def worker() -> None:
self.post_log(f"\n{''*50}\n {' '.join(script_args)}\n{''*50}\n")
try:
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
proc = subprocess.Popen(
[sys.executable, "-u", str(PROJECT_ROOT / script)] + extra,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
cwd=str(PROJECT_ROOT),
env=env,
)
for line in iter(proc.stdout.readline, ""):
self.post_log(line)
proc.wait()
ok = proc.returncode == 0
done_msg = (
t("app.tool_done") if ok
else t("app.tool_exit_code", code=proc.returncode)
)
self.post_log(f"\n{done_msg}.\n")
except Exception as e:
self.post_log(f"\n{t('app.tool_failed', script=script, e=e)}\n")
threading.Thread(target=worker, daemon=True).start()
def load_selection(self) -> set[str]:
"""Return selected preset filenames, defaulting to all if no file saved."""
if SELECTION_FILE.exists():
try:
data = json.loads(SELECTION_FILE.read_text(encoding="utf-8"))
return set(data.get("selected", []))
except Exception:
pass
if self._cfg and self._cfg.modlist_html.is_dir():
return {f.name for f in self._cfg.modlist_html.glob("*.html")}
return set()
def save_selection(self, selected: set[str]) -> None:
SELECTION_FILE.write_text(
json.dumps({"selected": sorted(selected)}, indent=2),
encoding="utf-8",
)
def open_wizard(self) -> None:
SetupWizard(self, on_complete=self._after_wizard)
# =========================================================================
# Private — language
# =========================================================================
@staticmethod
def _read_raw_config() -> dict:
"""Return config.json as a raw dict, or {} on missing / parse error."""
try:
return json.loads((PROJECT_ROOT / "config.json").read_text(encoding="utf-8"))
except Exception:
return {}
def _apply_startup_language(self) -> None:
"""Read language preference from config.json and activate it."""
from gui import locales
lang = self._read_raw_config().get("ui", {}).get("language", "en")
locales.set_language(lang)
def _save_language_pref(self, lang: str) -> None:
"""Persist language preference into the 'ui' key of config.json."""
try:
raw = self._read_raw_config()
raw.setdefault("ui", {})["language"] = lang
(PROJECT_ROOT / "config.json").write_text(
json.dumps(raw, indent=2), encoding="utf-8"
)
except Exception:
pass # non-fatal — language preference simply resets next run
def _rebuild_nav_labels(self) -> None:
"""Retranslate the sidebar navigation button labels."""
for name, btn in self._nav_btns.items():
btn.configure(text=t(f"nav.{name.lower()}"))
# =========================================================================
# Private — layout
# =========================================================================
def _build_layout(self) -> None:
self.grid_columnconfigure(1, weight=1)
self.grid_rowconfigure(0, weight=1)
# Sidebar
sb = ctk.CTkFrame(self, width=SIDEBAR_W, corner_radius=0)
sb.grid(row=0, column=0, sticky="nsew")
sb.grid_propagate(False)
sb.grid_rowconfigure(10, weight=1)
ctk.CTkLabel(
sb, text=APP_TITLE,
font=ctk.CTkFont(size=15, weight="bold"),
wraplength=SIDEBAR_W - 16, justify="center",
).grid(row=0, column=0, padx=12, pady=(22, 14))
self._nav_btns: dict[str, ctk.CTkButton] = {}
for i, name in enumerate(_VIEW_NAMES, start=1):
b = ctk.CTkButton(
sb, text=t(f"nav.{name.lower()}"), width=SIDEBAR_W - 24,
anchor="w", command=lambda n=name: self.navigate_to(n),
fg_color="transparent",
hover_color=("gray80", "gray30"),
text_color=("gray10", "gray90"),
)
b.grid(row=i, column=0, padx=12, pady=3)
self._nav_btns[name] = b
# Content area
self._content = ctk.CTkFrame(self, fg_color="transparent", corner_radius=0)
self._content.grid(row=0, column=1, sticky="nsew")
self._content.grid_columnconfigure(0, weight=1)
self._content.grid_rowconfigure(0, weight=1)
self.navigate_to("Dashboard")
def _nav_select(self, name: str) -> None:
for lbl, btn in self._nav_btns.items():
btn.configure(
fg_color=("gray75", "gray25") if lbl == name else "transparent"
)
# =========================================================================
# Private — config
# =========================================================================
def _load_config(self) -> None:
try:
from arma_modlist_tools.config import load_config
self._cfg = load_config(PROJECT_ROOT / "config.json")
except Exception as e:
self._cfg = None
self.post_log(f"[config] {e}\n")
def _after_wizard(self) -> None:
self._load_config()
# grid_forget before popping — navigate_to can't hide a view that's
# already been removed from _view_cache, so we do it manually first.
for name in ("Dashboard", "Settings"):
view = self._view_cache.pop(name, None)
if view is not None:
view.grid_forget()
view.lower()
self._active_name = ""
self.navigate_to("Dashboard")
# =========================================================================
# Private — logging
# =========================================================================
def _poll_log(self) -> None:
parts: list[str] = []
try:
while True:
parts.append(self._log_q.get_nowait())
except queue.Empty:
pass
if parts:
logs_view = self._view_cache.get("Logs")
if logs_view is not None and hasattr(logs_view, "append"):
logs_view.append("".join(parts)) # type: ignore[attr-defined]
self.after(80, self._poll_log)
def _redirect_output(self) -> None:
writer = _QueueWriter(self._log_q)
sys.stdout = writer
sys.stderr = writer
def _restore_output(self) -> None:
sys.stdout = self._orig_stdout
sys.stderr = self._orig_stderr
# =========================================================================
# Private — pipeline lifecycle
# =========================================================================
def _pipeline_done(self) -> None:
self._pipeline_running = False
self._get_dashboard().set_pipeline_ui(running=False)
# Refresh all cached views so data is current without manual refresh
for view in self._view_cache.values():
view.refresh()
def _get_dashboard(self) -> "DashboardView":
from gui.views.dashboard import DashboardView
view = self._view_cache.get("Dashboard")
if not isinstance(view, DashboardView):
# Build it if not yet visited (shouldn't normally happen)
view = DashboardView(self._content, self)
self._view_cache["Dashboard"] = view
return view
# ---------------------------------------------------------------------------
# Module-level helper
# ---------------------------------------------------------------------------
def _hdr(step: str, name: str) -> None:
print(f"\n{'='*50}")
print(f" {step}: {name}")
print(f"{'='*50}\n")