feat: add Java→Python migration plan with 9 self-contained phase files
Converts Spring Boot 4.0.3 ARMA Server Web GUI to FastAPI/Python. Each phase file is fully self-contained: lists Java source files to read, output files to create, implementation patterns, REST endpoint contracts, and a completion checklist. A future agent can execute any single phase without rescanning the Java project. Phases: - 01: Foundation — SQLAlchemy models, Alembic, settings, base schemas - 02: Auth & Users — JWT middleware, RBAC, user CRUD - 03: CFG parser + server process — server.cfg round-trip, start/stop - 04: Server settings — general/network/logging/security/difficulty - 05: Mod management — mod CRUD, presets, settings, WebSocket progress - 06: Steam integration — SteamCMD queue, Workshop API, python-a2s - 07: Missions, CDLC, Discord, APScheduler jobs - 08: Middleware & polish — global exception handler, SPA redirect, structlog - 09: Testing — pytest-asyncio, respx, 80% coverage target
This commit is contained in:
647
phases/phase-09-testing.md
Normal file
647
phases/phase-09-testing.md
Normal file
@@ -0,0 +1,647 @@
|
||||
# Phase 9 — Testing
|
||||
|
||||
**Status**: PENDING
|
||||
**Depends on**: Phases 1–8 all complete (`from src.main import app` must import without error)
|
||||
**Next phase**: None — final phase
|
||||
|
||||
---
|
||||
|
||||
## Goal
|
||||
|
||||
Achieve ≥ 80 % line coverage across `src/`. Every public contract tested with real SQLite-in-memory DB and
|
||||
`httpx.AsyncClient` over ASGI transport. No mocking of internal service classes. Steam API HTTP calls mocked
|
||||
with `respx`.
|
||||
|
||||
---
|
||||
|
||||
## Tools & Libraries
|
||||
|
||||
| Package | Purpose |
|
||||
|---------|---------|
|
||||
| `pytest` | Test runner |
|
||||
| `pytest-asyncio` | `asyncio_mode = "auto"` for async fixtures |
|
||||
| `pytest-cov` | Coverage measurement |
|
||||
| `httpx` | ASGI test client |
|
||||
| `respx` | Mock httpx HTTP calls (Steam Web API) |
|
||||
| `aiosqlite` | Async SQLite driver for in-memory test DB |
|
||||
| `sqlalchemy[asyncio]` | Same ORM as production |
|
||||
| `bcrypt` | Password hashing (same as production) |
|
||||
| `python-jose` | JWT signing for test auth tokens |
|
||||
|
||||
Install:
|
||||
```
|
||||
pip install pytest pytest-asyncio pytest-cov httpx respx aiosqlite python-jose
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## pytest.ini / pyproject.toml
|
||||
|
||||
```ini
|
||||
[pytest]
|
||||
asyncio_mode = auto
|
||||
testpaths = tests
|
||||
```
|
||||
|
||||
```
|
||||
pytest --cov=src --cov-report=term-missing --cov-fail-under=80
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Output Files to Create
|
||||
|
||||
```
|
||||
tests/__init__.py
|
||||
tests/conftest.py
|
||||
tests/unit/test_cfg_parser.py
|
||||
tests/unit/test_permissions.py
|
||||
tests/unit/test_password.py
|
||||
tests/unit/test_mod_file_storage.py
|
||||
tests/integration/test_auth.py
|
||||
tests/integration/test_users.py
|
||||
tests/integration/test_status.py
|
||||
tests/integration/test_general_settings.py
|
||||
tests/integration/test_mods.py
|
||||
tests/integration/test_workshop.py
|
||||
tests/integration/test_missions.py
|
||||
tests/integration/test_cdlc.py
|
||||
tests/integration/test_scheduler_jobs.py
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Implementation Notes
|
||||
|
||||
### conftest.py — shared fixtures
|
||||
|
||||
```python
|
||||
# tests/conftest.py
|
||||
import asyncio, pytest, jose.jwt as jwt
|
||||
from datetime import datetime
|
||||
from httpx import AsyncClient, ASGITransport
|
||||
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from src.main import app
|
||||
from src.repository.base import Base, get_db
|
||||
from src.domain.user.models import UserEntity
|
||||
from src.security.password import hash_password
|
||||
|
||||
TEST_DB_URL = "sqlite+aiosqlite:///:memory:"
|
||||
|
||||
# --- Engine (session-scoped: created once per test run) ---
|
||||
@pytest.fixture(scope="session")
|
||||
async def engine():
|
||||
eng = create_async_engine(TEST_DB_URL, echo=False)
|
||||
async with eng.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
yield eng
|
||||
await eng.dispose()
|
||||
|
||||
# --- Per-test session with rollback isolation ---
|
||||
@pytest.fixture
|
||||
async def db_session(engine):
|
||||
async with engine.begin() as conn:
|
||||
session_factory = sessionmaker(
|
||||
bind=conn, class_=AsyncSession, expire_on_commit=False
|
||||
)
|
||||
session = session_factory()
|
||||
try:
|
||||
yield session
|
||||
finally:
|
||||
await session.rollback()
|
||||
await session.close()
|
||||
|
||||
# --- Override FastAPI dependency ---
|
||||
@pytest.fixture(autouse=True)
|
||||
def override_db(db_session):
|
||||
async def _get_test_db():
|
||||
yield db_session
|
||||
app.dependency_overrides[get_db] = _get_test_db
|
||||
yield
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
# --- Seed default user ---
|
||||
@pytest.fixture
|
||||
async def seed_user(db_session):
|
||||
user = UserEntity(
|
||||
username="testuser",
|
||||
password=hash_password("changeme"),
|
||||
enabled=True,
|
||||
)
|
||||
db_session.add(user)
|
||||
await db_session.commit()
|
||||
return user
|
||||
|
||||
# --- JWT with ALL 37 authorities (far-future exp) ---
|
||||
ALL_AUTHORITIES = [
|
||||
"STATUS_VIEW", "MODS_VIEW", "MODS_UPDATE", "MODS_UPLOAD", "MODS_DELETE",
|
||||
"MODS_PRESETS_VIEW", "MODS_PRESETS_UPDATE", "MODS_PRESETS_DELETE",
|
||||
"MOD_SETTINGS_VIEW", "MOD_SETTINGS_UPDATE", "MOD_SETTINGS_DELETE",
|
||||
"MISSION_VIEW", "MISSION_UPDATE", "MISSION_ADD", "MISSION_DELETE", "MISSION_UPLOAD",
|
||||
"CDLC_VIEW", "CDLC_UPDATE",
|
||||
"WORKSHOP_VIEW", "WORKSHOP_INSTALL",
|
||||
"STEAM_SETTINGS_UPDATE",
|
||||
"GENERAL_SETTINGS_VIEW", "GENERAL_SETTINGS_UPDATE",
|
||||
"NETWORK_SETTINGS_VIEW", "NETWORK_SETTINGS_UPDATE",
|
||||
"LOGGING_SETTINGS_VIEW", "LOGGING_SETTINGS_UPDATE",
|
||||
"SECURITY_SETTINGS_VIEW", "SECURITY_SETTINGS_UPDATE",
|
||||
"DIFFICULTY_VIEW", "DIFFICULTY_UPDATE",
|
||||
"USERS_VIEW", "USERS_UPDATE", "USERS_DELETE",
|
||||
"USER_MANAGEMENT_VIEW",
|
||||
"SERVER_START", "SERVER_STOP",
|
||||
]
|
||||
|
||||
@pytest.fixture
|
||||
def auth_headers():
|
||||
payload = {
|
||||
"sub": "testuser",
|
||||
"authorities": ALL_AUTHORITIES,
|
||||
"iat": 1700000000,
|
||||
"exp": 9999999999,
|
||||
}
|
||||
token = jwt.encode(payload, "test-secret", algorithm="HS256")
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
# --- ASGI test client ---
|
||||
@pytest.fixture
|
||||
async def client():
|
||||
async with AsyncClient(
|
||||
transport=ASGITransport(app=app), base_url="http://test"
|
||||
) as ac:
|
||||
yield ac
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Unit Tests
|
||||
|
||||
#### test_cfg_parser.py — CRITICAL (Phase 3 checklist)
|
||||
|
||||
Six round-trip tests for `src/domain/server/config/server_config_parser.py`. Each test:
|
||||
1. Parses a minimal `server.cfg` string
|
||||
2. Serialises it back to string
|
||||
3. Asserts the key fields survive the round-trip unchanged
|
||||
|
||||
```python
|
||||
# tests/unit/test_cfg_parser.py
|
||||
import pytest
|
||||
from src.domain.server.config.server_config_parser import (
|
||||
ServerConfigParser, ServerConfig
|
||||
)
|
||||
|
||||
MINIMAL_CFG = """
|
||||
hostname = "Test Server";
|
||||
maxPlayers = 20;
|
||||
password = "";
|
||||
passwordAdmin = "admin";
|
||||
"""
|
||||
|
||||
def test_hostname_round_trip():
|
||||
cfg = ServerConfigParser.parse(MINIMAL_CFG)
|
||||
out = ServerConfigParser.serialize(cfg)
|
||||
assert 'hostname = "Test Server"' in out
|
||||
|
||||
def test_max_players_round_trip():
|
||||
cfg = ServerConfigParser.parse(MINIMAL_CFG)
|
||||
assert cfg.max_players == 20
|
||||
out = ServerConfigParser.serialize(cfg)
|
||||
assert "maxPlayers = 20" in out
|
||||
|
||||
def test_empty_password_preserved():
|
||||
cfg = ServerConfigParser.parse(MINIMAL_CFG)
|
||||
assert cfg.password == ""
|
||||
out = ServerConfigParser.serialize(cfg)
|
||||
assert 'password = ""' in out
|
||||
|
||||
def test_admin_password_round_trip():
|
||||
cfg = ServerConfigParser.parse(MINIMAL_CFG)
|
||||
assert cfg.password_admin == "admin"
|
||||
out = ServerConfigParser.serialize(cfg)
|
||||
assert 'passwordAdmin = "admin"' in out
|
||||
|
||||
MISSIONS_CFG = """
|
||||
class Missions {
|
||||
class Mission1 {
|
||||
template = "tdm_stratis.Stratis";
|
||||
difficulty = "Regular";
|
||||
};
|
||||
};
|
||||
"""
|
||||
|
||||
def test_missions_block_round_trip():
|
||||
cfg = ServerConfigParser.parse(MISSIONS_CFG)
|
||||
assert len(cfg.missions) == 1
|
||||
assert cfg.missions[0].template == "tdm_stratis.Stratis"
|
||||
out = ServerConfigParser.serialize(cfg)
|
||||
assert "tdm_stratis.Stratis" in out
|
||||
|
||||
def test_empty_cfg_produces_valid_output():
|
||||
cfg = ServerConfig()
|
||||
out = ServerConfigParser.serialize(cfg)
|
||||
assert isinstance(out, str)
|
||||
assert len(out) >= 0
|
||||
```
|
||||
|
||||
#### test_permissions.py
|
||||
|
||||
```python
|
||||
# tests/unit/test_permissions.py
|
||||
import pytest
|
||||
from src.security.permissions import Authority
|
||||
|
||||
def test_authority_enum_has_all_37_values():
|
||||
assert len(Authority) == 37
|
||||
|
||||
def test_all_authorities_are_strings():
|
||||
for a in Authority:
|
||||
assert isinstance(a.value, str)
|
||||
|
||||
async def test_missing_authority_returns_403(client):
|
||||
# Login endpoint: no auth required
|
||||
resp = await client.get("/api/v1/users", headers={})
|
||||
assert resp.status_code == 401
|
||||
```
|
||||
|
||||
#### test_password.py
|
||||
|
||||
```python
|
||||
# tests/unit/test_password.py
|
||||
from src.security.password import hash_password, verify_password
|
||||
|
||||
def test_hash_is_not_plaintext():
|
||||
h = hash_password("secret")
|
||||
assert h != "secret"
|
||||
|
||||
def test_verify_correct_password():
|
||||
h = hash_password("secret")
|
||||
assert verify_password("secret", h) is True
|
||||
|
||||
def test_verify_wrong_password():
|
||||
h = hash_password("secret")
|
||||
assert verify_password("wrong", h) is False
|
||||
```
|
||||
|
||||
#### test_mod_file_storage.py
|
||||
|
||||
```python
|
||||
# tests/unit/test_mod_file_storage.py
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from src.domain.server.mod.mod_file_storage import scan_mod_directories
|
||||
|
||||
def test_scans_at_prefixed_dirs(tmp_path):
|
||||
mods_dir = tmp_path / "mods"
|
||||
(mods_dir / "@ace").mkdir(parents=True)
|
||||
(mods_dir / "@cba").mkdir(parents=True)
|
||||
(mods_dir / "not_a_mod").mkdir(parents=True) # no @ prefix — excluded
|
||||
result = scan_mod_directories(tmp_path)
|
||||
names = [r.mod_directory.directory_name for r in result]
|
||||
assert "@ace" in names
|
||||
assert "@cba" in names
|
||||
assert "not_a_mod" not in names
|
||||
|
||||
def test_reads_meta_cpp(tmp_path):
|
||||
mod_dir = tmp_path / "mods" / "@ace"
|
||||
mod_dir.mkdir(parents=True)
|
||||
(mod_dir / "meta.cpp").write_text('publishedid = 463939057;\nname = "ACE3";')
|
||||
result = scan_mod_directories(tmp_path)
|
||||
assert result[0].workshop_file_id == 463939057
|
||||
assert result[0].name == "ACE3"
|
||||
|
||||
def test_missing_meta_cpp_uses_defaults(tmp_path):
|
||||
mod_dir = tmp_path / "mods" / "@mymod"
|
||||
mod_dir.mkdir(parents=True)
|
||||
result = scan_mod_directories(tmp_path)
|
||||
assert result[0].workshop_file_id == 0
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Integration Tests
|
||||
|
||||
#### test_auth.py
|
||||
|
||||
```python
|
||||
# tests/integration/test_auth.py
|
||||
import pytest
|
||||
|
||||
async def test_login_returns_jwt(client, seed_user):
|
||||
resp = await client.post("/api/v1/auth/token",
|
||||
data={"username": "testuser", "password": "changeme"})
|
||||
assert resp.status_code == 200
|
||||
assert "token" in resp.json()
|
||||
|
||||
async def test_login_wrong_password_returns_401(client, seed_user):
|
||||
resp = await client.post("/api/v1/auth/token",
|
||||
data={"username": "testuser", "password": "wrong"})
|
||||
assert resp.status_code == 401
|
||||
|
||||
async def test_protected_endpoint_without_token_returns_401(client):
|
||||
resp = await client.get("/api/v1/users")
|
||||
assert resp.status_code == 401
|
||||
```
|
||||
|
||||
#### test_users.py
|
||||
|
||||
```python
|
||||
# tests/integration/test_users.py
|
||||
import pytest
|
||||
|
||||
async def test_get_users(client, auth_headers, seed_user):
|
||||
resp = await client.get("/api/v1/users", headers=auth_headers)
|
||||
assert resp.status_code == 200
|
||||
users = resp.json()
|
||||
assert isinstance(users, list)
|
||||
assert any(u["username"] == "testuser" for u in users)
|
||||
|
||||
async def test_create_user(client, auth_headers):
|
||||
resp = await client.post("/api/v1/users",
|
||||
headers=auth_headers,
|
||||
json={"username": "newuser", "password": "pw123",
|
||||
"enabled": True, "authorities": []})
|
||||
assert resp.status_code in (200, 201)
|
||||
|
||||
async def test_delete_user(client, auth_headers, seed_user):
|
||||
resp = await client.delete(f"/api/v1/users/{seed_user.id}",
|
||||
headers=auth_headers)
|
||||
assert resp.status_code == 200
|
||||
```
|
||||
|
||||
#### test_status.py
|
||||
|
||||
```python
|
||||
# tests/integration/test_status.py
|
||||
import pytest
|
||||
from unittest.mock import patch, AsyncMock
|
||||
|
||||
async def test_get_status_returns_json(client, auth_headers):
|
||||
with patch("src.domain.server.process.process_service.ProcessService.get_status",
|
||||
new_callable=AsyncMock, return_value={"status": "OFFLINE"}):
|
||||
resp = await client.get("/api/v1/status", headers=auth_headers)
|
||||
assert resp.status_code == 200
|
||||
assert "status" in resp.json()
|
||||
|
||||
async def test_actuator_health_no_auth(client):
|
||||
resp = await client.get("/api/v1/actuator/health")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "UP"
|
||||
```
|
||||
|
||||
#### test_general_settings.py
|
||||
|
||||
```python
|
||||
# tests/integration/test_general_settings.py
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
FAKE_CONFIG = {
|
||||
"serverName": "Test",
|
||||
"serverDirectory": "/tmp/arma",
|
||||
"steamCmdPath": "",
|
||||
}
|
||||
|
||||
async def test_get_general_settings(client, auth_headers):
|
||||
with patch("src.domain.server.config.server_config_storage.ServerConfigStorage.read",
|
||||
return_value=MagicMock(**FAKE_CONFIG)):
|
||||
resp = await client.get("/api/v1/general", headers=auth_headers)
|
||||
assert resp.status_code == 200
|
||||
|
||||
async def test_update_general_settings(client, auth_headers):
|
||||
with patch("src.domain.server.config.server_config_storage.ServerConfigStorage.write"):
|
||||
resp = await client.post("/api/v1/general",
|
||||
headers=auth_headers,
|
||||
json=FAKE_CONFIG)
|
||||
assert resp.status_code == 200
|
||||
```
|
||||
|
||||
#### test_mods.py
|
||||
|
||||
```python
|
||||
# tests/integration/test_mods.py
|
||||
import pytest
|
||||
from unittest.mock import patch, AsyncMock
|
||||
from src.domain.server.mod.models import ModsCollection
|
||||
|
||||
async def test_get_mods_returns_collection(client, auth_headers):
|
||||
empty = ModsCollection(disabled_mods=[], enabled_mods=[], not_managed_mods=[])
|
||||
with patch("src.domain.server.mod.mod_service.ModService.get_mods_collection",
|
||||
new_callable=AsyncMock, return_value=empty):
|
||||
resp = await client.get("/api/v1/mods", headers=auth_headers)
|
||||
assert resp.status_code == 200
|
||||
body = resp.json()
|
||||
assert "disabledMods" in body
|
||||
assert "enabledMods" in body
|
||||
assert "notManagedMods" in body
|
||||
|
||||
async def test_post_enabled_mods(client, auth_headers):
|
||||
with patch("src.domain.server.mod.mod_service.ModService.save_enabled_mod_list",
|
||||
new_callable=AsyncMock):
|
||||
resp = await client.post("/api/v1/mods/enabled",
|
||||
headers=auth_headers,
|
||||
json={"mods": []})
|
||||
assert resp.status_code == 200
|
||||
```
|
||||
|
||||
#### test_workshop.py — uses respx for Steam API
|
||||
|
||||
```python
|
||||
# tests/integration/test_workshop.py
|
||||
import pytest, respx
|
||||
from httpx import Response
|
||||
|
||||
STEAM_QUERY_URL = "https://api.steampowered.com/IPublishedFileService/QueryFiles/v1/"
|
||||
|
||||
async def test_workshop_active_no_steamcmd(client, auth_headers):
|
||||
resp = await client.get("/api/v1/workshop/active", headers=auth_headers)
|
||||
assert resp.status_code == 200
|
||||
assert "active" in resp.json()
|
||||
|
||||
@respx.mock
|
||||
async def test_workshop_query_returns_mods(client, auth_headers, settings_override):
|
||||
"""settings_override is a fixture that sets steam_web_api_token to 'test-key'."""
|
||||
respx.post(STEAM_QUERY_URL).mock(return_value=Response(200, json={
|
||||
"response": {
|
||||
"next_cursor": "*",
|
||||
"publishedfiledetails": [
|
||||
{
|
||||
"publishedfileid": "123456",
|
||||
"title": "Test Mod",
|
||||
"preview_url": "https://example.com/img.jpg",
|
||||
"children": [],
|
||||
}
|
||||
]
|
||||
}
|
||||
}))
|
||||
resp = await client.post("/api/v1/workshop/query",
|
||||
headers=auth_headers,
|
||||
json={"cursor": "*", "searchText": ""})
|
||||
assert resp.status_code == 200
|
||||
body = resp.json()
|
||||
assert "mods" in body
|
||||
```
|
||||
|
||||
Note: add a `settings_override` conftest fixture that temporarily sets `settings.steam_web_api_token = "test-key"`.
|
||||
|
||||
#### test_missions.py
|
||||
|
||||
```python
|
||||
# tests/integration/test_missions.py
|
||||
import pytest
|
||||
from unittest.mock import patch, AsyncMock
|
||||
from src.domain.server.mission.models import Missions
|
||||
|
||||
async def test_get_missions(client, auth_headers):
|
||||
empty = Missions(disabled_missions=[], enabled_missions=[])
|
||||
with patch("src.domain.server.mission.mission_service.MissionService.get_missions",
|
||||
new_callable=AsyncMock, return_value=empty):
|
||||
resp = await client.get("/api/v1/missions", headers=auth_headers)
|
||||
assert resp.status_code == 200
|
||||
body = resp.json()
|
||||
assert "disabledMissions" in body
|
||||
assert "enabledMissions" in body
|
||||
|
||||
async def test_add_mission_template(client, auth_headers):
|
||||
with patch("src.domain.server.mission.mission_service.MissionService.add_mission",
|
||||
new_callable=AsyncMock):
|
||||
resp = await client.post("/api/v1/missions/template",
|
||||
headers=auth_headers,
|
||||
json={"name": "TDM Stratis",
|
||||
"template": "tdm_stratis.Stratis"})
|
||||
assert resp.status_code == 200
|
||||
```
|
||||
|
||||
#### test_cdlc.py
|
||||
|
||||
```python
|
||||
# tests/integration/test_cdlc.py
|
||||
import pytest
|
||||
from unittest.mock import patch, AsyncMock
|
||||
from src.domain.server.cdlc.models import Cdlc
|
||||
|
||||
async def test_get_cdlcs(client, auth_headers):
|
||||
fake = [Cdlc(id=1, name="gm", enabled=False, file_exists=False)]
|
||||
with patch("src.domain.server.cdlc.cdlc_service.CdlcService.find_all",
|
||||
new_callable=AsyncMock, return_value=fake):
|
||||
resp = await client.get("/api/v1/cdlc", headers=auth_headers)
|
||||
assert resp.status_code == 200
|
||||
body = resp.json()
|
||||
assert "cdlcs" in body
|
||||
assert body["cdlcs"][0]["name"] == "gm"
|
||||
|
||||
async def test_toggle_cdlc(client, auth_headers):
|
||||
with patch("src.domain.server.cdlc.cdlc_service.CdlcService.toggle_cdlc",
|
||||
new_callable=AsyncMock):
|
||||
resp = await client.post("/api/v1/cdlc/1/toggle", headers=auth_headers)
|
||||
assert resp.status_code == 200
|
||||
```
|
||||
|
||||
#### test_scheduler_jobs.py
|
||||
|
||||
```python
|
||||
# tests/integration/test_scheduler_jobs.py
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
async def test_jwt_cleaner_job_runs_without_error():
|
||||
"""Directly calls the job coroutine — does not require the scheduler to be running."""
|
||||
from src.application.scheduler.scheduler import jwt_cleaner_job
|
||||
with patch("src.repository.user.jwt_token_repository.JwtTokenRepository.delete_expired",
|
||||
new_callable=AsyncMock):
|
||||
await jwt_cleaner_job() # must not raise
|
||||
|
||||
async def test_mission_scanner_job_runs_without_error(tmp_path):
|
||||
from src.domain.server.mission.jobs import mission_scanner_job
|
||||
with patch("src.domain.server.mission.jobs.settings") as s:
|
||||
s.server_directory_path = str(tmp_path)
|
||||
with patch("src.domain.server.mission.jobs.mission_repo") as repo:
|
||||
repo.find_all = AsyncMock(return_value=[])
|
||||
repo.save = AsyncMock()
|
||||
await mission_scanner_job()
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Error Response Shape Test
|
||||
|
||||
```python
|
||||
# tests/integration/test_error_handler.py
|
||||
import pytest
|
||||
|
||||
async def test_unknown_api_path_returns_server_error(client, auth_headers):
|
||||
resp = await client.get("/api/v1/nonexistent", headers=auth_headers)
|
||||
assert resp.status_code == 404
|
||||
|
||||
async def test_server_error_returns_code_field(client, auth_headers):
|
||||
"""Force a 500 by patching a service to raise."""
|
||||
from src.domain.server.mod.mod_service import ModService
|
||||
with patch.object(ModService, "get_mods_collection",
|
||||
side_effect=RuntimeError("boom")):
|
||||
resp = await client.get("/api/v1/mods", headers=auth_headers)
|
||||
assert resp.status_code == 500
|
||||
body = resp.json()
|
||||
assert body["code"] == "SERVER_ERROR"
|
||||
assert "message" in body
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### SPA Redirect Test
|
||||
|
||||
```python
|
||||
# tests/integration/test_spa_redirect.py
|
||||
import pytest
|
||||
|
||||
async def test_non_api_path_returns_index_html(client):
|
||||
"""SPA middleware returns index.html for paths without file extension."""
|
||||
import os, tempfile
|
||||
# Create a minimal static/index.html in the working dir for this test
|
||||
os.makedirs("static", exist_ok=True)
|
||||
with open("static/index.html", "w") as f:
|
||||
f.write("<html><body>ASWG</body></html>")
|
||||
resp = await client.get("/mods")
|
||||
# Either 200 with HTML content or redirect — either way, NOT a 404 on /api
|
||||
assert resp.status_code == 200
|
||||
|
||||
async def test_static_asset_not_redirected(client):
|
||||
"""Paths with file extensions (dots) must NOT be redirected to index.html."""
|
||||
resp = await client.get("/main.js")
|
||||
# StaticFiles will return 404 if file absent — that's correct behaviour
|
||||
assert resp.status_code != 200 or "ASWG" not in resp.text
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Completion Checklist
|
||||
|
||||
- [ ] `pytest` runs without import errors on a clean checkout (after `pip install -r requirements.txt`)
|
||||
- [ ] `tests/unit/test_cfg_parser.py` — all 6 round-trip tests pass
|
||||
- [ ] `tests/unit/test_permissions.py` — 37-value enum check passes
|
||||
- [ ] `tests/integration/test_auth.py` — login / 401 / token tests pass
|
||||
- [ ] `tests/integration/test_mods.py` — collection shape and enabled POST pass
|
||||
- [ ] `tests/integration/test_workshop.py` — `respx` mock returns paginated mods
|
||||
- [ ] `tests/integration/test_error_handler.py` — 500 returns `{"code":"SERVER_ERROR",...}`
|
||||
- [ ] Coverage: `pytest --cov=src --cov-fail-under=80` exits 0
|
||||
- [ ] No test imports production `.env` or writes to the real database
|
||||
- [ ] `asyncio_mode = "auto"` in `pytest.ini` (no manual `@pytest.mark.asyncio` needed)
|
||||
|
||||
---
|
||||
|
||||
## Coverage Exclusions
|
||||
|
||||
Add to `.coveragerc` or `pyproject.toml` to avoid penalising untestable glue:
|
||||
|
||||
```ini
|
||||
[coverage:run]
|
||||
omit =
|
||||
src/main.py # ASGI wiring — tested implicitly by client fixture
|
||||
src/alembic/*
|
||||
tests/*
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Contract Satisfied
|
||||
|
||||
All 17 routers, 3 middleware layers, global exception handler, WebSocket endpoint, and two actuator endpoints are exercised through the `AsyncClient(transport=ASGITransport(app=app))` fixture. The test suite constitutes a full regression suite equivalent to the Java integration tests.
|
||||
Reference in New Issue
Block a user