Full source for the-third-rev: Discord bot (discord.py), FastAPI web UI (React/TS/Vite/Tailwind), ComfyUI integration, generation history DB, preset manager, workflow inspector, and all supporting modules. Excluded from tracking: .env, invite_tokens.json, *.db (SQLite), current-workflow-changes.json, user_settings/, presets/, logs/, web-static/ (build output), frontend/node_modules/. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
252 lines
6.6 KiB
Python
252 lines
6.6 KiB
Python
"""
|
|
discord_utils.py
|
|
================
|
|
|
|
Discord utility functions and helpers for the Discord ComfyUI bot.
|
|
|
|
This module provides reusable Discord-specific utilities including:
|
|
- Command decorators for validation
|
|
- Argument parsing helpers
|
|
- Message formatting utilities
|
|
- Discord UI components
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import functools
|
|
from io import BytesIO
|
|
from typing import Dict, Optional, List, Tuple
|
|
|
|
import discord
|
|
from discord.ext import commands
|
|
from discord.ui import View
|
|
|
|
from config import COMFY_NOT_CONFIGURED_MSG
|
|
|
|
|
|
def require_comfy_client(func):
|
|
"""
|
|
Decorator that validates bot.comfy exists before executing a command.
|
|
|
|
This decorator checks if the bot has a configured ComfyClient instance
|
|
(bot.comfy) and sends an error message if not. This eliminates the need
|
|
for repeated validation code in every command.
|
|
|
|
Usage
|
|
-----
|
|
@bot.command(name="generate")
|
|
@require_comfy_client
|
|
async def generate_command(ctx: commands.Context, *, args: str = ""):
|
|
# bot.comfy is guaranteed to exist here
|
|
await bot.comfy.generate_image(...)
|
|
|
|
Parameters
|
|
----------
|
|
func : callable
|
|
The command function to wrap.
|
|
|
|
Returns
|
|
-------
|
|
callable
|
|
The wrapped command function with ComfyClient validation.
|
|
"""
|
|
|
|
@functools.wraps(func)
|
|
async def wrapper(ctx: commands.Context, *args, **kwargs):
|
|
bot = ctx.bot
|
|
if not hasattr(bot, "comfy") or bot.comfy is None:
|
|
await ctx.reply(COMFY_NOT_CONFIGURED_MSG, mention_author=False)
|
|
return
|
|
return await func(ctx, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
def parse_labeled_args(args: str, keys: List[str]) -> Dict[str, Optional[str]]:
|
|
"""
|
|
Parse labeled arguments from a command string.
|
|
|
|
This parser handles Discord command arguments in the format:
|
|
"key1:value1 key2:value2 ..."
|
|
|
|
The parser splits on keyword markers and preserves case in values.
|
|
If a key is not found in the args string, its value will be None.
|
|
|
|
Parameters
|
|
----------
|
|
args : str
|
|
The argument string to parse.
|
|
keys : List[str]
|
|
List of keys to extract (e.g., ["prompt:", "negative_prompt:"]).
|
|
|
|
Returns
|
|
-------
|
|
Dict[str, Optional[str]]
|
|
Dictionary mapping keys (without colons) to their values.
|
|
Keys not found in args will have None values.
|
|
|
|
Examples
|
|
--------
|
|
>>> parse_labeled_args("prompt:a cat negative_prompt:blurry", ["prompt:", "negative_prompt:"])
|
|
{"prompt": "a cat", "negative_prompt": "blurry"}
|
|
|
|
>>> parse_labeled_args("prompt:hello world", ["prompt:", "type:"])
|
|
{"prompt": "hello world", "type": None}
|
|
"""
|
|
result = {key.rstrip(":"): None for key in keys}
|
|
remaining = args
|
|
|
|
# Sort keys by position in string to parse left-to-right
|
|
found_keys = []
|
|
for key in keys:
|
|
if key in remaining:
|
|
idx = remaining.find(key)
|
|
found_keys.append((idx, key))
|
|
|
|
found_keys.sort()
|
|
|
|
for i, (_, key) in enumerate(found_keys):
|
|
# Split on this key
|
|
parts = remaining.split(key, 1)
|
|
if len(parts) < 2:
|
|
continue
|
|
|
|
value_part = parts[1]
|
|
|
|
# Find the next key, if any
|
|
next_key_idx = len(value_part)
|
|
if i + 1 < len(found_keys):
|
|
next_key = found_keys[i + 1][1]
|
|
if next_key in value_part:
|
|
next_key_idx = value_part.find(next_key)
|
|
|
|
# Extract value up to next key
|
|
value = value_part[:next_key_idx].strip()
|
|
result[key.rstrip(":")] = value if value else None
|
|
|
|
return result
|
|
|
|
|
|
def convert_image_bytes_to_discord_files(
|
|
images: List[bytes], max_files: int = 4, prefix: str = "generated"
|
|
) -> List[discord.File]:
|
|
"""
|
|
Convert a list of image bytes to Discord File objects.
|
|
|
|
Parameters
|
|
----------
|
|
images : List[bytes]
|
|
List of raw image data as bytes.
|
|
max_files : int
|
|
Maximum number of files to convert (default: 4, Discord's limit).
|
|
prefix : str
|
|
Filename prefix for generated files (default: "generated").
|
|
|
|
Returns
|
|
-------
|
|
List[discord.File]
|
|
List of Discord.File objects ready to send.
|
|
"""
|
|
files = []
|
|
for idx, img_bytes in enumerate(images):
|
|
if idx >= max_files:
|
|
break
|
|
file_obj = BytesIO(img_bytes)
|
|
file_obj.seek(0)
|
|
files.append(discord.File(file_obj, filename=f"{prefix}_{idx + 1}.png"))
|
|
return files
|
|
|
|
|
|
async def send_queue_status(ctx: commands.Context, queue_size: int) -> None:
|
|
"""
|
|
Send a queue status message to the channel.
|
|
|
|
Parameters
|
|
----------
|
|
ctx : commands.Context
|
|
The command context.
|
|
queue_size : int
|
|
Current number of jobs in the queue.
|
|
"""
|
|
await ctx.send(f"Queue size: {queue_size}", mention_author=False)
|
|
|
|
|
|
async def send_typing_with_callback(ctx: commands.Context, callback):
|
|
"""
|
|
Execute a callback while showing typing indicator.
|
|
|
|
Parameters
|
|
----------
|
|
ctx : commands.Context
|
|
The command context.
|
|
callback : callable
|
|
Async function to execute while typing.
|
|
|
|
Returns
|
|
-------
|
|
Any
|
|
The return value of the callback.
|
|
"""
|
|
async with ctx.typing():
|
|
return await callback()
|
|
|
|
|
|
def truncate_text(text: str, length: int = 50) -> str:
|
|
"""
|
|
Truncate text to a maximum length with ellipsis.
|
|
|
|
Parameters
|
|
----------
|
|
text : str
|
|
The text to truncate.
|
|
length : int
|
|
Maximum length (default: 50).
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
Truncated text with "..." suffix if longer than length.
|
|
"""
|
|
return text if len(text) <= length else text[: length - 3] + "..."
|
|
|
|
|
|
def extract_arg_value(args: str, key: str) -> Tuple[Optional[str], str]:
|
|
"""
|
|
Extract a single argument value from a labeled args string.
|
|
|
|
This is a simpler alternative to parse_labeled_args for extracting just
|
|
one value.
|
|
|
|
Parameters
|
|
----------
|
|
args : str
|
|
The full argument string.
|
|
key : str
|
|
The key to extract (e.g., "type:").
|
|
|
|
Returns
|
|
-------
|
|
Tuple[Optional[str], str]
|
|
A tuple of (extracted_value, remaining_args). If key not found,
|
|
returns (None, original_args).
|
|
|
|
Examples
|
|
--------
|
|
>>> extract_arg_value("type:input some other text", "type:")
|
|
("input", "some other text")
|
|
"""
|
|
if key not in args:
|
|
return None, args
|
|
|
|
parts = args.split(key, 1)
|
|
if len(parts) < 2:
|
|
return None, args
|
|
|
|
value_and_rest = parts[1].strip()
|
|
# Take first word as value
|
|
words = value_and_rest.split(None, 1)
|
|
value = words[0] if words else None
|
|
remaining = words[1] if len(words) > 1 else ""
|
|
|
|
return value, remaining
|