Full source for the-third-rev: Discord bot (discord.py), FastAPI web UI (React/TS/Vite/Tailwind), ComfyUI integration, generation history DB, preset manager, workflow inspector, and all supporting modules. Excluded from tracking: .env, invite_tokens.json, *.db (SQLite), current-workflow-changes.json, user_settings/, presets/, logs/, web-static/ (build output), frontend/node_modules/. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
144 lines
4.9 KiB
Python
144 lines
4.9 KiB
Python
"""GET /api/history; GET /api/history/{prompt_id}/images; GET /api/history/{prompt_id}/file/{filename};
|
|
POST /api/history/{prompt_id}/share; DELETE /api/history/{prompt_id}/share"""
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Request
|
|
from fastapi.responses import Response
|
|
|
|
from web.auth import require_auth
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def _assert_owns(prompt_id: str, user: dict) -> None:
|
|
"""Raise 404 if the generation doesn't exist or doesn't belong to the user.
|
|
|
|
Returning the same 404 for both cases prevents leaking whether a
|
|
prompt_id exists to users who don't own it. Admins bypass this check.
|
|
"""
|
|
if user.get("admin"):
|
|
return
|
|
from generation_db import get_generation
|
|
gen = get_generation(prompt_id)
|
|
if gen is None or gen["user_label"] != user["sub"]:
|
|
raise HTTPException(404, "Not found")
|
|
|
|
|
|
@router.get("")
|
|
async def get_history(
|
|
user: dict = Depends(require_auth),
|
|
q: Optional[str] = Query(None, description="Keyword to search in overrides JSON"),
|
|
):
|
|
"""Return generation history. Admins see all; regular users see only their own.
|
|
Pass ?q=keyword to filter by prompt text or any override field."""
|
|
from generation_db import (
|
|
get_history as db_get_history,
|
|
get_history_for_user,
|
|
search_history,
|
|
search_history_for_user,
|
|
)
|
|
if q and q.strip():
|
|
if user.get("admin"):
|
|
return {"history": search_history(q.strip(), limit=50)}
|
|
return {"history": search_history_for_user(user["sub"], q.strip(), limit=50)}
|
|
if user.get("admin"):
|
|
return {"history": db_get_history(limit=50)}
|
|
return {"history": get_history_for_user(user["sub"], limit=50)}
|
|
|
|
|
|
@router.get("/{prompt_id}/images")
|
|
async def get_history_images(prompt_id: str, user: dict = Depends(require_auth)):
|
|
"""
|
|
Fetch output files for a past generation.
|
|
|
|
Returns base64-encoded blobs from the local SQLite DB — works even after
|
|
``flush_pending`` has deleted the files from disk.
|
|
"""
|
|
_assert_owns(prompt_id, user)
|
|
from generation_db import get_files
|
|
files = get_files(prompt_id)
|
|
if not files:
|
|
raise HTTPException(404, f"No files found for prompt_id {prompt_id!r}")
|
|
return {
|
|
"prompt_id": prompt_id,
|
|
"images": [
|
|
{
|
|
"filename": f["filename"],
|
|
"data": base64.b64encode(f["data"]).decode() if not f["mime_type"].startswith("video/") else None,
|
|
"mime_type": f["mime_type"],
|
|
}
|
|
for f in files
|
|
],
|
|
}
|
|
|
|
|
|
@router.get("/{prompt_id}/file/{filename}")
|
|
async def get_history_file(
|
|
prompt_id: str,
|
|
filename: str,
|
|
request: Request,
|
|
user: dict = Depends(require_auth),
|
|
):
|
|
"""Stream a single output file, with HTTP range request support for video seeking."""
|
|
_assert_owns(prompt_id, user)
|
|
from generation_db import get_files
|
|
files = get_files(prompt_id)
|
|
matched = next((f for f in files if f["filename"] == filename), None)
|
|
if matched is None:
|
|
raise HTTPException(404, f"File {filename!r} not found for prompt_id {prompt_id!r}")
|
|
|
|
data: bytes = matched["data"]
|
|
mime: str = matched["mime_type"]
|
|
total = len(data)
|
|
|
|
range_header = request.headers.get("range")
|
|
if range_header:
|
|
range_val = range_header.replace("bytes=", "")
|
|
start_str, _, end_str = range_val.partition("-")
|
|
start = int(start_str) if start_str else 0
|
|
end = int(end_str) if end_str else total - 1
|
|
end = min(end, total - 1)
|
|
chunk = data[start : end + 1]
|
|
return Response(
|
|
content=chunk,
|
|
status_code=206,
|
|
media_type=mime,
|
|
headers={
|
|
"Content-Range": f"bytes {start}-{end}/{total}",
|
|
"Accept-Ranges": "bytes",
|
|
"Content-Length": str(len(chunk)),
|
|
},
|
|
)
|
|
|
|
return Response(
|
|
content=data,
|
|
media_type=mime,
|
|
headers={
|
|
"Accept-Ranges": "bytes",
|
|
"Content-Length": str(total),
|
|
},
|
|
)
|
|
|
|
|
|
@router.post("/{prompt_id}/share")
|
|
async def create_generation_share(prompt_id: str, user: dict = Depends(require_auth)):
|
|
"""Create a share token for a generation. Only the owner may share."""
|
|
# Use the same 404-for-everything helper to avoid leaking prompt_id existence
|
|
_assert_owns(prompt_id, user)
|
|
from generation_db import create_share as db_create_share
|
|
token = db_create_share(prompt_id, user["sub"])
|
|
return {"share_token": token}
|
|
|
|
|
|
@router.delete("/{prompt_id}/share")
|
|
async def revoke_generation_share(prompt_id: str, user: dict = Depends(require_auth)):
|
|
"""Revoke a share token for a generation. Only the owner may revoke."""
|
|
from generation_db import revoke_share as db_revoke_share
|
|
deleted = db_revoke_share(prompt_id, user["sub"])
|
|
if not deleted:
|
|
raise HTTPException(404, "No active share found for this generation")
|
|
return {"ok": True}
|