Files
comfy-discord-web/web/auth.py
Khoa (Revenovich) Tran Gia 6004b000a7 manual submit
2026-03-07 21:49:16 +07:00

132 lines
3.7 KiB
Python

"""
web/auth.py
===========
JWT authentication for the web UI.
Flow:
- POST /api/auth/login {token} → verify invite token → issue JWT in httpOnly cookie
- All /api/* require valid JWT via require_auth dependency
- POST /api/admin/login {password} → issue admin JWT (admin: true claim)
- WS /ws?token=<jwt> → authenticate via query param
JWT claims: {"sub": "<label>", "admin": bool, "exp": ...}
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import Cookie, Depends, HTTPException, status
from fastapi.security import HTTPBearer
try:
from jose import JWTError, jwt
except ImportError:
jwt = None # type: ignore
JWTError = Exception # type: ignore
logger = logging.getLogger(__name__)
ALGORITHM = "HS256"
_COOKIE_NAME = "ttb_session"
def _get_secret() -> str:
from web.deps import get_config
cfg = get_config()
key = cfg.web_secret_key if cfg else ""
if not key:
raise RuntimeError(
"WEB_SECRET_KEY must be set in the environment — "
"refusing to run with an insecure default."
)
if len(key) < 32:
raise RuntimeError(
"WEB_SECRET_KEY is too short (got %d chars, need ≥ 32)." % len(key)
)
return key
def create_jwt(label: str, *, admin: bool = False, expire_hours: int = 8) -> str:
"""Create a signed JWT for the given user label."""
if jwt is None:
raise RuntimeError("python-jose is not installed (pip install python-jose[cryptography])")
expire = datetime.now(timezone.utc) + timedelta(hours=expire_hours)
payload = {"sub": label, "admin": admin, "exp": expire}
return jwt.encode(payload, _get_secret(), algorithm=ALGORITHM)
def decode_jwt(token: str) -> Optional[dict]:
"""Decode and verify a JWT. Returns the payload or None on failure."""
if jwt is None:
return None
try:
return jwt.decode(token, _get_secret(), algorithms=[ALGORITHM])
except JWTError as exc:
logger.debug("JWT decode failed: %s", exc)
return None
def verify_ws_token(token: str) -> Optional[dict]:
"""Verify a JWT passed as a WebSocket query parameter."""
return decode_jwt(token)
# ---------------------------------------------------------------------------
# FastAPI dependencies
# ---------------------------------------------------------------------------
def require_auth(ttb_session: Optional[str] = Cookie(default=None)) -> dict:
"""
FastAPI dependency that requires a valid JWT cookie.
Returns
-------
dict
The decoded JWT payload (``sub``, ``admin`` fields).
Raises
------
HTTPException 401
If the cookie is absent or the token is invalid/expired.
"""
if not ttb_session:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
)
payload = decode_jwt(ttb_session)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
)
return payload
def optional_auth(ttb_session: Optional[str] = Cookie(default=None)) -> Optional[dict]:
"""Returns decoded JWT payload or None. Never raises 401."""
if not ttb_session:
return None
return decode_jwt(ttb_session)
def require_admin(user: dict = Depends(require_auth)) -> dict:
"""
FastAPI dependency that requires an admin JWT.
Raises
------
HTTPException 403
If the token is valid but not admin.
"""
if not user.get("admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required",
)
return user