""" discord_utils.py ================ Discord utility functions and helpers for the Discord ComfyUI bot. This module provides reusable Discord-specific utilities including: - Command decorators for validation - Argument parsing helpers - Message formatting utilities - Discord UI components """ from __future__ import annotations import functools from io import BytesIO from typing import Dict, Optional, List, Tuple import discord from discord.ext import commands from discord.ui import View from config import COMFY_NOT_CONFIGURED_MSG def require_comfy_client(func): """ Decorator that validates bot.comfy exists before executing a command. This decorator checks if the bot has a configured ComfyClient instance (bot.comfy) and sends an error message if not. This eliminates the need for repeated validation code in every command. Usage ----- @bot.command(name="generate") @require_comfy_client async def generate_command(ctx: commands.Context, *, args: str = ""): # bot.comfy is guaranteed to exist here await bot.comfy.generate_image(...) Parameters ---------- func : callable The command function to wrap. Returns ------- callable The wrapped command function with ComfyClient validation. """ @functools.wraps(func) async def wrapper(ctx: commands.Context, *args, **kwargs): bot = ctx.bot if not hasattr(bot, "comfy") or bot.comfy is None: await ctx.reply(COMFY_NOT_CONFIGURED_MSG, mention_author=False) return return await func(ctx, *args, **kwargs) return wrapper def parse_labeled_args(args: str, keys: List[str]) -> Dict[str, Optional[str]]: """ Parse labeled arguments from a command string. This parser handles Discord command arguments in the format: "key1:value1 key2:value2 ..." The parser splits on keyword markers and preserves case in values. If a key is not found in the args string, its value will be None. Parameters ---------- args : str The argument string to parse. keys : List[str] List of keys to extract (e.g., ["prompt:", "negative_prompt:"]). Returns ------- Dict[str, Optional[str]] Dictionary mapping keys (without colons) to their values. Keys not found in args will have None values. Examples -------- >>> parse_labeled_args("prompt:a cat negative_prompt:blurry", ["prompt:", "negative_prompt:"]) {"prompt": "a cat", "negative_prompt": "blurry"} >>> parse_labeled_args("prompt:hello world", ["prompt:", "type:"]) {"prompt": "hello world", "type": None} """ result = {key.rstrip(":"): None for key in keys} remaining = args # Sort keys by position in string to parse left-to-right found_keys = [] for key in keys: if key in remaining: idx = remaining.find(key) found_keys.append((idx, key)) found_keys.sort() for i, (_, key) in enumerate(found_keys): # Split on this key parts = remaining.split(key, 1) if len(parts) < 2: continue value_part = parts[1] # Find the next key, if any next_key_idx = len(value_part) if i + 1 < len(found_keys): next_key = found_keys[i + 1][1] if next_key in value_part: next_key_idx = value_part.find(next_key) # Extract value up to next key value = value_part[:next_key_idx].strip() result[key.rstrip(":")] = value if value else None return result def convert_image_bytes_to_discord_files( images: List[bytes], max_files: int = 4, prefix: str = "generated" ) -> List[discord.File]: """ Convert a list of image bytes to Discord File objects. Parameters ---------- images : List[bytes] List of raw image data as bytes. max_files : int Maximum number of files to convert (default: 4, Discord's limit). prefix : str Filename prefix for generated files (default: "generated"). Returns ------- List[discord.File] List of Discord.File objects ready to send. """ files = [] for idx, img_bytes in enumerate(images): if idx >= max_files: break file_obj = BytesIO(img_bytes) file_obj.seek(0) files.append(discord.File(file_obj, filename=f"{prefix}_{idx + 1}.png")) return files async def send_queue_status(ctx: commands.Context, queue_size: int) -> None: """ Send a queue status message to the channel. Parameters ---------- ctx : commands.Context The command context. queue_size : int Current number of jobs in the queue. """ await ctx.send(f"Queue size: {queue_size}", mention_author=False) async def send_typing_with_callback(ctx: commands.Context, callback): """ Execute a callback while showing typing indicator. Parameters ---------- ctx : commands.Context The command context. callback : callable Async function to execute while typing. Returns ------- Any The return value of the callback. """ async with ctx.typing(): return await callback() def truncate_text(text: str, length: int = 50) -> str: """ Truncate text to a maximum length with ellipsis. Parameters ---------- text : str The text to truncate. length : int Maximum length (default: 50). Returns ------- str Truncated text with "..." suffix if longer than length. """ return text if len(text) <= length else text[: length - 3] + "..." def extract_arg_value(args: str, key: str) -> Tuple[Optional[str], str]: """ Extract a single argument value from a labeled args string. This is a simpler alternative to parse_labeled_args for extracting just one value. Parameters ---------- args : str The full argument string. key : str The key to extract (e.g., "type:"). Returns ------- Tuple[Optional[str], str] A tuple of (extracted_value, remaining_args). If key not found, returns (None, original_args). Examples -------- >>> extract_arg_value("type:input some other text", "type:") ("input", "some other text") """ if key not in args: return None, args parts = args.split(key, 1) if len(parts) < 2: return None, args value_and_rest = parts[1].strip() # Take first word as value words = value_and_rest.split(None, 1) value = words[0] if words else None remaining = words[1] if len(words) > 1 else "" return value, remaining