Full source for the-third-rev: Discord bot (discord.py), FastAPI web UI (React/TS/Vite/Tailwind), ComfyUI integration, generation history DB, preset manager, workflow inspector, and all supporting modules. Excluded from tracking: .env, invite_tokens.json, *.db (SQLite), current-workflow-changes.json, user_settings/, presets/, logs/, web-static/ (build output), frontend/node_modules/. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
522 lines
20 KiB
Python
522 lines
20 KiB
Python
"""
|
|
status_monitor.py
|
|
=================
|
|
|
|
Live status dashboard for the Discord ComfyUI bot.
|
|
|
|
Edits a single pinned message in a designated log channel every
|
|
``update_interval`` seconds. Changed values are highlighted with
|
|
bold text and directional arrows/emoji so differences are immediately
|
|
obvious.
|
|
|
|
Change-highlighting rules
|
|
-------------------------
|
|
- Unchanged good state → 🟢 value
|
|
- Unchanged bad state → 🔴 value
|
|
- Changed → bad → ⚠️ **value**
|
|
- Changed → good → ✅ **value**
|
|
- Changed (neutral) → **value**
|
|
- Queue size changed → **N** ▲ or **N** ▼
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import aiohttp
|
|
import discord
|
|
|
|
from commands.server import get_service_state, STATUS_EMOJI
|
|
from media_uploader import get_stats as get_upload_stats, is_running as upload_is_running, MEDIA_EXTENSIONS
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Module-level helpers (no discord.py dependency)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _format_uptime(start: datetime) -> str:
|
|
"""Return a human-readable uptime string from a UTC start time."""
|
|
delta = datetime.now(timezone.utc) - start
|
|
total = int(delta.total_seconds())
|
|
h, rem = divmod(total, 3600)
|
|
m, s = divmod(rem, 60)
|
|
if h:
|
|
return f"{h}h {m}m {s}s"
|
|
if m:
|
|
return f"{m}m {s}s"
|
|
return f"{s}s"
|
|
|
|
|
|
def _elapsed(start: datetime) -> str:
|
|
"""Return elapsed time string since *start* (UTC)."""
|
|
delta = datetime.now(timezone.utc) - start
|
|
total = int(delta.total_seconds())
|
|
h, rem = divmod(total, 3600)
|
|
m, s = divmod(rem, 60)
|
|
if h:
|
|
return f"{h}:{m:02d}:{s:02d}"
|
|
return f"{m}:{s:02d}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# StatusMonitor
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class StatusMonitor:
|
|
"""
|
|
Periodically edits a single Discord message with live bot status.
|
|
|
|
Parameters
|
|
----------
|
|
bot :
|
|
The discord.ext.commands.Bot instance.
|
|
channel_id : int
|
|
ID of the Discord channel used for the dashboard message.
|
|
update_interval : float
|
|
Seconds between updates (default 5).
|
|
"""
|
|
|
|
HEADER_MARKER = "📊"
|
|
|
|
def __init__(self, bot, channel_id: int, update_interval: float = 10.0) -> None:
|
|
self._bot = bot
|
|
self._channel_id = channel_id
|
|
self._interval = update_interval
|
|
self._prev: dict[str, str] = {}
|
|
self._message: Optional[discord.Message] = None
|
|
self._task: Optional[asyncio.Task] = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public lifecycle
|
|
# ------------------------------------------------------------------
|
|
|
|
async def start(self) -> None:
|
|
"""Start the update loop (idempotent)."""
|
|
if self._task is None or self._task.done():
|
|
self._task = asyncio.create_task(self._update_loop())
|
|
logger.info("StatusMonitor started for channel %s", self._channel_id)
|
|
|
|
async def stop(self) -> None:
|
|
"""Cancel the update loop, then send shutdown notice (idempotent)."""
|
|
if self._task and not self._task.done():
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
await self._send_shutdown_message()
|
|
logger.info("StatusMonitor stopped")
|
|
|
|
async def _send_shutdown_message(self) -> None:
|
|
"""Immediately edit the dashboard message to show bot offline status.
|
|
|
|
Uses a fresh aiohttp session with the bot token directly, because
|
|
discord.py closes its own HTTP session before our finally block runs
|
|
on Ctrl-C / task cancellation, making Message.edit() silently fail.
|
|
"""
|
|
if self._message is None:
|
|
# No cached message; can't create one safely during shutdown.
|
|
return
|
|
now = datetime.now(timezone.utc)
|
|
vn_time = now + timedelta(hours=7)
|
|
utc_str = now.strftime("%H:%M:%S UTC")
|
|
vn_str = vn_time.strftime("%H:%M:%S GMT+7")
|
|
text = (
|
|
f"{self.HEADER_MARKER} 🔴 **Bot Status Dashboard** — OFFLINE\n"
|
|
f"-# Shut down at: {utc_str} ({vn_str})\n"
|
|
"\n"
|
|
"Bot process has stopped."
|
|
)
|
|
try:
|
|
token = self._bot.http.token
|
|
url = (
|
|
f"https://discord.com/api/v10/channels/"
|
|
f"{self._channel_id}/messages/{self._message.id}"
|
|
)
|
|
headers = {
|
|
"Authorization": f"Bot {token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.patch(url, json={"content": text}, headers=headers) as resp:
|
|
if resp.status not in (200, 204):
|
|
logger.warning(
|
|
"StatusMonitor: shutdown edit returned HTTP %s", resp.status
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("StatusMonitor: could not send shutdown message: %s", exc)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _get_or_create_message(self) -> Optional[discord.Message]:
|
|
"""
|
|
Return the existing dashboard message or create a new one.
|
|
|
|
Searches the last 20 messages in the channel for an existing
|
|
dashboard (posted by this bot and containing the header marker).
|
|
"""
|
|
channel = self._bot.get_channel(self._channel_id)
|
|
if channel is None:
|
|
try:
|
|
channel = await self._bot.fetch_channel(self._channel_id)
|
|
except discord.NotFound:
|
|
logger.error("StatusMonitor: channel %s not found", self._channel_id)
|
|
return None
|
|
except discord.Forbidden:
|
|
logger.error("StatusMonitor: no access to channel %s", self._channel_id)
|
|
return None
|
|
|
|
# Try to find an existing dashboard message
|
|
try:
|
|
async for msg in channel.history(limit=20):
|
|
if msg.author == self._bot.user and self.HEADER_MARKER in msg.content:
|
|
return msg
|
|
except discord.HTTPException as exc:
|
|
logger.warning("StatusMonitor: history fetch failed: %s", exc)
|
|
|
|
# None found — create a fresh one
|
|
try:
|
|
msg = await channel.send(f"{self.HEADER_MARKER} **Bot Status Dashboard**\n-# Initializing…")
|
|
return msg
|
|
except discord.HTTPException as exc:
|
|
logger.error("StatusMonitor: could not create dashboard message: %s", exc)
|
|
return None
|
|
|
|
def _collect_sync(self) -> dict[str, str]:
|
|
"""
|
|
Read bot/workflow state synchronously (no async calls).
|
|
|
|
Returns a flat dict of string key → string value snapshots.
|
|
ComfyUI queue stats are filled in asynchronously in _update_loop.
|
|
"""
|
|
snap: dict[str, str] = {}
|
|
bot = self._bot
|
|
|
|
# --- Bot section ---
|
|
lat = bot.latency
|
|
latency_ms = round(lat * 1000) if (lat is not None and lat != float("inf")) else 0
|
|
snap["latency"] = f"{latency_ms} ms"
|
|
|
|
if hasattr(bot, "start_time"):
|
|
snap["uptime"] = _format_uptime(bot.start_time)
|
|
else:
|
|
snap["uptime"] = "unknown"
|
|
|
|
# --- ComfyUI queue section (filled async) ---
|
|
snap["comfy_pending"] = self._prev.get("comfy_pending", "?")
|
|
snap["comfy_running"] = self._prev.get("comfy_running", "?")
|
|
|
|
# --- ComfyUI section ---
|
|
comfy = getattr(bot, "comfy", None)
|
|
if comfy is not None:
|
|
snap["comfy_server"] = getattr(comfy, "server_address", "unknown")
|
|
wm = getattr(comfy, "workflow_manager", None)
|
|
workflow_loaded = wm is not None and wm.get_workflow_template() is not None
|
|
snap["workflow"] = "loaded" if workflow_loaded else "none"
|
|
|
|
sm = getattr(comfy, "state_manager", None)
|
|
if sm is not None:
|
|
changes = sm.get_changes()
|
|
p = changes.get("prompt") or ""
|
|
snap["prompt"] = (p[:50] + "…" if len(p) > 50 else p) if p else "—"
|
|
n = changes.get("negative_prompt") or ""
|
|
snap["neg_prompt"] = (n[:50] + "…" if len(n) > 50 else n) if n else "—"
|
|
img = changes.get("input_image")
|
|
snap["input_image"] = Path(img).name if img else "—"
|
|
seed_pin = changes.get("seed")
|
|
snap["pinned_seed"] = str(seed_pin) if seed_pin is not None else "random"
|
|
else:
|
|
snap["prompt"] = "—"
|
|
snap["neg_prompt"] = "—"
|
|
snap["input_image"] = "—"
|
|
snap["pinned_seed"] = "—"
|
|
|
|
last_seed = getattr(comfy, "last_seed", None)
|
|
snap["last_seed"] = str(last_seed) if last_seed is not None else "—"
|
|
snap["total_gen"] = str(getattr(comfy, "total_generated", 0))
|
|
else:
|
|
snap["comfy_server"] = "not configured"
|
|
snap["workflow"] = "—"
|
|
snap["prompt"] = "—"
|
|
snap["neg_prompt"] = "—"
|
|
snap["input_image"] = "—"
|
|
snap["pinned_seed"] = "—"
|
|
snap["last_seed"] = "—"
|
|
snap["total_gen"] = "0"
|
|
|
|
# comfy_status and service_state are filled in asynchronously
|
|
snap["comfy_status"] = self._prev.get("comfy_status", "unknown")
|
|
snap["service_state"] = self._prev.get("service_state", "unknown")
|
|
|
|
# --- Auto-upload section ---
|
|
config = getattr(bot, "config", None)
|
|
upload_user = getattr(config, "media_upload_user", None)
|
|
upload_configured = bool(upload_user)
|
|
snap["upload_configured"] = "enabled" if upload_configured else "disabled"
|
|
|
|
if upload_configured:
|
|
snap["upload_state"] = "uploading" if upload_is_running() else "idle"
|
|
|
|
# Pending: count media files sitting in the output directory
|
|
output_path_str = getattr(config, "comfy_output_path", None)
|
|
if output_path_str:
|
|
try:
|
|
pending = sum(
|
|
1 for e in Path(output_path_str).iterdir()
|
|
if e.is_file() and e.suffix.lower() in MEDIA_EXTENSIONS
|
|
)
|
|
except OSError:
|
|
pending = 0
|
|
snap["upload_pending"] = str(pending)
|
|
else:
|
|
snap["upload_pending"] = "—"
|
|
|
|
us = get_upload_stats()
|
|
if us.total_attempted > 0:
|
|
snap["upload_session"] = (
|
|
f"{us.total_ok} ok, {us.total_fail} failed"
|
|
f" ({us.fail_rate_pct:.1f}%)"
|
|
)
|
|
else:
|
|
snap["upload_session"] = "no uploads yet"
|
|
|
|
if us.last_attempted > 0:
|
|
snap["upload_last"] = f"{us.last_ok} ok, {us.last_fail} failed"
|
|
else:
|
|
snap["upload_last"] = "—"
|
|
else:
|
|
snap["upload_state"] = "—"
|
|
snap["upload_pending"] = "—"
|
|
snap["upload_session"] = "—"
|
|
snap["upload_last"] = "—"
|
|
|
|
return snap
|
|
|
|
async def _check_connection(self) -> str:
|
|
"""Async check whether ComfyUI is reachable. Returns a plain string."""
|
|
comfy = getattr(self._bot, "comfy", None)
|
|
if comfy is None:
|
|
return "not configured"
|
|
try:
|
|
reachable = await asyncio.wait_for(comfy.check_connection(), timeout=4.0)
|
|
return "reachable" if reachable else "unreachable"
|
|
except asyncio.TimeoutError:
|
|
return "unreachable"
|
|
except Exception:
|
|
return "unreachable"
|
|
|
|
async def _check_comfy_queue(self) -> dict[str, str]:
|
|
"""Fetch ComfyUI queue depths. Returns {comfy_pending, comfy_running}."""
|
|
comfy = getattr(self._bot, "comfy", None)
|
|
if comfy is None:
|
|
return {"comfy_pending": "?", "comfy_running": "?"}
|
|
try:
|
|
q = await asyncio.wait_for(comfy.get_comfy_queue(), timeout=4.0)
|
|
if q:
|
|
return {
|
|
"comfy_pending": str(len(q.get("queue_pending", []))),
|
|
"comfy_running": str(len(q.get("queue_running", []))),
|
|
}
|
|
except Exception:
|
|
pass
|
|
return {"comfy_pending": "?", "comfy_running": "?"}
|
|
|
|
async def _check_service_state(self) -> str:
|
|
"""Return the NSSM service state string for the configured ComfyUI service."""
|
|
config = getattr(self._bot, "config", None)
|
|
if config is None:
|
|
return "unknown"
|
|
service_name = getattr(config, "comfy_service_name", None)
|
|
if not service_name:
|
|
return "unknown"
|
|
return await get_service_state(service_name)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Change-detection formatting
|
|
# ------------------------------------------------------------------
|
|
|
|
def _fmt(self, key: str, value: str, *, good: str, bad: str) -> str:
|
|
"""
|
|
Format *value* with change-detection highlighting.
|
|
|
|
Parameters
|
|
----------
|
|
key :
|
|
Snapshot key used to look up the previous value.
|
|
value :
|
|
Current value string.
|
|
good :
|
|
The value string that represents a "good" state.
|
|
bad :
|
|
The value string that represents a "bad" state.
|
|
"""
|
|
prev = self._prev.get(key)
|
|
changed = prev is not None and prev != value
|
|
is_good = value == good
|
|
is_bad = value == bad
|
|
|
|
if not changed:
|
|
if is_good:
|
|
return f"🟢 {value}"
|
|
if is_bad:
|
|
return f"🔴 {value}"
|
|
return value
|
|
|
|
# Value changed
|
|
if is_bad:
|
|
return f"⚠️ **{value}**"
|
|
if is_good:
|
|
return f"✅ **{value}**"
|
|
return f"**{value}**"
|
|
|
|
def _fmt_service_state(self, value: str) -> str:
|
|
"""Format NSSM service state with emoji and change-detection highlighting."""
|
|
prev = self._prev.get("service_state")
|
|
changed = prev is not None and prev != value
|
|
emoji = STATUS_EMOJI.get(value, "⚪")
|
|
if not changed:
|
|
return f"{emoji} {value}"
|
|
if value == "SERVICE_RUNNING":
|
|
return f"✅ **{value}**"
|
|
if value in ("SERVICE_STOPPED", "error", "timeout"):
|
|
return f"⚠️ **{value}**"
|
|
return f"**{value}**"
|
|
|
|
def _fmt_queue_size(self, value: str, prev_key: str) -> str:
|
|
"""Format queue size with ▲/▼ arrows when changed."""
|
|
prev = self._prev.get(prev_key)
|
|
if prev is None or prev == value:
|
|
return value
|
|
try:
|
|
arrow = "▲" if int(value) > int(prev) else "▼"
|
|
except ValueError:
|
|
arrow = ""
|
|
return f"**{value}** {arrow}" if arrow else f"**{value}**"
|
|
|
|
# ------------------------------------------------------------------
|
|
# Message assembly
|
|
# ------------------------------------------------------------------
|
|
|
|
def _build_message(self, snap: dict[str, str], now: datetime) -> str:
|
|
"""Assemble the full dashboard message string."""
|
|
vn_time = now + timedelta(hours=7)
|
|
timestamp = f"{now.strftime('%H:%M:%S UTC')} ({vn_time.strftime('%H:%M:%S GMT+7')})"
|
|
|
|
pending_fmt = self._fmt_queue_size(snap["comfy_pending"], "comfy_pending")
|
|
running_fmt = self._fmt_queue_size(snap["comfy_running"], "comfy_running")
|
|
http_fmt = self._fmt("comfy_status", snap["comfy_status"], good="reachable", bad="unreachable")
|
|
svc_fmt = self._fmt_service_state(snap["service_state"])
|
|
seed_fmt = self._fmt("last_seed", snap["last_seed"], good="", bad="")
|
|
prompt_fmt = self._fmt("prompt", snap["prompt"], good="", bad="")
|
|
neg_fmt = self._fmt("neg_prompt", snap["neg_prompt"], good="", bad="")
|
|
image_fmt = self._fmt("input_image", snap["input_image"], good="", bad="")
|
|
pinned_fmt = self._fmt("pinned_seed", snap["pinned_seed"], good="", bad="")
|
|
|
|
upload_state_fmt = self._fmt(
|
|
"upload_state", snap["upload_state"], good="idle", bad=""
|
|
)
|
|
upload_pending_fmt = self._fmt(
|
|
"upload_pending", snap["upload_pending"], good="0", bad=""
|
|
)
|
|
upload_session_fmt = self._fmt("upload_session", snap["upload_session"], good="", bad="")
|
|
upload_last_fmt = self._fmt("upload_last", snap["upload_last"], good="", bad="")
|
|
|
|
lines = [
|
|
f"{self.HEADER_MARKER} **Bot Status Dashboard**",
|
|
f"-# Last updated: {timestamp}",
|
|
"",
|
|
"**Bot**",
|
|
f" Latency : {snap['latency']}",
|
|
f" Uptime : {snap['uptime']}",
|
|
"",
|
|
f"**ComfyUI** — `{snap['comfy_server']}`",
|
|
f" Service : {svc_fmt}",
|
|
f" HTTP : {http_fmt}",
|
|
f" Queue : {running_fmt} running, {pending_fmt} pending",
|
|
f" Workflow : {snap['workflow']}",
|
|
f" Prompt : || {prompt_fmt} ||",
|
|
f" Neg : || {neg_fmt} ||",
|
|
f" Image : {image_fmt}",
|
|
f" Seed : {pinned_fmt}",
|
|
"",
|
|
"**Last Generation**",
|
|
f" Seed : {seed_fmt}",
|
|
f" Total : {snap['total_gen']}",
|
|
]
|
|
|
|
if snap["upload_configured"] == "enabled":
|
|
lines += [
|
|
"",
|
|
"**Auto-Upload**",
|
|
f" State : {upload_state_fmt}",
|
|
f" Pending : {upload_pending_fmt}",
|
|
f" Session : {upload_session_fmt}",
|
|
f" Last run : {upload_last_fmt}",
|
|
]
|
|
else:
|
|
lines += [
|
|
"",
|
|
"**Auto-Upload** — disabled *(set MEDIA_UPLOAD_USER / MEDIA_UPLOAD_PASS to enable)*",
|
|
]
|
|
|
|
return "\n".join(lines)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Update loop
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _update_loop(self) -> None:
|
|
"""Background task: collect state, build message, edit in place."""
|
|
await self._bot.wait_until_ready()
|
|
|
|
while True:
|
|
try:
|
|
now = datetime.now(timezone.utc)
|
|
|
|
# Collect synchronous state
|
|
snap = self._collect_sync()
|
|
|
|
# Async checks run concurrently
|
|
comfy_status, service_state, queue_stats = await asyncio.gather(
|
|
self._check_connection(),
|
|
self._check_service_state(),
|
|
self._check_comfy_queue(),
|
|
)
|
|
snap["comfy_status"] = comfy_status
|
|
snap["service_state"] = service_state
|
|
snap.update(queue_stats)
|
|
|
|
# Build message text
|
|
text = self._build_message(snap, now)
|
|
|
|
# Ensure we have a message to edit
|
|
if self._message is None:
|
|
self._message = await self._get_or_create_message()
|
|
|
|
if self._message is not None:
|
|
try:
|
|
await self._message.edit(content=text)
|
|
except discord.NotFound:
|
|
# Message was deleted — recreate next cycle
|
|
self._message = None
|
|
except (discord.HTTPException, OSError) as exc:
|
|
logger.warning("StatusMonitor: edit failed: %s", exc)
|
|
|
|
# Save snapshot for next cycle's change detection
|
|
self._prev = snap
|
|
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception:
|
|
logger.exception("StatusMonitor: unexpected error in update loop")
|
|
|
|
await asyncio.sleep(self._interval)
|