personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

indexer: consolidate segment entity detections into journal entity store

Add consolidate_segment_entities() to think/indexer/journal.py. Called
automatically from scan_journal() before scan_entities() when full=True.

Globs **/agents/entities.jsonl across all day/stream/segment directories,
deduplicates by (name.lower(), type.lower()) with longest-description wins,
writes entities/<slug>/entity.json with source="detected". Skips existing
entity.json files unconditionally to preserve user-managed records. Handles
slug collisions by appending _2, _3, etc.

Fixes P0 blocker: entity graph, strength scoring, and entity intelligence had
no signal on new journals because per-segment agents/entities.jsonl files were
never read into the journal entity store.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

+165 -2
+165 -2
think/indexer/journal.py
··· 26 26 from pathlib import Path 27 27 from typing import Any 28 28 29 - from think.entities.core import entity_slug 29 + from think.entities.core import atomic_write, entity_slug 30 30 from think.formatters import ( 31 31 extract_path_metadata, 32 32 find_formattable_files, ··· 34 34 get_formatter, 35 35 load_jsonl, 36 36 ) 37 - from think.utils import DATE_RE, get_journal 37 + from think.utils import DATE_RE, get_journal, now_ms 38 38 39 39 logger = logging.getLogger(__name__) 40 40 ··· 1235 1235 return bool(to_index or removed) 1236 1236 1237 1237 1238 + def consolidate_segment_entities(journal: str, full: bool = False) -> int: 1239 + """Consolidate per-segment entity detections into the journal entity store. 1240 + 1241 + Reads agents/entities.jsonl files from all day/stream/segment directories, 1242 + deduplicates by (name, type), and writes to entities/<slug>/entity.json for 1243 + new entities. Skips any entity whose entity.json already exists (preserves 1244 + user-managed and previously-consolidated records). 1245 + 1246 + Args: 1247 + journal: Path to journal root directory 1248 + full: If True, scan all day directories. If False, scan today only. 1249 + 1250 + Returns: 1251 + Number of new journal entities written. 1252 + """ 1253 + from datetime import datetime 1254 + 1255 + journal_path = Path(journal) 1256 + today = datetime.now().strftime("%Y%m%d") 1257 + 1258 + # Collect all matching segment entity files across day/stream/segment dirs 1259 + segment_files = [] 1260 + for path in journal_path.glob("**/agents/entities.jsonl"): 1261 + if not path.is_file(): 1262 + continue 1263 + try: 1264 + day = path.relative_to(journal_path).parts[0] 1265 + except (ValueError, IndexError): 1266 + continue 1267 + if not DATE_RE.fullmatch(day): 1268 + continue # Not a journal day directory 1269 + if full or day == today: 1270 + segment_files.append(path) 1271 + 1272 + if not segment_files: 1273 + logger.info( 1274 + "consolidated 0 entities from 0 segment files → 0 new journal entities written" 1275 + ) 1276 + return 0 1277 + 1278 + # Collect and deduplicate entities from all segment files. 1279 + # Key: (name.lower().strip(), type.lower().strip()) 1280 + # Value: {"name": ..., "type": ..., "description": ...} 1281 + seen: dict[tuple[str, str], dict] = {} 1282 + 1283 + for seg_file in segment_files: 1284 + try: 1285 + with open(seg_file, encoding="utf-8") as f: 1286 + for raw in f: 1287 + raw = raw.strip() 1288 + if not raw: 1289 + continue 1290 + try: 1291 + data = json.loads(raw) 1292 + except json.JSONDecodeError as e: 1293 + logger.warning( 1294 + "Skipping malformed JSONL in %s: %s", seg_file, e 1295 + ) 1296 + continue 1297 + 1298 + name = (data.get("name") or "").strip() 1299 + etype = (data.get("type") or "").strip() 1300 + description = (data.get("description") or "").strip() 1301 + 1302 + if not name or not etype: 1303 + continue 1304 + if is_noise_entity(name): 1305 + continue 1306 + 1307 + key = (name.lower(), etype.lower()) 1308 + if key not in seen: 1309 + seen[key] = { 1310 + "name": name, 1311 + "type": etype, 1312 + "description": description, 1313 + } 1314 + elif len(description) > len(seen[key]["description"]): 1315 + # Longest description wins 1316 + seen[key]["description"] = description 1317 + except OSError as e: 1318 + logger.warning("Skipping %s: %s", seg_file, e) 1319 + continue 1320 + 1321 + total_entities = len(seen) 1322 + 1323 + if not seen: 1324 + logger.info( 1325 + "consolidated 0 entities from %d segment files → 0 new journal entities written", 1326 + len(segment_files), 1327 + ) 1328 + return 0 1329 + 1330 + # Write new entities, skipping any entity.json that already exists. 1331 + ts = now_ms() 1332 + written = 0 1333 + 1334 + for (name_lower, _type_lower), data in seen.items(): 1335 + name = data["name"] 1336 + etype = data["type"] 1337 + description = data["description"] 1338 + 1339 + base_slug = entity_slug(name) 1340 + if not base_slug: 1341 + continue 1342 + 1343 + # Resolve slug: skip if entity already exists for this name, handle 1344 + # collisions (different entity at same slug) by appending _2, _3, etc. 1345 + final_slug = None 1346 + for attempt in range(1, 102): 1347 + candidate = base_slug if attempt == 1 else f"{base_slug}_{attempt}" 1348 + candidate_path = journal_path / "entities" / candidate / "entity.json" 1349 + 1350 + if not candidate_path.exists(): 1351 + final_slug = candidate 1352 + break 1353 + 1354 + # Path exists — check if it's the same entity 1355 + try: 1356 + with open(candidate_path, encoding="utf-8") as f: 1357 + existing = json.load(f) 1358 + if (existing.get("name") or "").lower().strip() == name_lower: 1359 + # Entity already exists (prior run or user-created), skip 1360 + break 1361 + # Different entity occupies this slug — try next suffix 1362 + except (json.JSONDecodeError, OSError): 1363 + # Unreadable file — treat as occupied, try next suffix 1364 + continue 1365 + else: 1366 + logger.warning("Too many slug collisions for '%s', skipping", name) 1367 + continue 1368 + 1369 + if final_slug is None: 1370 + continue # Entity already exists 1371 + 1372 + entity: dict[str, Any] = { 1373 + "id": final_slug, 1374 + "name": name, 1375 + "type": etype, 1376 + "source": "detected", 1377 + "created_at": ts, 1378 + "updated_at": ts, 1379 + } 1380 + if description: 1381 + entity["description"] = description 1382 + 1383 + entity_path = journal_path / "entities" / final_slug / "entity.json" 1384 + try: 1385 + content = json.dumps(entity, ensure_ascii=False, indent=2) + "\n" 1386 + atomic_write(entity_path, content, prefix=".entity_") 1387 + written += 1 1388 + except OSError as e: 1389 + logger.warning("Failed to write entity %s: %s", final_slug, e) 1390 + 1391 + logger.info( 1392 + "consolidated %d entities from %d segment files → %d new journal entities written", 1393 + total_entities, 1394 + len(segment_files), 1395 + written, 1396 + ) 1397 + return written 1398 + 1399 + 1238 1400 def scan_journal(journal: str, verbose: bool = False, full: bool = False) -> bool: 1239 1401 """Scan and index journal content. 1240 1402 ··· 1318 1480 "%s indexed, %s removed in %.2f seconds", len(to_index), len(removed), elapsed 1319 1481 ) 1320 1482 1483 + consolidate_segment_entities(journal, full=full) 1321 1484 entity_changed = scan_entities(journal, conn, verbose=verbose, full=full) 1322 1485 signal_changed = scan_signals(journal, conn, verbose=verbose, full=full) 1323 1486