personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(entities): relocate consolidation out of indexer + add fuzzy gate

Extract `consolidate_segment_entities` out of `think/indexer/journal.py`
into `think/entities/consolidation.py` as `consolidate_detected_entities`.
Insert a `find_matching_entity` gate (threshold 85) before any new slug
is written, and route all writes through `save_journal_entity` instead
of a direct `atomic_write`. Remove the implicit call from `scan_journal`
so it is pure with respect to `journal/entities/` state. Expose the
operation as `sol call entities consolidate [--full]`.

Also relocates `is_noise_entity` + `_NOISE_ENTITY_RE` to
`think/entities/core.py` so the new module does not cross into
`think.indexer`, and updates `apps/graph/routes.py` to follow.

This closes V1 in the layer-hygiene allowlist. Historical duplicates
(8 Jeremie-variant entities on the live journal) are NOT cleaned up
here — a separate merge pass will handle them.

Regression coverage: `test_scan_journal_is_pure_wrt_entity_state` in
`tests/test_journal_index.py` seeds a detection file and snapshots
`journal/entities/` across two full scans. Five unit/CLI tests in
`tests/test_entities_consolidation.py` cover create, fuzzy-skip,
unrelated-create, idempotence, and CLI dispatch.

Parent plan: vpe/workspace/plan-bundle-a-entity-write-ownership.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+354 -189
+10
apps/entities/call.py
··· 12 12 13 13 import typer 14 14 15 + from think.entities.consolidation import consolidate_detected_entities 15 16 from think.entities.core import entity_slug, is_valid_entity_type 16 17 from think.entities.journal import ( 17 18 clear_journal_entity_cache, ··· 426 427 params={"entity": entity, "name": resolved_name, "aka": aka_value}, 427 428 ) 428 429 typer.echo(f"Added alias '{aka_value}' to '{resolved_name}'.") 430 + 431 + 432 + @app.command() 433 + def consolidate( 434 + full: bool = typer.Option(False, "--full", help="Scan all days, not just today."), 435 + ) -> None: 436 + """Consolidate segment-detected entities into journal identities.""" 437 + n = consolidate_detected_entities(get_journal(), full=full) 438 + typer.echo(f"Wrote {n} new entities.") 429 439 430 440 431 441 @app.command("observations")
+1 -1
apps/graph/routes.py
··· 11 11 12 12 from flask import Blueprint, jsonify, render_template, request 13 13 14 + from think.entities.core import is_noise_entity 14 15 from think.indexer.journal import ( 15 16 get_entity_intelligence, 16 17 get_entity_strength, 17 18 get_journal_index, 18 19 get_principal_entity_names, 19 - is_noise_entity, 20 20 ) 21 21 22 22 graph_bp = Blueprint(
+1 -1
docs/coding-standards.md
··· 67 67 68 68 | Domain | Write-owning module(s) | 69 69 |--------|------------------------| 70 - | Entities (`entities/*/entity.json`, `entities/*/*.npz`) | `think/entities/saving.py` + `apps/entities/call.py` | 70 + | Entities (`entities/*/entity.json`, `entities/*/*.npz`) | `think/entities/journal.py` + `think/entities/consolidation.py` + `think/entities/saving.py` + `apps/entities/call.py` | 71 71 | Facets (`facets/*/facet.json`, `facets/*/relationships/`) | `think/facets.py` + `apps/facets/*` (if/when created) | 72 72 | Observations (`observations.jsonl`) | `think/entities/observations.py` | 73 73 | Activities (`facets/*/activities/*.jsonl`) | `think/activities.py` |
-3
scripts/check_layer_hygiene.py
··· 109 109 # 110 110 # Audit ref: vpe/workspace/solstone-layer-violations-audit.md (extro repo). 111 111 ALLOWLIST: dict[str, str] = { 112 - # TODO(V1): consolidate_segment_entities() stealth-writes entities from 113 - # the indexer. Remove after Bundle A (entity-write ownership) lands. 114 - "think/indexer/journal.py": "V1", 115 112 # TODO(V2): seed_entities() creates entities from importer shared code. 116 113 # Indirect writes go through save_journal_entity(), so the direct-write 117 114 # grep does not flag the file today. Keep the entry so the file is
+133
tests/test_entities_consolidation.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import json 5 + import logging 6 + from datetime import datetime 7 + from pathlib import Path 8 + 9 + from typer.testing import CliRunner 10 + 11 + from think.call import call_app 12 + from think.entities.consolidation import consolidate_detected_entities 13 + from think.entities.journal import save_journal_entity 14 + 15 + runner = CliRunner() 16 + 17 + 18 + def _seed_detection( 19 + journal_path: Path, 20 + detection: dict[str, str], 21 + *, 22 + day: str | None = None, 23 + segment: str = "120000_300", 24 + ) -> Path: 25 + day = day or datetime.now().strftime("%Y%m%d") 26 + talents_dir = journal_path / "chronicle" / day / "default" / segment / "talents" 27 + talents_dir.mkdir(parents=True, exist_ok=True) 28 + path = talents_dir / "entities.jsonl" 29 + path.write_text(json.dumps(detection) + "\n", encoding="utf-8") 30 + return path 31 + 32 + 33 + def _entity_file_count(journal_path: Path) -> int: 34 + return sum(1 for _ in (journal_path / "entities").glob("*/entity.json")) 35 + 36 + 37 + def test_consolidate_writes_new_entity(journal_copy): 38 + journal_path = Path(journal_copy) 39 + _seed_detection( 40 + journal_path, 41 + { 42 + "name": "Zephyr Quartz Index", 43 + "type": "Project", 44 + "description": "Unique regression seed", 45 + }, 46 + ) 47 + 48 + written = consolidate_detected_entities(str(journal_path), full=True) 49 + 50 + assert written == 1 51 + entity_path = journal_path / "entities" / "zephyr_quartz_index" / "entity.json" 52 + assert entity_path.exists() 53 + entity = json.loads(entity_path.read_text(encoding="utf-8")) 54 + assert entity == { 55 + "id": "zephyr_quartz_index", 56 + "name": "Zephyr Quartz Index", 57 + "type": "Project", 58 + "source": "detected", 59 + "created_at": entity["created_at"], 60 + "updated_at": entity["updated_at"], 61 + "description": "Unique regression seed", 62 + } 63 + 64 + 65 + def test_consolidate_skips_fuzzy_match(journal_copy, caplog): 66 + journal_path = Path(journal_copy) 67 + before_count = _entity_file_count(journal_path) 68 + save_journal_entity( 69 + {"id": "jeremie_miller", "name": "Jeremie Miller", "type": "Person"} 70 + ) 71 + _seed_detection( 72 + journal_path, 73 + { 74 + "name": "Jeremee Miler", 75 + "type": "Person", 76 + "description": "Should fuzzy-match and skip", 77 + }, 78 + ) 79 + 80 + with caplog.at_level(logging.INFO): 81 + written = consolidate_detected_entities(str(journal_path), full=True) 82 + 83 + assert written == 0 84 + assert _entity_file_count(journal_path) == before_count + 1 85 + assert ( 86 + "consolidate: 1 detections, 1 matched-skipped (1 fuzzy, 0 exact), 0 new entities" 87 + in caplog.text 88 + ) 89 + 90 + 91 + def test_consolidate_creates_for_unrelated_name(journal_copy): 92 + journal_path = Path(journal_copy) 93 + save_journal_entity( 94 + {"id": "jeremie_miller", "name": "Jeremie Miller", "type": "Person"} 95 + ) 96 + _seed_detection( 97 + journal_path, 98 + { 99 + "name": "Quillon Vastworth", 100 + "type": "Person", 101 + "description": "Unique unrelated entity", 102 + }, 103 + ) 104 + 105 + written = consolidate_detected_entities(str(journal_path), full=True) 106 + 107 + assert written == 1 108 + assert (journal_path / "entities" / "quillon_vastworth" / "entity.json").exists() 109 + 110 + 111 + def test_consolidate_is_idempotent(journal_copy): 112 + journal_path = Path(journal_copy) 113 + _seed_detection( 114 + journal_path, 115 + { 116 + "name": "Zephyr Quartz Index", 117 + "type": "Project", 118 + "description": "Unique regression seed", 119 + }, 120 + ) 121 + 122 + first = consolidate_detected_entities(str(journal_path), full=True) 123 + second = consolidate_detected_entities(str(journal_path), full=True) 124 + 125 + assert first == 1 126 + assert second == 0 127 + 128 + 129 + def test_cli_consolidate_dispatches(journal_copy): 130 + result = runner.invoke(call_app, ["entities", "consolidate", "--full"]) 131 + 132 + assert result.exit_code == 0 133 + assert "Wrote" in result.stdout
+39
tests/test_journal_index.py
··· 3 3 4 4 """Tests for the unified journal index.""" 5 5 6 + import hashlib 6 7 import json 7 8 import os 8 9 from datetime import datetime ··· 1742 1743 ).fetchone()[0] 1743 1744 conn.close() 1744 1745 assert count1 == count2 1746 + 1747 + 1748 + def test_scan_journal_is_pure_wrt_entity_state(journal_copy): 1749 + """scan_journal must not mutate journal/entities/ state.""" 1750 + from think.indexer.journal import scan_journal 1751 + 1752 + journal_path = Path(journal_copy) 1753 + today = datetime.now().strftime("%Y%m%d") 1754 + segment_dir = ( 1755 + journal_path / "chronicle" / today / "default" / "120000_300" / "talents" 1756 + ) 1757 + segment_dir.mkdir(parents=True) 1758 + (segment_dir / "entities.jsonl").write_text( 1759 + '{"name":"Zephyr Quartz Index","type":"Project","description":"Unique regression seed"}\n', 1760 + encoding="utf-8", 1761 + ) 1762 + 1763 + def snapshot_entities(root: Path) -> list[tuple[str, str]]: 1764 + entries = [] 1765 + for path in sorted((root / "entities").rglob("*")): 1766 + if not path.is_file(): 1767 + continue 1768 + rel = path.relative_to(root).as_posix() 1769 + digest = hashlib.sha256(path.read_bytes()).hexdigest() 1770 + entries.append((rel, digest)) 1771 + return entries 1772 + 1773 + snap_before = snapshot_entities(journal_path) 1774 + scan_journal(str(journal_path), full=True) 1775 + snap_between = snapshot_entities(journal_path) 1776 + scan_journal(str(journal_path), full=True) 1777 + snap_after = snapshot_entities(journal_path) 1778 + 1779 + assert snap_before == snap_between == snap_after, ( 1780 + "scan_journal() mutated journal/entities/ — see " 1781 + "vpe/workspace/plan-bundle-a-entity-write-ownership.md and " 1782 + "docs/coding-standards.md § L6" 1783 + )
+155
think/entities/consolidation.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Consolidate segment-detected entities into journal identities.""" 5 + 6 + import json 7 + import logging 8 + from datetime import datetime 9 + from pathlib import Path 10 + 11 + from think.entities.core import entity_slug, is_noise_entity 12 + from think.entities.journal import load_all_journal_entities, save_journal_entity 13 + from think.entities.matching import MatchTier, find_matching_entity 14 + from think.utils import CHRONICLE_DIR, DATE_RE, now_ms 15 + 16 + logger = logging.getLogger(__name__) 17 + 18 + 19 + def consolidate_detected_entities( 20 + journal: str, 21 + full: bool = False, 22 + fuzzy_threshold: int = 85, 23 + ) -> int: 24 + """Consolidate segment-detected entities into journal identities.""" 25 + journal_path = Path(journal) 26 + day_root = ( 27 + journal_path / CHRONICLE_DIR 28 + if (journal_path / CHRONICLE_DIR).is_dir() 29 + else journal_path 30 + ) 31 + today = datetime.now().strftime("%Y%m%d") 32 + 33 + segment_files = [] 34 + for path in day_root.glob("**/talents/entities.jsonl"): 35 + if not path.is_file(): 36 + continue 37 + try: 38 + day = path.relative_to(day_root).parts[0] 39 + except (ValueError, IndexError): 40 + continue 41 + if not DATE_RE.fullmatch(day): 42 + continue 43 + if full or day == today: 44 + segment_files.append(path) 45 + 46 + seen: dict[tuple[str, str], dict[str, str]] = {} 47 + for seg_file in segment_files: 48 + try: 49 + with open(seg_file, encoding="utf-8") as f: 50 + for raw in f: 51 + raw = raw.strip() 52 + if not raw: 53 + continue 54 + try: 55 + data = json.loads(raw) 56 + except json.JSONDecodeError as e: 57 + logger.warning( 58 + "Skipping malformed JSONL in %s: %s", seg_file, e 59 + ) 60 + continue 61 + 62 + name = (data.get("name") or "").strip() 63 + etype = (data.get("type") or "").strip() 64 + description = (data.get("description") or "").strip() 65 + 66 + if not name or not etype or is_noise_entity(name): 67 + continue 68 + 69 + key = (name.lower(), etype.lower()) 70 + if key not in seen: 71 + seen[key] = { 72 + "name": name, 73 + "type": etype, 74 + "description": description, 75 + } 76 + elif len(description) > len(seen[key]["description"]): 77 + seen[key]["description"] = description 78 + except OSError as e: 79 + logger.warning("Skipping %s: %s", seg_file, e) 80 + 81 + entities_list = list(load_all_journal_entities().values()) 82 + total_detections = len(seen) 83 + fuzzy_count = 0 84 + exact_count = 0 85 + new_count = 0 86 + ts = now_ms() 87 + 88 + for (name_lower, _type_lower), data in seen.items(): 89 + name = data["name"] 90 + etype = data["type"] 91 + description = data["description"] 92 + 93 + match = find_matching_entity( 94 + name, entities_list, fuzzy_threshold=fuzzy_threshold 95 + ) 96 + if match: 97 + if match.tier == MatchTier.FUZZY: 98 + fuzzy_count += 1 99 + else: 100 + exact_count += 1 101 + continue 102 + 103 + base_slug = entity_slug(name) 104 + if not base_slug: 105 + continue 106 + 107 + final_slug = None 108 + for attempt in range(1, 102): 109 + candidate = base_slug if attempt == 1 else f"{base_slug}_{attempt}" 110 + candidate_path = journal_path / "entities" / candidate / "entity.json" 111 + 112 + if not candidate_path.exists(): 113 + final_slug = candidate 114 + break 115 + 116 + try: 117 + with open(candidate_path, encoding="utf-8") as f: 118 + existing = json.load(f) 119 + if (existing.get("name") or "").lower().strip() == name_lower: 120 + break 121 + except (json.JSONDecodeError, OSError): 122 + continue 123 + else: 124 + logger.warning("Too many slug collisions for '%s', skipping", name) 125 + continue 126 + 127 + if final_slug is None: 128 + continue 129 + 130 + entity = { 131 + "id": final_slug, 132 + "name": name, 133 + "type": etype, 134 + "source": "detected", 135 + "created_at": ts, 136 + "updated_at": ts, 137 + } 138 + if description: 139 + entity["description"] = description 140 + 141 + try: 142 + save_journal_entity(entity) 143 + new_count += 1 144 + except OSError as e: 145 + logger.warning("Failed to write entity %s: %s", final_slug, e) 146 + 147 + logger.info( 148 + "consolidate: %d detections, %d matched-skipped (%d fuzzy, %d exact), %d new entities", 149 + total_detections, 150 + fuzzy_count + exact_count, 151 + fuzzy_count, 152 + exact_count, 153 + new_count, 154 + ) 155 + return new_count
+14
think/entities/core.py
··· 42 42 # Maximum length for entity slug before truncation 43 43 MAX_ENTITY_SLUG_LENGTH = 200 44 44 45 + # Noise entity patterns — transcript artifacts that should not be indexed. 46 + # Matches "Speaker N", "Unknown/Unidentified <word>" (single-word role). 47 + # Multi-word patterns ("Speaker Diarization") and bare "Unknown" are kept. 48 + _NOISE_ENTITY_RE = re.compile( 49 + r"^Speaker \d+(?:\s*\(.*\))?$" 50 + r"|^(?:Unknown|Unidentified) \w+(?:\s*\d+)?(?:\s*\(.*\))?$", 51 + re.IGNORECASE, 52 + ) 53 + 45 54 46 55 def get_identity_names() -> list[str]: 47 56 """Get all names/aliases for the journal principal from identity config. ··· 137 146 return bool( 138 147 re.match(r"^[A-Za-z0-9 ]+$", etype) and re.search(r"[A-Za-z0-9]", etype) 139 148 ) 149 + 150 + 151 + def is_noise_entity(name: str) -> bool: 152 + """Return True if the entity name is a transcript artifact.""" 153 + return bool(_NOISE_ENTITY_RE.match(name)) 140 154 141 155 142 156 def entity_slug(name: str) -> str:
+1 -184
think/indexer/journal.py
··· 27 27 from pathlib import Path 28 28 from typing import Any 29 29 30 - from think.entities.core import atomic_write, entity_slug 30 + from think.entities.core import entity_slug, is_noise_entity 31 31 from think.formatters import ( 32 32 extract_path_metadata, 33 33 find_formattable_files, ··· 41 41 DATE_RE, 42 42 get_journal, 43 43 journal_relative_path, 44 - now_ms, 45 44 resolve_journal_path, 46 45 segment_key, 47 46 segment_parse, 48 47 ) 49 48 50 49 logger = logging.getLogger(__name__) 51 - 52 - # Noise entity patterns — transcript artifacts that should not be indexed. 53 - # Matches "Speaker N", "Unknown/Unidentified <word>" (single-word role). 54 - # Multi-word patterns ("Speaker Diarization") and bare "Unknown" are kept. 55 - _NOISE_ENTITY_RE = re.compile( 56 - r"^Speaker \d+(?:\s*\(.*\))?$" 57 - r"|^(?:Unknown|Unidentified) \w+(?:\s*\d+)?(?:\s*\(.*\))?$", 58 - re.IGNORECASE, 59 - ) 60 - 61 - 62 - def is_noise_entity(name: str) -> bool: 63 - """Return True if the entity name is a transcript artifact (noise).""" 64 - return bool(_NOISE_ENTITY_RE.match(name)) 65 50 66 51 67 52 def _strip_md_formatting(text: str) -> str: ··· 1479 1464 return count 1480 1465 1481 1466 1482 - def consolidate_segment_entities(journal: str, full: bool = False) -> int: 1483 - """Consolidate per-segment entity detections into the journal entity store. 1484 - 1485 - Reads agents/entities.jsonl files from all day/stream/segment directories, 1486 - deduplicates by (name, type), and writes to entities/<slug>/entity.json for 1487 - new entities. Skips any entity whose entity.json already exists (preserves 1488 - user-managed and previously-consolidated records). 1489 - 1490 - Args: 1491 - journal: Path to journal root directory 1492 - full: If True, scan all day directories. If False, scan today only. 1493 - 1494 - Returns: 1495 - Number of new journal entities written. 1496 - """ 1497 - from datetime import datetime 1498 - 1499 - journal_path = Path(journal) 1500 - day_root = ( 1501 - journal_path / CHRONICLE_DIR 1502 - if (journal_path / CHRONICLE_DIR).is_dir() 1503 - else journal_path 1504 - ) 1505 - today = datetime.now().strftime("%Y%m%d") 1506 - 1507 - # Collect all matching segment entity files across day/stream/segment dirs 1508 - segment_files = [] 1509 - for path in day_root.glob("**/talents/entities.jsonl"): 1510 - if not path.is_file(): 1511 - continue 1512 - try: 1513 - day = path.relative_to(day_root).parts[0] 1514 - except (ValueError, IndexError): 1515 - continue 1516 - if not DATE_RE.fullmatch(day): 1517 - continue # Not a journal day directory 1518 - if full or day == today: 1519 - segment_files.append(path) 1520 - 1521 - if not segment_files: 1522 - logger.info( 1523 - "consolidated 0 entities from 0 segment files → 0 new journal entities written" 1524 - ) 1525 - return 0 1526 - 1527 - # Collect and deduplicate entities from all segment files. 1528 - # Key: (name.lower().strip(), type.lower().strip()) 1529 - # Value: {"name": ..., "type": ..., "description": ...} 1530 - seen: dict[tuple[str, str], dict] = {} 1531 - 1532 - for seg_file in segment_files: 1533 - try: 1534 - with open(seg_file, encoding="utf-8") as f: 1535 - for raw in f: 1536 - raw = raw.strip() 1537 - if not raw: 1538 - continue 1539 - try: 1540 - data = json.loads(raw) 1541 - except json.JSONDecodeError as e: 1542 - logger.warning( 1543 - "Skipping malformed JSONL in %s: %s", seg_file, e 1544 - ) 1545 - continue 1546 - 1547 - name = (data.get("name") or "").strip() 1548 - etype = (data.get("type") or "").strip() 1549 - description = (data.get("description") or "").strip() 1550 - 1551 - if not name or not etype: 1552 - continue 1553 - if is_noise_entity(name): 1554 - continue 1555 - 1556 - key = (name.lower(), etype.lower()) 1557 - if key not in seen: 1558 - seen[key] = { 1559 - "name": name, 1560 - "type": etype, 1561 - "description": description, 1562 - } 1563 - elif len(description) > len(seen[key]["description"]): 1564 - # Longest description wins 1565 - seen[key]["description"] = description 1566 - except OSError as e: 1567 - logger.warning("Skipping %s: %s", seg_file, e) 1568 - continue 1569 - 1570 - total_entities = len(seen) 1571 - 1572 - if not seen: 1573 - logger.info( 1574 - "consolidated 0 entities from %d segment files → 0 new journal entities written", 1575 - len(segment_files), 1576 - ) 1577 - return 0 1578 - 1579 - # Write new entities, skipping any entity.json that already exists. 1580 - ts = now_ms() 1581 - written = 0 1582 - 1583 - for (name_lower, _type_lower), data in seen.items(): 1584 - name = data["name"] 1585 - etype = data["type"] 1586 - description = data["description"] 1587 - 1588 - base_slug = entity_slug(name) 1589 - if not base_slug: 1590 - continue 1591 - 1592 - # Resolve slug: skip if entity already exists for this name, handle 1593 - # collisions (different entity at same slug) by appending _2, _3, etc. 1594 - final_slug = None 1595 - for attempt in range(1, 102): 1596 - candidate = base_slug if attempt == 1 else f"{base_slug}_{attempt}" 1597 - candidate_path = journal_path / "entities" / candidate / "entity.json" 1598 - 1599 - if not candidate_path.exists(): 1600 - final_slug = candidate 1601 - break 1602 - 1603 - # Path exists — check if it's the same entity 1604 - try: 1605 - with open(candidate_path, encoding="utf-8") as f: 1606 - existing = json.load(f) 1607 - if (existing.get("name") or "").lower().strip() == name_lower: 1608 - # Entity already exists (prior run or user-created), skip 1609 - break 1610 - # Different entity occupies this slug — try next suffix 1611 - except (json.JSONDecodeError, OSError): 1612 - # Unreadable file — treat as occupied, try next suffix 1613 - continue 1614 - else: 1615 - logger.warning("Too many slug collisions for '%s', skipping", name) 1616 - continue 1617 - 1618 - if final_slug is None: 1619 - continue # Entity already exists 1620 - 1621 - entity: dict[str, Any] = { 1622 - "id": final_slug, 1623 - "name": name, 1624 - "type": etype, 1625 - "source": "detected", 1626 - "created_at": ts, 1627 - "updated_at": ts, 1628 - } 1629 - if description: 1630 - entity["description"] = description 1631 - 1632 - entity_path = journal_path / "entities" / final_slug / "entity.json" 1633 - try: 1634 - content = json.dumps(entity, ensure_ascii=False, indent=2) + "\n" 1635 - atomic_write(entity_path, content, prefix=".entity_") 1636 - written += 1 1637 - except OSError as e: 1638 - logger.warning("Failed to write entity %s: %s", final_slug, e) 1639 - 1640 - logger.info( 1641 - "consolidated %d entities from %d segment files → %d new journal entities written", 1642 - total_entities, 1643 - len(segment_files), 1644 - written, 1645 - ) 1646 - return written 1647 - 1648 - 1649 1467 def scan_journal(journal: str, verbose: bool = False, full: bool = False) -> bool: 1650 1468 """Scan and index journal content. 1651 1469 ··· 1758 1576 len(affected_segments), 1759 1577 ) 1760 1578 1761 - consolidate_segment_entities(journal, full=full) 1762 1579 entity_changed = scan_entities(journal, conn, verbose=verbose, full=full) 1763 1580 signal_changed = scan_signals(journal, conn, verbose=verbose, full=full) 1764 1581