from __future__ import annotations import json import queue import subprocess import sys import threading from pathlib import Path from typing import Optional import customtkinter as ctk from tkinter import messagebox from gui._constants import ( SIDEBAR_W, APP_TITLE, PROJECT_ROOT, SELECTION_FILE, ) from gui._io import _QueueWriter from gui.locales import t from gui.wizard import SetupWizard from gui.views.base import BaseView # --------------------------------------------------------------------------- # View name → class mapping (imported lazily to avoid circular imports) # --------------------------------------------------------------------------- _VIEW_NAMES = ("Dashboard", "Mods", "Tools", "Logs", "Settings") def _get_view_class(name: str): from gui.views import DashboardView, ModsView, ToolsView, LogsView, SettingsView return { "Dashboard": DashboardView, "Mods": ModsView, "Tools": ToolsView, "Logs": LogsView, "Settings": SettingsView, }[name] # --------------------------------------------------------------------------- # Main application # --------------------------------------------------------------------------- class ArmaModManagerApp(ctk.CTk): def __init__(self) -> None: super().__init__() self.title(APP_TITLE) self.geometry("980x640") self.minsize(820, 560) self._log_q: queue.Queue[str] = queue.Queue() self._orig_stdout = sys.stdout self._orig_stderr = sys.stderr self._pipeline_running: bool = False self._cfg = None self._view_cache: dict[str, BaseView] = {} self._active_name: str = "" if not (PROJECT_ROOT / "config.json").exists(): self.after(200, self.open_wizard) else: self._load_config() self._apply_startup_language() self._build_layout() self._poll_log() # ========================================================================= # Public interface (used by views) # ========================================================================= @property def cfg(self): """Loaded Config object, or None if config.json is missing/invalid.""" return self._cfg @property def pipeline_running(self) -> bool: return self._pipeline_running def navigate_to(self, view_name: str) -> None: """Switch the content area to the named view, refreshing its data.""" assert view_name in _VIEW_NAMES, f"Unknown view: {view_name}" # Build view on first visit if view_name not in self._view_cache: cls = _get_view_class(view_name) view = cls(self._content, self) self._view_cache[view_name] = view # Hide current if self._active_name and self._active_name in self._view_cache: old = self._view_cache[self._active_name] old.grid_forget() old.lower() # push canvas-based widgets below everything on Windows # Show new view = self._view_cache[view_name] view.grid(row=0, column=0, sticky="nsew") view.lift() # ensure it's above any lingering hidden views view.refresh() self._active_name = view_name self._nav_select(view_name) def post_log(self, text: str) -> None: """Thread-safe: enqueue text for the Logs panel.""" self._log_q.put(text) def switch_language(self, lang: str) -> None: """Switch the UI language and refresh all cached views.""" from gui import locales locales.set_language(lang) self._save_language_pref(lang) self._rebuild_nav_labels() for view in self._view_cache.values(): view.refresh() if self._active_name: self.navigate_to(self._active_name) def run_pipeline(self, selected_names: set[str]) -> None: """Start the background pipeline for the given preset filenames.""" if self._pipeline_running: return if len(selected_names) < 2: messagebox.showwarning( t("app.dlg_presets_title"), t("app.dlg_presets_body"), ) return cfg = self._cfg if not cfg: messagebox.showwarning(t("app.dlg_setup_title"), t("app.dlg_setup_body")) return self._pipeline_running = True self._get_dashboard().set_pipeline_ui(running=True) self.navigate_to("Logs") def worker() -> None: # run.py calls fix_console_encoding() at import time, which needs # the real sys.stdout.buffer. Import it before we redirect stdout. from run import step_fetch, step_link self._redirect_output() try: from arma_modlist_tools.parser import parse_modlist_html from arma_modlist_tools.compare import compare_presets # Step 1 — Parse selected presets _hdr("Step 1 / 4", t("pipeline.step1_name")) cfg.modlist_json.mkdir(exist_ok=True) presets = [] for fp in sorted(cfg.modlist_html.glob("*.html")): if fp.name not in selected_names: print(f" SKIP {fp.name}") continue preset = parse_modlist_html(fp) out = cfg.modlist_json / (preset["preset_name"] + ".json") out.write_text( json.dumps(preset, indent=2, ensure_ascii=False), encoding="utf-8", ) print(f" {fp.name} → {out.name} ({preset['mod_count']} mods)") presets.append(preset) # Step 2 — Compare _hdr("Step 2 / 4", t("pipeline.step2_name")) result = compare_presets(*presets) cfg.comparison.write_text( json.dumps(result, indent=2, ensure_ascii=False), encoding="utf-8", ) total_unique = sum(v["mod_count"] for v in result["unique"].values()) print(f" Compared: {', '.join(result['compared_presets'])}") print(f" Shared: {result['shared']['mod_count']} | " f"Unique: {total_unique}") # Step 3 — Fetch _hdr("Step 3 / 4", t("pipeline.step3_name")) step_fetch(cfg) # Step 4 — Link _hdr("Step 4 / 4", t("pipeline.step4_name")) groups = ( sorted(p.name for p in cfg.downloads.iterdir() if p.is_dir()) if cfg.downloads.is_dir() else [] ) step_link(cfg, groups) print("\n✓ Pipeline complete.\n") except Exception as e: print(f"\n✗ Error: {e}\n") import traceback traceback.print_exc() finally: self._restore_output() self.after(0, self._pipeline_done) threading.Thread(target=worker, daemon=True).start() def run_tool(self, script_args: list[str]) -> None: """Run a maintenance script via subprocess, streaming output to Logs.""" import os script = script_args[0] extra = script_args[1:] def worker() -> None: self.post_log(f"\n{'─'*50}\n {' '.join(script_args)}\n{'─'*50}\n") try: env = os.environ.copy() env["PYTHONUNBUFFERED"] = "1" proc = subprocess.Popen( [sys.executable, "-u", str(PROJECT_ROOT / script)] + extra, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, cwd=str(PROJECT_ROOT), env=env, ) for line in iter(proc.stdout.readline, ""): self.post_log(line) proc.wait() ok = proc.returncode == 0 done_msg = ( t("app.tool_done") if ok else t("app.tool_exit_code", code=proc.returncode) ) self.post_log(f"\n{done_msg}.\n") except Exception as e: self.post_log(f"\n{t('app.tool_failed', script=script, e=e)}\n") threading.Thread(target=worker, daemon=True).start() def load_selection(self) -> set[str]: """Return selected preset filenames, defaulting to all if no file saved.""" if SELECTION_FILE.exists(): try: data = json.loads(SELECTION_FILE.read_text(encoding="utf-8")) return set(data.get("selected", [])) except Exception: pass if self._cfg and self._cfg.modlist_html.is_dir(): return {f.name for f in self._cfg.modlist_html.glob("*.html")} return set() def save_selection(self, selected: set[str]) -> None: SELECTION_FILE.write_text( json.dumps({"selected": sorted(selected)}, indent=2), encoding="utf-8", ) def open_wizard(self) -> None: SetupWizard(self, on_complete=self._after_wizard) # ========================================================================= # Private — language # ========================================================================= def _apply_startup_language(self) -> None: """Read language preference from config.json and activate it.""" from gui import locales lang = "en" cfg_path = PROJECT_ROOT / "config.json" if cfg_path.exists(): try: raw = json.loads(cfg_path.read_text(encoding="utf-8")) lang = raw.get("ui", {}).get("language", "en") except Exception: pass locales.set_language(lang) def _save_language_pref(self, lang: str) -> None: """Persist language preference into the 'ui' key of config.json.""" cfg_path = PROJECT_ROOT / "config.json" try: raw = json.loads(cfg_path.read_text(encoding="utf-8")) raw.setdefault("ui", {})["language"] = lang cfg_path.write_text(json.dumps(raw, indent=2), encoding="utf-8") except Exception: pass # non-fatal — language preference simply resets next run def _rebuild_nav_labels(self) -> None: """Retranslate the sidebar navigation button labels.""" for name, btn in self._nav_btns.items(): btn.configure(text=t(f"nav.{name.lower()}")) # ========================================================================= # Private — layout # ========================================================================= def _build_layout(self) -> None: self.grid_columnconfigure(1, weight=1) self.grid_rowconfigure(0, weight=1) # Sidebar sb = ctk.CTkFrame(self, width=SIDEBAR_W, corner_radius=0) sb.grid(row=0, column=0, sticky="nsew") sb.grid_propagate(False) sb.grid_rowconfigure(10, weight=1) ctk.CTkLabel( sb, text=APP_TITLE, font=ctk.CTkFont(size=15, weight="bold"), wraplength=SIDEBAR_W - 16, justify="center", ).grid(row=0, column=0, padx=12, pady=(22, 14)) self._nav_btns: dict[str, ctk.CTkButton] = {} for i, name in enumerate(_VIEW_NAMES, start=1): b = ctk.CTkButton( sb, text=t(f"nav.{name.lower()}"), width=SIDEBAR_W - 24, anchor="w", command=lambda n=name: self.navigate_to(n), fg_color="transparent", hover_color=("gray80", "gray30"), text_color=("gray10", "gray90"), ) b.grid(row=i, column=0, padx=12, pady=3) self._nav_btns[name] = b # Content area self._content = ctk.CTkFrame(self, fg_color="transparent", corner_radius=0) self._content.grid(row=0, column=1, sticky="nsew") self._content.grid_columnconfigure(0, weight=1) self._content.grid_rowconfigure(0, weight=1) self.navigate_to("Dashboard") def _nav_select(self, name: str) -> None: for lbl, btn in self._nav_btns.items(): btn.configure( fg_color=("gray75", "gray25") if lbl == name else "transparent" ) # ========================================================================= # Private — config # ========================================================================= def _load_config(self) -> None: try: from arma_modlist_tools.config import load_config self._cfg = load_config(PROJECT_ROOT / "config.json") except Exception as e: self._cfg = None self.post_log(f"[config] {e}\n") def _after_wizard(self) -> None: self._load_config() # grid_forget before popping — navigate_to can't hide a view that's # already been removed from _view_cache, so we do it manually first. for name in ("Dashboard", "Settings"): view = self._view_cache.pop(name, None) if view is not None: view.grid_forget() view.lower() self._active_name = "" self.navigate_to("Dashboard") # ========================================================================= # Private — logging # ========================================================================= def _poll_log(self) -> None: try: from gui.views.logs import LogsView logs_view = self._view_cache.get("Logs") while True: text = self._log_q.get_nowait() if isinstance(logs_view, LogsView): logs_view.append(text) except queue.Empty: pass self.after(80, self._poll_log) def _redirect_output(self) -> None: writer = _QueueWriter(self._log_q) sys.stdout = writer sys.stderr = writer def _restore_output(self) -> None: sys.stdout = self._orig_stdout sys.stderr = self._orig_stderr # ========================================================================= # Private — pipeline lifecycle # ========================================================================= def _pipeline_done(self) -> None: self._pipeline_running = False self._get_dashboard().set_pipeline_ui(running=False) # Refresh all cached views so data is current without manual refresh for view in self._view_cache.values(): view.refresh() def _get_dashboard(self): from gui.views.dashboard import DashboardView view = self._view_cache.get("Dashboard") if not isinstance(view, DashboardView): # Build it if not yet visited (shouldn't normally happen) view = DashboardView(self._content, self) self._view_cache["Dashboard"] = view return view # --------------------------------------------------------------------------- # Module-level helper # --------------------------------------------------------------------------- def _hdr(step: str, name: str) -> None: print(f"\n{'='*50}") print(f" {step}: {name}") print(f"{'='*50}\n")