"""Arma 3 ban manager — bidirectional sync between DB bans and BattlEye ban file.""" from __future__ import annotations import logging import os from pathlib import Path from pydantic import BaseModel from core.utils.file_utils import get_server_dir logger = logging.getLogger(__name__) _BANS_FILE = "battleye/bans.txt" class Arma3BanData(BaseModel): """Ban data schema for Arma 3.""" guid: str = "" ip: str = "" class Arma3BanManager: """ Implements BanManager protocol for Arma3 BattlEye. Also provides richer file-based operations for the ban endpoints. """ def __init__(self, server_id: int | None = None) -> None: self._server_id = server_id def _bans_path(self) -> Path: if self._server_id is None: raise ValueError("server_id required for file-based ban operations") server_dir = get_server_dir(self._server_id) return server_dir / _BANS_FILE # ── BanManager protocol methods ── def get_ban_file_path(self, server_dir: Path) -> Path: return server_dir / _BANS_FILE def sync_bans_to_file(self, bans: list[dict], ban_file: Path) -> None: """Write bans from DB to BattlEye ban file format.""" lines = [] for ban in bans: identifier = ban.get("player_uid") or ban.get("guid") or ban.get("ip", "") ban_type = ban.get("ban_type", "GUID") reason = ban.get("reason", "") duration = ban.get("duration_minutes", 0) reason_clean = reason.replace("\n", " ").replace("\r", "").strip() if identifier: lines.append(f"{ban_type} {identifier} {duration} {reason_clean}".strip()) ban_file.parent.mkdir(parents=True, exist_ok=True) tmp_path = str(ban_file) + ".tmp" try: with open(tmp_path, "w", encoding="utf-8") as f: f.write("\n".join(lines) + "\n" if lines else "") os.replace(tmp_path, str(ban_file)) except OSError as exc: self._safe_delete(tmp_path) raise def read_bans_from_file(self, ban_file: Path) -> list[dict]: """Read bans from BattlEye ban file into standard format.""" if not ban_file.exists(): return [] bans = [] for line_num, line in enumerate(ban_file.read_text(encoding="utf-8", errors="replace").splitlines(), 1): line = line.strip() if not line or line.startswith("//") or line.startswith("#"): continue parsed = self._parse_ban_line(line, line_num) if parsed: bans.append(parsed) return bans def get_ban_data_schema(self) -> type[BaseModel] | None: return Arma3BanData # ── Richer file-based operations (used by ban endpoints) ── def get_bans(self) -> list[dict]: """Read all bans from bans.txt. Returns list of dicts.""" bans_path = self._bans_path() if not bans_path.exists(): return [] bans = [] try: with open(bans_path, "r", encoding="utf-8", errors="replace") as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line or line.startswith("#"): continue parsed = self._parse_ban_line(line, line_num) if parsed: bans.append(parsed) except OSError as exc: logger.error("Cannot read bans.txt: %s", exc) return bans def add_ban(self, identifier: str, ban_type: str, reason: str, duration_minutes: int) -> None: """Append a ban entry to bans.txt.""" reason_clean = reason.replace("\n", " ").replace("\r", "").strip() line = f"{ban_type} {identifier} {duration_minutes} {reason_clean}\n" bans_path = self._bans_path() bans_path.parent.mkdir(parents=True, exist_ok=True) try: with open(bans_path, "a", encoding="utf-8") as f: f.write(line) except OSError as exc: logger.error("Cannot write bans.txt: %s", exc) def remove_ban(self, identifier: str) -> bool: """Remove all ban entries matching the given identifier. Returns True if removed.""" bans_path = self._bans_path() if not bans_path.exists(): return False try: with open(bans_path, "r", encoding="utf-8", errors="replace") as f: lines = f.readlines() except OSError as exc: logger.error("Cannot read bans.txt: %s", exc) return False new_lines = [] removed = 0 for line in lines: stripped = line.strip() if stripped and not stripped.startswith("#"): parts = stripped.split(None, 3) if len(parts) >= 2 and parts[1] == identifier: removed += 1 continue new_lines.append(line) if removed == 0: return False tmp_path = str(bans_path) + ".tmp" try: with open(tmp_path, "w", encoding="utf-8") as f: f.writelines(new_lines) os.replace(tmp_path, str(bans_path)) except OSError as exc: self._safe_delete(tmp_path) logger.error("Cannot update bans.txt: %s", exc) return False return True # ── Internal ── def _parse_ban_line(self, line: str, line_num: int) -> dict | None: """Parse one ban line: TYPE IDENTIFIER DURATION REASON""" parts = line.split(None, 3) if len(parts) < 2: return None ban_type = parts[0].upper() if ban_type not in ("GUID", "IP"): return None identifier = parts[1] duration = 0 reason = "" if len(parts) >= 3: try: duration = int(parts[2]) except ValueError: duration = 0 if len(parts) >= 4: reason = parts[3] return { "type": ban_type, "identifier": identifier, "duration_minutes": duration, "reason": reason, "is_permanent": duration == 0, } @staticmethod def _safe_delete(path: str) -> None: try: os.unlink(path) except OSError as exc: logger.debug("Arma3BanManager: could not delete %s: %s", path, exc)