Backend: - Complete FastAPI backend with 42+ REST endpoints (auth, servers, config, players, bans, missions, mods, games, system) - Game adapter architecture with Arma 3 as first-class adapter - WebSocket real-time events for status, metrics, logs, players - Background thread system (process monitor, metrics, log tail, RCon poller) - Fernet encryption for sensitive config fields at rest - JWT auth with admin/viewer roles, bcrypt password hashing - SQLite with WAL mode, parameterized queries, migration system - APScheduler cleanup jobs for logs, metrics, events Frontend: - Server Detail page with 7 tabs (overview, config, players, bans, missions, mods, logs) - Settings page with password change and admin user management - Create Server wizard (4-step; known bug: silent validation failure) - New hooks: useServerDetail, useAuth, useGames - New components: ServerHeader, ConfigEditor, PlayerTable, BanTable, MissionList, ModList, LogViewer, PasswordChange, UserManager - WebSocket onEvent callback for real-time log accumulation - 120 unit tests passing (Vitest + React Testing Library) Docs: - Added .gitignore, CLAUDE.md, README.md - Updated FRONTEND.md, ARCHITECTURE.md with current implementation state - Added .env.example for backend configuration Known issues: - Create Server form: "Next" buttons don't validate before advancing, causing silent submit failure when fields are invalid - Config sub-tabs need UX redesign for non-technical users
200 lines
6.4 KiB
Python
200 lines
6.4 KiB
Python
"""Arma 3 ban manager — bidirectional sync between DB bans and BattlEye ban file."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
|
|
from pydantic import BaseModel
|
|
|
|
from core.utils.file_utils import get_server_dir
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_BANS_FILE = "battleye/bans.txt"
|
|
|
|
|
|
class Arma3BanData(BaseModel):
|
|
"""Ban data schema for Arma 3."""
|
|
guid: str = ""
|
|
ip: str = ""
|
|
|
|
|
|
class Arma3BanManager:
|
|
"""
|
|
Implements BanManager protocol for Arma3 BattlEye.
|
|
|
|
Also provides richer file-based operations for the ban endpoints.
|
|
"""
|
|
|
|
def __init__(self, server_id: int | None = None) -> None:
|
|
self._server_id = server_id
|
|
|
|
def _bans_path(self) -> Path:
|
|
if self._server_id is None:
|
|
raise ValueError("server_id required for file-based ban operations")
|
|
server_dir = get_server_dir(self._server_id)
|
|
return server_dir / _BANS_FILE
|
|
|
|
# ── BanManager protocol methods ──
|
|
|
|
def get_ban_file_path(self, server_dir: Path) -> Path:
|
|
return server_dir / _BANS_FILE
|
|
|
|
def sync_bans_to_file(self, bans: list[dict], ban_file: Path) -> None:
|
|
"""Write bans from DB to BattlEye ban file format."""
|
|
lines = []
|
|
for ban in bans:
|
|
identifier = ban.get("player_uid") or ban.get("guid") or ban.get("ip", "")
|
|
ban_type = ban.get("ban_type", "GUID")
|
|
reason = ban.get("reason", "")
|
|
duration = ban.get("duration_minutes", 0)
|
|
reason_clean = reason.replace("\n", " ").replace("\r", "").strip()
|
|
if identifier:
|
|
lines.append(f"{ban_type} {identifier} {duration} {reason_clean}".strip())
|
|
|
|
ban_file.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp_path = str(ban_file) + ".tmp"
|
|
try:
|
|
with open(tmp_path, "w", encoding="utf-8") as f:
|
|
f.write("\n".join(lines) + "\n" if lines else "")
|
|
os.replace(tmp_path, str(ban_file))
|
|
except OSError as exc:
|
|
self._safe_delete(tmp_path)
|
|
raise
|
|
|
|
def read_bans_from_file(self, ban_file: Path) -> list[dict]:
|
|
"""Read bans from BattlEye ban file into standard format."""
|
|
if not ban_file.exists():
|
|
return []
|
|
|
|
bans = []
|
|
for line_num, line in enumerate(ban_file.read_text(encoding="utf-8", errors="replace").splitlines(), 1):
|
|
line = line.strip()
|
|
if not line or line.startswith("//") or line.startswith("#"):
|
|
continue
|
|
|
|
parsed = self._parse_ban_line(line, line_num)
|
|
if parsed:
|
|
bans.append(parsed)
|
|
|
|
return bans
|
|
|
|
def get_ban_data_schema(self) -> type[BaseModel] | None:
|
|
return Arma3BanData
|
|
|
|
# ── Richer file-based operations (used by ban endpoints) ──
|
|
|
|
def get_bans(self) -> list[dict]:
|
|
"""Read all bans from bans.txt. Returns list of dicts."""
|
|
bans_path = self._bans_path()
|
|
if not bans_path.exists():
|
|
return []
|
|
|
|
bans = []
|
|
try:
|
|
with open(bans_path, "r", encoding="utf-8", errors="replace") as f:
|
|
for line_num, line in enumerate(f, 1):
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
parsed = self._parse_ban_line(line, line_num)
|
|
if parsed:
|
|
bans.append(parsed)
|
|
except OSError as exc:
|
|
logger.error("Cannot read bans.txt: %s", exc)
|
|
|
|
return bans
|
|
|
|
def add_ban(self, identifier: str, ban_type: str, reason: str, duration_minutes: int) -> None:
|
|
"""Append a ban entry to bans.txt."""
|
|
reason_clean = reason.replace("\n", " ").replace("\r", "").strip()
|
|
line = f"{ban_type} {identifier} {duration_minutes} {reason_clean}\n"
|
|
|
|
bans_path = self._bans_path()
|
|
bans_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
with open(bans_path, "a", encoding="utf-8") as f:
|
|
f.write(line)
|
|
except OSError as exc:
|
|
logger.error("Cannot write bans.txt: %s", exc)
|
|
|
|
def remove_ban(self, identifier: str) -> bool:
|
|
"""Remove all ban entries matching the given identifier. Returns True if removed."""
|
|
bans_path = self._bans_path()
|
|
if not bans_path.exists():
|
|
return False
|
|
|
|
try:
|
|
with open(bans_path, "r", encoding="utf-8", errors="replace") as f:
|
|
lines = f.readlines()
|
|
except OSError as exc:
|
|
logger.error("Cannot read bans.txt: %s", exc)
|
|
return False
|
|
|
|
new_lines = []
|
|
removed = 0
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
if stripped and not stripped.startswith("#"):
|
|
parts = stripped.split(None, 3)
|
|
if len(parts) >= 2 and parts[1] == identifier:
|
|
removed += 1
|
|
continue
|
|
new_lines.append(line)
|
|
|
|
if removed == 0:
|
|
return False
|
|
|
|
tmp_path = str(bans_path) + ".tmp"
|
|
try:
|
|
with open(tmp_path, "w", encoding="utf-8") as f:
|
|
f.writelines(new_lines)
|
|
os.replace(tmp_path, str(bans_path))
|
|
except OSError as exc:
|
|
self._safe_delete(tmp_path)
|
|
logger.error("Cannot update bans.txt: %s", exc)
|
|
return False
|
|
|
|
return True
|
|
|
|
# ── Internal ──
|
|
|
|
def _parse_ban_line(self, line: str, line_num: int) -> dict | None:
|
|
"""Parse one ban line: TYPE IDENTIFIER DURATION REASON"""
|
|
parts = line.split(None, 3)
|
|
if len(parts) < 2:
|
|
return None
|
|
|
|
ban_type = parts[0].upper()
|
|
if ban_type not in ("GUID", "IP"):
|
|
return None
|
|
|
|
identifier = parts[1]
|
|
duration = 0
|
|
reason = ""
|
|
|
|
if len(parts) >= 3:
|
|
try:
|
|
duration = int(parts[2])
|
|
except ValueError:
|
|
duration = 0
|
|
|
|
if len(parts) >= 4:
|
|
reason = parts[3]
|
|
|
|
return {
|
|
"type": ban_type,
|
|
"identifier": identifier,
|
|
"duration_minutes": duration,
|
|
"reason": reason,
|
|
"is_permanent": duration == 0,
|
|
}
|
|
|
|
@staticmethod
|
|
def _safe_delete(path: str) -> None:
|
|
try:
|
|
os.unlink(path)
|
|
except OSError as exc:
|
|
logger.debug("Arma3BanManager: could not delete %s: %s", path, exc) |