feat: implement phases 3-5 of Arma 3 UX enhancement plan

Phase 3 - Mod display names + split-pane selector:
- Parse mod.cpp/meta.cpp for display_name and workshop_id
- Rewrite ModList as two-pane available/selected interface

Phase 4 - Player kick/ban from Players tab:
- Add get_by_slot() to PlayerRepository
- Add get_rcon_client() class method to ThreadRegistry
- Add /players/{slot_id}/kick and /ban endpoints
- Rewrite PlayerTable with kick/ban modals and ban presets

Phase 5 - Historical log file browser:
- Add list_log_files() and get_log_file_path() to RPTParser
- Add logfiles_router with GET/download/DELETE endpoints
- Update LogViewer with collapsible log files section (download + delete)
This commit is contained in:
Tran G. (Revernomad) Khoa
2026-04-17 20:47:37 +07:00
parent fe3bd81cae
commit 5a62d21def
14 changed files with 890 additions and 88 deletions

View File

@@ -62,6 +62,36 @@ class RPTParser:
"message": (message or "").strip(),
}
def list_log_files(self, server_dir: Path) -> list[dict]:
"""Return all .rpt log files in server_dir/server/, newest first."""
profile_dir = server_dir / "server"
if not profile_dir.exists():
return []
files = []
for p in profile_dir.glob("*.rpt"):
try:
stat = p.stat()
files.append({
"filename": p.name,
"size_bytes": stat.st_size,
"modified_at": stat.st_mtime,
})
except OSError:
pass
files.sort(key=lambda f: f["modified_at"], reverse=True)
return files
def get_log_file_path(self, server_dir: Path, filename: str) -> Path | None:
"""Return the Path for a specific log file, or None if not found / path traversal attempt."""
import os
profile_dir = server_dir / "server"
target = (profile_dir / filename).resolve()
if not str(target).startswith(str(profile_dir.resolve())):
return None
if not target.exists() or target.suffix != ".rpt":
return None
return target
def get_log_file_resolver(self, server_id: int) -> Callable[[Path], Path | None]:
"""Return a callable that finds the current RPT log file."""
def resolver(server_dir: Path) -> Path | None:

View File

@@ -15,6 +15,24 @@ logger = logging.getLogger(__name__)
_MOD_DIR_PATTERN = re.compile(r"^@.+", re.IGNORECASE)
def _parse_mod_cpp(mod_dir: Path) -> str | None:
mod_cpp = mod_dir / "mod.cpp"
if not mod_cpp.exists():
return None
text = mod_cpp.read_text(errors="ignore")
m = re.search(r'name\s*=\s*"([^"]+)"', text, re.IGNORECASE)
return m.group(1) if m else None
def _parse_meta_cpp(mod_dir: Path) -> str | None:
meta_cpp = mod_dir / "meta.cpp"
if not meta_cpp.exists():
return None
text = meta_cpp.read_text(errors="ignore")
m = re.search(r'publishedid\s*=\s*(\d+)', text, re.IGNORECASE)
return m.group(1) if m else None
class Arma3ModData(BaseModel):
"""Mod data schema for Arma 3."""
workshop_id: str = ""
@@ -60,6 +78,8 @@ class Arma3ModManager:
"name": entry.name,
"path": str(entry.resolve()),
"size_bytes": size,
"display_name": _parse_mod_cpp(entry),
"workshop_id": _parse_meta_cpp(entry),
})
except OSError as exc:
raise AdapterError(f"Cannot scan mod directory: {exc}") from exc

View File

@@ -43,6 +43,12 @@ class PlayerRepository(BaseRepository):
},
)
def get_by_slot(self, server_id: int, slot_id: int) -> dict | None:
return self._fetchone(
"SELECT * FROM players WHERE server_id = :sid AND slot_id = :slot",
{"sid": server_id, "slot": str(slot_id)},
)
def clear(self, server_id: int) -> None:
self._execute("DELETE FROM players WHERE server_id = :sid", {"sid": server_id})

View File

@@ -0,0 +1,80 @@
"""Log file endpoints — list, download, and delete historical RPT log files."""
from __future__ import annotations
import logging
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import FileResponse
from sqlalchemy.engine import Connection
from adapters.registry import GameAdapterRegistry
from core.dal.server_repository import ServerRepository
from core.utils.file_utils import get_server_dir
from database import get_db
from dependencies import get_current_user, require_admin
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/servers/{server_id}/logfiles", tags=["logfiles"])
def _ok(data):
return {"success": True, "data": data, "error": None}
def _get_rpt_parser(server_id: int, db: Connection):
server = ServerRepository(db).get_by_id(server_id)
if server is None:
raise HTTPException(status_code=404, detail="Server not found")
adapter = GameAdapterRegistry.get(server["game_type"])
if not adapter.has_capability("log_parser"):
raise HTTPException(status_code=404, detail="Server does not support log files")
return adapter.get_log_parser(), get_server_dir(server_id)
@router.get("")
def list_log_files(
server_id: int,
db: Annotated[Connection, Depends(get_db)],
_user: Annotated[dict, Depends(get_current_user)],
) -> dict:
parser, server_dir = _get_rpt_parser(server_id, db)
files = parser.list_log_files(server_dir)
return _ok(files)
@router.get("/{filename}/download")
def download_log_file(
server_id: int,
filename: str,
db: Annotated[Connection, Depends(get_db)],
_user: Annotated[dict, Depends(get_current_user)],
):
parser, server_dir = _get_rpt_parser(server_id, db)
path = parser.get_log_file_path(server_dir, filename)
if path is None:
raise HTTPException(status_code=404, detail="Log file not found")
return FileResponse(
path=str(path),
filename=filename,
media_type="text/plain",
)
@router.delete("/{filename}")
def delete_log_file(
server_id: int,
filename: str,
db: Annotated[Connection, Depends(get_db)],
_admin: Annotated[dict, Depends(require_admin)],
) -> dict:
parser, server_dir = _get_rpt_parser(server_id, db)
path = parser.get_log_file_path(server_dir, filename)
if path is None:
raise HTTPException(status_code=404, detail="Log file not found")
try:
path.unlink()
except OSError as exc:
raise HTTPException(status_code=500, detail=f"Could not delete file: {exc}") from exc
return _ok({"message": f"{filename} deleted"})

