Phase 3 - Mod display names + split-pane selector:
- Parse mod.cpp/meta.cpp for display_name and workshop_id
- Rewrite ModList as two-pane available/selected interface
Phase 4 - Player kick/ban from Players tab:
- Add get_by_slot() to PlayerRepository
- Add get_rcon_client() class method to ThreadRegistry
- Add /players/{slot_id}/kick and /ban endpoints
- Rewrite PlayerTable with kick/ban modals and ban presets
Phase 5 - Historical log file browser:
- Add list_log_files() and get_log_file_path() to RPTParser
- Add logfiles_router with GET/download/DELETE endpoints
- Update LogViewer with collapsible log files section (download + delete)
81 lines
2.6 KiB
Python
81 lines
2.6 KiB
Python
"""Log file endpoints — list, download, and delete historical RPT log files."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from fastapi.responses import FileResponse
|
|
from sqlalchemy.engine import Connection
|
|
|
|
from adapters.registry import GameAdapterRegistry
|
|
from core.dal.server_repository import ServerRepository
|
|
from core.utils.file_utils import get_server_dir
|
|
from database import get_db
|
|
from dependencies import get_current_user, require_admin
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/servers/{server_id}/logfiles", tags=["logfiles"])
|
|
|
|
|
|
def _ok(data):
|
|
return {"success": True, "data": data, "error": None}
|
|
|
|
|
|
def _get_rpt_parser(server_id: int, db: Connection):
|
|
server = ServerRepository(db).get_by_id(server_id)
|
|
if server is None:
|
|
raise HTTPException(status_code=404, detail="Server not found")
|
|
adapter = GameAdapterRegistry.get(server["game_type"])
|
|
if not adapter.has_capability("log_parser"):
|
|
raise HTTPException(status_code=404, detail="Server does not support log files")
|
|
return adapter.get_log_parser(), get_server_dir(server_id)
|
|
|
|
|
|
@router.get("")
|
|
def list_log_files(
|
|
server_id: int,
|
|
db: Annotated[Connection, Depends(get_db)],
|
|
_user: Annotated[dict, Depends(get_current_user)],
|
|
) -> dict:
|
|
parser, server_dir = _get_rpt_parser(server_id, db)
|
|
files = parser.list_log_files(server_dir)
|
|
return _ok(files)
|
|
|
|
|
|
@router.get("/{filename}/download")
|
|
def download_log_file(
|
|
server_id: int,
|
|
filename: str,
|
|
db: Annotated[Connection, Depends(get_db)],
|
|
_user: Annotated[dict, Depends(get_current_user)],
|
|
):
|
|
parser, server_dir = _get_rpt_parser(server_id, db)
|
|
path = parser.get_log_file_path(server_dir, filename)
|
|
if path is None:
|
|
raise HTTPException(status_code=404, detail="Log file not found")
|
|
return FileResponse(
|
|
path=str(path),
|
|
filename=filename,
|
|
media_type="text/plain",
|
|
)
|
|
|
|
|
|
@router.delete("/{filename}")
|
|
def delete_log_file(
|
|
server_id: int,
|
|
filename: str,
|
|
db: Annotated[Connection, Depends(get_db)],
|
|
_admin: Annotated[dict, Depends(require_admin)],
|
|
) -> dict:
|
|
parser, server_dir = _get_rpt_parser(server_id, db)
|
|
path = parser.get_log_file_path(server_dir, filename)
|
|
if path is None:
|
|
raise HTTPException(status_code=404, detail="Log file not found")
|
|
try:
|
|
path.unlink()
|
|
except OSError as exc:
|
|
raise HTTPException(status_code=500, detail=f"Could not delete file: {exc}") from exc
|
|
return _ok({"message": f"{filename} deleted"})
|