my prefect server setup prefect-metrics.waow.tech
python orchestration
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

replace weave with morning flow — agentic semble curation

weave's mechanical promotion (threshold-based cards/connections/collections)
produced 4 cards and 0 connections. replace with daily morning flow that
assembles both knowledge layers (tpuf + semble state), lets an LLM decide
what deserves promotion, then executes the plan atomically.

phases 1-3 (tag maintenance) unchanged. phase 4 is now:
assemble_curation_context → plan_curation → execute_curation_plan

schedule: 0 13 * * * (8am CT), 1h before phi's daily reflection.
NOT triggered by transform — runs on its own cron.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+1204 -1030
+1145
flows/morning.py
··· 1 + """ 2 + Morning flow — tag maintenance + agentic semble curation. 3 + 4 + Daily at 0 13 * * * (8am CT), 1h before phi's reflection at 14:00 UTC. 5 + NOT triggered by transform — runs on its own cron. 6 + 7 + Phases 1-3: mechanical tag maintenance (dedup, relationships, storage). 8 + Phase 4: agentic curation — assembles phi's knowledge state, lets an LLM 9 + decide what (if anything) deserves promotion to semble, then executes. 10 + """ 11 + 12 + import hashlib 13 + from collections import defaultdict 14 + from datetime import datetime, timedelta, timezone 15 + from typing import Any 16 + 17 + import httpx 18 + import turbopuffer 19 + from openai import OpenAI 20 + from pydantic import BaseModel, Field 21 + from pydantic_ai import Agent 22 + from pydantic_ai.models.anthropic import AnthropicModel 23 + from pydantic_ai.providers.anthropic import AnthropicProvider 24 + from prefect import flow, get_run_logger, task 25 + from prefect.blocks.system import Secret 26 + from prefect.cache_policies import CachePolicy 27 + from prefect.context import TaskRunContext 28 + from prefect.variables import Variable 29 + 30 + from mps.phi import ( 31 + CardPlan, 32 + CurationPlan, 33 + TagCluster, 34 + TagMerge, 35 + TagRelationship, 36 + ) 37 + 38 + PHI_DID = "did:plc:65sucjiel52gefhcdcypynsr" 39 + TAG_REL_NAMESPACE = "phi-tag-relationships" 40 + TAG_REL_SCHEMA = { 41 + "tag_a": {"type": "string", "filterable": True}, 42 + "tag_b": {"type": "string", "filterable": True}, 43 + "relationship_type": {"type": "string", "filterable": True}, 44 + "confidence": {"type": "float"}, 45 + "evidence": {"type": "string"}, 46 + } 47 + 48 + 49 + # --------------------------------------------------------------------------- 50 + # helpers 51 + # --------------------------------------------------------------------------- 52 + 53 + 54 + def cosine_similarity(a: list[float], b: list[float]) -> float: 55 + dot = sum(x * y for x, y in zip(a, b)) 56 + norm_a = sum(x * x for x in a) ** 0.5 57 + norm_b = sum(x * x for x in b) ** 0.5 58 + if norm_a == 0 or norm_b == 0: 59 + return 0.0 60 + return dot / (norm_a * norm_b) 61 + 62 + 63 + def _rel_id(tag_a: str, tag_b: str) -> str: 64 + """Deterministic ID for a tag relationship (order-independent).""" 65 + pair = tuple(sorted([tag_a, tag_b])) 66 + return f"rel-{pair[0]}-{pair[1]}" 67 + 68 + 69 + def _create_bsky_session(handle: str, password: str) -> dict[str, Any]: 70 + """Authenticate with bsky and return session (accessJwt, did).""" 71 + resp = httpx.post( 72 + "https://bsky.social/xrpc/com.atproto.server.createSession", 73 + json={"identifier": handle, "password": password}, 74 + timeout=15, 75 + ) 76 + resp.raise_for_status() 77 + return resp.json() 78 + 79 + 80 + def _list_cosmik_records(did: str, collection: str) -> list[dict[str, Any]]: 81 + """Paginate through all records of a given cosmik collection type.""" 82 + records: list[dict[str, Any]] = [] 83 + cursor = None 84 + while True: 85 + params: dict[str, Any] = { 86 + "repo": did, 87 + "collection": collection, 88 + "limit": 100, 89 + } 90 + if cursor: 91 + params["cursor"] = cursor 92 + resp = httpx.get( 93 + "https://bsky.social/xrpc/com.atproto.repo.listRecords", 94 + params=params, 95 + timeout=15, 96 + ) 97 + resp.raise_for_status() 98 + data = resp.json() 99 + records.extend(data.get("records", [])) 100 + cursor = data.get("cursor") 101 + if not cursor: 102 + break 103 + return records 104 + 105 + 106 + def _list_cosmik_cards(did: str) -> list[dict[str, Any]]: 107 + return _list_cosmik_records(did, "network.cosmik.card") 108 + 109 + 110 + def _list_cosmik_connections(did: str) -> list[dict[str, Any]]: 111 + return _list_cosmik_records(did, "network.cosmik.connection") 112 + 113 + 114 + def _list_cosmik_collections(did: str) -> list[dict[str, Any]]: 115 + return _list_cosmik_records(did, "network.cosmik.collection") 116 + 117 + 118 + def _list_cosmik_collection_links(did: str) -> list[dict[str, Any]]: 119 + return _list_cosmik_records(did, "network.cosmik.collectionLink") 120 + 121 + 122 + def _create_pds_record( 123 + session: dict[str, Any], collection: str, record: dict[str, Any] 124 + ) -> dict[str, Any]: 125 + """Create a record on PDS via XRPC. Returns {uri, cid}.""" 126 + resp = httpx.post( 127 + "https://bsky.social/xrpc/com.atproto.repo.createRecord", 128 + headers={"Authorization": f"Bearer {session['accessJwt']}"}, 129 + json={ 130 + "repo": session["did"], 131 + "collection": collection, 132 + "record": record, 133 + }, 134 + timeout=15, 135 + ) 136 + resp.raise_for_status() 137 + return resp.json() 138 + 139 + 140 + # --------------------------------------------------------------------------- 141 + # cache policies 142 + # --------------------------------------------------------------------------- 143 + 144 + 145 + class ByTagsHash(CachePolicy): 146 + """Cache by hash of all tags, scoped per task. Skip LLM if tag set unchanged.""" 147 + 148 + def compute_key( 149 + self, 150 + task_ctx: TaskRunContext, 151 + inputs: dict[str, Any], 152 + flow_parameters: dict[str, Any], 153 + **kwargs: Any, 154 + ) -> str | None: 155 + tags_text = inputs.get("tags_text") 156 + if not tags_text: 157 + return None 158 + h = hashlib.md5(tags_text.encode()).hexdigest()[:12] 159 + task_key = task_ctx.task.task_key if task_ctx else "unknown" 160 + return f"morning-{task_key}/{h}" 161 + 162 + 163 + class ByCurationStateHash(CachePolicy): 164 + """Cache by hash of the full curation context. Skip LLM if state unchanged.""" 165 + 166 + def compute_key( 167 + self, 168 + task_ctx: TaskRunContext, 169 + inputs: dict[str, Any], 170 + flow_parameters: dict[str, Any], 171 + **kwargs: Any, 172 + ) -> str | None: 173 + context = inputs.get("context") 174 + if not context: 175 + return None 176 + h = hashlib.md5(context.encode()).hexdigest()[:12] 177 + task_key = task_ctx.task.task_key if task_ctx else "unknown" 178 + return f"morning-{task_key}/{h}" 179 + 180 + 181 + # --------------------------------------------------------------------------- 182 + # phase 1: tag collection + deduplication 183 + # --------------------------------------------------------------------------- 184 + 185 + 186 + @task 187 + def collect_all_tags(tpuf_key: str) -> dict[str, Any]: 188 + """Read all tags from user namespaces + episodic, with co-occurrence data.""" 189 + client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 190 + 191 + tag_info: dict[str, dict[str, Any]] = defaultdict( 192 + lambda: {"count": 0, "users": set(), "samples": [], "episodic_count": 0} 193 + ) 194 + cooccurrences: dict[tuple[str, str], int] = defaultdict(int) 195 + user_tag_sets: dict[str, set[str]] = defaultdict(set) 196 + 197 + # scan user namespaces 198 + page = client.namespaces(prefix="phi-users-") 199 + for ns_summary in page.namespaces: 200 + handle = ns_summary.id.removeprefix("phi-users-").replace("_", ".") 201 + ns = client.namespace(ns_summary.id) 202 + try: 203 + response = ns.query( 204 + rank_by=("vector", "ANN", [0.5] * 1536), 205 + top_k=200, 206 + filters={"kind": ["Eq", "observation"]}, 207 + include_attributes=["content", "tags"], 208 + ) 209 + if response.rows: 210 + for row in response.rows: 211 + row_tags = sorted(getattr(row, "tags", []) or []) 212 + for tag in row_tags: 213 + info = tag_info[tag] 214 + info["count"] += 1 215 + info["users"].add(handle) 216 + user_tag_sets[handle].add(tag) 217 + if len(info["samples"]) < 3: 218 + info["samples"].append(row.content[:200]) 219 + # co-occurrence within same observation 220 + for i, t1 in enumerate(row_tags): 221 + for t2 in row_tags[i + 1 :]: 222 + cooccurrences[(t1, t2)] += 1 223 + except Exception: 224 + pass 225 + 226 + # scan episodic namespace 227 + try: 228 + ns = client.namespace("phi-episodic") 229 + response = ns.query( 230 + rank_by=("vector", "ANN", [0.5] * 1536), 231 + top_k=200, 232 + include_attributes=["content", "tags"], 233 + ) 234 + if response.rows: 235 + for row in response.rows: 236 + row_tags = sorted(getattr(row, "tags", []) or []) 237 + for tag in row_tags: 238 + info = tag_info[tag] 239 + info["episodic_count"] += 1 240 + if len(info["samples"]) < 3: 241 + info["samples"].append(row.content[:200]) 242 + for i, t1 in enumerate(row_tags): 243 + for t2 in row_tags[i + 1 :]: 244 + cooccurrences[(t1, t2)] += 1 245 + except Exception: 246 + pass 247 + 248 + # serialize for prefect (sets -> lists, tuple keys -> string keys) 249 + return { 250 + "tag_info": { 251 + tag: {**info, "users": list(info["users"])} 252 + for tag, info in tag_info.items() 253 + }, 254 + "cooccurrences": { 255 + f"{t1}|{t2}": count for (t1, t2), count in cooccurrences.items() 256 + }, 257 + "user_tag_sets": {h: list(tags) for h, tags in user_tag_sets.items()}, 258 + } 259 + 260 + 261 + @task 262 + def embed_tags(openai_key: str, tags: list[str]) -> dict[str, list[float]]: 263 + """Batch-embed all tag names with text-embedding-3-small.""" 264 + if not tags: 265 + return {} 266 + client = OpenAI(api_key=openai_key) 267 + response = client.embeddings.create(model="text-embedding-3-small", input=tags) 268 + return {tags[i]: response.data[i].embedding for i in range(len(tags))} 269 + 270 + 271 + class MergeProposal(BaseModel): 272 + merges: list[TagMerge] = Field(default_factory=list) 273 + 274 + 275 + @task( 276 + cache_policy=ByTagsHash(), 277 + cache_expiration=timedelta(hours=4), 278 + persist_result=True, 279 + result_serializer="json", 280 + ) 281 + async def identify_tag_merges( 282 + tags_text: str, 283 + tag_info: dict[str, dict], 284 + tag_embeddings: dict[str, list[float]], 285 + api_key: str, 286 + model_name: str = "claude-sonnet-4-6", 287 + ) -> list[dict[str, Any]]: 288 + """Give the LLM the full tag inventory and let it propose consolidations.""" 289 + tag_lines = [] 290 + for tag in sorted(tag_info.keys()): 291 + info = tag_info[tag] 292 + users = info.get("users", []) 293 + count = info.get("count", 0) 294 + episodic = info.get("episodic_count", 0) 295 + sample = (info.get("samples") or [""])[0][:120] 296 + tag_lines.append( 297 + f" {tag} (obs={count}, episodic={episodic}, users={len(users)})" 298 + f"\n sample: {sample}" 299 + ) 300 + 301 + inventory = "\n".join(tag_lines) 302 + 303 + model = AnthropicModel( 304 + model_name, provider=AnthropicProvider(api_key=api_key) 305 + ) 306 + agent = Agent( 307 + model, 308 + system_prompt=( 309 + "you are consolidating the tag vocabulary for phi's memory graph.\n\n" 310 + "you will receive the full inventory of tags with usage counts and sample " 311 + "observations. your job:\n\n" 312 + "1. MERGE tags that are the same concept with different surface forms.\n" 313 + " examples: 'attestation' / 'self-attestation' → canonical: 'attestation'\n" 314 + " 'ai_systems' / 'bot' / 'system-improvement' → canonical: 'ai-systems'\n\n" 315 + "2. mark tags that are RELATED but distinct — these should link, not merge.\n" 316 + " example: 'epistemology' / 'social-epistemology' → related, not merged\n\n" 317 + "rules:\n" 318 + "- prefer lowercase, hyphenated canonical forms\n" 319 + "- group transitive merges into one entry\n" 320 + "- put related (but not merged) tags in the 'related' field\n" 321 + "- only merge when you're confident they're the same concept\n" 322 + "- it's fine to return zero merges if the tags are already clean\n" 323 + "- look for underscored vs hyphenated variants, singular/plural, " 324 + "abbreviations, and overlapping concepts" 325 + ), 326 + output_type=MergeProposal, 327 + name="tag-merger", 328 + ) 329 + 330 + result = await agent.run(f"full tag inventory ({len(tag_info)} tags):\n{inventory}") 331 + return [m.model_dump() for m in result.output.merges] 332 + 333 + 334 + @task 335 + def apply_tag_merges( 336 + tpuf_key: str, 337 + merges: list[dict[str, Any]], 338 + ) -> int: 339 + """Rewrite tags in turbopuffer observations to use canonical forms.""" 340 + if not merges: 341 + return 0 342 + 343 + alias_map: dict[str, str] = {} 344 + for merge in merges: 345 + for alias in merge["aliases"]: 346 + alias_map[alias] = merge["canonical"] 347 + 348 + client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 349 + updated = 0 350 + 351 + ns_ids: list[str] = [] 352 + page = client.namespaces(prefix="phi-users-") 353 + ns_ids.extend(ns.id for ns in page.namespaces) 354 + ns_ids.append("phi-episodic") 355 + 356 + for ns_id in ns_ids: 357 + ns = client.namespace(ns_id) 358 + is_user_ns = ns_id.startswith("phi-users-") 359 + 360 + try: 361 + kwargs: dict[str, Any] = { 362 + "rank_by": ("vector", "ANN", [0.5] * 1536), 363 + "top_k": 200, 364 + "include_attributes": ["content", "tags", "created_at", "vector"], 365 + } 366 + if is_user_ns: 367 + kwargs["filters"] = {"kind": ["Eq", "observation"]} 368 + kwargs["include_attributes"].append("kind") 369 + else: 370 + kwargs["include_attributes"].append("source") 371 + response = ns.query(**kwargs) 372 + except Exception: 373 + continue 374 + 375 + if not response.rows: 376 + continue 377 + 378 + rows_to_upsert = [] 379 + for row in response.rows: 380 + old_tags = list(getattr(row, "tags", []) or []) 381 + new_tags = [alias_map.get(t, t) for t in old_tags] 382 + seen: set[str] = set() 383 + deduped = [] 384 + for t in new_tags: 385 + if t not in seen: 386 + seen.add(t) 387 + deduped.append(t) 388 + 389 + if deduped != old_tags: 390 + vec = getattr(row, "vector", None) 391 + if not vec: 392 + continue 393 + row_data: dict[str, Any] = { 394 + "id": row.id, 395 + "vector": vec, 396 + "content": row.content, 397 + "tags": deduped, 398 + "created_at": getattr( 399 + row, "created_at", datetime.now(timezone.utc).isoformat() 400 + ), 401 + } 402 + if is_user_ns: 403 + row_data["kind"] = "observation" 404 + else: 405 + row_data["source"] = getattr(row, "source", "tool") 406 + rows_to_upsert.append(row_data) 407 + 408 + if rows_to_upsert: 409 + schema: dict[str, Any] = { 410 + "content": {"type": "string", "full_text_search": True}, 411 + "tags": {"type": "[]string", "filterable": True}, 412 + "created_at": {"type": "string"}, 413 + } 414 + if is_user_ns: 415 + schema["kind"] = {"type": "string", "filterable": True} 416 + else: 417 + schema["source"] = {"type": "string", "filterable": True} 418 + 419 + ns.write( 420 + upsert_rows=rows_to_upsert, 421 + distance_metric="cosine_distance", 422 + schema=schema, 423 + ) 424 + updated += len(rows_to_upsert) 425 + 426 + return updated 427 + 428 + 429 + # --------------------------------------------------------------------------- 430 + # phase 2: tag relationship discovery 431 + # --------------------------------------------------------------------------- 432 + 433 + 434 + class ClusterProposal(BaseModel): 435 + clusters: list[TagCluster] = Field(default_factory=list) 436 + 437 + 438 + @task( 439 + cache_policy=ByTagsHash(), 440 + cache_expiration=timedelta(hours=4), 441 + persist_result=True, 442 + result_serializer="json", 443 + ) 444 + async def discover_tag_relationships( 445 + tags_text: str, 446 + tag_info: dict[str, dict], 447 + tag_embeddings: dict[str, list[float]], 448 + cooccurrences: dict[str, int], 449 + user_tag_sets: dict[str, list[str]], 450 + merged_aliases: set[str], 451 + api_key: str, 452 + model_name: str = "claude-sonnet-4-6", 453 + ) -> list[dict[str, Any]]: 454 + """Ask the LLM to identify thematic clusters, then derive pairwise edges.""" 455 + tags = [t for t in sorted(tag_info.keys()) if t not in merged_aliases] 456 + 457 + cooccur_hints: dict[str, list[str]] = defaultdict(list) 458 + for pair_key, count in cooccurrences.items(): 459 + t1, t2 = pair_key.split("|", 1) 460 + if t1 in merged_aliases or t2 in merged_aliases: 461 + continue 462 + if count >= 2: 463 + cooccur_hints[t1].append(f"{t2} ({count}x)") 464 + cooccur_hints[t2].append(f"{t1} ({count}x)") 465 + 466 + tag_lines = [] 467 + for tag in tags: 468 + info = tag_info.get(tag, {}) 469 + count = info.get("count", 0) 470 + episodic = info.get("episodic_count", 0) 471 + n_users = len(info.get("users", [])) 472 + sample = (info.get("samples") or [""])[0][:120] 473 + cooccur_str = "" 474 + if tag in cooccur_hints: 475 + cooccur_str = f"\n co-occurs with: {', '.join(cooccur_hints[tag][:5])}" 476 + tag_lines.append( 477 + f" {tag} (obs={count}, episodic={episodic}, users={n_users})" 478 + f"\n sample: {sample}{cooccur_str}" 479 + ) 480 + 481 + inventory = "\n".join(tag_lines) 482 + 483 + model = AnthropicModel( 484 + model_name, provider=AnthropicProvider(api_key=api_key) 485 + ) 486 + agent = Agent( 487 + model, 488 + system_prompt=( 489 + "you are organizing phi's memory tags into thematic clusters.\n" 490 + "phi is a bluesky bot that remembers conversations and builds knowledge.\n\n" 491 + "you will receive the full tag inventory with usage counts, sample observations, " 492 + "and co-occurrence data. your job: identify thematic clusters — groups of tags " 493 + "that belong to the same area of phi's knowledge or experience.\n\n" 494 + "examples of good clusters:\n" 495 + "- 'phi's epistemic concerns': epistemology, memory, confabulation, self-attestation\n" 496 + "- 'technical interests': programming, ai-systems, infrastructure\n" 497 + "- 'social dynamics': community, trust, collaboration\n\n" 498 + "rules:\n" 499 + "- a tag can appear in multiple clusters (topics overlap)\n" 500 + "- clusters should have 2-8 tags — small enough to be coherent\n" 501 + "- assign cohesion 0.0-1.0: how tightly the tags relate to the cluster theme\n" 502 + "- name each cluster concisely (2-4 words)\n" 503 + "- describe what ties the tags together\n" 504 + "- not every tag needs a cluster — singletons with no thematic neighbors are fine to skip\n" 505 + "- use the sample observations and co-occurrence data to inform your groupings" 506 + ), 507 + output_type=ClusterProposal, 508 + name="tag-clusterer", 509 + ) 510 + 511 + result = await agent.run( 512 + f"full tag inventory ({len(tags)} tags after merges):\n{inventory}" 513 + ) 514 + 515 + relationships: list[dict[str, Any]] = [] 516 + seen_pairs: set[tuple[str, str]] = set() 517 + for cluster in result.output.clusters: 518 + members = [t for t in cluster.tags if t in set(tags)] 519 + for i, t1 in enumerate(members): 520 + for t2 in members[i + 1 :]: 521 + pair = tuple(sorted([t1, t2])) 522 + if pair in seen_pairs: 523 + continue 524 + seen_pairs.add(pair) 525 + relationships.append( 526 + TagRelationship( 527 + tag_a=pair[0], 528 + tag_b=pair[1], 529 + relationship_type="related", 530 + confidence=cluster.cohesion, 531 + evidence=f"[{cluster.name}] {cluster.description}", 532 + ).model_dump() 533 + ) 534 + 535 + return relationships 536 + 537 + 538 + # --------------------------------------------------------------------------- 539 + # phase 3: store relationships in turbopuffer 540 + # --------------------------------------------------------------------------- 541 + 542 + 543 + @task 544 + def store_tag_relationships( 545 + tpuf_key: str, 546 + openai_key: str, 547 + relationships: list[dict[str, Any]], 548 + ) -> int: 549 + """Write tag relationships to phi-tag-relationships namespace.""" 550 + if not relationships: 551 + return 0 552 + 553 + client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 554 + openai_client = OpenAI(api_key=openai_key) 555 + ns = client.namespace(TAG_REL_NAMESPACE) 556 + 557 + texts = [f"{r['tag_a']} — {r['tag_b']}: {r['evidence']}" for r in relationships] 558 + embeddings = openai_client.embeddings.create( 559 + model="text-embedding-3-small", input=texts 560 + ) 561 + 562 + rows = [] 563 + for i, rel in enumerate(relationships): 564 + rows.append( 565 + { 566 + "id": _rel_id(rel["tag_a"], rel["tag_b"]), 567 + "vector": embeddings.data[i].embedding, 568 + "tag_a": rel["tag_a"], 569 + "tag_b": rel["tag_b"], 570 + "relationship_type": rel["relationship_type"], 571 + "confidence": rel["confidence"], 572 + "evidence": rel["evidence"], 573 + } 574 + ) 575 + 576 + ns.write( 577 + upsert_rows=rows, 578 + distance_metric="cosine_distance", 579 + schema=TAG_REL_SCHEMA, 580 + ) 581 + return len(rows) 582 + 583 + 584 + # --------------------------------------------------------------------------- 585 + # phase 4: agentic curation 586 + # --------------------------------------------------------------------------- 587 + 588 + 589 + @task 590 + def collect_recent_observations(tpuf_key: str, top_k: int = 40) -> list[dict[str, Any]]: 591 + """Gather the most recent observations across all user namespaces.""" 592 + client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 593 + all_obs: list[dict[str, Any]] = [] 594 + 595 + page = client.namespaces(prefix="phi-users-") 596 + for ns_summary in page.namespaces: 597 + handle = ns_summary.id.removeprefix("phi-users-").replace("_", ".") 598 + ns = client.namespace(ns_summary.id) 599 + try: 600 + response = ns.query( 601 + rank_by=("created_at", "desc"), 602 + top_k=top_k, 603 + filters={"kind": ["Eq", "observation"]}, 604 + include_attributes=["content", "tags", "created_at"], 605 + ) 606 + for row in response.rows or []: 607 + all_obs.append({ 608 + "handle": handle, 609 + "content": row.content[:400], 610 + "tags": list(getattr(row, "tags", []) or []), 611 + "created_at": getattr(row, "created_at", ""), 612 + }) 613 + except Exception: 614 + pass 615 + 616 + # sort by created_at desc, take top_k overall 617 + all_obs.sort(key=lambda x: x.get("created_at", ""), reverse=True) 618 + return all_obs[:top_k] 619 + 620 + 621 + @task 622 + def collect_recent_episodic(tpuf_key: str, top_k: int = 20) -> list[dict[str, Any]]: 623 + """Gather the most recent episodic memories.""" 624 + client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 625 + try: 626 + ns = client.namespace("phi-episodic") 627 + response = ns.query( 628 + rank_by=("created_at", "desc"), 629 + top_k=top_k, 630 + include_attributes=["content", "tags", "created_at", "source"], 631 + ) 632 + return [ 633 + { 634 + "content": row.content[:400], 635 + "tags": list(getattr(row, "tags", []) or []), 636 + "created_at": getattr(row, "created_at", ""), 637 + "source": getattr(row, "source", ""), 638 + } 639 + for row in (response.rows or []) 640 + ] 641 + except Exception: 642 + return [] 643 + 644 + 645 + @task 646 + def assemble_curation_context( 647 + tag_info: dict[str, dict], 648 + tag_relationships: list[dict[str, Any]], 649 + existing_cards: list[dict[str, Any]], 650 + existing_connections: list[dict[str, Any]], 651 + existing_collections: list[dict[str, Any]], 652 + existing_collection_links: list[dict[str, Any]], 653 + recent_observations: list[dict[str, Any]], 654 + recent_episodic: list[dict[str, Any]], 655 + ) -> str: 656 + """Format all state into a single context string for the curation LLM.""" 657 + sections: list[str] = [] 658 + 659 + # existing cards 660 + card_lines = [] 661 + for card in existing_cards: 662 + uri = card.get("uri", "") 663 + val = card.get("value", {}) 664 + ctype = val.get("type", "?") 665 + if ctype == "NOTE": 666 + text = val.get("content", {}).get("text", "")[:200] 667 + card_lines.append(f" [{ctype}] {uri}\n {text}") 668 + elif ctype == "URL": 669 + content = val.get("content", {}) 670 + url = content.get("url", "") 671 + meta = content.get("metadata", {}) 672 + desc = meta.get("description", "") or meta.get("title", "") 673 + card_lines.append(f" [{ctype}] {uri}\n {url} — {desc[:150]}") 674 + else: 675 + card_lines.append(f" [{ctype}] {uri}") 676 + sections.append( 677 + f"## existing cards ({len(existing_cards)})\n" + "\n".join(card_lines) 678 + if card_lines else "## existing cards (0)\nnone" 679 + ) 680 + 681 + # existing connections 682 + conn_lines = [] 683 + for conn in existing_connections: 684 + val = conn.get("value", {}) 685 + src = val.get("source", "") 686 + tgt = val.get("target", "") 687 + note = val.get("note", "")[:100] 688 + conn_lines.append(f" {src} → {tgt} ({note})") 689 + sections.append( 690 + f"## existing connections ({len(existing_connections)})\n" + "\n".join(conn_lines) 691 + if conn_lines else "## existing connections (0)\nnone" 692 + ) 693 + 694 + # existing collections 695 + coll_lines = [] 696 + # build link index: collection uri -> list of card uris 697 + coll_card_map: dict[str, list[str]] = defaultdict(list) 698 + for link in existing_collection_links: 699 + val = link.get("value", {}) 700 + coll_uri = val.get("collection", {}).get("uri", "") 701 + card_uri = val.get("card", {}).get("uri", "") 702 + if coll_uri and card_uri: 703 + coll_card_map[coll_uri].append(card_uri) 704 + for coll in existing_collections: 705 + uri = coll.get("uri", "") 706 + val = coll.get("value", {}) 707 + name = val.get("name", "") 708 + desc = val.get("description", "")[:100] 709 + cards_in = coll_card_map.get(uri, []) 710 + coll_lines.append(f" {uri} — {name} ({len(cards_in)} cards): {desc}") 711 + sections.append( 712 + f"## existing collections ({len(existing_collections)})\n" + "\n".join(coll_lines) 713 + if coll_lines else "## existing collections (0)\nnone" 714 + ) 715 + 716 + # tag inventory (post-merge, with counts) 717 + tag_lines = [] 718 + for tag in sorted(tag_info.keys()): 719 + info = tag_info[tag] 720 + count = info.get("count", 0) 721 + episodic = info.get("episodic_count", 0) 722 + n_users = len(info.get("users", [])) 723 + tag_lines.append(f" {tag} (obs={count}, episodic={episodic}, users={n_users})") 724 + sections.append( 725 + f"## tag inventory ({len(tag_info)} tags)\n" + "\n".join(tag_lines) 726 + ) 727 + 728 + # tag relationships (clusters/edges) 729 + if tag_relationships: 730 + rel_lines = [ 731 + f" {r['tag_a']} — {r['tag_b']} ({r['confidence']:.2f}): {r['evidence'][:80]}" 732 + for r in tag_relationships[:30] 733 + ] 734 + sections.append( 735 + f"## tag relationships ({len(tag_relationships)} edges, showing top 30)\n" 736 + + "\n".join(rel_lines) 737 + ) 738 + 739 + # recent observations 740 + if recent_observations: 741 + obs_lines = [] 742 + for obs in recent_observations[:20]: 743 + tags_str = ", ".join(obs.get("tags", [])[:5]) 744 + obs_lines.append( 745 + f" [{obs.get('created_at', '?')[:10]}] @{obs['handle']}: " 746 + f"{obs['content'][:150]} tags: [{tags_str}]" 747 + ) 748 + sections.append( 749 + f"## recent observations ({len(recent_observations)}, showing top 20)\n" 750 + + "\n".join(obs_lines) 751 + ) 752 + 753 + # recent episodic memories 754 + if recent_episodic: 755 + ep_lines = [] 756 + for ep in recent_episodic[:10]: 757 + tags_str = ", ".join(ep.get("tags", [])[:5]) 758 + ep_lines.append( 759 + f" [{ep.get('created_at', '?')[:10]}] {ep['content'][:150]} tags: [{tags_str}]" 760 + ) 761 + sections.append( 762 + f"## recent episodic memories ({len(recent_episodic)}, showing top 10)\n" 763 + + "\n".join(ep_lines) 764 + ) 765 + 766 + return "\n\n".join(sections) 767 + 768 + 769 + @task( 770 + cache_policy=ByCurationStateHash(), 771 + cache_expiration=timedelta(hours=20), 772 + persist_result=True, 773 + result_serializer="json", 774 + ) 775 + async def plan_curation( 776 + context: str, 777 + api_key: str, 778 + model_name: str = "claude-sonnet-4-6", 779 + ) -> dict[str, Any]: 780 + """Single LLM call: decide what (if anything) to promote to semble.""" 781 + model = AnthropicModel( 782 + model_name, provider=AnthropicProvider(api_key=api_key) 783 + ) 784 + agent = Agent( 785 + model, 786 + system_prompt=( 787 + "you are phi's curation engine. phi is a bluesky bot that builds knowledge " 788 + "through conversations. you decide what knowledge gets promoted to semble — " 789 + "phi's curated public knowledge layer (cosmik cards, connections, collections).\n\n" 790 + "## what semble is\n" 791 + "semble is the pristine, curated output. it holds hard-fought knowledge — " 792 + "insights earned through multiple interactions, not surface co-occurrence. " 793 + "most days nothing should change.\n\n" 794 + "## card types\n" 795 + "- NOTE: original insight or synthesis. use when phi has genuinely learned " 796 + "something that deserves to be crystallized.\n" 797 + "- URL: a resource phi has encountered and finds valuable. only promote URLs " 798 + "that appear multiple times across different conversations.\n\n" 799 + "## connections\n" 800 + "link two cards when there's a meaningful relationship. use descriptive " 801 + "connection_type: 'builds-on', 'contrasts', 'related', 'example-of'.\n\n" 802 + "## collections\n" 803 + "group cards into named themes. only create a collection when there are 3+ " 804 + "related cards. you can add cards to existing collections.\n\n" 805 + "## rules\n" 806 + "- set should_curate=false unless there is genuinely new knowledge worth promoting\n" 807 + "- never duplicate existing cards — check the existing cards list carefully\n" 808 + "- a card should represent a non-obvious insight, not a factual restatement\n" 809 + "- look for patterns across multiple observations/interactions, not single data points\n" 810 + "- use ref_key (e.g. 'card-1') on new cards so connections/collections can reference them\n" 811 + "- for connections to existing cards, use their at:// URI\n" 812 + "- for collections, you can reference existing_collection_uri to add to an existing one\n" 813 + "- quality over quantity — one excellent card beats three mediocre ones\n" 814 + "- you can also suggest connections between existing cards that aren't yet connected" 815 + ), 816 + output_type=CurationPlan, 817 + name="curation-planner", 818 + ) 819 + 820 + result = await agent.run( 821 + f"here is phi's current knowledge state. decide what, if anything, " 822 + f"should be promoted to semble today.\n\n{context}" 823 + ) 824 + return result.output.model_dump() 825 + 826 + 827 + @task 828 + def execute_curation_plan( 829 + session: dict[str, Any], 830 + plan: dict[str, Any], 831 + existing_cards: list[dict[str, Any]], 832 + existing_connections: list[dict[str, Any]], 833 + ) -> dict[str, int]: 834 + """Execute the curation plan: create cards, then connections + collections.""" 835 + logger = get_run_logger() 836 + 837 + if not plan.get("should_curate"): 838 + logger.info(f"no curation needed: {plan.get('reasoning', '')}") 839 + return {"cards": 0, "connections": 0, "collections": 0} 840 + 841 + logger.info(f"executing curation: {plan.get('reasoning', '')}") 842 + 843 + # index existing card content/URLs for dedup 844 + existing_note_texts: set[str] = set() 845 + existing_urls: set[str] = set() 846 + for card in existing_cards: 847 + val = card.get("value", {}) 848 + if val.get("type") == "NOTE": 849 + existing_note_texts.add(val.get("content", {}).get("text", "")[:200]) 850 + elif val.get("type") == "URL": 851 + existing_urls.add(val.get("content", {}).get("url", "")) 852 + 853 + # index existing connections for dedup 854 + existing_conn_pairs: set[tuple[str, str]] = set() 855 + for conn in existing_connections: 856 + val = conn.get("value", {}) 857 + existing_conn_pairs.add((val.get("source", ""), val.get("target", ""))) 858 + 859 + # phase 1: create cards, build ref_key -> {uri, cid} map 860 + ref_map: dict[str, dict[str, str]] = {} 861 + cards_created = 0 862 + 863 + for card_plan in plan.get("cards", []): 864 + card_type = card_plan.get("card_type", "NOTE") 865 + ref_key = card_plan.get("ref_key", "") 866 + 867 + if card_type == "NOTE": 868 + text = card_plan.get("content", "") 869 + if text[:200] in existing_note_texts: 870 + logger.info(f"skipping duplicate NOTE card: {ref_key}") 871 + continue 872 + record = { 873 + "type": "NOTE", 874 + "content": { 875 + "$type": "network.cosmik.card#noteContent", 876 + "text": text, 877 + }, 878 + "createdAt": datetime.now(timezone.utc).isoformat(), 879 + } 880 + if card_plan.get("title"): 881 + record["content"]["title"] = card_plan["title"] 882 + 883 + elif card_type == "URL": 884 + url = card_plan.get("content", "") 885 + if url in existing_urls: 886 + logger.info(f"skipping duplicate URL card: {ref_key}") 887 + continue 888 + record = { 889 + "type": "URL", 890 + "content": { 891 + "$type": "network.cosmik.card#urlContent", 892 + "url": url, 893 + "metadata": { 894 + "$type": "network.cosmik.card#urlMetadata", 895 + }, 896 + }, 897 + "createdAt": datetime.now(timezone.utc).isoformat(), 898 + } 899 + if card_plan.get("title"): 900 + record["content"]["metadata"]["title"] = card_plan["title"] 901 + if card_plan.get("description"): 902 + record["content"]["metadata"]["description"] = card_plan["description"] 903 + else: 904 + logger.warning(f"unknown card type: {card_type}") 905 + continue 906 + 907 + try: 908 + result = _create_pds_record(session, "network.cosmik.card", record) 909 + ref_map[ref_key] = {"uri": result["uri"], "cid": result["cid"]} 910 + cards_created += 1 911 + logger.info(f"created {card_type} card: {ref_key} -> {result['uri']}") 912 + except Exception as e: 913 + logger.warning(f"failed to create card {ref_key}: {e}") 914 + 915 + # helper to resolve ref_key or at:// URI 916 + def resolve_ref(ref: str) -> dict[str, str] | None: 917 + if ref.startswith("at://"): 918 + # find cid from existing cards 919 + for card in existing_cards: 920 + if card.get("uri") == ref: 921 + return {"uri": ref, "cid": card.get("cid", "")} 922 + return {"uri": ref, "cid": ""} 923 + return ref_map.get(ref) 924 + 925 + # phase 2: create connections 926 + conns_created = 0 927 + for conn_plan in plan.get("connections", []): 928 + source = resolve_ref(conn_plan.get("source", "")) 929 + target = resolve_ref(conn_plan.get("target", "")) 930 + if not source or not target: 931 + logger.warning( 932 + f"skipping connection — unresolved ref: " 933 + f"{conn_plan.get('source')} -> {conn_plan.get('target')}" 934 + ) 935 + continue 936 + 937 + pair = (source["uri"], target["uri"]) 938 + if pair in existing_conn_pairs: 939 + logger.info(f"skipping duplicate connection: {pair}") 940 + continue 941 + 942 + record = { 943 + "source": source["uri"], 944 + "target": target["uri"], 945 + "connectionType": conn_plan.get("connection_type", "related"), 946 + } 947 + if conn_plan.get("note"): 948 + record["note"] = conn_plan["note"][:1000] 949 + 950 + try: 951 + _create_pds_record(session, "network.cosmik.connection", record) 952 + existing_conn_pairs.add(pair) 953 + conns_created += 1 954 + logger.info(f"created connection: {pair}") 955 + except Exception as e: 956 + logger.warning(f"failed to create connection: {e}") 957 + 958 + # phase 3: create/update collections 959 + colls_created = 0 960 + for coll_plan in plan.get("collections", []): 961 + card_refs = coll_plan.get("card_refs", []) 962 + resolved_cards = [] 963 + for ref in card_refs: 964 + resolved = resolve_ref(ref) 965 + if resolved: 966 + resolved_cards.append(resolved) 967 + 968 + if not resolved_cards: 969 + continue 970 + 971 + existing_uri = coll_plan.get("existing_collection_uri") 972 + if existing_uri: 973 + # add cards to existing collection — need its cid 974 + coll_ref = {"uri": existing_uri, "cid": ""} 975 + else: 976 + # create new collection 977 + record = { 978 + "name": coll_plan.get("name", "untitled")[:100], 979 + "accessType": "OPEN", 980 + "createdAt": datetime.now(timezone.utc).isoformat(), 981 + } 982 + if coll_plan.get("description"): 983 + record["description"] = coll_plan["description"][:500] 984 + 985 + try: 986 + result = _create_pds_record( 987 + session, "network.cosmik.collection", record 988 + ) 989 + coll_ref = {"uri": result["uri"], "cid": result["cid"]} 990 + colls_created += 1 991 + logger.info(f"created collection: {coll_plan.get('name')}") 992 + except Exception as e: 993 + logger.warning(f"failed to create collection: {e}") 994 + continue 995 + 996 + # link cards to collection 997 + for card_ref in resolved_cards: 998 + link_record = { 999 + "collection": {"uri": coll_ref["uri"], "cid": coll_ref["cid"]}, 1000 + "card": {"uri": card_ref["uri"], "cid": card_ref["cid"]}, 1001 + "addedBy": session["did"], 1002 + "addedAt": datetime.now(timezone.utc).isoformat(), 1003 + } 1004 + try: 1005 + _create_pds_record( 1006 + session, "network.cosmik.collectionLink", link_record 1007 + ) 1008 + except Exception as e: 1009 + logger.warning(f"failed to link card to collection: {e}") 1010 + 1011 + return { 1012 + "cards": cards_created, 1013 + "connections": conns_created, 1014 + "collections": colls_created, 1015 + } 1016 + 1017 + 1018 + # --------------------------------------------------------------------------- 1019 + # main flow 1020 + # --------------------------------------------------------------------------- 1021 + 1022 + 1023 + @flow(name="morning", log_prints=True) 1024 + async def morning(): 1025 + """Morning flow: tag maintenance + agentic semble curation. 1026 + 1027 + Runs daily at 8am CT (13:00 UTC). Phases 1-3 clean up the tag graph, 1028 + phase 4 reasons about what deserves promotion to semble. 1029 + """ 1030 + logger = get_run_logger() 1031 + 1032 + tpuf_key = (await Secret.load("turbopuffer-api-key")).get() 1033 + openai_key = (await Secret.load("openai-api-key")).get() 1034 + anthropic_key = (await Secret.load("anthropic-api-key")).get() 1035 + model_name = await Variable.get("morning-model", default="claude-sonnet-4-6") 1036 + print(f"using model: {model_name}") 1037 + 1038 + # --- phase 1: collect and deduplicate tags --- 1039 + tag_data = collect_all_tags(tpuf_key) 1040 + tag_info: dict[str, dict] = tag_data["tag_info"] 1041 + cooccurrences: dict[str, int] = tag_data["cooccurrences"] 1042 + user_tag_sets: dict[str, list[str]] = tag_data["user_tag_sets"] 1043 + 1044 + tags = sorted(tag_info.keys()) 1045 + if not tags: 1046 + print("no tags found, nothing to do") 1047 + return 1048 + 1049 + print(f"collected {len(tags)} unique tags across all namespaces") 1050 + tag_embeddings = embed_tags(openai_key, tags) 1051 + 1052 + tags_text = "\n".join(tags) 1053 + merge_dicts = await identify_tag_merges( 1054 + tags_text, tag_info, tag_embeddings, anthropic_key, model_name=model_name 1055 + ) 1056 + 1057 + merged_aliases: set[str] = set() 1058 + if merge_dicts: 1059 + for m in merge_dicts: 1060 + merged_aliases.update(m["aliases"]) 1061 + updated = apply_tag_merges(tpuf_key, merge_dicts) 1062 + print( 1063 + f"phase 1: merged {len(merged_aliases)} aliases into " 1064 + f"{len(merge_dicts)} canonical tags, updated {updated} observations" 1065 + ) 1066 + else: 1067 + print("phase 1: no tag merges needed") 1068 + 1069 + # --- phase 2: discover relationships --- 1070 + rel_dicts = await discover_tag_relationships( 1071 + tags_text, 1072 + tag_info, 1073 + tag_embeddings, 1074 + cooccurrences, 1075 + user_tag_sets, 1076 + merged_aliases, 1077 + anthropic_key, 1078 + model_name=model_name, 1079 + ) 1080 + print(f"phase 2: discovered {len(rel_dicts)} tag relationships") 1081 + 1082 + # --- phase 3: store in turbopuffer --- 1083 + if rel_dicts: 1084 + stored = store_tag_relationships(tpuf_key, openai_key, rel_dicts) 1085 + print(f"phase 3: stored {stored} relationships in {TAG_REL_NAMESPACE}") 1086 + else: 1087 + print("phase 3: no relationships to store") 1088 + 1089 + # --- phase 4: agentic curation --- 1090 + try: 1091 + bsky_handle = (await Secret.load("atproto-handle")).get() 1092 + bsky_password = (await Secret.load("atproto-password")).get() 1093 + except Exception: 1094 + print("phase 4: skipped — atproto secrets not configured") 1095 + return 1096 + 1097 + # gather state from both knowledge layers 1098 + existing_cards = _list_cosmik_cards(PHI_DID) 1099 + existing_connections = _list_cosmik_connections(PHI_DID) 1100 + existing_collections = _list_cosmik_collections(PHI_DID) 1101 + existing_collection_links = _list_cosmik_collection_links(PHI_DID) 1102 + recent_obs = collect_recent_observations(tpuf_key) 1103 + recent_ep = collect_recent_episodic(tpuf_key) 1104 + 1105 + print( 1106 + f"phase 4: state — {len(existing_cards)} cards, " 1107 + f"{len(existing_connections)} connections, " 1108 + f"{len(existing_collections)} collections, " 1109 + f"{len(recent_obs)} recent observations, " 1110 + f"{len(recent_ep)} recent episodic" 1111 + ) 1112 + 1113 + # assemble context for LLM 1114 + context = assemble_curation_context( 1115 + tag_info=tag_info, 1116 + tag_relationships=rel_dicts, 1117 + existing_cards=existing_cards, 1118 + existing_connections=existing_connections, 1119 + existing_collections=existing_collections, 1120 + existing_collection_links=existing_collection_links, 1121 + recent_observations=recent_obs, 1122 + recent_episodic=recent_ep, 1123 + ) 1124 + 1125 + # LLM decides what to curate 1126 + plan = await plan_curation(context, anthropic_key, model_name=model_name) 1127 + 1128 + if not plan.get("should_curate"): 1129 + print(f"phase 4: no curation needed — {plan.get('reasoning', '')}") 1130 + return 1131 + 1132 + # execute the plan 1133 + session = _create_bsky_session(bsky_handle, bsky_password) 1134 + result = execute_curation_plan(session, plan, existing_cards, existing_connections) 1135 + print( 1136 + f"phase 4: created {result['cards']} cards, " 1137 + f"{result['connections']} connections, " 1138 + f"{result['collections']} collections" 1139 + ) 1140 + 1141 + 1142 + if __name__ == "__main__": 1143 + import asyncio 1144 + 1145 + asyncio.run(morning())
-1021
flows/weave.py
··· 1 - """ 2 - Weave phi's memory graph — deduplicate tags, discover relationships, inject edges, 3 - and selectively promote hard-fought knowledge to cosmik records. 4 - 5 - Triggered on transform completion (parallel with compact and brief). 6 - Reads from TurboPuffer directly (no DuckDB needed — tags/observations live in tpuf). 7 - """ 8 - 9 - import hashlib 10 - import re 11 - from collections import defaultdict 12 - from datetime import datetime, timedelta, timezone 13 - from typing import Any 14 - 15 - import httpx 16 - import turbopuffer 17 - from openai import OpenAI 18 - from pydantic import BaseModel, Field 19 - from pydantic_ai import Agent 20 - from pydantic_ai.models.anthropic import AnthropicModel 21 - from pydantic_ai.providers.anthropic import AnthropicProvider 22 - from prefect import flow, get_run_logger, task 23 - from prefect.blocks.system import Secret 24 - from prefect.cache_policies import CachePolicy 25 - from prefect.context import TaskRunContext 26 - from prefect.variables import Variable 27 - 28 - from mps.phi import TagCluster, TagMerge, TagRelationship 29 - 30 - PHI_DID = "did:plc:65sucjiel52gefhcdcypynsr" 31 - TAG_REL_NAMESPACE = "phi-tag-relationships" 32 - TAG_REL_SCHEMA = { 33 - "tag_a": {"type": "string", "filterable": True}, 34 - "tag_b": {"type": "string", "filterable": True}, 35 - "relationship_type": {"type": "string", "filterable": True}, 36 - "confidence": {"type": "float"}, 37 - "evidence": {"type": "string"}, 38 - } 39 - 40 - 41 - # --------------------------------------------------------------------------- 42 - # helpers 43 - # --------------------------------------------------------------------------- 44 - 45 - 46 - def cosine_similarity(a: list[float], b: list[float]) -> float: 47 - dot = sum(x * y for x, y in zip(a, b)) 48 - norm_a = sum(x * x for x in a) ** 0.5 49 - norm_b = sum(x * x for x in b) ** 0.5 50 - if norm_a == 0 or norm_b == 0: 51 - return 0.0 52 - return dot / (norm_a * norm_b) 53 - 54 - 55 - def _rel_id(tag_a: str, tag_b: str) -> str: 56 - """Deterministic ID for a tag relationship (order-independent).""" 57 - pair = tuple(sorted([tag_a, tag_b])) 58 - return f"rel-{pair[0]}-{pair[1]}" 59 - 60 - 61 - # --------------------------------------------------------------------------- 62 - # cache policies 63 - # --------------------------------------------------------------------------- 64 - 65 - 66 - class ByTagsHash(CachePolicy): 67 - """Cache by hash of all tags, scoped per task. Skip LLM if tag set unchanged.""" 68 - 69 - def compute_key( 70 - self, 71 - task_ctx: TaskRunContext, 72 - inputs: dict[str, Any], 73 - flow_parameters: dict[str, Any], 74 - **kwargs: Any, 75 - ) -> str | None: 76 - tags_text = inputs.get("tags_text") 77 - if not tags_text: 78 - return None 79 - h = hashlib.md5(tags_text.encode()).hexdigest()[:12] 80 - task_key = task_ctx.task.task_key if task_ctx else "unknown" 81 - return f"weave-{task_key}/{h}" 82 - 83 - 84 - # --------------------------------------------------------------------------- 85 - # phase 1: tag collection + deduplication 86 - # --------------------------------------------------------------------------- 87 - 88 - 89 - @task 90 - def collect_all_tags(tpuf_key: str) -> dict[str, Any]: 91 - """Read all tags from user namespaces + episodic, with co-occurrence data.""" 92 - client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 93 - 94 - tag_info: dict[str, dict[str, Any]] = defaultdict( 95 - lambda: {"count": 0, "users": set(), "samples": [], "episodic_count": 0} 96 - ) 97 - cooccurrences: dict[tuple[str, str], int] = defaultdict(int) 98 - user_tag_sets: dict[str, set[str]] = defaultdict(set) 99 - 100 - # scan user namespaces 101 - page = client.namespaces(prefix="phi-users-") 102 - for ns_summary in page.namespaces: 103 - handle = ns_summary.id.removeprefix("phi-users-").replace("_", ".") 104 - ns = client.namespace(ns_summary.id) 105 - try: 106 - response = ns.query( 107 - rank_by=("vector", "ANN", [0.5] * 1536), 108 - top_k=200, 109 - filters={"kind": ["Eq", "observation"]}, 110 - include_attributes=["content", "tags"], 111 - ) 112 - if response.rows: 113 - for row in response.rows: 114 - row_tags = sorted(getattr(row, "tags", []) or []) 115 - for tag in row_tags: 116 - info = tag_info[tag] 117 - info["count"] += 1 118 - info["users"].add(handle) 119 - user_tag_sets[handle].add(tag) 120 - if len(info["samples"]) < 3: 121 - info["samples"].append(row.content[:200]) 122 - # co-occurrence within same observation 123 - for i, t1 in enumerate(row_tags): 124 - for t2 in row_tags[i + 1 :]: 125 - cooccurrences[(t1, t2)] += 1 126 - except Exception: 127 - pass 128 - 129 - # scan episodic namespace 130 - try: 131 - ns = client.namespace("phi-episodic") 132 - response = ns.query( 133 - rank_by=("vector", "ANN", [0.5] * 1536), 134 - top_k=200, 135 - include_attributes=["content", "tags"], 136 - ) 137 - if response.rows: 138 - for row in response.rows: 139 - row_tags = sorted(getattr(row, "tags", []) or []) 140 - for tag in row_tags: 141 - info = tag_info[tag] 142 - info["episodic_count"] += 1 143 - if len(info["samples"]) < 3: 144 - info["samples"].append(row.content[:200]) 145 - for i, t1 in enumerate(row_tags): 146 - for t2 in row_tags[i + 1 :]: 147 - cooccurrences[(t1, t2)] += 1 148 - except Exception: 149 - pass 150 - 151 - # serialize for prefect (sets -> lists, tuple keys -> string keys) 152 - return { 153 - "tag_info": { 154 - tag: {**info, "users": list(info["users"])} 155 - for tag, info in tag_info.items() 156 - }, 157 - "cooccurrences": { 158 - f"{t1}|{t2}": count for (t1, t2), count in cooccurrences.items() 159 - }, 160 - "user_tag_sets": {h: list(tags) for h, tags in user_tag_sets.items()}, 161 - } 162 - 163 - 164 - @task 165 - def embed_tags(openai_key: str, tags: list[str]) -> dict[str, list[float]]: 166 - """Batch-embed all tag names with text-embedding-3-small.""" 167 - if not tags: 168 - return {} 169 - client = OpenAI(api_key=openai_key) 170 - response = client.embeddings.create(model="text-embedding-3-small", input=tags) 171 - return {tags[i]: response.data[i].embedding for i in range(len(tags))} 172 - 173 - 174 - class MergeProposal(BaseModel): 175 - merges: list[TagMerge] = Field(default_factory=list) 176 - 177 - 178 - @task( 179 - cache_policy=ByTagsHash(), 180 - cache_expiration=timedelta(hours=4), 181 - persist_result=True, 182 - result_serializer="json", 183 - ) 184 - async def identify_tag_merges( 185 - tags_text: str, 186 - tag_info: dict[str, dict], 187 - tag_embeddings: dict[str, list[float]], 188 - api_key: str, 189 - model_name: str = "claude-sonnet-4-6", 190 - ) -> list[dict[str, Any]]: 191 - """Give the LLM the full tag inventory and let it propose consolidations.""" 192 - # format every tag with usage context so the LLM can make informed decisions 193 - tag_lines = [] 194 - for tag in sorted(tag_info.keys()): 195 - info = tag_info[tag] 196 - users = info.get("users", []) 197 - count = info.get("count", 0) 198 - episodic = info.get("episodic_count", 0) 199 - sample = (info.get("samples") or [""])[0][:120] 200 - tag_lines.append( 201 - f" {tag} (obs={count}, episodic={episodic}, users={len(users)})" 202 - f"\n sample: {sample}" 203 - ) 204 - 205 - inventory = "\n".join(tag_lines) 206 - 207 - model = AnthropicModel( 208 - model_name, provider=AnthropicProvider(api_key=api_key) 209 - ) 210 - agent = Agent( 211 - model, 212 - system_prompt=( 213 - "you are consolidating the tag vocabulary for phi's memory graph.\n\n" 214 - "you will receive the full inventory of tags with usage counts and sample " 215 - "observations. your job:\n\n" 216 - "1. MERGE tags that are the same concept with different surface forms.\n" 217 - " examples: 'attestation' / 'self-attestation' → canonical: 'attestation'\n" 218 - " 'ai_systems' / 'bot' / 'system-improvement' → canonical: 'ai-systems'\n\n" 219 - "2. mark tags that are RELATED but distinct — these should link, not merge.\n" 220 - " example: 'epistemology' / 'social-epistemology' → related, not merged\n\n" 221 - "rules:\n" 222 - "- prefer lowercase, hyphenated canonical forms\n" 223 - "- group transitive merges into one entry\n" 224 - "- put related (but not merged) tags in the 'related' field\n" 225 - "- only merge when you're confident they're the same concept\n" 226 - "- it's fine to return zero merges if the tags are already clean\n" 227 - "- look for underscored vs hyphenated variants, singular/plural, " 228 - "abbreviations, and overlapping concepts" 229 - ), 230 - output_type=MergeProposal, 231 - name="tag-merger", 232 - ) 233 - 234 - result = await agent.run(f"full tag inventory ({len(tag_info)} tags):\n{inventory}") 235 - return [m.model_dump() for m in result.output.merges] 236 - 237 - 238 - @task 239 - def apply_tag_merges( 240 - tpuf_key: str, 241 - merges: list[dict[str, Any]], 242 - ) -> int: 243 - """Rewrite tags in turbopuffer observations to use canonical forms.""" 244 - if not merges: 245 - return 0 246 - 247 - alias_map: dict[str, str] = {} 248 - for merge in merges: 249 - for alias in merge["aliases"]: 250 - alias_map[alias] = merge["canonical"] 251 - 252 - client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 253 - updated = 0 254 - 255 - # collect all namespaces to scan 256 - ns_ids: list[str] = [] 257 - page = client.namespaces(prefix="phi-users-") 258 - ns_ids.extend(ns.id for ns in page.namespaces) 259 - ns_ids.append("phi-episodic") 260 - 261 - for ns_id in ns_ids: 262 - ns = client.namespace(ns_id) 263 - is_user_ns = ns_id.startswith("phi-users-") 264 - 265 - try: 266 - kwargs: dict[str, Any] = { 267 - "rank_by": ("vector", "ANN", [0.5] * 1536), 268 - "top_k": 200, 269 - "include_attributes": ["content", "tags", "created_at", "vector"], 270 - } 271 - if is_user_ns: 272 - kwargs["filters"] = {"kind": ["Eq", "observation"]} 273 - kwargs["include_attributes"].append("kind") 274 - else: 275 - kwargs["include_attributes"].append("source") 276 - response = ns.query(**kwargs) 277 - except Exception: 278 - continue 279 - 280 - if not response.rows: 281 - continue 282 - 283 - rows_to_upsert = [] 284 - for row in response.rows: 285 - old_tags = list(getattr(row, "tags", []) or []) 286 - new_tags = [alias_map.get(t, t) for t in old_tags] 287 - # deduplicate preserving order 288 - seen: set[str] = set() 289 - deduped = [] 290 - for t in new_tags: 291 - if t not in seen: 292 - seen.add(t) 293 - deduped.append(t) 294 - 295 - if deduped != old_tags: 296 - vec = getattr(row, "vector", None) 297 - if not vec: 298 - continue 299 - row_data: dict[str, Any] = { 300 - "id": row.id, 301 - "vector": vec, 302 - "content": row.content, 303 - "tags": deduped, 304 - "created_at": getattr( 305 - row, "created_at", datetime.now(timezone.utc).isoformat() 306 - ), 307 - } 308 - if is_user_ns: 309 - row_data["kind"] = "observation" 310 - else: 311 - row_data["source"] = getattr(row, "source", "tool") 312 - rows_to_upsert.append(row_data) 313 - 314 - if rows_to_upsert: 315 - schema: dict[str, Any] = { 316 - "content": {"type": "string", "full_text_search": True}, 317 - "tags": {"type": "[]string", "filterable": True}, 318 - "created_at": {"type": "string"}, 319 - } 320 - if is_user_ns: 321 - schema["kind"] = {"type": "string", "filterable": True} 322 - else: 323 - schema["source"] = {"type": "string", "filterable": True} 324 - 325 - ns.write( 326 - upsert_rows=rows_to_upsert, 327 - distance_metric="cosine_distance", 328 - schema=schema, 329 - ) 330 - updated += len(rows_to_upsert) 331 - 332 - return updated 333 - 334 - 335 - # --------------------------------------------------------------------------- 336 - # phase 2: tag relationship discovery 337 - # --------------------------------------------------------------------------- 338 - 339 - 340 - class ClusterProposal(BaseModel): 341 - clusters: list[TagCluster] = Field(default_factory=list) 342 - 343 - 344 - @task( 345 - cache_policy=ByTagsHash(), 346 - cache_expiration=timedelta(hours=4), 347 - persist_result=True, 348 - result_serializer="json", 349 - ) 350 - async def discover_tag_relationships( 351 - tags_text: str, 352 - tag_info: dict[str, dict], 353 - tag_embeddings: dict[str, list[float]], 354 - cooccurrences: dict[str, int], 355 - user_tag_sets: dict[str, list[str]], 356 - merged_aliases: set[str], 357 - api_key: str, 358 - model_name: str = "claude-sonnet-4-6", 359 - ) -> list[dict[str, Any]]: 360 - """Ask the LLM to identify thematic clusters, then derive pairwise edges.""" 361 - tags = [t for t in sorted(tag_info.keys()) if t not in merged_aliases] 362 - 363 - # precompute co-occurrence hints 364 - cooccur_hints: dict[str, list[str]] = defaultdict(list) 365 - for pair_key, count in cooccurrences.items(): 366 - t1, t2 = pair_key.split("|", 1) 367 - if t1 in merged_aliases or t2 in merged_aliases: 368 - continue 369 - if count >= 2: 370 - cooccur_hints[t1].append(f"{t2} ({count}x)") 371 - cooccur_hints[t2].append(f"{t1} ({count}x)") 372 - 373 - # format tag inventory with context 374 - tag_lines = [] 375 - for tag in tags: 376 - info = tag_info.get(tag, {}) 377 - count = info.get("count", 0) 378 - episodic = info.get("episodic_count", 0) 379 - n_users = len(info.get("users", [])) 380 - sample = (info.get("samples") or [""])[0][:120] 381 - cooccur_str = "" 382 - if tag in cooccur_hints: 383 - cooccur_str = f"\n co-occurs with: {', '.join(cooccur_hints[tag][:5])}" 384 - tag_lines.append( 385 - f" {tag} (obs={count}, episodic={episodic}, users={n_users})" 386 - f"\n sample: {sample}{cooccur_str}" 387 - ) 388 - 389 - inventory = "\n".join(tag_lines) 390 - 391 - model = AnthropicModel( 392 - model_name, provider=AnthropicProvider(api_key=api_key) 393 - ) 394 - agent = Agent( 395 - model, 396 - system_prompt=( 397 - "you are organizing phi's memory tags into thematic clusters.\n" 398 - "phi is a bluesky bot that remembers conversations and builds knowledge.\n\n" 399 - "you will receive the full tag inventory with usage counts, sample observations, " 400 - "and co-occurrence data. your job: identify thematic clusters — groups of tags " 401 - "that belong to the same area of phi's knowledge or experience.\n\n" 402 - "examples of good clusters:\n" 403 - "- 'phi's epistemic concerns': epistemology, memory, confabulation, self-attestation\n" 404 - "- 'technical interests': programming, ai-systems, infrastructure\n" 405 - "- 'social dynamics': community, trust, collaboration\n\n" 406 - "rules:\n" 407 - "- a tag can appear in multiple clusters (topics overlap)\n" 408 - "- clusters should have 2-8 tags — small enough to be coherent\n" 409 - "- assign cohesion 0.0-1.0: how tightly the tags relate to the cluster theme\n" 410 - "- name each cluster concisely (2-4 words)\n" 411 - "- describe what ties the tags together\n" 412 - "- not every tag needs a cluster — singletons with no thematic neighbors are fine to skip\n" 413 - "- use the sample observations and co-occurrence data to inform your groupings" 414 - ), 415 - output_type=ClusterProposal, 416 - name="tag-clusterer", 417 - ) 418 - 419 - result = await agent.run( 420 - f"full tag inventory ({len(tags)} tags after merges):\n{inventory}" 421 - ) 422 - 423 - # derive pairwise relationships from cluster membership 424 - relationships: list[dict[str, Any]] = [] 425 - seen_pairs: set[tuple[str, str]] = set() 426 - for cluster in result.output.clusters: 427 - members = [t for t in cluster.tags if t in set(tags)] 428 - for i, t1 in enumerate(members): 429 - for t2 in members[i + 1 :]: 430 - pair = tuple(sorted([t1, t2])) 431 - if pair in seen_pairs: 432 - continue 433 - seen_pairs.add(pair) 434 - relationships.append( 435 - TagRelationship( 436 - tag_a=pair[0], 437 - tag_b=pair[1], 438 - relationship_type="related", 439 - confidence=cluster.cohesion, 440 - evidence=f"[{cluster.name}] {cluster.description}", 441 - ).model_dump() 442 - ) 443 - 444 - return relationships 445 - 446 - 447 - # --------------------------------------------------------------------------- 448 - # phase 3: store relationships in turbopuffer 449 - # --------------------------------------------------------------------------- 450 - 451 - 452 - @task 453 - def store_tag_relationships( 454 - tpuf_key: str, 455 - openai_key: str, 456 - relationships: list[dict[str, Any]], 457 - ) -> int: 458 - """Write tag relationships to phi-tag-relationships namespace.""" 459 - if not relationships: 460 - return 0 461 - 462 - client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 463 - openai_client = OpenAI(api_key=openai_key) 464 - ns = client.namespace(TAG_REL_NAMESPACE) 465 - 466 - # batch embed relationship descriptions for vector field 467 - texts = [f"{r['tag_a']} — {r['tag_b']}: {r['evidence']}" for r in relationships] 468 - embeddings = openai_client.embeddings.create( 469 - model="text-embedding-3-small", input=texts 470 - ) 471 - 472 - rows = [] 473 - for i, rel in enumerate(relationships): 474 - rows.append( 475 - { 476 - "id": _rel_id(rel["tag_a"], rel["tag_b"]), 477 - "vector": embeddings.data[i].embedding, 478 - "tag_a": rel["tag_a"], 479 - "tag_b": rel["tag_b"], 480 - "relationship_type": rel["relationship_type"], 481 - "confidence": rel["confidence"], 482 - "evidence": rel["evidence"], 483 - } 484 - ) 485 - 486 - ns.write( 487 - upsert_rows=rows, 488 - distance_metric="cosine_distance", 489 - schema=TAG_REL_SCHEMA, 490 - ) 491 - return len(rows) 492 - 493 - 494 - # --------------------------------------------------------------------------- 495 - # phase 4: cosmik promotion 496 - # --------------------------------------------------------------------------- 497 - 498 - 499 - def _create_bsky_session(handle: str, password: str) -> dict[str, Any]: 500 - """Authenticate with bsky and return session (accessJwt, did).""" 501 - resp = httpx.post( 502 - "https://bsky.social/xrpc/com.atproto.server.createSession", 503 - json={"identifier": handle, "password": password}, 504 - timeout=15, 505 - ) 506 - resp.raise_for_status() 507 - return resp.json() 508 - 509 - 510 - def _list_cosmik_cards(did: str) -> list[dict[str, Any]]: 511 - """List all cosmik cards for a DID (public, no auth needed).""" 512 - cards: list[dict[str, Any]] = [] 513 - cursor = None 514 - while True: 515 - params: dict[str, Any] = { 516 - "repo": did, 517 - "collection": "network.cosmik.card", 518 - "limit": 100, 519 - } 520 - if cursor: 521 - params["cursor"] = cursor 522 - resp = httpx.get( 523 - "https://bsky.social/xrpc/com.atproto.repo.listRecords", 524 - params=params, 525 - timeout=15, 526 - ) 527 - resp.raise_for_status() 528 - data = resp.json() 529 - cards.extend(data.get("records", [])) 530 - cursor = data.get("cursor") 531 - if not cursor: 532 - break 533 - return cards 534 - 535 - 536 - def _list_cosmik_connections(did: str) -> list[dict[str, Any]]: 537 - """List existing cosmik connections (public, no auth).""" 538 - conns: list[dict[str, Any]] = [] 539 - cursor = None 540 - while True: 541 - params: dict[str, Any] = { 542 - "repo": did, 543 - "collection": "network.cosmik.connection", 544 - "limit": 100, 545 - } 546 - if cursor: 547 - params["cursor"] = cursor 548 - resp = httpx.get( 549 - "https://bsky.social/xrpc/com.atproto.repo.listRecords", 550 - params=params, 551 - timeout=15, 552 - ) 553 - resp.raise_for_status() 554 - data = resp.json() 555 - conns.extend(data.get("records", [])) 556 - cursor = data.get("cursor") 557 - if not cursor: 558 - break 559 - return conns 560 - 561 - 562 - def _create_pds_record( 563 - session: dict[str, Any], collection: str, record: dict[str, Any] 564 - ) -> dict[str, Any]: 565 - """Create a record on PDS via XRPC. Returns {uri, cid}.""" 566 - resp = httpx.post( 567 - "https://bsky.social/xrpc/com.atproto.repo.createRecord", 568 - headers={"Authorization": f"Bearer {session['accessJwt']}"}, 569 - json={ 570 - "repo": session["did"], 571 - "collection": collection, 572 - "record": record, 573 - }, 574 - timeout=15, 575 - ) 576 - resp.raise_for_status() 577 - return resp.json() 578 - 579 - 580 - def _card_text(card: dict[str, Any]) -> str: 581 - """Extract searchable text from a cosmik card (handles both old and new format).""" 582 - val = card.get("value", {}) 583 - if val.get("type") == "NOTE": 584 - return val.get("content", {}).get("text", "")[:500] 585 - if val.get("type") == "URL": 586 - content = val.get("content", {}) 587 - # new format: metadata nested under content.metadata 588 - meta = content.get("metadata", {}) 589 - title = meta.get("title", "") or content.get("title", "") 590 - desc = meta.get("description", "") or content.get("description", "") 591 - return f"{title} {desc}".strip() or content.get("url", "") 592 - return "" 593 - 594 - 595 - def _match_cards_to_tags( 596 - cards: list[dict[str, Any]], 597 - tag_info: dict[str, dict], 598 - openai_key: str, 599 - ) -> dict[str, list[dict[str, Any]]]: 600 - """Match cosmik cards to tags by content similarity. 601 - 602 - Returns tag -> list of cards that are relevant to that tag. 603 - """ 604 - if not cards or not tag_info: 605 - return {} 606 - 607 - openai_client = OpenAI(api_key=openai_key) 608 - 609 - card_texts = [_card_text(c) for c in cards] 610 - valid = [(i, t) for i, t in enumerate(card_texts) if t] 611 - if not valid: 612 - return {} 613 - 614 - card_embeddings = openai_client.embeddings.create( 615 - model="text-embedding-3-small", input=[t for _, t in valid] 616 - ) 617 - card_vecs = {valid[j][0]: card_embeddings.data[j].embedding for j in range(len(valid))} 618 - 619 - tags = list(tag_info.keys()) 620 - tag_embeddings = openai_client.embeddings.create( 621 - model="text-embedding-3-small", input=tags 622 - ) 623 - tag_vecs = {tags[j]: tag_embeddings.data[j].embedding for j in range(len(tags))} 624 - 625 - result: dict[str, list[dict[str, Any]]] = defaultdict(list) 626 - for tag, tvec in tag_vecs.items(): 627 - for card_idx, cvec in card_vecs.items(): 628 - if cosine_similarity(tvec, cvec) >= 0.5: 629 - result[tag].append(cards[card_idx]) 630 - 631 - return dict(result) 632 - 633 - 634 - _URL_RE = re.compile(r"https?://[^\s,)>\]]+") 635 - 636 - 637 - @task 638 - def create_cluster_cards( 639 - session: dict[str, Any], 640 - tpuf_key: str, 641 - tag_info: dict[str, dict], 642 - existing_cards: list[dict[str, Any]], 643 - ) -> dict[str, Any]: 644 - """Create cosmik URL cards for URLs found in phi's observations. 645 - 646 - Scans all observations and episodic memories for URLs, creates cards for 647 - those that don't exist yet. Returns {cards: [...], tag_cards: {tag: [card]}}. 648 - The tag_cards mapping is built from observation-level tag associations, 649 - which is far more reliable than embedding-based matching. 650 - """ 651 - logger = get_run_logger() 652 - 653 - # index existing card URLs for dedup 654 - existing_urls: set[str] = set() 655 - for card in existing_cards: 656 - val = card.get("value", {}) 657 - if val.get("type") == "URL": 658 - existing_urls.add(val.get("content", {}).get("url", "")) 659 - 660 - # scan turbopuffer for URLs in observation content 661 - client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 662 - url_evidence: dict[str, dict[str, Any]] = {} 663 - 664 - ns_ids: list[str] = ["phi-episodic"] 665 - page = client.namespaces(prefix="phi-users-") 666 - ns_ids.extend(ns.id for ns in page.namespaces) 667 - 668 - for ns_id in ns_ids: 669 - ns = client.namespace(ns_id) 670 - try: 671 - kwargs: dict[str, Any] = { 672 - "rank_by": ("vector", "ANN", [0.5] * 1536), 673 - "top_k": 200, 674 - "include_attributes": ["content", "tags"], 675 - } 676 - if ns_id.startswith("phi-users-"): 677 - kwargs["filters"] = {"kind": ["Eq", "observation"]} 678 - response = ns.query(**kwargs) 679 - except Exception: 680 - continue 681 - 682 - for row in response.rows or []: 683 - content = row.content or "" 684 - row_tags = list(getattr(row, "tags", []) or []) 685 - for url in _URL_RE.findall(content): 686 - url = url.rstrip(".,;:!?") 687 - if url not in url_evidence: 688 - url_evidence[url] = { 689 - "tags": set(), 690 - "context": content[:300], 691 - "count": 0, 692 - } 693 - url_evidence[url]["tags"].update(row_tags) 694 - url_evidence[url]["count"] += 1 695 - 696 - # build tag->card mapping from observation-level associations 697 - tag_cards: dict[str, list[dict[str, Any]]] = defaultdict(list) 698 - 699 - # map existing cards to tags via their URLs 700 - existing_by_url: dict[str, dict[str, Any]] = {} 701 - for card in existing_cards: 702 - val = card.get("value", {}) 703 - if val.get("type") == "URL": 704 - card_url = val.get("content", {}).get("url", "") 705 - existing_by_url[card_url] = card 706 - if card_url in url_evidence: 707 - for tag in url_evidence[card_url]["tags"]: 708 - tag_cards[tag].append(card) 709 - 710 - # find new URLs not yet represented as cards 711 - new_urls = { 712 - url: ev for url, ev in url_evidence.items() if url not in existing_urls 713 - } 714 - 715 - if not new_urls: 716 - logger.info("no new URLs found in observations") 717 - return {"cards": existing_cards, "tag_cards": dict(tag_cards)} 718 - 719 - # sort by evidence strength 720 - ranked = sorted( 721 - new_urls.items(), 722 - key=lambda x: x[1]["count"] * len(x[1]["tags"]), 723 - reverse=True, 724 - ) 725 - 726 - new_cards = list(existing_cards) 727 - created = 0 728 - for url, evidence in ranked[:20]: 729 - tags_str = ", ".join(sorted(evidence["tags"])[:5]) 730 - record = { 731 - "type": "URL", 732 - "content": { 733 - "$type": "network.cosmik.card#urlContent", 734 - "url": url, 735 - "metadata": { 736 - "$type": "network.cosmik.card#urlMetadata", 737 - "description": f"discussed in context of: {tags_str}", 738 - }, 739 - }, 740 - "createdAt": datetime.now(timezone.utc).isoformat(), 741 - } 742 - try: 743 - result = _create_pds_record(session, "network.cosmik.card", record) 744 - card_entry = { 745 - "uri": result["uri"], 746 - "cid": result["cid"], 747 - "value": record, 748 - } 749 - new_cards.append(card_entry) 750 - # map this card to its observation tags 751 - for tag in evidence["tags"]: 752 - tag_cards[tag].append(card_entry) 753 - existing_urls.add(url) 754 - created += 1 755 - logger.info(f"created URL card: {url}") 756 - except Exception as e: 757 - logger.warning(f"failed to create card for {url}: {e}") 758 - 759 - logger.info(f"created {created} new URL cards from observations") 760 - return {"cards": new_cards, "tag_cards": dict(tag_cards)} 761 - 762 - 763 - @task 764 - def promote_connections( 765 - session: dict[str, Any], 766 - relationships: list[dict[str, Any]], 767 - tag_cards: dict[str, list[dict[str, Any]]], 768 - tag_info: dict[str, dict], 769 - existing_connections: list[dict[str, Any]], 770 - ) -> int: 771 - """Promote high-confidence relationships to cosmik connections.""" 772 - # index existing connections by source+target for idempotency 773 - existing_pairs: set[tuple[str, str]] = set() 774 - for conn in existing_connections: 775 - val = conn.get("value", {}) 776 - existing_pairs.add((val.get("source", ""), val.get("target", ""))) 777 - 778 - created = 0 779 - for rel in relationships: 780 - if rel["confidence"] < 0.8: 781 - continue 782 - 783 - tag_a, tag_b = rel["tag_a"], rel["tag_b"] 784 - cards_a = tag_cards.get(tag_a, []) 785 - cards_b = tag_cards.get(tag_b, []) 786 - 787 - if not cards_a or not cards_b: 788 - continue 789 - 790 - # check observation support: >= 3 observations across >= 2 users, or >= 5 episodic 791 - info_a = tag_info.get(tag_a, {}) 792 - info_b = tag_info.get(tag_b, {}) 793 - total_obs = info_a.get("count", 0) + info_b.get("count", 0) 794 - total_users = len( 795 - set(info_a.get("users", [])) | set(info_b.get("users", [])) 796 - ) 797 - total_episodic = info_a.get("episodic_count", 0) + info_b.get( 798 - "episodic_count", 0 799 - ) 800 - 801 - if not ( 802 - (total_obs >= 3 and total_users >= 2) or total_episodic >= 5 803 - ): 804 - continue 805 - 806 - # use first card from each tag as source/target 807 - source_uri = cards_a[0]["uri"] 808 - target_uri = cards_b[0]["uri"] 809 - 810 - if (source_uri, target_uri) in existing_pairs: 811 - continue 812 - 813 - record = { 814 - "source": source_uri, 815 - "target": target_uri, 816 - "connectionType": "related", 817 - "note": rel["evidence"][:1000], 818 - } 819 - 820 - try: 821 - _create_pds_record(session, "network.cosmik.connection", record) 822 - existing_pairs.add((source_uri, target_uri)) 823 - created += 1 824 - except Exception: 825 - pass 826 - 827 - return created 828 - 829 - 830 - @task 831 - def promote_collections( 832 - session: dict[str, Any], 833 - relationships: list[dict[str, Any]], 834 - tag_cards: dict[str, list[dict[str, Any]]], 835 - ) -> int: 836 - """Promote tag clusters with >= 3 cards to cosmik collections.""" 837 - # build clusters: group tags that are connected by relationships 838 - adj: dict[str, set[str]] = defaultdict(set) 839 - for rel in relationships: 840 - if rel["confidence"] >= 0.7: 841 - adj[rel["tag_a"]].add(rel["tag_b"]) 842 - adj[rel["tag_b"]].add(rel["tag_a"]) 843 - 844 - # find connected components via BFS 845 - visited: set[str] = set() 846 - clusters: list[set[str]] = [] 847 - for tag in adj: 848 - if tag in visited: 849 - continue 850 - cluster: set[str] = set() 851 - queue = [tag] 852 - while queue: 853 - t = queue.pop() 854 - if t in visited: 855 - continue 856 - visited.add(t) 857 - cluster.add(t) 858 - queue.extend(adj[t] - visited) 859 - if len(cluster) >= 2: 860 - clusters.append(cluster) 861 - 862 - created = 0 863 - for cluster in clusters: 864 - # collect all unique cards in this cluster 865 - cluster_cards: list[dict[str, Any]] = [] 866 - seen_uris: set[str] = set() 867 - for tag in cluster: 868 - for card in tag_cards.get(tag, []): 869 - if card["uri"] not in seen_uris: 870 - seen_uris.add(card["uri"]) 871 - cluster_cards.append(card) 872 - 873 - if len(cluster_cards) < 3: 874 - continue 875 - 876 - # derive collection name from tags 877 - name = " + ".join(sorted(cluster)[:3]) 878 - if len(cluster) > 3: 879 - name += f" (+{len(cluster) - 3})" 880 - 881 - record = { 882 - "name": name[:100], 883 - "accessType": "OPEN", 884 - "description": f"phi's notes on: {', '.join(sorted(cluster))}"[:500], 885 - "createdAt": datetime.now(timezone.utc).isoformat(), 886 - } 887 - 888 - try: 889 - coll_result = _create_pds_record( 890 - session, "network.cosmik.collection", record 891 - ) 892 - coll_uri = coll_result["uri"] 893 - coll_cid = coll_result["cid"] 894 - 895 - # link cards to collection 896 - for card in cluster_cards: 897 - link_record = { 898 - "collection": {"uri": coll_uri, "cid": coll_cid}, 899 - "card": {"uri": card["uri"], "cid": card["cid"]}, 900 - "addedBy": session["did"], 901 - "addedAt": datetime.now(timezone.utc).isoformat(), 902 - } 903 - try: 904 - _create_pds_record( 905 - session, "network.cosmik.collectionLink", link_record 906 - ) 907 - except Exception: 908 - pass 909 - 910 - created += 1 911 - except Exception: 912 - pass 913 - 914 - return created 915 - 916 - 917 - # --------------------------------------------------------------------------- 918 - # main flow 919 - # --------------------------------------------------------------------------- 920 - 921 - 922 - @flow(name="weave", log_prints=True) 923 - async def weave(): 924 - """Weave phi's memory graph: deduplicate tags, discover relationships, 925 - inject graph edges, and selectively promote to cosmik.""" 926 - logger = get_run_logger() 927 - 928 - tpuf_key = (await Secret.load("turbopuffer-api-key")).get() 929 - openai_key = (await Secret.load("openai-api-key")).get() 930 - anthropic_key = (await Secret.load("anthropic-api-key")).get() 931 - model_name = await Variable.get("weave-model", default="claude-sonnet-4-6") 932 - print(f"using model: {model_name}") 933 - 934 - # --- phase 1: collect and deduplicate tags --- 935 - tag_data = collect_all_tags(tpuf_key) 936 - tag_info: dict[str, dict] = tag_data["tag_info"] 937 - cooccurrences: dict[str, int] = tag_data["cooccurrences"] 938 - user_tag_sets: dict[str, list[str]] = tag_data["user_tag_sets"] 939 - 940 - tags = sorted(tag_info.keys()) 941 - if not tags: 942 - print("no tags found, nothing to weave") 943 - return 944 - 945 - print(f"collected {len(tags)} unique tags across all namespaces") 946 - tag_embeddings = embed_tags(openai_key, tags) 947 - 948 - tags_text = "\n".join(tags) 949 - merge_dicts = await identify_tag_merges( 950 - tags_text, tag_info, tag_embeddings, anthropic_key, model_name=model_name 951 - ) 952 - 953 - # collect all aliases for filtering in phase 2 954 - merged_aliases: set[str] = set() 955 - if merge_dicts: 956 - for m in merge_dicts: 957 - merged_aliases.update(m["aliases"]) 958 - updated = apply_tag_merges(tpuf_key, merge_dicts) 959 - print( 960 - f"phase 1: merged {len(merged_aliases)} aliases into " 961 - f"{len(merge_dicts)} canonical tags, updated {updated} observations" 962 - ) 963 - else: 964 - print("phase 1: no tag merges needed") 965 - 966 - # --- phase 2: discover relationships --- 967 - rel_dicts = await discover_tag_relationships( 968 - tags_text, 969 - tag_info, 970 - tag_embeddings, 971 - cooccurrences, 972 - user_tag_sets, 973 - merged_aliases, 974 - anthropic_key, 975 - model_name=model_name, 976 - ) 977 - print(f"phase 2: discovered {len(rel_dicts)} tag relationships") 978 - 979 - # --- phase 3: store in turbopuffer --- 980 - if rel_dicts: 981 - stored = store_tag_relationships(tpuf_key, openai_key, rel_dicts) 982 - print(f"phase 3: stored {stored} relationships in {TAG_REL_NAMESPACE}") 983 - else: 984 - print("phase 3: no relationships to store") 985 - 986 - # --- phase 4: cosmik promotion --- 987 - try: 988 - bsky_handle = (await Secret.load("atproto-handle")).get() 989 - bsky_password = (await Secret.load("atproto-password")).get() 990 - except Exception: 991 - print("phase 4: skipped — atproto secrets not configured") 992 - return 993 - 994 - if not rel_dicts: 995 - print("phase 4: no relationships to promote") 996 - return 997 - 998 - session = _create_bsky_session(bsky_handle, bsky_password) 999 - existing_cards = _list_cosmik_cards(PHI_DID) 1000 - existing_conns = _list_cosmik_connections(PHI_DID) 1001 - print(f"phase 4: found {len(existing_cards)} existing cards, {len(existing_conns)} connections") 1002 - 1003 - # create cards from URLs in observations + build tag->card mapping 1004 - card_result = create_cluster_cards(session, tpuf_key, tag_info, existing_cards) 1005 - cards = card_result["cards"] 1006 - tag_cards = card_result["tag_cards"] 1007 - print(f"phase 4: {len(cards)} total cards, {len(tag_cards)} tags with cards") 1008 - 1009 - conn_count = promote_connections( 1010 - session, rel_dicts, tag_cards, tag_info, existing_conns 1011 - ) 1012 - coll_count = promote_collections(session, rel_dicts, tag_cards) 1013 - print( 1014 - f"phase 4: promoted {conn_count} connections, {coll_count} collections to cosmik" 1015 - ) 1016 - 1017 - 1018 - if __name__ == "__main__": 1019 - import asyncio 1020 - 1021 - asyncio.run(weave())
+55
packages/mps/src/mps/phi.py
··· 47 47 evidence: str = Field(description="brief explanation of why these are related") 48 48 49 49 50 + class CardPlan(BaseModel): 51 + """A card to create on semble.""" 52 + 53 + card_type: str = Field(description="NOTE or URL") 54 + content: str = Field(description="text content (NOTE) or URL string (URL)") 55 + title: str | None = Field(default=None, description="optional title for the card") 56 + description: str | None = Field( 57 + default=None, description="optional description / context" 58 + ) 59 + ref_key: str = Field( 60 + description="local reference key (e.g. 'card-1') for cross-referencing within the plan" 61 + ) 62 + 63 + 64 + class ConnectionPlan(BaseModel): 65 + """A connection to create between two cards.""" 66 + 67 + source: str = Field( 68 + description="at:// URI of existing card, or ref_key of a new card in this plan" 69 + ) 70 + target: str = Field( 71 + description="at:// URI of existing card, or ref_key of a new card in this plan" 72 + ) 73 + connection_type: str = Field(description="e.g. related, builds-on, contrasts") 74 + note: str | None = Field(default=None, description="brief explanation of the link") 75 + 76 + 77 + class CollectionPlan(BaseModel): 78 + """A collection to create or update.""" 79 + 80 + name: str = Field(description="collection name (concise, 2-5 words)") 81 + description: str | None = Field(default=None) 82 + card_refs: list[str] = Field( 83 + description="at:// URIs or ref_keys of cards to include" 84 + ) 85 + existing_collection_uri: str | None = Field( 86 + default=None, 87 + description="if set, add cards to this existing collection instead of creating new", 88 + ) 89 + 90 + 91 + class CurationPlan(BaseModel): 92 + """Structured output from the curation LLM — what (if anything) to promote to semble.""" 93 + 94 + should_curate: bool = Field( 95 + description="false most days — only true when there is genuinely new knowledge worth promoting" 96 + ) 97 + reasoning: str = Field( 98 + description="brief explanation of why curation is or isn't warranted" 99 + ) 100 + cards: list[CardPlan] = Field(default_factory=list) 101 + connections: list[ConnectionPlan] = Field(default_factory=list) 102 + collections: list[CollectionPlan] = Field(default_factory=list) 103 + 104 + 50 105 @dataclass 51 106 class PhiObservation: 52 107 handle: str
+4 -9
prefect.yaml
··· 86 86 prefect.resource.name: "transform" 87 87 prefect.resource.role: "deployment" 88 88 89 - - name: weave 90 - entrypoint: flows/weave.py:weave 89 + - name: morning 90 + entrypoint: flows/morning.py:morning 91 91 work_pool: *k8s 92 - triggers: 93 - - type: event 94 - expect: 95 - - "prefect.flow-run.Completed" 96 - match_related: 97 - prefect.resource.name: "transform" 98 - prefect.resource.role: "deployment" 92 + schedules: 93 + - cron: "0 13 * * *" # 8am CT daily