View File

@@ -5,18 +5,28 @@ import logging
from typing import Annotated
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlalchemy.engine import Connection
from core.dal.player_repository import PlayerRepository
from core.servers.service import ServerService
from database import get_db
from dependencies import get_current_user
from dependencies import get_current_user, require_admin
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/servers/{server_id}/players", tags=["players"])
class KickRequest(BaseModel):
reason: str = "Kicked by admin"
class BanFromPlayerRequest(BaseModel):
reason: str = "Banned by admin"
duration_minutes: int | None = None
def _ok(data):
return {"success": True, "data": data, "error": None}
@@ -54,4 +64,31 @@ def player_history(
total, rows = player_repo.get_history(
server_id=server_id, limit=limit, offset=offset, search=search,
)
return _ok({"total": total, "items": rows})
return _ok({"total": total, "items": rows})
@router.post("/{slot_id}/kick")
def kick_player(
server_id: int,
slot_id: int,
body: KickRequest,
db: Annotated[Connection, Depends(get_db)],
_admin: Annotated[dict, Depends(require_admin)],
) -> dict:
ServerService(db).kick_player(server_id, slot_id, body.reason)
return _ok({"message": f"Player {slot_id} kicked"})
@router.post("/{slot_id}/ban")
def ban_player_from_list(
server_id: int,
slot_id: int,
body: BanFromPlayerRequest,
db: Annotated[Connection, Depends(get_db)],
admin: Annotated[dict, Depends(require_admin)],
) -> dict:
ban = ServerService(db).ban_from_player(
server_id, slot_id, body.reason, body.duration_minutes,
banned_by=admin["username"],
)
return _ok(ban)

View File

@@ -396,6 +396,56 @@ class ServerService:
data[field] = "***"
return sections
def kick_player(self, server_id: int, slot_id: int, reason: str) -> None:
from core.threads.thread_registry import ThreadRegistry
ra = ThreadRegistry.get_rcon_client(server_id)
if not ra or not ra.is_connected():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"code": "RCON_NOT_CONNECTED", "message": "RCon not connected — server must be running"},
)
success = ra.kick_player(int(slot_id), reason)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={"code": "KICK_FAILED", "message": "Kick command failed"},
)
def ban_from_player(
self,
server_id: int,
slot_id: int,
reason: str,
duration_minutes: int | None,
banned_by: str,
) -> dict:
from datetime import datetime, timezone, timedelta
from core.dal.player_repository import PlayerRepository
from core.dal.ban_repository import BanRepository
player = PlayerRepository(self._db).get_by_slot(server_id, slot_id)
if not player:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={"code": "NOT_FOUND", "message": "Player not found"},
)
expires_at = None
if duration_minutes is not None and duration_minutes > 0:
expires_at = (datetime.now(timezone.utc) + timedelta(minutes=duration_minutes)).isoformat()
from core.threads.thread_registry import ThreadRegistry
ra = ThreadRegistry.get_rcon_client(server_id)
if ra and ra.is_connected():
ra.ban_player(player["guid"], duration_minutes or 0, reason)
ban_repo = BanRepository(self._db)
ban_id = ban_repo.create(
server_id=server_id,
guid=player["guid"],
name=player["name"],
reason=reason,
banned_by=banned_by,
expires_at=expires_at,
)
return dict(ban_repo.get_by_id(ban_id))
def get_config_schema(self, server_id: int) -> dict:
server = self.get_server(server_id)
adapter = GameAdapterRegistry.get(server["game_type"])

View File

@@ -90,6 +90,20 @@ class ThreadRegistry:
if registry is not None:
registry._stop_all()
@classmethod
def get_rcon_client(cls, server_id: int):
"""Return the live Arma3RemoteAdmin client for a running server, or None."""
registry = cls._get_instance()
if registry is None:
return None
bundle = registry._bundles.get(server_id)
if bundle is None:
return None
poller = bundle.get("rcon_poller")
if poller is None or not poller.is_alive():
return None
return getattr(poller, "_client", None)
# ── Instance methods ──
def _start_server_threads(self, server_id: int, db) -> None:

View File

@@ -168,6 +168,7 @@ def create_app() -> FastAPI:
from core.servers.bans_router import router as bans_router
from core.servers.missions_router import router as missions_router
from core.servers.mods_router import router as mods_router
from core.servers.logfiles_router import router as logfiles_router
from core.websocket.router import router as ws_router
app.include_router(auth_router, prefix="/api")
@@ -178,6 +179,7 @@ def create_app() -> FastAPI:
app.include_router(bans_router, prefix="/api")
app.include_router(missions_router, prefix="/api")
app.include_router(mods_router, prefix="/api")
app.include_router(logfiles_router, prefix="/api")
app.include_router(ws_router)
return app