Files
Tran G. (Revernomad) Khoa 6511353b55 feat: implement full backend + frontend server detail, settings, and create server pages
Backend:
- Complete FastAPI backend with 42+ REST endpoints (auth, servers, config,
  players, bans, missions, mods, games, system)
- Game adapter architecture with Arma 3 as first-class adapter
- WebSocket real-time events for status, metrics, logs, players
- Background thread system (process monitor, metrics, log tail, RCon poller)
- Fernet encryption for sensitive config fields at rest
- JWT auth with admin/viewer roles, bcrypt password hashing
- SQLite with WAL mode, parameterized queries, migration system
- APScheduler cleanup jobs for logs, metrics, events

Frontend:
- Server Detail page with 7 tabs (overview, config, players, bans,
  missions, mods, logs)
- Settings page with password change and admin user management
- Create Server wizard (4-step; known bug: silent validation failure)
- New hooks: useServerDetail, useAuth, useGames
- New components: ServerHeader, ConfigEditor, PlayerTable, BanTable,
  MissionList, ModList, LogViewer, PasswordChange, UserManager
- WebSocket onEvent callback for real-time log accumulation
- 120 unit tests passing (Vitest + React Testing Library)

Docs:
- Added .gitignore, CLAUDE.md, README.md
- Updated FRONTEND.md, ARCHITECTURE.md with current implementation state
- Added .env.example for backend configuration

Known issues:
- Create Server form: "Next" buttons don't validate before advancing,
  causing silent submit failure when fields are invalid
- Config sub-tabs need UX redesign for non-technical users
2026-04-17 11:58:34 +07:00

200 lines
6.4 KiB
Python

"""Arma 3 ban manager — bidirectional sync between DB bans and BattlEye ban file."""
from __future__ import annotations
import logging
import os
from pathlib import Path
from pydantic import BaseModel
from core.utils.file_utils import get_server_dir
logger = logging.getLogger(__name__)
_BANS_FILE = "battleye/bans.txt"
class Arma3BanData(BaseModel):
"""Ban data schema for Arma 3."""
guid: str = ""
ip: str = ""
class Arma3BanManager:
"""
Implements BanManager protocol for Arma3 BattlEye.
Also provides richer file-based operations for the ban endpoints.
"""
def __init__(self, server_id: int | None = None) -> None:
self._server_id = server_id
def _bans_path(self) -> Path:
if self._server_id is None:
raise ValueError("server_id required for file-based ban operations")
server_dir = get_server_dir(self._server_id)
return server_dir / _BANS_FILE
# ── BanManager protocol methods ──
def get_ban_file_path(self, server_dir: Path) -> Path:
return server_dir / _BANS_FILE
def sync_bans_to_file(self, bans: list[dict], ban_file: Path) -> None:
"""Write bans from DB to BattlEye ban file format."""
lines = []
for ban in bans:
identifier = ban.get("player_uid") or ban.get("guid") or ban.get("ip", "")
ban_type = ban.get("ban_type", "GUID")
reason = ban.get("reason", "")
duration = ban.get("duration_minutes", 0)
reason_clean = reason.replace("\n", " ").replace("\r", "").strip()
if identifier:
lines.append(f"{ban_type} {identifier} {duration} {reason_clean}".strip())
ban_file.parent.mkdir(parents=True, exist_ok=True)
tmp_path = str(ban_file) + ".tmp"
try:
with open(tmp_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines) + "\n" if lines else "")
os.replace(tmp_path, str(ban_file))
except OSError as exc:
self._safe_delete(tmp_path)
raise
def read_bans_from_file(self, ban_file: Path) -> list[dict]:
"""Read bans from BattlEye ban file into standard format."""
if not ban_file.exists():
return []
bans = []
for line_num, line in enumerate(ban_file.read_text(encoding="utf-8", errors="replace").splitlines(), 1):
line = line.strip()
if not line or line.startswith("//") or line.startswith("#"):
continue
parsed = self._parse_ban_line(line, line_num)
if parsed:
bans.append(parsed)
return bans
def get_ban_data_schema(self) -> type[BaseModel] | None:
return Arma3BanData
# ── Richer file-based operations (used by ban endpoints) ──
def get_bans(self) -> list[dict]:
"""Read all bans from bans.txt. Returns list of dicts."""
bans_path = self._bans_path()
if not bans_path.exists():
return []
bans = []
try:
with open(bans_path, "r", encoding="utf-8", errors="replace") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line or line.startswith("#"):
continue
parsed = self._parse_ban_line(line, line_num)
if parsed:
bans.append(parsed)
except OSError as exc:
logger.error("Cannot read bans.txt: %s", exc)
return bans
def add_ban(self, identifier: str, ban_type: str, reason: str, duration_minutes: int) -> None:
"""Append a ban entry to bans.txt."""
reason_clean = reason.replace("\n", " ").replace("\r", "").strip()
line = f"{ban_type} {identifier} {duration_minutes} {reason_clean}\n"
bans_path = self._bans_path()
bans_path.parent.mkdir(parents=True, exist_ok=True)
try:
with open(bans_path, "a", encoding="utf-8") as f:
f.write(line)
except OSError as exc:
logger.error("Cannot write bans.txt: %s", exc)
def remove_ban(self, identifier: str) -> bool:
"""Remove all ban entries matching the given identifier. Returns True if removed."""
bans_path = self._bans_path()
if not bans_path.exists():
return False
try:
with open(bans_path, "r", encoding="utf-8", errors="replace") as f:
lines = f.readlines()
except OSError as exc:
logger.error("Cannot read bans.txt: %s", exc)
return False
new_lines = []
removed = 0
for line in lines:
stripped = line.strip()
if stripped and not stripped.startswith("#"):
parts = stripped.split(None, 3)
if len(parts) >= 2 and parts[1] == identifier:
removed += 1
continue
new_lines.append(line)
if removed == 0:
return False
tmp_path = str(bans_path) + ".tmp"
try:
with open(tmp_path, "w", encoding="utf-8") as f:
f.writelines(new_lines)
os.replace(tmp_path, str(bans_path))
except OSError as exc:
self._safe_delete(tmp_path)
logger.error("Cannot update bans.txt: %s", exc)
return False
return True
# ── Internal ──
def _parse_ban_line(self, line: str, line_num: int) -> dict | None:
"""Parse one ban line: TYPE IDENTIFIER DURATION REASON"""
parts = line.split(None, 3)
if len(parts) < 2:
return None
ban_type = parts[0].upper()
if ban_type not in ("GUID", "IP"):
return None
identifier = parts[1]
duration = 0
reason = ""
if len(parts) >= 3:
try:
duration = int(parts[2])
except ValueError:
duration = 0
if len(parts) >= 4:
reason = parts[3]
return {
"type": ban_type,
"identifier": identifier,
"duration_minutes": duration,
"reason": reason,
"is_permanent": duration == 0,
}
@staticmethod
def _safe_delete(path: str) -> None:
try:
os.unlink(path)
except OSError as exc:
logger.debug("Arma3BanManager: could not delete %s: %s", path, exc)