my prefect server setup prefect-metrics.waow.tech
python orchestration
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

compact flow: populate source_uris on likes-derived observations

the bot now records provenance (source_uris: list[AtUri]) on every
observation row — sources are what the claim is anchored to. the likes
pipeline has the sources for free (the URIs of the posts nate liked that
fed the extraction), but was throwing them away.

changes:
- load_recent_liked_posts SQL now selects subject_uri alongside the
other columns
- LikesObservation gets a source_uris field (default empty). the LLM
doesn't see URIs in its prompt (only post text), so the orchestrator
attaches them post-extraction: every observation for an author gets
the full deduped ordered set of URIs from that author's batch of
liked posts — coarse but always-true
- USER_NAMESPACE_SCHEMA aligned with the bot: added status, supersedes,
source_uris columns that previously diverged
- write_likes_observations_to_turbopuffer writes status="active",
supersedes="", source_uris=[...] so bot reads (which filter on
status != superseded) find them

bot side (separate commit) reads source_uris defensively via
getattr(row, "source_uris", []) so older rows missing the column don't
break anything.

Co-Authored-By: Claude Opus 4 (1M context) <noreply@anthropic.com>

zzstoatzz 159993ff c8b24b6c

+23 -2
+23 -2
flows/compact.py
··· 290 290 default=None, 291 291 description="for UPDATE: the old observation content this replaces", 292 292 ) 293 + # populated post-extraction by the orchestrator from the liked-post URIs 294 + # that fed the LLM call. the model itself doesn't see URIs (only text). 295 + source_uris: list[str] = Field(default_factory=list) 293 296 294 297 295 298 class LikesExtractionResult(BaseModel): ··· 320 323 db = duckdb.connect(snap_path, read_only=True) 321 324 try: 322 325 rows = db.execute(""" 323 - SELECT author_handle, author_did, text, created_at, 326 + SELECT subject_uri, author_handle, author_did, text, created_at, 324 327 liked_at, embed_type, embed_text 325 328 FROM raw_liked_posts 326 329 WHERE liked_at >= (now() - INTERVAL '7 days')::VARCHAR ··· 332 335 return {} 333 336 db.close() 334 337 335 - columns = ["author_handle", "author_did", "text", "created_at", 338 + columns = ["subject_uri", "author_handle", "author_did", "text", "created_at", 336 339 "liked_at", "embed_type", "embed_text"] 337 340 by_author: dict[str, list[dict[str, str]]] = defaultdict(list) 338 341 for row in rows: ··· 451 454 452 455 USER_NAMESPACE_SCHEMA = { 453 456 "kind": {"type": "string", "filterable": True}, 457 + "status": {"type": "string", "filterable": True}, # active | superseded 454 458 "content": {"type": "string", "full_text_search": True}, 455 459 "tags": {"type": "[]string", "filterable": True}, 460 + "supersedes": {"type": "string"}, # id of observation this replaces 461 + "source_uris": {"type": "[]string"}, # AT-URIs backing the observation 456 462 "created_at": {"type": "string"}, 457 463 "updated_at": {"type": "string"}, 458 464 } ··· 512 518 "id": obs_id, 513 519 "vector": embedding, 514 520 "kind": "observation", 521 + "status": "active", 515 522 "content": content, 516 523 "tags": tags, 524 + "supersedes": "", 525 + "source_uris": list(obs.get("source_uris") or []), 517 526 "created_at": now, 518 527 "updated_at": now, 519 528 }], ··· 583 592 obs_dicts = await extract_likes_observations( 584 593 handle, liked_posts_text, existing, bsky_profile, pubs, anthropic_key, 585 594 ) 595 + 596 + # attach the URIs of the liked posts that fed this batch to every 597 + # observation. coarse — the LLM doesn't tell us which post produced 598 + # which observation, but always-true: each extracted claim was 599 + # justified by something in this batch. dedup, preserve order. 600 + batch_uris = list(dict.fromkeys( 601 + p.get("subject_uri", "") for p in posts if p.get("subject_uri") 602 + )) 603 + for obs in obs_dicts: 604 + if not obs.get("source_uris") and batch_uris: 605 + obs["source_uris"] = batch_uris 606 + 586 607 all_observations.extend(obs_dicts) 587 608 588 609 actionable = [o for o in all_observations if o.get("action", "").upper() != "NOOP"]