"""POST /api/admin/login; GET/POST/DELETE /api/admin/tokens""" from __future__ import annotations import hmac import logging from fastapi import APIRouter, Depends, HTTPException, Request, Response, status from pydantic import BaseModel from web.auth import create_jwt, require_admin from web.deps import get_config from web.login_guard import get_guard, get_real_ip router = APIRouter() _COOKIE = "ttb_session" audit = logging.getLogger("audit") class AdminLoginRequest(BaseModel): password: str class CreateTokenRequest(BaseModel): label: str admin: bool = False @router.post("/login") async def admin_login(body: AdminLoginRequest, request: Request, response: Response): """Admin password login → admin JWT cookie.""" config = get_config() expected_pw = config.admin_password if config else None ip = get_real_ip(request) get_guard().check(ip) # Constant-time comparison to prevent timing attacks if not expected_pw or not hmac.compare_digest(body.password, expected_pw): get_guard().record_failure(ip) audit.info("admin.login ip=%s success=False", ip) raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Wrong password") get_guard().record_success(ip) audit.info("admin.login ip=%s success=True", ip) expire_hours = config.web_jwt_expire_hours if config else 8 jwt_token = create_jwt("admin", admin=True, expire_hours=expire_hours) response.set_cookie( _COOKIE, jwt_token, httponly=True, secure=True, samesite="strict", max_age=expire_hours * 3600, ) return {"label": "admin", "admin": True} @router.get("/tokens") async def list_tokens(_: dict = Depends(require_admin)): """List all invite tokens (hashes shown, labels safe).""" from token_store import list_tokens as _list config = get_config() token_file = config.web_token_file if config else "invite_tokens.json" records = _list(token_file) # Don't return hashes to the UI return [{"id": r["id"], "label": r["label"], "admin": r.get("admin", False), "created_at": r.get("created_at")} for r in records] @router.post("/tokens") async def create_token(body: CreateTokenRequest, _: dict = Depends(require_admin)): """Create a new invite token. Returns the plaintext token (shown once).""" from token_store import create_token as _create config = get_config() token_file = config.web_token_file if config else "invite_tokens.json" plaintext = _create(body.label, token_file, admin=body.admin) return {"token": plaintext, "label": body.label, "admin": body.admin} @router.delete("/tokens/{token_id}") async def revoke_token(token_id: str, _: dict = Depends(require_admin)): """Revoke an invite token by ID.""" from token_store import revoke_token as _revoke config = get_config() token_file = config.web_token_file if config else "invite_tokens.json" ok = _revoke(token_id, token_file) if not ok: raise HTTPException(status_code=404, detail="Token not found") return {"ok": True}