Files
comfy-discord-web/face_db.py
Khoa (Revenovich) Tran Gia 6004b000a7 manual submit
2026-03-07 21:49:16 +07:00

716 lines
27 KiB
Python

"""
face_db.py
==========
SQLite persistence for face detection and identity data.
Two tables:
persons : one row per named person
face_detections : one detected face per row, linked to source media
"""
from __future__ import annotations
import json
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
import numpy as np
_DB_PATH: Path = Path(__file__).parent / "faces.db"
_SCHEMA = """
CREATE TABLE IF NOT EXISTS persons (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_persons_name ON persons(LOWER(name));
CREATE TABLE IF NOT EXISTS person_aliases (
id INTEGER PRIMARY KEY AUTOINCREMENT,
person_id INTEGER NOT NULL REFERENCES persons(id) ON DELETE CASCADE,
alias TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_aliases_alias ON person_aliases(LOWER(alias));
CREATE TABLE IF NOT EXISTS face_groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
label TEXT,
threshold REAL NOT NULL,
is_manual INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS face_detections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
person_id INTEGER REFERENCES persons(id) ON DELETE SET NULL,
source_type TEXT NOT NULL,
source_id INTEGER NOT NULL,
frame_index INTEGER NOT NULL DEFAULT 0,
face_index INTEGER NOT NULL DEFAULT 0,
embedding BLOB,
bbox_json TEXT,
created_at TEXT NOT NULL,
identified_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_fd_source ON face_detections(source_type, source_id);
CREATE INDEX IF NOT EXISTS idx_fd_person ON face_detections(person_id);
"""
def _apply_migrations(conn: sqlite3.Connection) -> None:
"""Apply schema migrations that cannot be expressed in CREATE TABLE IF NOT EXISTS."""
# Migration 0: Add group_id column
try:
conn.execute(
"ALTER TABLE face_detections ADD COLUMN group_id INTEGER REFERENCES face_groups(id) ON DELETE SET NULL"
)
conn.commit()
except sqlite3.OperationalError:
pass # column already exists
conn.execute("CREATE INDEX IF NOT EXISTS idx_fd_group ON face_detections(group_id)")
conn.commit()
# Migration A: Make embedding nullable (rebuild table if NOT NULL constraint exists)
ddl_row = conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='face_detections'"
).fetchone()
import re as _re
_ddl_normalized = " ".join((ddl_row["sql"] or "").split()) if ddl_row else ""
if "embedding BLOB NOT NULL" in _ddl_normalized:
conn.execute("""
CREATE TABLE face_detections_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
person_id INTEGER REFERENCES persons(id) ON DELETE SET NULL,
source_type TEXT NOT NULL,
source_id INTEGER NOT NULL,
frame_index INTEGER NOT NULL DEFAULT 0,
face_index INTEGER NOT NULL DEFAULT 0,
embedding BLOB,
bbox_json TEXT,
created_at TEXT NOT NULL,
identified_at TEXT,
group_id INTEGER REFERENCES face_groups(id) ON DELETE SET NULL
)
""")
conn.execute(
"INSERT INTO face_detections_new "
"SELECT id, person_id, source_type, source_id, frame_index, face_index, "
"embedding, bbox_json, created_at, identified_at, group_id FROM face_detections"
)
conn.execute("DROP TABLE face_detections")
conn.execute("ALTER TABLE face_detections_new RENAME TO face_detections")
conn.execute("CREATE INDEX IF NOT EXISTS idx_fd_source ON face_detections(source_type, source_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_fd_person ON face_detections(person_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_fd_group ON face_detections(group_id)")
conn.commit()
# Migration B: NULL out all existing output embeddings (optimize storage)
result = conn.execute(
"UPDATE face_detections SET embedding = NULL WHERE source_type = 'output' AND embedding IS NOT NULL"
)
conn.commit()
if result.rowcount:
import logging as _logging
_logging.getLogger(__name__).info("Migration B: cleared %d output embedding(s)", result.rowcount)
def _connect(db_path: Path | None = None) -> sqlite3.Connection:
path = db_path if db_path is not None else _DB_PATH
conn = sqlite3.connect(str(path), check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def init_db(db_path: Path = _DB_PATH) -> None:
"""Create tables if they don't exist and apply schema migrations."""
global _DB_PATH
_DB_PATH = db_path
with _connect(db_path) as conn:
conn.executescript(_SCHEMA)
conn.commit()
_apply_migrations(conn)
def get_or_create_person(name: str) -> tuple[int, bool]:
"""Find or create a person by name (case-insensitive). Resolves aliases. Returns (person_id, created)."""
with _connect() as conn:
row = conn.execute(
"SELECT id FROM persons WHERE LOWER(name) = LOWER(?)", (name,)
).fetchone()
if row:
return row["id"], False
row = conn.execute(
"SELECT person_id FROM person_aliases WHERE LOWER(alias) = LOWER(?)", (name,)
).fetchone()
if row:
return row["person_id"], False
created_at = datetime.now(timezone.utc).isoformat()
cur = conn.execute(
"INSERT INTO persons (name, created_at) VALUES (?, ?)",
(name, created_at),
)
conn.commit()
return cur.lastrowid, True # type: ignore[return-value]
def person_name_exists(name: str) -> bool:
"""Return True if a person with that name or alias (case-insensitive) already exists."""
with _connect() as conn:
row = conn.execute(
"""SELECT 1 FROM persons WHERE LOWER(name) = LOWER(?)
UNION
SELECT 1 FROM person_aliases WHERE LOWER(alias) = LOWER(?)
LIMIT 1""",
(name, name),
).fetchone()
return row is not None
def get_person_by_name(name: str) -> dict | None:
"""Return person row dict by name (case-insensitive), or None."""
with _connect() as conn:
row = conn.execute(
"SELECT id, name, created_at FROM persons WHERE LOWER(name) = LOWER(?)", (name,)
).fetchone()
return dict(row) if row else None
def list_persons() -> list[dict]:
"""Return all persons sorted by name, each with their aliases list and face_count."""
with _connect() as conn:
persons = conn.execute(
"""SELECT p.id, p.name, p.created_at,
(SELECT COUNT(*) FROM face_detections WHERE person_id = p.id) AS face_count
FROM persons p ORDER BY p.name"""
).fetchall()
result = []
for p in persons:
d = dict(p)
aliases = conn.execute(
"SELECT id, alias FROM person_aliases WHERE person_id = ? ORDER BY alias",
(d["id"],),
).fetchall()
d["aliases"] = [dict(a) for a in aliases]
result.append(d)
return result
def get_unidentified_input_detections(limit: int = 50, offset: int = 0) -> tuple[list[dict], int]:
"""Return paginated ungrouped unidentified face detections from input images, plus total count."""
with _connect() as conn:
total = conn.execute(
"SELECT COUNT(*) FROM face_detections WHERE person_id IS NULL AND group_id IS NULL AND source_type = 'input'"
).fetchone()[0]
rows = conn.execute(
"""SELECT id, source_id, face_index, bbox_json, created_at
FROM face_detections
WHERE person_id IS NULL AND group_id IS NULL AND source_type = 'input'
ORDER BY id
LIMIT ? OFFSET ?""",
(limit, offset),
).fetchall()
return [dict(r) for r in rows], total
def add_alias(person_id: int, alias: str) -> tuple[int, bool]:
"""Add an alias to a person. Returns (alias_id, created). Raises ValueError if alias taken."""
if person_name_exists(alias):
raise ValueError(f"Name or alias '{alias}' is already taken")
created_at = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
cur = conn.execute(
"INSERT INTO person_aliases (person_id, alias, created_at) VALUES (?, ?, ?)",
(person_id, alias, created_at),
)
conn.commit()
return cur.lastrowid, True # type: ignore[return-value]
def remove_alias(alias_id: int) -> None:
"""Delete an alias row by id."""
with _connect() as conn:
conn.execute("DELETE FROM person_aliases WHERE id = ?", (alias_id,))
conn.commit()
def count_detections_for_person(person_id: int) -> int:
"""Return number of detections linked to this person."""
with _connect() as conn:
row = conn.execute(
"SELECT COUNT(*) FROM face_detections WHERE person_id = ?", (person_id,)
).fetchone()
return row[0]
def insert_detection(
source_type: str,
source_id: int,
embedding: "np.ndarray | None",
bbox: dict,
frame_index: int = 0,
face_index: int = 0,
person_id: int | None = None,
) -> int:
"""Insert a face detection row. Returns the new row id."""
embedding_bytes = embedding.astype(np.float32).tobytes() if embedding is not None else None
bbox_json = json.dumps(bbox)
created_at = datetime.now(timezone.utc).isoformat()
identified_at = created_at if person_id is not None else None
with _connect() as conn:
cur = conn.execute(
"""
INSERT INTO face_detections
(person_id, source_type, source_id, frame_index, face_index,
embedding, bbox_json, created_at, identified_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(person_id, source_type, source_id, frame_index, face_index,
embedding_bytes, bbox_json, created_at, identified_at),
)
conn.commit()
return cur.lastrowid # type: ignore[return-value]
def link_detection_to_person(detection_id: int, person_id: int) -> None:
"""Associate a detection with a named person."""
identified_at = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
conn.execute(
"UPDATE face_detections SET person_id = ?, identified_at = ? WHERE id = ?",
(person_id, identified_at, detection_id),
)
conn.commit()
prune_person_output_embeddings(person_id)
def prune_person_output_embeddings(person_id: int, max_keep: int = 5) -> int:
"""Delete oldest output embeddings for a person if they exceed max_keep.
Input embeddings are never pruned. Returns number deleted."""
with _connect() as conn:
rows = conn.execute(
"""SELECT id FROM face_detections
WHERE person_id = ? AND source_type = 'output' AND embedding IS NOT NULL
ORDER BY id ASC""",
(person_id,),
).fetchall()
excess = rows[:max(0, len(rows) - max_keep)]
if not excess:
return 0
ids = [r["id"] for r in excess]
placeholders = ",".join("?" * len(ids))
conn.execute(
f"UPDATE face_detections SET embedding = NULL WHERE id IN ({placeholders})", ids
)
conn.commit()
return len(excess)
def get_detection(detection_id: int) -> dict | None:
"""Return a single face_detection row by id, or None."""
with _connect() as conn:
row = conn.execute(
"SELECT * FROM face_detections WHERE id = ?", (detection_id,)
).fetchone()
return dict(row) if row else None
def get_detections_for_source(source_type: str, source_id: int) -> list[dict]:
"""Return all face detections for a given source."""
with _connect() as conn:
rows = conn.execute(
"SELECT * FROM face_detections WHERE source_type = ? AND source_id = ?",
(source_type, source_id),
).fetchall()
return [dict(r) for r in rows]
def get_all_embeddings() -> list[dict]:
"""Return all identified detections with their embeddings as np.ndarray."""
with _connect() as conn:
rows = conn.execute(
"SELECT id, person_id, embedding FROM face_detections "
"WHERE person_id IS NOT NULL AND embedding IS NOT NULL"
).fetchall()
result = []
for row in rows:
emb = np.frombuffer(bytes(row["embedding"]), dtype=np.float32).copy()
result.append({"id": row["id"], "person_id": row["person_id"], "embedding": emb})
return result
# ---------------------------------------------------------------------------
# Face group CRUD
# ---------------------------------------------------------------------------
def create_group(threshold: float, label: str | None = None, is_manual: bool = False) -> int:
"""Insert a new face_groups row and return its id."""
created_at = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
cur = conn.execute(
"INSERT INTO face_groups (label, threshold, is_manual, created_at) VALUES (?, ?, ?, ?)",
(label, threshold, int(is_manual), created_at),
)
conn.commit()
return cur.lastrowid # type: ignore[return-value]
def assign_detection_to_group(detection_id: int, group_id: int | None) -> None:
"""Set or clear the group_id on a face_detections row."""
with _connect() as conn:
conn.execute(
"UPDATE face_detections SET group_id = ? WHERE id = ?",
(group_id, detection_id),
)
conn.commit()
def clear_all_groups() -> None:
"""Remove all groups: NULL all group_id references then DELETE all face_groups rows."""
with _connect() as conn:
conn.execute("UPDATE face_detections SET group_id = NULL WHERE group_id IS NOT NULL")
conn.execute("DELETE FROM face_groups")
conn.commit()
def get_groups_with_detections() -> list[dict]:
"""Return groups that have ≥ 2 unidentified detections, with detection_ids and preview_ids."""
with _connect() as conn:
groups = conn.execute(
"SELECT id, label, threshold, is_manual, created_at FROM face_groups ORDER BY id"
).fetchall()
result = []
for g in groups:
det_rows = conn.execute(
"SELECT id FROM face_detections WHERE group_id = ? AND person_id IS NULL ORDER BY id",
(g["id"],),
).fetchall()
det_ids = [r["id"] for r in det_rows]
if len(det_ids) < 2:
continue
result.append({
**dict(g),
"count": len(det_ids),
"detection_ids": det_ids,
"preview_ids": det_ids[:4],
})
return result
def get_group_detections(group_id: int) -> list[dict]:
"""Return unidentified detections belonging to a group."""
with _connect() as conn:
rows = conn.execute(
"""SELECT id, source_type, face_index, created_at
FROM face_detections
WHERE group_id = ? AND person_id IS NULL
ORDER BY id""",
(group_id,),
).fetchall()
return [dict(r) for r in rows]
def merge_groups(keep_id: int, discard_id: int) -> None:
"""Move all detections from discard_id to keep_id and delete the discard group."""
with _connect() as conn:
conn.execute(
"UPDATE face_detections SET group_id = ? WHERE group_id = ?",
(keep_id, discard_id),
)
conn.execute("DELETE FROM face_groups WHERE id = ?", (discard_id,))
conn.commit()
def remove_detection_from_group(detection_id: int) -> None:
"""Unassign a detection from its group (set group_id = NULL)."""
assign_detection_to_group(detection_id, None)
def delete_singleton_groups() -> None:
"""Delete groups that have fewer than 2 unidentified members, NULL-ing their refs first."""
with _connect() as conn:
rows = conn.execute(
"""SELECT fg.id FROM face_groups fg
LEFT JOIN face_detections fd ON fd.group_id = fg.id AND fd.person_id IS NULL
GROUP BY fg.id
HAVING COUNT(fd.id) < 2"""
).fetchall()
for row in rows:
gid = row["id"]
conn.execute("UPDATE face_detections SET group_id = NULL WHERE group_id = ?", (gid,))
conn.execute("DELETE FROM face_groups WHERE id = ?", (gid,))
conn.commit()
def delete_group(group_id: int) -> None:
"""Delete a single face_groups row (does not NULL detections — caller must have already cleared them)."""
with _connect() as conn:
conn.execute("DELETE FROM face_groups WHERE id = ?", (group_id,))
conn.commit()
def get_unidentified_embeddings() -> list[dict]:
"""Return embeddings for all unidentified detections (both source types)."""
with _connect() as conn:
rows = conn.execute(
"SELECT id, embedding FROM face_detections WHERE person_id IS NULL AND embedding IS NOT NULL"
).fetchall()
result = []
for row in rows:
emb = np.frombuffer(bytes(row["embedding"]), dtype=np.float32).copy()
result.append({"id": row["id"], "embedding": emb})
return result
def get_all_group_embeddings_with_threshold() -> list[dict]:
"""Return per-group embedding lists along with the group threshold."""
with _connect() as conn:
groups = conn.execute("SELECT id, threshold FROM face_groups").fetchall()
result = []
for g in groups:
rows = conn.execute(
"SELECT embedding FROM face_detections "
"WHERE group_id = ? AND person_id IS NULL AND embedding IS NOT NULL",
(g["id"],),
).fetchall()
embeddings = [
np.frombuffer(bytes(r["embedding"]), dtype=np.float32).copy()
for r in rows
]
if embeddings:
result.append({
"group_id": g["id"],
"threshold": g["threshold"],
"embeddings": embeddings,
})
return result
def get_person_embeddings(person_id: int) -> list[np.ndarray]:
"""Return all embedding arrays for a given person (only rows with non-null embedding)."""
with _connect() as conn:
rows = conn.execute(
"SELECT embedding FROM face_detections WHERE person_id = ? AND embedding IS NOT NULL",
(person_id,),
).fetchall()
return [np.frombuffer(bytes(r["embedding"]), dtype=np.float32).copy() for r in rows]
def get_ungrouped_unidentified_embeddings() -> list[dict]:
"""Return embeddings for unidentified detections that have no group assigned."""
with _connect() as conn:
rows = conn.execute(
"SELECT id, embedding FROM face_detections "
"WHERE person_id IS NULL AND group_id IS NULL AND embedding IS NOT NULL"
).fetchall()
result = []
for row in rows:
emb = np.frombuffer(bytes(row["embedding"]), dtype=np.float32).copy()
result.append({"id": row["id"], "embedding": emb})
return result
# ---------------------------------------------------------------------------
# New person management functions
# ---------------------------------------------------------------------------
def get_person(person_id: int) -> dict | None:
"""Return {id, name, created_at, aliases: [{id, alias}]} for a person, or None."""
with _connect() as conn:
row = conn.execute(
"SELECT id, name, created_at FROM persons WHERE id = ?", (person_id,)
).fetchone()
if row is None:
return None
d = dict(row)
aliases = conn.execute(
"SELECT id, alias FROM person_aliases WHERE person_id = ? ORDER BY alias",
(person_id,),
).fetchall()
d["aliases"] = [dict(a) for a in aliases]
return d
def get_detections_for_person(
person_id: int, limit: int = 50, offset: int = 0
) -> tuple[list[dict], int]:
"""Return paginated detections for a person and total count (no embedding column)."""
with _connect() as conn:
total = conn.execute(
"SELECT COUNT(*) FROM face_detections WHERE person_id = ?", (person_id,)
).fetchone()[0]
rows = conn.execute(
"""SELECT id, source_type, source_id, face_index, frame_index, bbox_json,
created_at, identified_at
FROM face_detections
WHERE person_id = ?
ORDER BY id
LIMIT ? OFFSET ?""",
(person_id, limit, offset),
).fetchall()
return [dict(r) for r in rows], total
def unidentify_detection(detection_id: int) -> None:
"""Remove the person association from a detection."""
with _connect() as conn:
conn.execute(
"UPDATE face_detections SET person_id = NULL, identified_at = NULL WHERE id = ?",
(detection_id,),
)
conn.commit()
def rename_person(person_id: int, new_name: str) -> None:
"""
Rename a person. Allows case-only renames for the same person.
Raises ValueError if new_name is taken by a different person or any alias.
"""
with _connect() as conn:
# Check if name is taken by a different person
row = conn.execute(
"SELECT id FROM persons WHERE LOWER(name) = LOWER(?)", (new_name,)
).fetchone()
if row and row["id"] != person_id:
raise ValueError(f"Name '{new_name}' is already taken by another person")
# Check if name is taken by any alias (aliases belong to any person)
alias_row = conn.execute(
"SELECT person_id FROM person_aliases WHERE LOWER(alias) = LOWER(?)", (new_name,)
).fetchone()
if alias_row:
raise ValueError(f"Name '{new_name}' is already used as an alias")
conn.execute("UPDATE persons SET name = ? WHERE id = ?", (new_name, person_id))
conn.commit()
def delete_person(person_id: int) -> None:
"""
Delete a person: NULL all their detections, delete aliases, delete person row.
Does not rely on FK pragma (which is not enabled by default).
"""
with _connect() as conn:
conn.execute(
"UPDATE face_detections SET person_id = NULL, identified_at = NULL WHERE person_id = ?",
(person_id,),
)
conn.execute("DELETE FROM person_aliases WHERE person_id = ?", (person_id,))
conn.execute("DELETE FROM persons WHERE id = ?", (person_id,))
conn.commit()
def remove_person_from_source_ids(source_ids: list[int], person_id: int, source_type: str = "output") -> int:
"""Delete face_detections linking person_id to any of the given source_ids. Returns deleted count."""
if not source_ids:
return 0
placeholders = ",".join("?" * len(source_ids))
with _connect() as conn:
cur = conn.execute(
f"DELETE FROM face_detections WHERE source_type = ? AND person_id = ? AND source_id IN ({placeholders})",
(source_type, person_id, *source_ids),
)
conn.commit()
return cur.rowcount
def get_persons_for_source_ids(source_ids: list[int], source_type: str = "output") -> list[dict]:
"""Return distinct [{id, name}] for persons detected in the given source IDs."""
if not source_ids:
return []
placeholders = ",".join("?" * len(source_ids))
with _connect() as conn:
rows = conn.execute(
f"""SELECT DISTINCT p.id, p.name
FROM face_detections fd JOIN persons p ON p.id = fd.person_id
WHERE fd.source_type = ? AND fd.source_id IN ({placeholders}) AND fd.person_id IS NOT NULL
ORDER BY p.name""",
(source_type, *source_ids),
).fetchall()
return [dict(r) for r in rows]
def get_source_ids_for_person_query(name_query: str, source_type: str) -> list[int]:
"""Return distinct source_id values where a person/alias matching the substring appears."""
if not name_query.strip():
return []
pattern = f"%{name_query}%"
with _connect() as conn:
rows = conn.execute(
"""SELECT DISTINCT fd.source_id
FROM face_detections fd
JOIN persons p ON p.id = fd.person_id
LEFT JOIN person_aliases pa ON pa.person_id = p.id
WHERE fd.source_type = ? AND fd.person_id IS NOT NULL
AND (LOWER(p.name) LIKE LOWER(?) OR LOWER(pa.alias) LIKE LOWER(?))""",
(source_type, pattern, pattern),
).fetchall()
return [r["source_id"] for r in rows]
def get_persons_for_source_id_map(source_ids: list[int], source_type: str) -> dict[int, list[str]]:
"""Return {source_id: [person_name, …]} for the given source IDs (identified faces only)."""
if not source_ids:
return {}
placeholders = ",".join("?" * len(source_ids))
with _connect() as conn:
rows = conn.execute(
f"""SELECT fd.source_id, p.name
FROM face_detections fd JOIN persons p ON p.id = fd.person_id
WHERE fd.source_type = ? AND fd.source_id IN ({placeholders}) AND fd.person_id IS NOT NULL
ORDER BY fd.source_id, p.name""",
(source_type, *source_ids),
).fetchall()
result: dict[int, list[str]] = {sid: [] for sid in source_ids}
for row in rows:
sid = row["source_id"]
name = row["name"]
if name not in result[sid]:
result[sid].append(name)
return result
def update_detection_embedding(detection_id: int, embedding: "np.ndarray") -> None:
"""Update the embedding for an existing detection row."""
embedding_bytes = embedding.astype(np.float32).tobytes()
with _connect() as conn:
conn.execute(
"UPDATE face_detections SET embedding = ? WHERE id = ?",
(embedding_bytes, detection_id),
)
conn.commit()
def get_null_embedding_output_source_ids() -> list[int]:
"""Distinct source_ids of output detections with NULL embeddings and stored bbox."""
with _connect() as conn:
rows = conn.execute(
"""SELECT DISTINCT source_id FROM face_detections
WHERE source_type = 'output' AND embedding IS NULL
AND bbox_json IS NOT NULL AND bbox_json != '{}'"""
).fetchall()
return [r["source_id"] for r in rows]
def merge_persons(survivor_id: int, other_id: int) -> None:
"""
Absorb other_id into survivor_id: move all detections and aliases, then delete other_id.
Raises ValueError if both ids are the same.
"""
if survivor_id == other_id:
raise ValueError("Cannot merge person into themselves")
with _connect() as conn:
conn.execute(
"UPDATE face_detections SET person_id = ? WHERE person_id = ?",
(survivor_id, other_id),
)
conn.execute(
"UPDATE person_aliases SET person_id = ? WHERE person_id = ?",
(survivor_id, other_id),
)
conn.execute("DELETE FROM persons WHERE id = ?", (other_id,))
conn.commit()