"""Arma 3 RCon service — remote admin via BattleEye RCon protocol.""" from __future__ import annotations import socket import logging import struct from typing import Any from pydantic import BaseModel logger = logging.getLogger(__name__) class Arma3PlayerData(BaseModel): """Player data schema for Arma 3.""" name: str ping: int = 0 guid: str = "" class Arma3RConClient: """BattleEye RCon client for a single connection.""" def __init__(self, host: str, port: int, password: str): self._host = host self._port = port self._password = password self._sock: socket.socket | None = None def _connect(self) -> None: if self._sock is not None: return self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._sock.settimeout(5.0) self._sock.connect((self._host, self._port)) # Login sequence self._login() def _login(self) -> None: if self._sock is None: raise ConnectionError("Not connected") # BE RCon login: send password with checksum password_bytes = self._password.encode("utf-8") checksum = self._compute_checksum(password_bytes) packet = b"\xff" + bytes([0, len(password_bytes) & 0xff]) + checksum + password_bytes self._sock.send(packet) response = self._sock.recv(4096) if not response or response[0] != 0xff: raise ConnectionError("RCon login failed") @staticmethod def _compute_checksum(data: bytes) -> bytes: """Compute BE RCon checksum (sum of bytes) & 0xFF.""" return bytes([sum(data) & 0xFF]) def send_command(self, command: str, timeout: float = 5.0) -> str | None: try: self._connect() if self._sock is None: return None self._sock.settimeout(timeout) cmd_bytes = command.encode("utf-8") checksum = self._compute_checksum(cmd_bytes) packet = b"\xff\x01" + bytes([len(cmd_bytes) & 0xff]) + checksum + cmd_bytes self._sock.send(packet) response = self._sock.recv(4096) if response and len(response) > 2: return response[2:].decode("utf-8", errors="replace") return None except Exception as e: logger.error("RCon command error: %s", e) return None def get_players(self) -> list[dict]: result = self.send_command("players") if result is None: return [] # Parse player list from BE RCon response players = [] for line in result.split("\n"): line = line.strip() if not line or line.startswith("(") or line.startswith("total"): continue parts = line.split(maxsplit=4) if len(parts) >= 5: players.append({ "slot_id": parts[0], "name": parts[3] if len(parts) > 3 else "", "guid": parts[2] if len(parts) > 2 else "", "ping": int(parts[1]) if parts[1].isdigit() else 0, }) return players def kick_player(self, identifier: str, reason: str = "") -> bool: cmd = f"kick {identifier}" if reason: cmd += f" {reason}" result = self.send_command(cmd) return result is not None def ban_player(self, identifier: str, duration_minutes: int, reason: str) -> bool: cmd = f"ban {identifier} {duration_minutes} {reason}" result = self.send_command(cmd) return result is not None def say_all(self, message: str) -> bool: result = self.send_command(f"say {message}") return result is not None def shutdown(self) -> bool: result = self.send_command("#shutdown") return result is not None def keepalive(self) -> None: try: self.send_command("") except Exception as exc: logger.debug("Arma3RConClient: keepalive failed: %s", exc) def disconnect(self) -> None: if self._sock: try: self._sock.close() except Exception as exc: logger.debug("Arma3RConClient: error closing socket: %s", exc) self._sock = None class Arma3RConService: """Factory for Arma 3 RCon clients.""" def create_client(self, host: str, port: int, password: str) -> Arma3RConClient: return Arma3RConClient(host, port, password) def get_startup_delay(self) -> float: return 30.0 def get_poll_interval(self) -> float: return 10.0 def get_player_data_schema(self) -> type[BaseModel] | None: return Arma3PlayerData