fix: address design review ACT NOW items (6 risk gaps)
- Add migrate_config() to ConfigGenerator protocol for schema version upgrades - Add per-server operation lock to ProcessManager to prevent start/stop races - Add busy_timeout retry/backoff strategy (exponential: 1s, 2s, 4s) for DB lock exhaustion - Add ConfigForm testing strategy and error boundary for malformed schemas - Add schema cache invalidation on adapter version change - Add ConfigMigrationError to typed adapter exceptions
This commit is contained in:
27
THREADING.md
27
THREADING.md
@@ -137,8 +137,16 @@ This means:
|
||||
# Each background thread creates its own SQLAlchemy connection
|
||||
# from the same engine (WAL mode allows concurrent reads)
|
||||
# PRAGMA busy_timeout=5000 prevents "database is locked" errors
|
||||
#
|
||||
# If busy_timeout is exhausted (5s), the write fails with
|
||||
# OperationalError. Background threads retry with exponential
|
||||
# backoff: 1s, 2s, 4s — then log and skip the tick.
|
||||
# API request handlers retry up to 2 times with 1s backoff,
|
||||
# then return 503 "database temporarily unavailable".
|
||||
|
||||
class BaseServerThread(threading.Thread):
|
||||
_db_retry_delays = [1.0, 2.0, 4.0] # seconds, exponential backoff
|
||||
|
||||
def run(self):
|
||||
engine = get_engine()
|
||||
self._db = engine.connect()
|
||||
@@ -147,6 +155,13 @@ class BaseServerThread(threading.Thread):
|
||||
while not self._stop_event.is_set():
|
||||
try:
|
||||
self.tick()
|
||||
except OperationalError as e:
|
||||
if "database is locked" in str(e):
|
||||
retried = self._retry_db_write(self.tick)
|
||||
if not retried:
|
||||
logger.warning(f"{self.name}: DB locked after all retries, skipping tick")
|
||||
else:
|
||||
self.on_error(e)
|
||||
except Exception as e:
|
||||
self.on_error(e)
|
||||
self._stop_event.wait(self.interval)
|
||||
@@ -155,6 +170,18 @@ class BaseServerThread(threading.Thread):
|
||||
finally:
|
||||
self.teardown()
|
||||
self._db.close()
|
||||
|
||||
def _retry_db_write(self, fn, max_retries=3):
|
||||
for i, delay in enumerate(self._db_retry_delays[:max_retries]):
|
||||
self._stop_event.wait(delay)
|
||||
if self._stop_event.is_set():
|
||||
return False
|
||||
try:
|
||||
fn()
|
||||
return True
|
||||
except OperationalError:
|
||||
continue
|
||||
return False
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
Reference in New Issue
Block a user