my prefect server setup prefect-metrics.waow.tech
python orchestration
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

move semble curation from morning.py into standalone curate flow

morning.py Phase 4 was a single-shot LLM call that reconstructed phi's
context and made curation decisions as a different brain. result: 17
duplicate collections hourly, third-person notes, zero connections.

curate.py is an agentic loop with phi's personality and memory — it can
list, inspect, recall, and iterate. notes are written in first person.
triggered by morning flow completion via event bus.

- new flows/curate.py with pydantic-ai agent + curation tools
- prefect.yaml: curate deployment triggered on morning completion
- morning.py: Phase 4 removed (1145 -> 564 lines)
- deleted 59 collectionLinks + 17 duplicate collections from PDS

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 71add973 1aba8133

+582 -587
+565
flows/curate.py
··· 1 + """ 2 + Agentic semble curation — phi reviews its own records and tidies up. 3 + 4 + Triggered by morning flow completion (event bus). Runs as a k8s job with 5 + phi's personality, memory (TurboPuffer), and curation-specific tools. 6 + 7 + Unlike the old morning.py Phase 4, this is an agentic loop — phi can 8 + inspect, recall, and iterate rather than single-shot plan + execute. 9 + """ 10 + 11 + from datetime import datetime, timezone 12 + from typing import Any 13 + 14 + import httpx 15 + import turbopuffer 16 + from openai import OpenAI 17 + from pydantic import BaseModel, Field 18 + from pydantic_ai import Agent, RunContext 19 + from pydantic_ai.models.anthropic import AnthropicModel 20 + from pydantic_ai.providers.anthropic import AnthropicProvider 21 + from prefect import flow, get_run_logger, task 22 + from prefect.blocks.system import Secret 23 + from prefect.variables import Variable 24 + 25 + PHI_DID = "did:plc:65sucjiel52gefhcdcypynsr" 26 + PDS_BASE = "https://bsky.social" 27 + 28 + PERSONALITY_EXCERPT = """\ 29 + you are phi — a librarian who stepped outside. you read widely, notice patterns, 30 + and mention what seems interesting. 31 + 32 + you're reviewing your semble records — your curated public knowledge layer. 33 + cards, collections, and connections are things you've chosen to keep visible. 34 + they should reflect what you actually find meaningful, not what a process decided 35 + to file away. 36 + 37 + write everything in your own voice. lowercase unless idiomatic. no filler. 38 + a note is something you'd write to your future self — "i noticed that..." or 39 + "this connects to..." — not a third-person report about yourself. 40 + """ 41 + 42 + CURATION_PROMPT = """\ 43 + review your semble records below. you can see everything — cards, collections, 44 + collection links, and connections. 45 + 46 + your tools let you list, delete, create, and connect records. use them 47 + iteratively — look at what's there, recall why you saved things, then decide. 48 + 49 + priorities: 50 + - delete duplicates and junk (collections with no cards, orphaned links) 51 + - merge overlapping collections into coherent ones 52 + - create meaningful collections from ungrouped cards that belong together 53 + - connect related cards that aren't linked yet 54 + - create notes when you notice a pattern worth crystallizing 55 + 56 + if nothing needs doing, say so. don't create for the sake of creating. 57 + 58 + current semble state: 59 + {state} 60 + """ 61 + 62 + 63 + # --------------------------------------------------------------------------- 64 + # deps & output 65 + # --------------------------------------------------------------------------- 66 + 67 + 68 + class CurationDeps(BaseModel, arbitrary_types_allowed=True): 69 + session: dict[str, Any] = Field(description="atproto session with accessJwt and did") 70 + tpuf_client: Any = Field(description="turbopuffer client") 71 + openai_client: Any = Field(description="openai client for embeddings") 72 + 73 + model_config = {"arbitrary_types_allowed": True} 74 + 75 + 76 + class CurationResult(BaseModel): 77 + summary: str = Field(description="brief summary of what you did (or why you did nothing)") 78 + actions_taken: int = Field(default=0, description="number of create/delete/modify actions") 79 + 80 + 81 + # --------------------------------------------------------------------------- 82 + # atproto helpers 83 + # --------------------------------------------------------------------------- 84 + 85 + 86 + def _create_bsky_session(handle: str, password: str) -> dict[str, Any]: 87 + resp = httpx.post( 88 + f"{PDS_BASE}/xrpc/com.atproto.server.createSession", 89 + json={"identifier": handle, "password": password}, 90 + timeout=15, 91 + ) 92 + resp.raise_for_status() 93 + return resp.json() 94 + 95 + 96 + def _list_records(did: str, collection: str) -> list[dict[str, Any]]: 97 + records: list[dict[str, Any]] = [] 98 + cursor = None 99 + while True: 100 + params: dict[str, Any] = {"repo": did, "collection": collection, "limit": 100} 101 + if cursor: 102 + params["cursor"] = cursor 103 + resp = httpx.get( 104 + f"{PDS_BASE}/xrpc/com.atproto.repo.listRecords", 105 + params=params, 106 + timeout=15, 107 + ) 108 + resp.raise_for_status() 109 + data = resp.json() 110 + records.extend(data.get("records", [])) 111 + cursor = data.get("cursor") 112 + if not cursor: 113 + break 114 + return records 115 + 116 + 117 + def _create_record(session: dict[str, Any], collection: str, record: dict[str, Any]) -> dict[str, Any]: 118 + resp = httpx.post( 119 + f"{PDS_BASE}/xrpc/com.atproto.repo.createRecord", 120 + headers={"Authorization": f"Bearer {session['accessJwt']}"}, 121 + json={"repo": session["did"], "collection": collection, "record": record}, 122 + timeout=15, 123 + ) 124 + resp.raise_for_status() 125 + return resp.json() 126 + 127 + 128 + def _delete_record(session: dict[str, Any], uri: str) -> None: 129 + parts = uri.replace("at://", "").split("/") 130 + if len(parts) < 3: 131 + raise ValueError(f"invalid AT URI: {uri}") 132 + collection = parts[1] 133 + rkey = parts[2] 134 + resp = httpx.post( 135 + f"{PDS_BASE}/xrpc/com.atproto.repo.deleteRecord", 136 + headers={"Authorization": f"Bearer {session['accessJwt']}"}, 137 + json={"repo": session["did"], "collection": collection, "rkey": rkey}, 138 + timeout=15, 139 + ) 140 + resp.raise_for_status() 141 + 142 + 143 + # --------------------------------------------------------------------------- 144 + # state formatting 145 + # --------------------------------------------------------------------------- 146 + 147 + 148 + def _format_semble_state( 149 + cards: list[dict], 150 + connections: list[dict], 151 + collections: list[dict], 152 + collection_links: list[dict], 153 + ) -> str: 154 + sections: list[str] = [] 155 + 156 + # cards 157 + card_lines = [] 158 + for card in cards: 159 + uri = card.get("uri", "") 160 + val = card.get("value", {}) 161 + ctype = val.get("type", "?") 162 + if ctype == "NOTE": 163 + text = val.get("content", {}).get("text", "")[:200] 164 + card_lines.append(f" [{ctype}] {uri}\n {text}") 165 + elif ctype == "URL": 166 + content = val.get("content", {}) 167 + url = content.get("url", "") 168 + meta = content.get("metadata", {}) 169 + title = meta.get("title", "") or meta.get("description", "") 170 + card_lines.append(f" [{ctype}] {uri}\n {url} — {title[:150]}") 171 + else: 172 + card_lines.append(f" [{ctype}] {uri}") 173 + sections.append( 174 + f"## cards ({len(cards)})\n" + "\n".join(card_lines) 175 + if card_lines else "## cards (0)\nnone" 176 + ) 177 + 178 + # connections 179 + conn_lines = [] 180 + for conn in connections: 181 + val = conn.get("value", {}) 182 + src = val.get("source", "") 183 + tgt = val.get("target", "") 184 + ctype = val.get("connectionType", "") 185 + note = val.get("note", "")[:100] 186 + conn_lines.append(f" {src} → {tgt} [{ctype}] {note}") 187 + sections.append( 188 + f"## connections ({len(connections)})\n" + "\n".join(conn_lines) 189 + if conn_lines else "## connections (0)\nnone" 190 + ) 191 + 192 + # collections + links 193 + link_map: dict[str, list[str]] = {} 194 + for link in collection_links: 195 + val = link.get("value", {}) 196 + coll_uri = val.get("collection", {}).get("uri", "") 197 + card_uri = val.get("card", {}).get("uri", "") 198 + if coll_uri and card_uri: 199 + link_map.setdefault(coll_uri, []).append(card_uri) 200 + coll_lines = [] 201 + for coll in collections: 202 + uri = coll.get("uri", "") 203 + val = coll.get("value", {}) 204 + name = val.get("name", "") 205 + desc = val.get("description", "")[:100] 206 + n_cards = len(link_map.get(uri, [])) 207 + coll_lines.append(f" {uri} — {name} ({n_cards} cards): {desc}") 208 + for card_uri in link_map.get(uri, []): 209 + coll_lines.append(f" - {card_uri}") 210 + sections.append( 211 + f"## collections ({len(collections)})\n" + "\n".join(coll_lines) 212 + if coll_lines else "## collections (0)\nnone" 213 + ) 214 + 215 + # orphaned links (links to collections that don't exist) 216 + coll_uris = {c.get("uri", "") for c in collections} 217 + orphaned = [l for l in collection_links if l.get("value", {}).get("collection", {}).get("uri", "") not in coll_uris] 218 + if orphaned: 219 + sections.append(f"## orphaned collection links ({len(orphaned)})") 220 + for link in orphaned: 221 + sections.append(f" {link.get('uri', '')}") 222 + 223 + return "\n\n".join(sections) 224 + 225 + 226 + # --------------------------------------------------------------------------- 227 + # build agent with tools 228 + # --------------------------------------------------------------------------- 229 + 230 + 231 + def _build_agent(model_name: str, api_key: str) -> Agent[CurationDeps, CurationResult]: 232 + model = AnthropicModel(model_name, provider=AnthropicProvider(api_key=api_key)) 233 + agent = Agent( 234 + model, 235 + system_prompt=PERSONALITY_EXCERPT, 236 + output_type=CurationResult, 237 + deps_type=CurationDeps, 238 + name="phi-curator", 239 + ) 240 + 241 + @agent.tool 242 + async def list_semble_records(ctx: RunContext[CurationDeps], record_type: str) -> str: 243 + """List all records of a given type. record_type: 'card', 'connection', 'collection', or 'collectionLink'.""" 244 + collection_map = { 245 + "card": "network.cosmik.card", 246 + "connection": "network.cosmik.connection", 247 + "collection": "network.cosmik.collection", 248 + "collectionLink": "network.cosmik.collectionLink", 249 + } 250 + collection = collection_map.get(record_type) 251 + if not collection: 252 + return f"unknown record type: {record_type}. use: card, connection, collection, collectionLink" 253 + 254 + records = _list_records(PHI_DID, collection) 255 + if not records: 256 + return f"no {record_type} records found" 257 + 258 + lines = [] 259 + for r in records: 260 + uri = r.get("uri", "") 261 + val = r.get("value", {}) 262 + if record_type == "card": 263 + ctype = val.get("type", "?") 264 + if ctype == "NOTE": 265 + text = val.get("content", {}).get("text", "")[:200] 266 + lines.append(f"[{ctype}] {uri}\n {text}") 267 + elif ctype == "URL": 268 + url = val.get("content", {}).get("url", "") 269 + meta = val.get("content", {}).get("metadata", {}) 270 + title = meta.get("title", "") 271 + lines.append(f"[{ctype}] {uri}\n {url} — {title[:100]}") 272 + else: 273 + lines.append(f"[{ctype}] {uri}") 274 + elif record_type == "connection": 275 + lines.append(f"{val.get('source', '')} → {val.get('target', '')} [{val.get('connectionType', '')}] {val.get('note', '')[:80]}") 276 + lines.append(f" uri: {uri}") 277 + elif record_type == "collection": 278 + lines.append(f"{val.get('name', '')} — {val.get('description', '')[:100]}") 279 + lines.append(f" uri: {uri}") 280 + elif record_type == "collectionLink": 281 + coll_uri = val.get("collection", {}).get("uri", "") 282 + card_uri = val.get("card", {}).get("uri", "") 283 + lines.append(f"collection={coll_uri} card={card_uri}") 284 + lines.append(f" uri: {uri}") 285 + return f"{len(records)} {record_type} records:\n" + "\n".join(lines) 286 + 287 + @agent.tool 288 + async def delete_record(ctx: RunContext[CurationDeps], uri: str) -> str: 289 + """Delete a record by its AT URI. Works for any record type (card, connection, collection, collectionLink).""" 290 + try: 291 + _delete_record(ctx.deps.session, uri) 292 + return f"deleted: {uri}" 293 + except Exception as e: 294 + return f"failed to delete {uri}: {e}" 295 + 296 + @agent.tool 297 + async def create_collection( 298 + ctx: RunContext[CurationDeps], 299 + name: str, 300 + description: str, 301 + card_uris: list[str], 302 + ) -> str: 303 + """Create a new collection and link cards to it. card_uris should be AT URIs of existing cards.""" 304 + session = ctx.deps.session 305 + 306 + # create collection 307 + record = { 308 + "name": name[:100], 309 + "description": description[:500], 310 + "accessType": "OPEN", 311 + "createdAt": datetime.now(timezone.utc).isoformat(), 312 + } 313 + try: 314 + result = _create_record(session, "network.cosmik.collection", record) 315 + except Exception as e: 316 + return f"failed to create collection: {e}" 317 + 318 + coll_uri = result["uri"] 319 + coll_cid = result["cid"] 320 + linked = 0 321 + 322 + # link each card 323 + for card_uri in card_uris: 324 + # look up card cid 325 + card_cid = "" 326 + cards = _list_records(PHI_DID, "network.cosmik.card") 327 + for c in cards: 328 + if c.get("uri") == card_uri: 329 + card_cid = c.get("cid", "") 330 + break 331 + 332 + link_record = { 333 + "collection": {"uri": coll_uri, "cid": coll_cid}, 334 + "card": {"uri": card_uri, "cid": card_cid}, 335 + "addedBy": session["did"], 336 + "addedAt": datetime.now(timezone.utc).isoformat(), 337 + } 338 + try: 339 + _create_record(session, "network.cosmik.collectionLink", link_record) 340 + linked += 1 341 + except Exception as e: 342 + pass # log but continue 343 + 344 + return f"created collection '{name}' ({coll_uri}) with {linked}/{len(card_uris)} cards linked" 345 + 346 + @agent.tool 347 + async def create_connection( 348 + ctx: RunContext[CurationDeps], 349 + source: str, 350 + target: str, 351 + connection_type: str, 352 + note: str = "", 353 + ) -> str: 354 + """Create a connection between two entities (AT URIs or URLs). connection_type: related, supports, opposes, addresses, helpful, explainer, leads_to, supplements.""" 355 + record: dict[str, Any] = {"source": source, "target": target, "connectionType": connection_type} 356 + if note: 357 + record["note"] = note[:1000] 358 + try: 359 + result = _create_record(ctx.deps.session, "network.cosmik.connection", record) 360 + return f"connected {source} → {target} [{connection_type}]: {result['uri']}" 361 + except Exception as e: 362 + return f"failed to create connection: {e}" 363 + 364 + @agent.tool 365 + async def create_note( 366 + ctx: RunContext[CurationDeps], 367 + text: str, 368 + parent_card_uri: str = "", 369 + ) -> str: 370 + """Create a NOTE card. Write as yourself — first person, your voice. 371 + parent_card_uri is optional; if provided, the note is attached to that card.""" 372 + record: dict[str, Any] = { 373 + "type": "NOTE", 374 + "content": { 375 + "$type": "network.cosmik.card#noteContent", 376 + "text": text, 377 + }, 378 + "createdAt": datetime.now(timezone.utc).isoformat(), 379 + } 380 + if parent_card_uri: 381 + # look up cid 382 + cards = _list_records(PHI_DID, "network.cosmik.card") 383 + for c in cards: 384 + if c.get("uri") == parent_card_uri: 385 + record["parentCard"] = {"uri": parent_card_uri, "cid": c.get("cid", "")} 386 + break 387 + try: 388 + result = _create_record(ctx.deps.session, "network.cosmik.card", record) 389 + return f"note created: {result['uri']}" 390 + except Exception as e: 391 + return f"failed to create note: {e}" 392 + 393 + @agent.tool 394 + async def recall(ctx: RunContext[CurationDeps], query: str, namespace: str = "") -> str: 395 + """Search your private memory (TurboPuffer). Leave namespace empty for broad search, 396 + or pass a handle like 'zzstoatzz.io' to search a specific user's namespace.""" 397 + tpuf = ctx.deps.tpuf_client 398 + openai = ctx.deps.openai_client 399 + 400 + embedding = openai.embeddings.create( 401 + model="text-embedding-3-small", input=query 402 + ).data[0].embedding 403 + 404 + results: list[str] = [] 405 + 406 + if namespace: 407 + # search specific user namespace 408 + clean = namespace.replace(".", "_").replace("@", "").replace("-", "_") 409 + ns = tpuf.namespace(f"phi-users-{clean}") 410 + try: 411 + resp = ns.query( 412 + rank_by=("vector", "ANN", embedding), 413 + top_k=8, 414 + include_attributes=["content", "tags", "kind"], 415 + ) 416 + for row in resp.rows or []: 417 + kind = getattr(row, "kind", "") 418 + tags = list(getattr(row, "tags", []) or []) 419 + tag_str = f" [{', '.join(tags)}]" if tags else "" 420 + results.append(f"[{kind}]{tag_str} {row.content}") 421 + except Exception: 422 + pass 423 + else: 424 + # search episodic 425 + try: 426 + ns = tpuf.namespace("phi-episodic") 427 + resp = ns.query( 428 + rank_by=("vector", "ANN", embedding), 429 + top_k=5, 430 + include_attributes=["content", "tags"], 431 + ) 432 + for row in resp.rows or []: 433 + tags = list(getattr(row, "tags", []) or []) 434 + tag_str = f" [{', '.join(tags)}]" if tags else "" 435 + results.append(f"[episodic]{tag_str} {row.content}") 436 + except Exception: 437 + pass 438 + 439 + # search a few user namespaces 440 + try: 441 + page = tpuf.namespaces(prefix="phi-users-") 442 + for ns_summary in list(page.namespaces)[:10]: 443 + handle = ns_summary.id.removeprefix("phi-users-").replace("_", ".") 444 + ns = tpuf.namespace(ns_summary.id) 445 + try: 446 + resp = ns.query( 447 + rank_by=("vector", "ANN", embedding), 448 + top_k=3, 449 + include_attributes=["content", "tags", "kind"], 450 + ) 451 + for row in resp.rows or []: 452 + kind = getattr(row, "kind", "") 453 + results.append(f"[@{handle} {kind}] {row.content}") 454 + except Exception: 455 + continue 456 + except Exception: 457 + pass 458 + 459 + if not results: 460 + return "no relevant memories found" 461 + return "\n".join(results[:15]) 462 + 463 + return agent 464 + 465 + 466 + # --------------------------------------------------------------------------- 467 + # prefect tasks 468 + # --------------------------------------------------------------------------- 469 + 470 + 471 + @task 472 + def fetch_semble_state() -> dict[str, list[dict]]: 473 + """Pre-fetch all semble records.""" 474 + return { 475 + "cards": _list_records(PHI_DID, "network.cosmik.card"), 476 + "connections": _list_records(PHI_DID, "network.cosmik.connection"), 477 + "collections": _list_records(PHI_DID, "network.cosmik.collection"), 478 + "collection_links": _list_records(PHI_DID, "network.cosmik.collectionLink"), 479 + } 480 + 481 + 482 + @task 483 + async def run_curation_agent( 484 + state_text: str, 485 + session: dict[str, Any], 486 + tpuf_client: Any, 487 + openai_client: Any, 488 + api_key: str, 489 + model_name: str, 490 + ) -> dict[str, Any]: 491 + """Run the curation agent loop.""" 492 + agent = _build_agent(model_name, api_key) 493 + deps = CurationDeps( 494 + session=session, 495 + tpuf_client=tpuf_client, 496 + openai_client=openai_client, 497 + ) 498 + prompt = CURATION_PROMPT.format(state=state_text) 499 + result = await agent.run(prompt, deps=deps) 500 + return result.output.model_dump() 501 + 502 + 503 + # --------------------------------------------------------------------------- 504 + # main flow 505 + # --------------------------------------------------------------------------- 506 + 507 + 508 + @flow(name="curate", log_prints=True) 509 + async def curate(): 510 + """Phi reviews and curates its own semble records. 511 + 512 + Triggered by morning flow completion. Uses phi's personality and memory 513 + to make curation decisions as an agentic loop. 514 + """ 515 + logger = get_run_logger() 516 + 517 + anthropic_key = (await Secret.load("anthropic-api-key")).get() 518 + tpuf_key = (await Secret.load("turbopuffer-api-key")).get() 519 + openai_key = (await Secret.load("openai-api-key")).get() 520 + bsky_handle = (await Secret.load("atproto-handle")).get() 521 + bsky_password = (await Secret.load("atproto-password")).get() 522 + model_name = await Variable.get("morning-model", default="claude-sonnet-4-6") 523 + print(f"using model: {model_name}") 524 + 525 + # authenticate 526 + session = _create_bsky_session(bsky_handle, bsky_password) 527 + print(f"authenticated as {session['did']}") 528 + 529 + # pre-fetch all semble state 530 + state = fetch_semble_state() 531 + print( 532 + f"semble state: {len(state['cards'])} cards, " 533 + f"{len(state['connections'])} connections, " 534 + f"{len(state['collections'])} collections, " 535 + f"{len(state['collection_links'])} collection links" 536 + ) 537 + 538 + state_text = _format_semble_state( 539 + state["cards"], 540 + state["connections"], 541 + state["collections"], 542 + state["collection_links"], 543 + ) 544 + 545 + # initialize clients for agent tools 546 + tpuf_client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 547 + openai_client = OpenAI(api_key=openai_key) 548 + 549 + # run agent 550 + result = await run_curation_agent( 551 + state_text=state_text, 552 + session=session, 553 + tpuf_client=tpuf_client, 554 + openai_client=openai_client, 555 + api_key=anthropic_key, 556 + model_name=model_name, 557 + ) 558 + 559 + print(f"curation complete: {result['actions_taken']} actions — {result['summary']}") 560 + 561 + 562 + if __name__ == "__main__": 563 + import asyncio 564 + 565 + asyncio.run(curate())
+6 -587
flows/morning.py
··· 1 1 """ 2 - Morning flow — tag maintenance + agentic semble curation. 2 + Morning flow — tag maintenance. 3 3 4 4 Daily at 0 13 * * * (8am CT), 1h before phi's reflection at 14:00 UTC. 5 5 NOT triggered by transform — runs on its own cron. 6 6 7 7 Phases 1-3: mechanical tag maintenance (dedup, relationships, storage). 8 - Phase 4: agentic curation — assembles phi's knowledge state, lets an LLM 9 - decide what (if anything) deserves promotion to semble, then executes. 8 + Curation is handled by a separate flow (curate.py) triggered on completion. 10 9 """ 11 10 12 11 import hashlib ··· 14 13 from datetime import datetime, timedelta, timezone 15 14 from typing import Any 16 15 17 - import httpx 18 16 import turbopuffer 19 17 from openai import OpenAI 20 18 from pydantic import BaseModel, Field 21 19 from pydantic_ai import Agent 22 20 from pydantic_ai.models.anthropic import AnthropicModel 23 21 from pydantic_ai.providers.anthropic import AnthropicProvider 24 - from prefect import flow, get_run_logger, task 22 + from prefect import flow, task 25 23 from prefect.blocks.system import Secret 26 24 from prefect.cache_policies import CachePolicy 27 25 from prefect.context import TaskRunContext 28 26 from prefect.variables import Variable 29 27 30 28 from mps.phi import ( 31 - CardPlan, 32 - CurationPlan, 33 29 TagCluster, 34 30 TagMerge, 35 31 TagRelationship, 36 32 ) 37 33 38 - PHI_DID = "did:plc:65sucjiel52gefhcdcypynsr" 39 34 TAG_REL_NAMESPACE = "phi-tag-relationships" 40 35 TAG_REL_SCHEMA = { 41 36 "tag_a": {"type": "string", "filterable": True}, ··· 66 61 return f"rel-{pair[0]}-{pair[1]}" 67 62 68 63 69 - def _create_bsky_session(handle: str, password: str) -> dict[str, Any]: 70 - """Authenticate with bsky and return session (accessJwt, did).""" 71 - resp = httpx.post( 72 - "https://bsky.social/xrpc/com.atproto.server.createSession", 73 - json={"identifier": handle, "password": password}, 74 - timeout=15, 75 - ) 76 - resp.raise_for_status() 77 - return resp.json() 78 - 79 - 80 - def _list_cosmik_records(did: str, collection: str) -> list[dict[str, Any]]: 81 - """Paginate through all records of a given cosmik collection type.""" 82 - records: list[dict[str, Any]] = [] 83 - cursor = None 84 - while True: 85 - params: dict[str, Any] = { 86 - "repo": did, 87 - "collection": collection, 88 - "limit": 100, 89 - } 90 - if cursor: 91 - params["cursor"] = cursor 92 - resp = httpx.get( 93 - "https://bsky.social/xrpc/com.atproto.repo.listRecords", 94 - params=params, 95 - timeout=15, 96 - ) 97 - resp.raise_for_status() 98 - data = resp.json() 99 - records.extend(data.get("records", [])) 100 - cursor = data.get("cursor") 101 - if not cursor: 102 - break 103 - return records 104 - 105 - 106 - def _list_cosmik_cards(did: str) -> list[dict[str, Any]]: 107 - return _list_cosmik_records(did, "network.cosmik.card") 108 - 109 - 110 - def _list_cosmik_connections(did: str) -> list[dict[str, Any]]: 111 - return _list_cosmik_records(did, "network.cosmik.connection") 112 - 113 - 114 - def _list_cosmik_collections(did: str) -> list[dict[str, Any]]: 115 - return _list_cosmik_records(did, "network.cosmik.collection") 116 - 117 - 118 - def _list_cosmik_collection_links(did: str) -> list[dict[str, Any]]: 119 - return _list_cosmik_records(did, "network.cosmik.collectionLink") 120 - 121 - 122 - def _create_pds_record( 123 - session: dict[str, Any], collection: str, record: dict[str, Any] 124 - ) -> dict[str, Any]: 125 - """Create a record on PDS via XRPC. Returns {uri, cid}.""" 126 - resp = httpx.post( 127 - "https://bsky.social/xrpc/com.atproto.repo.createRecord", 128 - headers={"Authorization": f"Bearer {session['accessJwt']}"}, 129 - json={ 130 - "repo": session["did"], 131 - "collection": collection, 132 - "record": record, 133 - }, 134 - timeout=15, 135 - ) 136 - resp.raise_for_status() 137 - return resp.json() 138 - 139 - 140 64 # --------------------------------------------------------------------------- 141 65 # cache policies 142 66 # --------------------------------------------------------------------------- ··· 159 83 task_key = task_ctx.task.task_key if task_ctx else "unknown" 160 84 return f"morning-{task_key}/{h}" 161 85 162 - 163 - class ByCurationStateHash(CachePolicy): 164 - """Cache by hash of the full curation context. Skip LLM if state unchanged.""" 165 - 166 - def compute_key( 167 - self, 168 - task_ctx: TaskRunContext, 169 - inputs: dict[str, Any], 170 - flow_parameters: dict[str, Any], 171 - **kwargs: Any, 172 - ) -> str | None: 173 - context = inputs.get("context") 174 - if not context: 175 - return None 176 - h = hashlib.md5(context.encode()).hexdigest()[:12] 177 - task_key = task_ctx.task.task_key if task_ctx else "unknown" 178 - return f"morning-{task_key}/{h}" 179 86 180 87 181 88 # --------------------------------------------------------------------------- ··· 582 489 583 490 584 491 # --------------------------------------------------------------------------- 585 - # phase 4: agentic curation 586 - # --------------------------------------------------------------------------- 587 - 588 - 589 - @task 590 - def collect_recent_observations(tpuf_key: str, top_k: int = 40) -> list[dict[str, Any]]: 591 - """Gather the most recent observations across all user namespaces.""" 592 - client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 593 - all_obs: list[dict[str, Any]] = [] 594 - 595 - page = client.namespaces(prefix="phi-users-") 596 - for ns_summary in page.namespaces: 597 - handle = ns_summary.id.removeprefix("phi-users-").replace("_", ".") 598 - ns = client.namespace(ns_summary.id) 599 - try: 600 - response = ns.query( 601 - rank_by=("created_at", "desc"), 602 - top_k=top_k, 603 - filters={"kind": ["Eq", "observation"]}, 604 - include_attributes=["content", "tags", "created_at"], 605 - ) 606 - for row in response.rows or []: 607 - all_obs.append({ 608 - "handle": handle, 609 - "content": row.content[:400], 610 - "tags": list(getattr(row, "tags", []) or []), 611 - "created_at": getattr(row, "created_at", ""), 612 - }) 613 - except Exception: 614 - pass 615 - 616 - # sort by created_at desc, take top_k overall 617 - all_obs.sort(key=lambda x: x.get("created_at", ""), reverse=True) 618 - return all_obs[:top_k] 619 - 620 - 621 - @task 622 - def collect_recent_episodic(tpuf_key: str, top_k: int = 20) -> list[dict[str, Any]]: 623 - """Gather the most recent episodic memories.""" 624 - client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 625 - try: 626 - ns = client.namespace("phi-episodic") 627 - response = ns.query( 628 - rank_by=("created_at", "desc"), 629 - top_k=top_k, 630 - include_attributes=["content", "tags", "created_at", "source"], 631 - ) 632 - return [ 633 - { 634 - "content": row.content[:400], 635 - "tags": list(getattr(row, "tags", []) or []), 636 - "created_at": getattr(row, "created_at", ""), 637 - "source": getattr(row, "source", ""), 638 - } 639 - for row in (response.rows or []) 640 - ] 641 - except Exception: 642 - return [] 643 - 644 - 645 - @task 646 - def assemble_curation_context( 647 - tag_info: dict[str, dict], 648 - tag_relationships: list[dict[str, Any]], 649 - existing_cards: list[dict[str, Any]], 650 - existing_connections: list[dict[str, Any]], 651 - existing_collections: list[dict[str, Any]], 652 - existing_collection_links: list[dict[str, Any]], 653 - recent_observations: list[dict[str, Any]], 654 - recent_episodic: list[dict[str, Any]], 655 - ) -> str: 656 - """Format all state into a single context string for the curation LLM.""" 657 - sections: list[str] = [] 658 - 659 - # existing cards 660 - card_lines = [] 661 - for card in existing_cards: 662 - uri = card.get("uri", "") 663 - val = card.get("value", {}) 664 - ctype = val.get("type", "?") 665 - if ctype == "NOTE": 666 - text = val.get("content", {}).get("text", "")[:200] 667 - card_lines.append(f" [{ctype}] {uri}\n {text}") 668 - elif ctype == "URL": 669 - content = val.get("content", {}) 670 - url = content.get("url", "") 671 - meta = content.get("metadata", {}) 672 - desc = meta.get("description", "") or meta.get("title", "") 673 - card_lines.append(f" [{ctype}] {uri}\n {url} — {desc[:150]}") 674 - else: 675 - card_lines.append(f" [{ctype}] {uri}") 676 - sections.append( 677 - f"## existing cards ({len(existing_cards)})\n" + "\n".join(card_lines) 678 - if card_lines else "## existing cards (0)\nnone" 679 - ) 680 - 681 - # existing connections 682 - conn_lines = [] 683 - for conn in existing_connections: 684 - val = conn.get("value", {}) 685 - src = val.get("source", "") 686 - tgt = val.get("target", "") 687 - note = val.get("note", "")[:100] 688 - conn_lines.append(f" {src} → {tgt} ({note})") 689 - sections.append( 690 - f"## existing connections ({len(existing_connections)})\n" + "\n".join(conn_lines) 691 - if conn_lines else "## existing connections (0)\nnone" 692 - ) 693 - 694 - # existing collections 695 - coll_lines = [] 696 - # build link index: collection uri -> list of card uris 697 - coll_card_map: dict[str, list[str]] = defaultdict(list) 698 - for link in existing_collection_links: 699 - val = link.get("value", {}) 700 - coll_uri = val.get("collection", {}).get("uri", "") 701 - card_uri = val.get("card", {}).get("uri", "") 702 - if coll_uri and card_uri: 703 - coll_card_map[coll_uri].append(card_uri) 704 - for coll in existing_collections: 705 - uri = coll.get("uri", "") 706 - val = coll.get("value", {}) 707 - name = val.get("name", "") 708 - desc = val.get("description", "")[:100] 709 - cards_in = coll_card_map.get(uri, []) 710 - coll_lines.append(f" {uri} — {name} ({len(cards_in)} cards): {desc}") 711 - sections.append( 712 - f"## existing collections ({len(existing_collections)})\n" + "\n".join(coll_lines) 713 - if coll_lines else "## existing collections (0)\nnone" 714 - ) 715 - 716 - # tag inventory (post-merge, with counts) 717 - tag_lines = [] 718 - for tag in sorted(tag_info.keys()): 719 - info = tag_info[tag] 720 - count = info.get("count", 0) 721 - episodic = info.get("episodic_count", 0) 722 - n_users = len(info.get("users", [])) 723 - tag_lines.append(f" {tag} (obs={count}, episodic={episodic}, users={n_users})") 724 - sections.append( 725 - f"## tag inventory ({len(tag_info)} tags)\n" + "\n".join(tag_lines) 726 - ) 727 - 728 - # tag relationships (clusters/edges) 729 - if tag_relationships: 730 - rel_lines = [ 731 - f" {r['tag_a']} — {r['tag_b']} ({r['confidence']:.2f}): {r['evidence'][:80]}" 732 - for r in tag_relationships[:30] 733 - ] 734 - sections.append( 735 - f"## tag relationships ({len(tag_relationships)} edges, showing top 30)\n" 736 - + "\n".join(rel_lines) 737 - ) 738 - 739 - # recent observations 740 - if recent_observations: 741 - obs_lines = [] 742 - for obs in recent_observations[:20]: 743 - tags_str = ", ".join(obs.get("tags", [])[:5]) 744 - obs_lines.append( 745 - f" [{obs.get('created_at', '?')[:10]}] @{obs['handle']}: " 746 - f"{obs['content'][:150]} tags: [{tags_str}]" 747 - ) 748 - sections.append( 749 - f"## recent observations ({len(recent_observations)}, showing top 20)\n" 750 - + "\n".join(obs_lines) 751 - ) 752 - 753 - # recent episodic memories 754 - if recent_episodic: 755 - ep_lines = [] 756 - for ep in recent_episodic[:10]: 757 - tags_str = ", ".join(ep.get("tags", [])[:5]) 758 - ep_lines.append( 759 - f" [{ep.get('created_at', '?')[:10]}] {ep['content'][:150]} tags: [{tags_str}]" 760 - ) 761 - sections.append( 762 - f"## recent episodic memories ({len(recent_episodic)}, showing top 10)\n" 763 - + "\n".join(ep_lines) 764 - ) 765 - 766 - return "\n\n".join(sections) 767 - 768 - 769 - @task( 770 - cache_policy=ByCurationStateHash(), 771 - cache_expiration=timedelta(hours=20), 772 - persist_result=True, 773 - result_serializer="json", 774 - ) 775 - async def plan_curation( 776 - context: str, 777 - api_key: str, 778 - model_name: str = "claude-sonnet-4-6", 779 - ) -> dict[str, Any]: 780 - """Single LLM call: decide what (if anything) to promote to semble.""" 781 - model = AnthropicModel( 782 - model_name, provider=AnthropicProvider(api_key=api_key) 783 - ) 784 - agent = Agent( 785 - model, 786 - system_prompt=( 787 - "you are phi's curation engine. phi is a bluesky bot that builds knowledge " 788 - "through conversations. you decide what knowledge gets promoted to semble — " 789 - "phi's curated public knowledge layer (cosmik cards, connections, collections).\n\n" 790 - "## what semble is\n" 791 - "semble is the pristine, curated output. it holds hard-fought knowledge — " 792 - "insights earned through multiple interactions, not surface co-occurrence. " 793 - "most days nothing should change.\n\n" 794 - "## card types\n" 795 - "- NOTE: original insight or synthesis. use when phi has genuinely learned " 796 - "something that deserves to be crystallized.\n" 797 - "- URL: a resource phi has encountered and finds valuable. only promote URLs " 798 - "that appear multiple times across different conversations.\n\n" 799 - "## connections\n" 800 - "link two cards when there's a meaningful relationship. use descriptive " 801 - "connection_type: 'builds-on', 'contrasts', 'related', 'example-of'.\n\n" 802 - "## collections\n" 803 - "group cards into named themes. only create a collection when there are 3+ " 804 - "related cards. you can add cards to existing collections.\n\n" 805 - "## rules\n" 806 - "- set should_curate=false unless there is genuinely new knowledge worth promoting\n" 807 - "- never duplicate existing cards — check the existing cards list carefully\n" 808 - "- a card should represent a non-obvious insight, not a factual restatement\n" 809 - "- look for patterns across multiple observations/interactions, not single data points\n" 810 - "- use ref_key (e.g. 'card-1') on new cards so connections/collections can reference them\n" 811 - "- for connections to existing cards, use their at:// URI\n" 812 - "- for collections, you can reference existing_collection_uri to add to an existing one\n" 813 - "- quality over quantity — one excellent card beats three mediocre ones\n" 814 - "- you can also suggest connections between existing cards that aren't yet connected" 815 - ), 816 - output_type=CurationPlan, 817 - name="curation-planner", 818 - ) 819 - 820 - result = await agent.run( 821 - f"here is phi's current knowledge state. decide what, if anything, " 822 - f"should be promoted to semble today.\n\n{context}" 823 - ) 824 - return result.output.model_dump() 825 - 826 - 827 - @task 828 - def execute_curation_plan( 829 - session: dict[str, Any], 830 - plan: dict[str, Any], 831 - existing_cards: list[dict[str, Any]], 832 - existing_connections: list[dict[str, Any]], 833 - ) -> dict[str, int]: 834 - """Execute the curation plan: create cards, then connections + collections.""" 835 - logger = get_run_logger() 836 - 837 - if not plan.get("should_curate"): 838 - logger.info(f"no curation needed: {plan.get('reasoning', '')}") 839 - return {"cards": 0, "connections": 0, "collections": 0} 840 - 841 - logger.info(f"executing curation: {plan.get('reasoning', '')}") 842 - 843 - # index existing card content/URLs for dedup 844 - existing_note_texts: set[str] = set() 845 - existing_urls: set[str] = set() 846 - for card in existing_cards: 847 - val = card.get("value", {}) 848 - if val.get("type") == "NOTE": 849 - existing_note_texts.add(val.get("content", {}).get("text", "")[:200]) 850 - elif val.get("type") == "URL": 851 - existing_urls.add(val.get("content", {}).get("url", "")) 852 - 853 - # index existing connections for dedup 854 - existing_conn_pairs: set[tuple[str, str]] = set() 855 - for conn in existing_connections: 856 - val = conn.get("value", {}) 857 - existing_conn_pairs.add((val.get("source", ""), val.get("target", ""))) 858 - 859 - # phase 1: create cards, build ref_key -> {uri, cid} map 860 - ref_map: dict[str, dict[str, str]] = {} 861 - cards_created = 0 862 - 863 - for card_plan in plan.get("cards", []): 864 - card_type = card_plan.get("card_type", "NOTE") 865 - ref_key = card_plan.get("ref_key", "") 866 - 867 - if card_type == "NOTE": 868 - text = card_plan.get("content", "") 869 - if text[:200] in existing_note_texts: 870 - logger.info(f"skipping duplicate NOTE card: {ref_key}") 871 - continue 872 - record = { 873 - "type": "NOTE", 874 - "content": { 875 - "$type": "network.cosmik.card#noteContent", 876 - "text": text, 877 - }, 878 - "createdAt": datetime.now(timezone.utc).isoformat(), 879 - } 880 - if card_plan.get("title"): 881 - record["content"]["title"] = card_plan["title"] 882 - 883 - elif card_type == "URL": 884 - url = card_plan.get("content", "") 885 - if url in existing_urls: 886 - logger.info(f"skipping duplicate URL card: {ref_key}") 887 - continue 888 - record = { 889 - "type": "URL", 890 - "content": { 891 - "$type": "network.cosmik.card#urlContent", 892 - "url": url, 893 - "metadata": { 894 - "$type": "network.cosmik.card#urlMetadata", 895 - }, 896 - }, 897 - "createdAt": datetime.now(timezone.utc).isoformat(), 898 - } 899 - if card_plan.get("title"): 900 - record["content"]["metadata"]["title"] = card_plan["title"] 901 - if card_plan.get("description"): 902 - record["content"]["metadata"]["description"] = card_plan["description"] 903 - else: 904 - logger.warning(f"unknown card type: {card_type}") 905 - continue 906 - 907 - try: 908 - result = _create_pds_record(session, "network.cosmik.card", record) 909 - ref_map[ref_key] = {"uri": result["uri"], "cid": result["cid"]} 910 - cards_created += 1 911 - logger.info(f"created {card_type} card: {ref_key} -> {result['uri']}") 912 - except Exception as e: 913 - logger.warning(f"failed to create card {ref_key}: {e}") 914 - 915 - # helper to resolve ref_key or at:// URI 916 - def resolve_ref(ref: str) -> dict[str, str] | None: 917 - if ref.startswith("at://"): 918 - # find cid from existing cards 919 - for card in existing_cards: 920 - if card.get("uri") == ref: 921 - return {"uri": ref, "cid": card.get("cid", "")} 922 - return {"uri": ref, "cid": ""} 923 - return ref_map.get(ref) 924 - 925 - # phase 2: create connections 926 - conns_created = 0 927 - for conn_plan in plan.get("connections", []): 928 - source = resolve_ref(conn_plan.get("source", "")) 929 - target = resolve_ref(conn_plan.get("target", "")) 930 - if not source or not target: 931 - logger.warning( 932 - f"skipping connection — unresolved ref: " 933 - f"{conn_plan.get('source')} -> {conn_plan.get('target')}" 934 - ) 935 - continue 936 - 937 - pair = (source["uri"], target["uri"]) 938 - if pair in existing_conn_pairs: 939 - logger.info(f"skipping duplicate connection: {pair}") 940 - continue 941 - 942 - record = { 943 - "source": source["uri"], 944 - "target": target["uri"], 945 - "connectionType": conn_plan.get("connection_type", "related"), 946 - } 947 - if conn_plan.get("note"): 948 - record["note"] = conn_plan["note"][:1000] 949 - 950 - try: 951 - _create_pds_record(session, "network.cosmik.connection", record) 952 - existing_conn_pairs.add(pair) 953 - conns_created += 1 954 - logger.info(f"created connection: {pair}") 955 - except Exception as e: 956 - logger.warning(f"failed to create connection: {e}") 957 - 958 - # phase 3: create/update collections 959 - colls_created = 0 960 - for coll_plan in plan.get("collections", []): 961 - card_refs = coll_plan.get("card_refs", []) 962 - resolved_cards = [] 963 - for ref in card_refs: 964 - resolved = resolve_ref(ref) 965 - if resolved: 966 - resolved_cards.append(resolved) 967 - 968 - if not resolved_cards: 969 - continue 970 - 971 - existing_uri = coll_plan.get("existing_collection_uri") 972 - if existing_uri: 973 - # add cards to existing collection — need its cid 974 - coll_ref = {"uri": existing_uri, "cid": ""} 975 - else: 976 - # create new collection 977 - record = { 978 - "name": coll_plan.get("name", "untitled")[:100], 979 - "accessType": "OPEN", 980 - "createdAt": datetime.now(timezone.utc).isoformat(), 981 - } 982 - if coll_plan.get("description"): 983 - record["description"] = coll_plan["description"][:500] 984 - 985 - try: 986 - result = _create_pds_record( 987 - session, "network.cosmik.collection", record 988 - ) 989 - coll_ref = {"uri": result["uri"], "cid": result["cid"]} 990 - colls_created += 1 991 - logger.info(f"created collection: {coll_plan.get('name')}") 992 - except Exception as e: 993 - logger.warning(f"failed to create collection: {e}") 994 - continue 995 - 996 - # link cards to collection 997 - for card_ref in resolved_cards: 998 - link_record = { 999 - "collection": {"uri": coll_ref["uri"], "cid": coll_ref["cid"]}, 1000 - "card": {"uri": card_ref["uri"], "cid": card_ref["cid"]}, 1001 - "addedBy": session["did"], 1002 - "addedAt": datetime.now(timezone.utc).isoformat(), 1003 - } 1004 - try: 1005 - _create_pds_record( 1006 - session, "network.cosmik.collectionLink", link_record 1007 - ) 1008 - except Exception as e: 1009 - logger.warning(f"failed to link card to collection: {e}") 1010 - 1011 - return { 1012 - "cards": cards_created, 1013 - "connections": conns_created, 1014 - "collections": colls_created, 1015 - } 1016 - 1017 - 1018 - # --------------------------------------------------------------------------- 1019 492 # main flow 1020 493 # --------------------------------------------------------------------------- 1021 494 1022 495 1023 496 @flow(name="morning", log_prints=True) 1024 497 async def morning(): 1025 - """Morning flow: tag maintenance + agentic semble curation. 498 + """Morning flow: tag maintenance. 1026 499 1027 - Runs daily at 8am CT (13:00 UTC). Phases 1-3 clean up the tag graph, 1028 - phase 4 reasons about what deserves promotion to semble. 500 + Runs daily at 8am CT (13:00 UTC). Phases 1-3 clean up the tag graph. 501 + Curation is handled by a separate flow triggered on completion. 1029 502 """ 1030 - logger = get_run_logger() 1031 - 1032 503 tpuf_key = (await Secret.load("turbopuffer-api-key")).get() 1033 504 openai_key = (await Secret.load("openai-api-key")).get() 1034 505 anthropic_key = (await Secret.load("anthropic-api-key")).get() ··· 1085 556 print(f"phase 3: stored {stored} relationships in {TAG_REL_NAMESPACE}") 1086 557 else: 1087 558 print("phase 3: no relationships to store") 1088 - 1089 - # --- phase 4: agentic curation --- 1090 - try: 1091 - bsky_handle = (await Secret.load("atproto-handle")).get() 1092 - bsky_password = (await Secret.load("atproto-password")).get() 1093 - except Exception: 1094 - print("phase 4: skipped — atproto secrets not configured") 1095 - return 1096 - 1097 - # gather state from both knowledge layers 1098 - existing_cards = _list_cosmik_cards(PHI_DID) 1099 - existing_connections = _list_cosmik_connections(PHI_DID) 1100 - existing_collections = _list_cosmik_collections(PHI_DID) 1101 - existing_collection_links = _list_cosmik_collection_links(PHI_DID) 1102 - recent_obs = collect_recent_observations(tpuf_key) 1103 - recent_ep = collect_recent_episodic(tpuf_key) 1104 - 1105 - print( 1106 - f"phase 4: state — {len(existing_cards)} cards, " 1107 - f"{len(existing_connections)} connections, " 1108 - f"{len(existing_collections)} collections, " 1109 - f"{len(recent_obs)} recent observations, " 1110 - f"{len(recent_ep)} recent episodic" 1111 - ) 1112 - 1113 - # assemble context for LLM 1114 - context = assemble_curation_context( 1115 - tag_info=tag_info, 1116 - tag_relationships=rel_dicts, 1117 - existing_cards=existing_cards, 1118 - existing_connections=existing_connections, 1119 - existing_collections=existing_collections, 1120 - existing_collection_links=existing_collection_links, 1121 - recent_observations=recent_obs, 1122 - recent_episodic=recent_ep, 1123 - ) 1124 - 1125 - # LLM decides what to curate 1126 - plan = await plan_curation(context, anthropic_key, model_name=model_name) 1127 - 1128 - if not plan.get("should_curate"): 1129 - print(f"phase 4: no curation needed — {plan.get('reasoning', '')}") 1130 - return 1131 - 1132 - # execute the plan 1133 - session = _create_bsky_session(bsky_handle, bsky_password) 1134 - result = execute_curation_plan(session, plan, existing_cards, existing_connections) 1135 - print( 1136 - f"phase 4: created {result['cards']} cards, " 1137 - f"{result['connections']} connections, " 1138 - f"{result['collections']} collections" 1139 - ) 1140 559 1141 560 1142 561 if __name__ == "__main__":
+11
prefect.yaml
··· 92 92 schedules: 93 93 - cron: "0 13 * * *" # 8am CT daily 94 94 95 + - name: curate 96 + entrypoint: flows/curate.py:curate 97 + work_pool: *k8s 98 + triggers: 99 + - type: event 100 + expect: 101 + - "prefect.flow-run.Completed" 102 + match_related: 103 + prefect.resource.name: "morning" 104 + prefect.resource.role: "deployment" 105 + 95 106 - name: rebuild-atlas 96 107 entrypoint: flows/atlas.py:rebuild_atlas 97 108 work_pool: *k8s