""" media_uploader.py ================= Auto-uploads generated media files to the external storage server. On success the local file is deleted. Any files that fail to upload are left in place and will be retried automatically on the next call (i.e. the next time a generation completes). If no credentials are configured (MEDIA_UPLOAD_USER / MEDIA_UPLOAD_PASS env vars not set), flush_pending() is a no-op and files are left for the manual ttr!collect-videos command. Upload behaviour: - Files are categorised into image / gif / video / audio folders. - A ``folder`` form field is sent with each upload so the server can route the file into the correct subdirectory. - The current datetime is appended to each filename before uploading (e.g. ``output_20260225_143022.png``); the local filename is unchanged. Usage:: from media_uploader import flush_pending, get_stats await flush_pending(Path(config.comfy_output_path), config.media_upload_user, config.media_upload_pass) stats = get_stats() """ from __future__ import annotations import asyncio import logging import mimetypes import ssl from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Optional import aiohttp logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- UPLOAD_URL = "https://mediaup.revoluxiant.ddns.net/upload" # Media categories and their recognised extensions. CATEGORY_EXTENSIONS: dict[str, frozenset[str]] = { "image": frozenset({ ".png", ".jpg", ".jpeg", ".webp", ".bmp", ".tiff", ".tif", ".avif", ".heic", ".heif", ".svg", ".ico", }), "gif": frozenset({ ".gif", }), "video": frozenset({ ".mp4", ".webm", ".avi", ".mkv", ".mov", ".flv", ".ts", ".m2ts", ".m4v", ".wmv", }), "audio": frozenset({ ".mp3", ".wav", ".ogg", ".flac", ".aac", ".m4a", ".opus", ".wma", ".aiff", ".aif", }), } # Flat set of all recognised extensions (used for directory scanning). MEDIA_EXTENSIONS: frozenset[str] = frozenset().union(*CATEGORY_EXTENSIONS.values()) # Shared SSL context — server uses a self-signed cert. _ssl_ctx = ssl.create_default_context() _ssl_ctx.check_hostname = False _ssl_ctx.verify_mode = ssl.CERT_NONE # Prevents concurrent flush runs from uploading the same file twice. _flush_lock = asyncio.Lock() # --------------------------------------------------------------------------- # Stats # --------------------------------------------------------------------------- @dataclass class UploadStats: """Cumulative upload counters for the current bot session.""" total_attempted: int = 0 total_ok: int = 0 last_attempted: int = 0 last_ok: int = 0 @property def total_fail(self) -> int: return self.total_attempted - self.total_ok @property def last_fail(self) -> int: return self.last_attempted - self.last_ok @property def fail_rate_pct(self) -> float: if self.total_attempted == 0: return 0.0 return (self.total_fail / self.total_attempted) * 100.0 _stats = UploadStats() def get_stats() -> UploadStats: """Return the module-level upload stats (live reference).""" return _stats def is_running() -> bool: """Return True if a flush is currently in progress.""" return _flush_lock.locked() # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _content_type(filepath: Path) -> str: mime, _ = mimetypes.guess_type(filepath.name) return mime or "application/octet-stream" def _get_category(suffix: str) -> str: """Return the upload folder category for a file extension.""" s = suffix.lower() for category, extensions in CATEGORY_EXTENSIONS.items(): if s in extensions: logger.info(f"[_get_category] File category: {category}") return category logger.info(f"[_get_category] File category: other") return "other" def _build_upload_name(filepath: Path) -> str: """Return a filename with the current datetime appended before the extension. Example: ``ComfyUI_00042.png`` → ``20260225_143022_ComfyUI_00042.png`` """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") return f"{timestamp}_{filepath.stem}{filepath.suffix}" def _build_url(url, category: str) -> str: """Build url to upload to each specific folder""" url += "/comfyui" if category == "image": url += "/image" elif category == "video": url += "/video" elif category == "gif": url += "/gif" elif category == "audio": url += "/audio" else: url return url async def _upload_one(session: aiohttp.ClientSession, filepath: Path) -> bool: """Upload a single file. Returns True on HTTP 2xx.""" try: file_bytes = filepath.read_bytes() except OSError: logger.warning("Cannot read file for upload: %s", filepath) return False category = _get_category(filepath.suffix) upload_name = _build_upload_name(filepath) form = aiohttp.FormData() form.add_field( "file", file_bytes, filename=upload_name, content_type=_content_type(filepath), ) url = _build_url(UPLOAD_URL, category) logger.info(f"Uploading file to url: {url}") try: async with session.post( url, data=form, timeout=aiohttp.ClientTimeout(total=120), ) as resp: if resp.status // 100 == 2: return True body = await resp.text() logger.warning( "Upload rejected %s: HTTP %s — %s", upload_name, resp.status, body[:200], ) return False except Exception: logger.warning("Upload error for %s", upload_name, exc_info=True) return False # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- async def flush_pending( output_path: Path, user: Optional[str], password: Optional[str], ) -> int: """ Scan *output_path* for media files, upload each to the storage server, and delete the local file on success. If *user* or *password* is falsy (not configured), returns 0 immediately and leaves all files in place for the manual ttr!collect-videos command. Files that fail to upload are left in place and will be retried on the next call. If a previous flush is still running the new call returns 0 immediately to avoid double-uploading. Returns the number of files successfully uploaded and deleted. """ if not user or not password: return 0 if _flush_lock.locked(): logger.debug("flush_pending already in progress, skipping") return 0 async with _flush_lock: try: entries = [ e for e in output_path.iterdir() if e.is_file() and e.suffix.lower() in MEDIA_EXTENSIONS ] except OSError: logger.warning("Cannot scan output directory: %s", output_path) return 0 if not entries: _stats.last_attempted = 0 _stats.last_ok = 0 return 0 logger.info("Auto-uploading %d pending media file(s)…", len(entries)) auth = aiohttp.BasicAuth(user, password) connector = aiohttp.TCPConnector(ssl=_ssl_ctx) uploaded = 0 async with aiohttp.ClientSession(connector=connector, auth=auth) as session: for filepath in entries: if await _upload_one(session, filepath): try: filepath.unlink() logger.info("Uploaded and deleted: %s", filepath.name) uploaded += 1 except OSError: logger.warning( "Uploaded but could not delete local file: %s", filepath ) # Update cumulative stats _stats.last_attempted = len(entries) _stats.last_ok = uploaded _stats.total_attempted += len(entries) _stats.total_ok += uploaded failed = len(entries) - uploaded if failed: logger.warning( "Auto-upload: %d ok, %d failed — will retry next generation.", uploaded, failed, ) elif uploaded: logger.info("Auto-upload complete: %d file(s) uploaded and deleted.", uploaded) return uploaded