my prefect server setup prefect-metrics.waow.tech
python orchestration
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add updated_at to observations + observation review in curate flow

step 1: compact.py gets updated_at in schema and likes writes.
step 2: curate.py gains observation-specific tools (list_users,
list_user_observations, deprecate_observation, update_observation)
and a second agent phase that reviews private observations for
contradictions, staleness, and near-duplicates.

also fixes ruff errors: ambiguous var name, unused exception var,
unused logger import.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 01167335 725b2384

+168 -9
+5 -1
flows/compact.py
··· 257 257 "content": {"type": "string", "full_text_search": True}, 258 258 "tags": {"type": "[]string", "filterable": True}, 259 259 "created_at": {"type": "string"}, 260 + "updated_at": {"type": "string"}, 260 261 }, 261 262 ) 262 263 ··· 453 454 "content": {"type": "string", "full_text_search": True}, 454 455 "tags": {"type": "[]string", "filterable": True}, 455 456 "created_at": {"type": "string"}, 457 + "updated_at": {"type": "string"}, 456 458 } 457 459 458 460 ··· 504 506 pass # proceed with upsert even if delete fails 505 507 506 508 obs_id = _observation_id(handle, content) 509 + now = datetime.now(timezone.utc).isoformat() 507 510 ns.write( 508 511 upsert_rows=[{ 509 512 "id": obs_id, ··· 511 514 "kind": "observation", 512 515 "content": content, 513 516 "tags": tags, 514 - "created_at": datetime.now(timezone.utc).isoformat(), 517 + "created_at": now, 518 + "updated_at": now, 515 519 }], 516 520 distance_metric="cosine_distance", 517 521 schema=USER_NAMESPACE_SCHEMA,
+163 -8
flows/curate.py
··· 18 18 from pydantic_ai import Agent, RunContext 19 19 from pydantic_ai.models.anthropic import AnthropicModel 20 20 from pydantic_ai.providers.anthropic import AnthropicProvider 21 - from prefect import flow, get_run_logger, task 21 + from prefect import flow, task 22 22 from prefect.blocks.system import Secret 23 23 from prefect.cache_policies import NONE 24 24 from prefect.variables import Variable 25 + 26 + from mps.phi import clean_handle 25 27 26 28 PHI_DID = "did:plc:65sucjiel52gefhcdcypynsr" 27 29 PDS_BASE = "https://bsky.social" ··· 60 62 {state} 61 63 """ 62 64 65 + OBSERVATION_REVIEW_PROMPT = """\ 66 + now review your private observations — the facts you've extracted from 67 + conversations with people. use list_users to see who you have memory about, 68 + then list_user_observations to inspect their observations. 69 + 70 + you can use recall to cross-reference against other memory before deciding. 71 + 72 + priorities: 73 + - look for contradictions between observations about the same person 74 + - look for observations that are clearly stale (someone's role or interest changed) 75 + - look for near-duplicates that write-time reconciliation missed 76 + - when in doubt, leave it alone — carrying a marginal observation is better than losing a real one 77 + 78 + use deprecate_observation to remove things that are wrong or redundant. 79 + use update_observation to correct or merge observations. 80 + 81 + if everything looks clean, say so. quality over quantity. 82 + """ 83 + 63 84 64 85 # --------------------------------------------------------------------------- 65 86 # deps & output ··· 215 236 216 237 # orphaned links (links to collections that don't exist) 217 238 coll_uris = {c.get("uri", "") for c in collections} 218 - orphaned = [l for l in collection_links if l.get("value", {}).get("collection", {}).get("uri", "") not in coll_uris] 239 + orphaned = [lnk for lnk in collection_links if lnk.get("value", {}).get("collection", {}).get("uri", "") not in coll_uris] 219 240 if orphaned: 220 241 sections.append(f"## orphaned collection links ({len(orphaned)})") 221 242 for link in orphaned: ··· 339 360 try: 340 361 _create_record(session, "network.cosmik.collectionLink", link_record) 341 362 linked += 1 342 - except Exception as e: 363 + except Exception: 343 364 pass # log but continue 344 365 345 366 return f"created collection '{name}' ({coll_uri}) with {linked}/{len(card_uris)} cards linked" ··· 461 482 return "no relevant memories found" 462 483 return "\n".join(results[:15]) 463 484 485 + # --- observation curation tools --- 486 + 487 + @agent.tool 488 + async def list_users(ctx: RunContext[CurationDeps]) -> str: 489 + """List all user namespaces you have memory about.""" 490 + tpuf = ctx.deps.tpuf_client 491 + try: 492 + page = tpuf.namespaces(prefix="phi-users-") 493 + handles = [] 494 + for ns_summary in page.namespaces: 495 + handle = ns_summary.id.removeprefix("phi-users-").replace("_", ".") 496 + handles.append(f"@{handle}") 497 + if not handles: 498 + return "no user namespaces found" 499 + return f"{len(handles)} users:\n" + "\n".join(handles) 500 + except Exception as e: 501 + return f"failed to list users: {e}" 502 + 503 + @agent.tool 504 + async def list_user_observations(ctx: RunContext[CurationDeps], handle: str) -> str: 505 + """List all observations for a user. Shows content, tags, timestamps, and row ID.""" 506 + tpuf = ctx.deps.tpuf_client 507 + ns_name = f"phi-users-{clean_handle(handle)}" 508 + ns = tpuf.namespace(ns_name) 509 + try: 510 + resp = ns.query( 511 + rank_by=("created_at", "desc"), 512 + top_k=50, 513 + filters={"kind": ["Eq", "observation"]}, 514 + include_attributes=["content", "tags", "created_at", "updated_at"], 515 + ) 516 + if not resp.rows: 517 + return f"no observations for @{handle}" 518 + lines = [] 519 + for row in resp.rows: 520 + tags = list(getattr(row, "tags", []) or []) 521 + tag_str = f" [{', '.join(tags)}]" if tags else "" 522 + created = getattr(row, "created_at", "?") 523 + updated = getattr(row, "updated_at", None) or "never" 524 + lines.append( 525 + f"id={row.id}{tag_str}\n" 526 + f" {row.content}\n" 527 + f" created: {created} | updated: {updated}" 528 + ) 529 + return f"{len(resp.rows)} observations for @{handle}:\n" + "\n".join(lines) 530 + except Exception as e: 531 + return f"failed to list observations for @{handle}: {e}" 532 + 533 + @agent.tool 534 + async def deprecate_observation( 535 + ctx: RunContext[CurationDeps], handle: str, observation_id: str, reason: str 536 + ) -> str: 537 + """Delete an observation from a user's namespace. Logs the reason.""" 538 + tpuf = ctx.deps.tpuf_client 539 + ns_name = f"phi-users-{clean_handle(handle)}" 540 + ns = tpuf.namespace(ns_name) 541 + try: 542 + ns.write(deletes=[observation_id]) 543 + return f"deprecated observation {observation_id} for @{handle}: {reason}" 544 + except Exception as e: 545 + return f"failed to deprecate {observation_id}: {e}" 546 + 547 + @agent.tool 548 + async def update_observation( 549 + ctx: RunContext[CurationDeps], 550 + handle: str, 551 + observation_id: str, 552 + new_content: str, 553 + new_tags: list[str], 554 + ) -> str: 555 + """Re-embed and overwrite an observation with corrected content. Sets fresh updated_at.""" 556 + tpuf = ctx.deps.tpuf_client 557 + openai = ctx.deps.openai_client 558 + ns_name = f"phi-users-{clean_handle(handle)}" 559 + ns = tpuf.namespace(ns_name) 560 + 561 + embedding = openai.embeddings.create( 562 + model="text-embedding-3-small", input=new_content 563 + ).data[0].embedding 564 + 565 + now = datetime.now(timezone.utc).isoformat() 566 + try: 567 + ns.write( 568 + upsert_rows=[{ 569 + "id": observation_id, 570 + "vector": embedding, 571 + "kind": "observation", 572 + "content": new_content, 573 + "tags": new_tags, 574 + "created_at": now, 575 + "updated_at": now, 576 + }], 577 + distance_metric="cosine_distance", 578 + schema={ 579 + "kind": {"type": "string", "filterable": True}, 580 + "content": {"type": "string", "full_text_search": True}, 581 + "tags": {"type": "[]string", "filterable": True}, 582 + "created_at": {"type": "string"}, 583 + "updated_at": {"type": "string"}, 584 + }, 585 + ) 586 + return f"updated observation {observation_id} for @{handle}: {new_content[:80]}" 587 + except Exception as e: 588 + return f"failed to update {observation_id}: {e}" 589 + 464 590 return agent 465 591 466 592 ··· 501 627 return result.output.model_dump() 502 628 503 629 630 + @task(cache_policy=NONE) 631 + async def run_observation_review( 632 + session: dict[str, Any], 633 + tpuf_client: Any, 634 + openai_client: Any, 635 + api_key: str, 636 + model_name: str, 637 + ) -> dict[str, Any]: 638 + """Run the observation review agent loop.""" 639 + agent = _build_agent(model_name, api_key) 640 + deps = CurationDeps( 641 + session=session, 642 + tpuf_client=tpuf_client, 643 + openai_client=openai_client, 644 + ) 645 + result = await agent.run(OBSERVATION_REVIEW_PROMPT, deps=deps) 646 + return result.output.model_dump() 647 + 648 + 504 649 # --------------------------------------------------------------------------- 505 650 # main flow 506 651 # --------------------------------------------------------------------------- ··· 513 658 Triggered by morning flow completion. Uses phi's personality and memory 514 659 to make curation decisions as an agentic loop. 515 660 """ 516 - logger = get_run_logger() 517 - 518 661 anthropic_key = (await Secret.load("anthropic-api-key")).get() 519 662 tpuf_key = (await Secret.load("turbopuffer-api-key")).get() 520 663 openai_key = (await Secret.load("openai-api-key")).get() ··· 547 690 tpuf_client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 548 691 openai_client = OpenAI(api_key=openai_key) 549 692 550 - # run agent 551 - result = await run_curation_agent( 693 + # phase 1: semble record curation 694 + semble_result = await run_curation_agent( 552 695 state_text=state_text, 553 696 session=session, 554 697 tpuf_client=tpuf_client, ··· 556 699 api_key=anthropic_key, 557 700 model_name=model_name, 558 701 ) 702 + print(f"semble curation: {semble_result['actions_taken']} actions — {semble_result['summary']}") 559 703 560 - print(f"curation complete: {result['actions_taken']} actions — {result['summary']}") 704 + # phase 2: observation review 705 + obs_result = await run_observation_review( 706 + session=session, 707 + tpuf_client=tpuf_client, 708 + openai_client=openai_client, 709 + api_key=anthropic_key, 710 + model_name=model_name, 711 + ) 712 + print(f"observation review: {obs_result['actions_taken']} actions — {obs_result['summary']}") 713 + 714 + total_actions = semble_result["actions_taken"] + obs_result["actions_taken"] 715 + print(f"curation complete: {total_actions} total actions") 561 716 562 717 563 718 if __name__ == "__main__":