my prefect server setup prefect-metrics.waow.tech
python orchestration
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add weave flow — tag dedup, relationship discovery, cosmik promotion

new flow triggered on transform completion (parallel with compact and brief).
reads tags from all tpuf namespaces, deduplicates via embedding similarity +
LLM confirmation, discovers tag relationships, stores edges in phi-tag-relationships
namespace, and selectively promotes high-confidence relationships to cosmik records.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+903
+869
flows/weave.py
··· 1 + """ 2 + Weave phi's memory graph — deduplicate tags, discover relationships, inject edges, 3 + and selectively promote hard-fought knowledge to cosmik records. 4 + 5 + Triggered on transform completion (parallel with compact and brief). 6 + Reads from TurboPuffer directly (no DuckDB needed — tags/observations live in tpuf). 7 + """ 8 + 9 + import hashlib 10 + from collections import defaultdict 11 + from datetime import datetime, timedelta, timezone 12 + from typing import Any 13 + 14 + import httpx 15 + import turbopuffer 16 + from openai import OpenAI 17 + from pydantic import BaseModel, Field 18 + from pydantic_ai import Agent 19 + from pydantic_ai.models.anthropic import AnthropicModel 20 + from pydantic_ai.providers.anthropic import AnthropicProvider 21 + from prefect import flow, get_run_logger, task 22 + from prefect.blocks.system import Secret 23 + from prefect.cache_policies import CachePolicy 24 + from prefect.context import TaskRunContext 25 + 26 + from mps.phi import TagMerge, TagRelationship 27 + 28 + PHI_DID = "did:plc:65sucjiel52gefhcdcypynsr" 29 + TAG_REL_NAMESPACE = "phi-tag-relationships" 30 + TAG_REL_SCHEMA = { 31 + "tag_a": {"type": "string", "filterable": True}, 32 + "tag_b": {"type": "string", "filterable": True}, 33 + "relationship_type": {"type": "string", "filterable": True}, 34 + "confidence": {"type": "float"}, 35 + "evidence": {"type": "string"}, 36 + } 37 + 38 + 39 + # --------------------------------------------------------------------------- 40 + # helpers 41 + # --------------------------------------------------------------------------- 42 + 43 + 44 + def cosine_similarity(a: list[float], b: list[float]) -> float: 45 + dot = sum(x * y for x, y in zip(a, b)) 46 + norm_a = sum(x * x for x in a) ** 0.5 47 + norm_b = sum(x * x for x in b) ** 0.5 48 + if norm_a == 0 or norm_b == 0: 49 + return 0.0 50 + return dot / (norm_a * norm_b) 51 + 52 + 53 + def _rel_id(tag_a: str, tag_b: str) -> str: 54 + """Deterministic ID for a tag relationship (order-independent).""" 55 + pair = tuple(sorted([tag_a, tag_b])) 56 + return f"rel-{pair[0]}-{pair[1]}" 57 + 58 + 59 + # --------------------------------------------------------------------------- 60 + # cache policies 61 + # --------------------------------------------------------------------------- 62 + 63 + 64 + class ByTagsHash(CachePolicy): 65 + """Cache by hash of all tags. Skip LLM if tag set unchanged.""" 66 + 67 + def compute_key( 68 + self, 69 + task_ctx: TaskRunContext, 70 + inputs: dict[str, Any], 71 + flow_parameters: dict[str, Any], 72 + **kwargs: Any, 73 + ) -> str | None: 74 + tags_text = inputs.get("tags_text") 75 + if not tags_text: 76 + return None 77 + h = hashlib.md5(tags_text.encode()).hexdigest()[:12] 78 + return f"weave-tags/{h}" 79 + 80 + 81 + # --------------------------------------------------------------------------- 82 + # phase 1: tag collection + deduplication 83 + # --------------------------------------------------------------------------- 84 + 85 + 86 + @task 87 + def collect_all_tags(tpuf_key: str) -> dict[str, Any]: 88 + """Read all tags from user namespaces + episodic, with co-occurrence data.""" 89 + client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 90 + 91 + tag_info: dict[str, dict[str, Any]] = defaultdict( 92 + lambda: {"count": 0, "users": set(), "samples": [], "episodic_count": 0} 93 + ) 94 + cooccurrences: dict[tuple[str, str], int] = defaultdict(int) 95 + user_tag_sets: dict[str, set[str]] = defaultdict(set) 96 + 97 + # scan user namespaces 98 + page = client.namespaces(prefix="phi-users-") 99 + for ns_summary in page.namespaces: 100 + handle = ns_summary.id.removeprefix("phi-users-").replace("_", ".") 101 + ns = client.namespace(ns_summary.id) 102 + try: 103 + response = ns.query( 104 + rank_by=("vector", "ANN", [0.5] * 1536), 105 + top_k=200, 106 + filters={"kind": ["Eq", "observation"]}, 107 + include_attributes=["content", "tags"], 108 + ) 109 + if response.rows: 110 + for row in response.rows: 111 + row_tags = sorted(getattr(row, "tags", []) or []) 112 + for tag in row_tags: 113 + info = tag_info[tag] 114 + info["count"] += 1 115 + info["users"].add(handle) 116 + user_tag_sets[handle].add(tag) 117 + if len(info["samples"]) < 3: 118 + info["samples"].append(row.content[:200]) 119 + # co-occurrence within same observation 120 + for i, t1 in enumerate(row_tags): 121 + for t2 in row_tags[i + 1 :]: 122 + cooccurrences[(t1, t2)] += 1 123 + except Exception: 124 + pass 125 + 126 + # scan episodic namespace 127 + try: 128 + ns = client.namespace("phi-episodic") 129 + response = ns.query( 130 + rank_by=("vector", "ANN", [0.5] * 1536), 131 + top_k=200, 132 + include_attributes=["content", "tags"], 133 + ) 134 + if response.rows: 135 + for row in response.rows: 136 + row_tags = sorted(getattr(row, "tags", []) or []) 137 + for tag in row_tags: 138 + info = tag_info[tag] 139 + info["episodic_count"] += 1 140 + if len(info["samples"]) < 3: 141 + info["samples"].append(row.content[:200]) 142 + for i, t1 in enumerate(row_tags): 143 + for t2 in row_tags[i + 1 :]: 144 + cooccurrences[(t1, t2)] += 1 145 + except Exception: 146 + pass 147 + 148 + # serialize for prefect (sets -> lists, tuple keys -> string keys) 149 + return { 150 + "tag_info": { 151 + tag: {**info, "users": list(info["users"])} 152 + for tag, info in tag_info.items() 153 + }, 154 + "cooccurrences": { 155 + f"{t1}|{t2}": count for (t1, t2), count in cooccurrences.items() 156 + }, 157 + "user_tag_sets": {h: list(tags) for h, tags in user_tag_sets.items()}, 158 + } 159 + 160 + 161 + @task 162 + def embed_tags(openai_key: str, tags: list[str]) -> dict[str, list[float]]: 163 + """Batch-embed all tag names with text-embedding-3-small.""" 164 + if not tags: 165 + return {} 166 + client = OpenAI(api_key=openai_key) 167 + response = client.embeddings.create(model="text-embedding-3-small", input=tags) 168 + return {tags[i]: response.data[i].embedding for i in range(len(tags))} 169 + 170 + 171 + class MergeProposal(BaseModel): 172 + merges: list[TagMerge] = Field(default_factory=list) 173 + 174 + 175 + @task( 176 + cache_policy=ByTagsHash(), 177 + cache_expiration=timedelta(hours=4), 178 + persist_result=True, 179 + result_serializer="json", 180 + ) 181 + async def identify_tag_merges( 182 + tags_text: str, 183 + tag_info: dict[str, dict], 184 + tag_embeddings: dict[str, list[float]], 185 + api_key: str, 186 + ) -> list[dict[str, Any]]: 187 + """Cluster tags by embedding similarity, LLM confirms merges.""" 188 + tags = list(tag_embeddings.keys()) 189 + 190 + # find high-similarity pairs (>= 0.85) as merge candidates 191 + candidates: list[tuple[str, str, float]] = [] 192 + for i, t1 in enumerate(tags): 193 + for t2 in tags[i + 1 :]: 194 + sim = cosine_similarity(tag_embeddings[t1], tag_embeddings[t2]) 195 + if sim >= 0.85: 196 + candidates.append((t1, t2, sim)) 197 + 198 + if not candidates: 199 + return [] 200 + 201 + candidates.sort(key=lambda x: -x[2]) 202 + candidates_text = "\n".join( 203 + f"- \"{t1}\" <-> \"{t2}\" (similarity: {sim:.3f})\n" 204 + f" {t1} context: {(tag_info.get(t1) or {}).get('samples', [''])[0][:100]}\n" 205 + f" {t2} context: {(tag_info.get(t2) or {}).get('samples', [''])[0][:100]}" 206 + for t1, t2, sim in candidates[:30] # cap at 30 pairs 207 + ) 208 + 209 + model = AnthropicModel( 210 + "claude-haiku-4-5", provider=AnthropicProvider(api_key=api_key) 211 + ) 212 + agent = Agent( 213 + model, 214 + system_prompt=( 215 + "you review tag merge candidates for a memory graph. for each pair, decide:\n" 216 + "- MERGE: same concept — pick the canonical form\n" 217 + "- RELATE: distinct but related — don't merge\n" 218 + "- SKIP: not meaningfully related despite embedding similarity\n\n" 219 + "prefer lowercase, hyphenated canonical forms (e.g. 'ai-systems' not 'AI_systems').\n" 220 + "group transitive merges (if a merges with b and b merges with c, " 221 + "produce one merge with canonical + all aliases).\n" 222 + "put RELATE pairs in the 'related' field of the merge they're closest to, " 223 + "or omit if they don't belong to any merge group." 224 + ), 225 + output_type=MergeProposal, 226 + name="tag-merger", 227 + ) 228 + 229 + result = await agent.run(f"merge candidates:\n{candidates_text}") 230 + # serialize to dicts for prefect result persistence 231 + return [m.model_dump() for m in result.output.merges] 232 + 233 + 234 + @task 235 + def apply_tag_merges( 236 + tpuf_key: str, 237 + merges: list[dict[str, Any]], 238 + ) -> int: 239 + """Rewrite tags in turbopuffer observations to use canonical forms.""" 240 + if not merges: 241 + return 0 242 + 243 + alias_map: dict[str, str] = {} 244 + for merge in merges: 245 + for alias in merge["aliases"]: 246 + alias_map[alias] = merge["canonical"] 247 + 248 + client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 249 + updated = 0 250 + 251 + # collect all namespaces to scan 252 + ns_ids: list[str] = [] 253 + page = client.namespaces(prefix="phi-users-") 254 + ns_ids.extend(ns.id for ns in page.namespaces) 255 + ns_ids.append("phi-episodic") 256 + 257 + for ns_id in ns_ids: 258 + ns = client.namespace(ns_id) 259 + is_user_ns = ns_id.startswith("phi-users-") 260 + 261 + try: 262 + kwargs: dict[str, Any] = { 263 + "rank_by": ("vector", "ANN", [0.5] * 1536), 264 + "top_k": 200, 265 + "include_attributes": ["content", "tags", "created_at", "vector"], 266 + } 267 + if is_user_ns: 268 + kwargs["filters"] = {"kind": ["Eq", "observation"]} 269 + kwargs["include_attributes"].append("kind") 270 + else: 271 + kwargs["include_attributes"].append("source") 272 + response = ns.query(**kwargs) 273 + except Exception: 274 + continue 275 + 276 + if not response.rows: 277 + continue 278 + 279 + rows_to_upsert = [] 280 + for row in response.rows: 281 + old_tags = list(getattr(row, "tags", []) or []) 282 + new_tags = [alias_map.get(t, t) for t in old_tags] 283 + # deduplicate preserving order 284 + seen: set[str] = set() 285 + deduped = [] 286 + for t in new_tags: 287 + if t not in seen: 288 + seen.add(t) 289 + deduped.append(t) 290 + 291 + if deduped != old_tags: 292 + vec = getattr(row, "vector", None) 293 + if not vec: 294 + continue 295 + row_data: dict[str, Any] = { 296 + "id": row.id, 297 + "vector": vec, 298 + "content": row.content, 299 + "tags": deduped, 300 + "created_at": getattr( 301 + row, "created_at", datetime.now(timezone.utc).isoformat() 302 + ), 303 + } 304 + if is_user_ns: 305 + row_data["kind"] = "observation" 306 + else: 307 + row_data["source"] = getattr(row, "source", "tool") 308 + rows_to_upsert.append(row_data) 309 + 310 + if rows_to_upsert: 311 + schema: dict[str, Any] = { 312 + "content": {"type": "string", "full_text_search": True}, 313 + "tags": {"type": "[]string", "filterable": True}, 314 + "created_at": {"type": "string"}, 315 + } 316 + if is_user_ns: 317 + schema["kind"] = {"type": "string", "filterable": True} 318 + else: 319 + schema["source"] = {"type": "string", "filterable": True} 320 + 321 + ns.write( 322 + upsert_rows=rows_to_upsert, 323 + distance_metric="cosine_distance", 324 + schema=schema, 325 + ) 326 + updated += len(rows_to_upsert) 327 + 328 + return updated 329 + 330 + 331 + # --------------------------------------------------------------------------- 332 + # phase 2: tag relationship discovery 333 + # --------------------------------------------------------------------------- 334 + 335 + 336 + class RelationshipProposal(BaseModel): 337 + relationships: list[TagRelationship] = Field(default_factory=list) 338 + 339 + 340 + @task( 341 + cache_policy=ByTagsHash(), 342 + cache_expiration=timedelta(hours=4), 343 + persist_result=True, 344 + result_serializer="json", 345 + ) 346 + async def discover_tag_relationships( 347 + tags_text: str, 348 + tag_info: dict[str, dict], 349 + tag_embeddings: dict[str, list[float]], 350 + cooccurrences: dict[str, int], 351 + user_tag_sets: dict[str, list[str]], 352 + merged_aliases: set[str], 353 + api_key: str, 354 + ) -> list[dict[str, Any]]: 355 + """Score and LLM-confirm relationships between non-merged tags.""" 356 + tags = [t for t in tag_embeddings if t not in merged_aliases] 357 + 358 + # score tag pairs by combined signal 359 + scored: list[tuple[str, str, float, str]] = [] # (a, b, score, reason) 360 + for i, t1 in enumerate(tags): 361 + for t2 in tags[i + 1 :]: 362 + sim = cosine_similarity(tag_embeddings[t1], tag_embeddings[t2]) 363 + if sim < 0.4: 364 + continue 365 + 366 + # co-occurrence score 367 + pair_key = "|".join(sorted([t1, t2])) 368 + cooccur = cooccurrences.get(pair_key, 0) 369 + 370 + # shared users score 371 + shared_users = sum( 372 + 1 373 + for tags_list in user_tag_sets.values() 374 + if t1 in tags_list and t2 in tags_list 375 + ) 376 + 377 + # combine signals 378 + score = sim * 0.5 379 + if cooccur > 0: 380 + score += min(cooccur / 5, 0.3) # cap at 0.3 381 + if shared_users > 0: 382 + score += min(shared_users / 3, 0.2) # cap at 0.2 383 + 384 + if score >= 0.5: 385 + reason = f"sim={sim:.2f}, cooccur={cooccur}, shared_users={shared_users}" 386 + scored.append((t1, t2, score, reason)) 387 + 388 + if not scored: 389 + return [] 390 + 391 + scored.sort(key=lambda x: -x[2]) 392 + candidates_text = "\n".join( 393 + f"- \"{t1}\" <-> \"{t2}\" (score: {score:.2f}, {reason})\n" 394 + f" {t1}: {(tag_info.get(t1) or {}).get('samples', [''])[0][:100]}\n" 395 + f" {t2}: {(tag_info.get(t2) or {}).get('samples', [''])[0][:100]}" 396 + for t1, t2, score, reason in scored[:30] 397 + ) 398 + 399 + model = AnthropicModel( 400 + "claude-haiku-4-5", provider=AnthropicProvider(api_key=api_key) 401 + ) 402 + agent = Agent( 403 + model, 404 + system_prompt=( 405 + "you review candidate tag relationships for a memory graph belonging to phi, " 406 + "a bluesky bot that remembers conversations.\n\n" 407 + "for each pair, decide if there's a genuine conceptual relationship:\n" 408 + "- RELATED: broadly connected concepts\n" 409 + "- SUBTOPIC: one is a narrower form of the other\n" 410 + "- OVERLAPPING: partially shared meaning\n" 411 + "- SKIP: co-occurrence is coincidental, not conceptual\n\n" 412 + "assign confidence 0.0-1.0. be honest — surface co-occurrence ≠ real relationship.\n" 413 + "provide brief evidence for each accepted relationship." 414 + ), 415 + output_type=RelationshipProposal, 416 + name="tag-relator", 417 + ) 418 + 419 + result = await agent.run(f"relationship candidates:\n{candidates_text}") 420 + return [r.model_dump() for r in result.output.relationships] 421 + 422 + 423 + # --------------------------------------------------------------------------- 424 + # phase 3: store relationships in turbopuffer 425 + # --------------------------------------------------------------------------- 426 + 427 + 428 + @task 429 + def store_tag_relationships( 430 + tpuf_key: str, 431 + openai_key: str, 432 + relationships: list[dict[str, Any]], 433 + ) -> int: 434 + """Write tag relationships to phi-tag-relationships namespace.""" 435 + if not relationships: 436 + return 0 437 + 438 + client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 439 + openai_client = OpenAI(api_key=openai_key) 440 + ns = client.namespace(TAG_REL_NAMESPACE) 441 + 442 + # batch embed relationship descriptions for vector field 443 + texts = [f"{r['tag_a']} — {r['tag_b']}: {r['evidence']}" for r in relationships] 444 + embeddings = openai_client.embeddings.create( 445 + model="text-embedding-3-small", input=texts 446 + ) 447 + 448 + rows = [] 449 + for i, rel in enumerate(relationships): 450 + rows.append( 451 + { 452 + "id": _rel_id(rel["tag_a"], rel["tag_b"]), 453 + "vector": embeddings.data[i].embedding, 454 + "tag_a": rel["tag_a"], 455 + "tag_b": rel["tag_b"], 456 + "relationship_type": rel["relationship_type"], 457 + "confidence": rel["confidence"], 458 + "evidence": rel["evidence"], 459 + } 460 + ) 461 + 462 + ns.write( 463 + upsert_rows=rows, 464 + distance_metric="cosine_distance", 465 + schema=TAG_REL_SCHEMA, 466 + ) 467 + return len(rows) 468 + 469 + 470 + # --------------------------------------------------------------------------- 471 + # phase 4: cosmik promotion 472 + # --------------------------------------------------------------------------- 473 + 474 + 475 + def _create_bsky_session(handle: str, password: str) -> dict[str, Any]: 476 + """Authenticate with bsky and return session (accessJwt, did).""" 477 + resp = httpx.post( 478 + "https://bsky.social/xrpc/com.atproto.server.createSession", 479 + json={"identifier": handle, "password": password}, 480 + timeout=15, 481 + ) 482 + resp.raise_for_status() 483 + return resp.json() 484 + 485 + 486 + def _list_cosmik_cards(did: str) -> list[dict[str, Any]]: 487 + """List all cosmik cards for a DID (public, no auth needed).""" 488 + cards: list[dict[str, Any]] = [] 489 + cursor = None 490 + while True: 491 + params: dict[str, Any] = { 492 + "repo": did, 493 + "collection": "network.cosmik.card", 494 + "limit": 100, 495 + } 496 + if cursor: 497 + params["cursor"] = cursor 498 + resp = httpx.get( 499 + "https://bsky.social/xrpc/com.atproto.repo.listRecords", 500 + params=params, 501 + timeout=15, 502 + ) 503 + resp.raise_for_status() 504 + data = resp.json() 505 + cards.extend(data.get("records", [])) 506 + cursor = data.get("cursor") 507 + if not cursor: 508 + break 509 + return cards 510 + 511 + 512 + def _list_cosmik_connections(did: str) -> list[dict[str, Any]]: 513 + """List existing cosmik connections (public, no auth).""" 514 + conns: list[dict[str, Any]] = [] 515 + cursor = None 516 + while True: 517 + params: dict[str, Any] = { 518 + "repo": did, 519 + "collection": "network.cosmik.connection", 520 + "limit": 100, 521 + } 522 + if cursor: 523 + params["cursor"] = cursor 524 + resp = httpx.get( 525 + "https://bsky.social/xrpc/com.atproto.repo.listRecords", 526 + params=params, 527 + timeout=15, 528 + ) 529 + resp.raise_for_status() 530 + data = resp.json() 531 + conns.extend(data.get("records", [])) 532 + cursor = data.get("cursor") 533 + if not cursor: 534 + break 535 + return conns 536 + 537 + 538 + def _create_pds_record( 539 + session: dict[str, Any], collection: str, record: dict[str, Any] 540 + ) -> dict[str, Any]: 541 + """Create a record on PDS via XRPC. Returns {uri, cid}.""" 542 + resp = httpx.post( 543 + "https://bsky.social/xrpc/com.atproto.repo.createRecord", 544 + headers={"Authorization": f"Bearer {session['accessJwt']}"}, 545 + json={ 546 + "repo": session["did"], 547 + "collection": collection, 548 + "record": record, 549 + }, 550 + timeout=15, 551 + ) 552 + resp.raise_for_status() 553 + return resp.json() 554 + 555 + 556 + def _match_cards_to_tags( 557 + cards: list[dict[str, Any]], 558 + tag_info: dict[str, dict], 559 + openai_key: str, 560 + ) -> dict[str, list[dict[str, Any]]]: 561 + """Match cosmik cards to tags by content similarity. 562 + 563 + Returns tag -> list of cards that are relevant to that tag. 564 + """ 565 + if not cards or not tag_info: 566 + return {} 567 + 568 + openai_client = OpenAI(api_key=openai_key) 569 + 570 + # embed card content 571 + card_texts = [] 572 + for card in cards: 573 + val = card.get("value", {}) 574 + if val.get("type") == "NOTE": 575 + card_texts.append(val.get("content", {}).get("text", "")[:500]) 576 + elif val.get("type") == "URL": 577 + content = val.get("content", {}) 578 + card_texts.append( 579 + f"{content.get('title', '')} {content.get('description', '')}".strip() 580 + or content.get("url", "") 581 + ) 582 + else: 583 + card_texts.append("") 584 + 585 + # filter out empty 586 + valid = [(i, t) for i, t in enumerate(card_texts) if t] 587 + if not valid: 588 + return {} 589 + 590 + card_embeddings = openai_client.embeddings.create( 591 + model="text-embedding-3-small", input=[t for _, t in valid] 592 + ) 593 + card_vecs = {valid[j][0]: card_embeddings.data[j].embedding for j in range(len(valid))} 594 + 595 + # embed tags 596 + tags = list(tag_info.keys()) 597 + tag_embeddings = openai_client.embeddings.create( 598 + model="text-embedding-3-small", input=tags 599 + ) 600 + tag_vecs = {tags[j]: tag_embeddings.data[j].embedding for j in range(len(tags))} 601 + 602 + # match: for each tag, find cards with similarity >= 0.5 603 + result: dict[str, list[dict[str, Any]]] = defaultdict(list) 604 + for tag, tvec in tag_vecs.items(): 605 + for card_idx, cvec in card_vecs.items(): 606 + if cosine_similarity(tvec, cvec) >= 0.5: 607 + result[tag].append(cards[card_idx]) 608 + 609 + return dict(result) 610 + 611 + 612 + @task 613 + def promote_connections( 614 + session: dict[str, Any], 615 + relationships: list[dict[str, Any]], 616 + tag_cards: dict[str, list[dict[str, Any]]], 617 + tag_info: dict[str, dict], 618 + existing_connections: list[dict[str, Any]], 619 + ) -> int: 620 + """Promote high-confidence relationships to cosmik connections.""" 621 + # index existing connections by source+target for idempotency 622 + existing_pairs: set[tuple[str, str]] = set() 623 + for conn in existing_connections: 624 + val = conn.get("value", {}) 625 + existing_pairs.add((val.get("source", ""), val.get("target", ""))) 626 + 627 + created = 0 628 + for rel in relationships: 629 + if rel["confidence"] < 0.8: 630 + continue 631 + 632 + tag_a, tag_b = rel["tag_a"], rel["tag_b"] 633 + cards_a = tag_cards.get(tag_a, []) 634 + cards_b = tag_cards.get(tag_b, []) 635 + 636 + if not cards_a or not cards_b: 637 + continue 638 + 639 + # check observation support: >= 3 observations across >= 2 users, or >= 5 episodic 640 + info_a = tag_info.get(tag_a, {}) 641 + info_b = tag_info.get(tag_b, {}) 642 + total_obs = info_a.get("count", 0) + info_b.get("count", 0) 643 + total_users = len( 644 + set(info_a.get("users", [])) | set(info_b.get("users", [])) 645 + ) 646 + total_episodic = info_a.get("episodic_count", 0) + info_b.get( 647 + "episodic_count", 0 648 + ) 649 + 650 + if not ( 651 + (total_obs >= 3 and total_users >= 2) or total_episodic >= 5 652 + ): 653 + continue 654 + 655 + # use first card from each tag as source/target 656 + source_uri = cards_a[0]["uri"] 657 + target_uri = cards_b[0]["uri"] 658 + 659 + if (source_uri, target_uri) in existing_pairs: 660 + continue 661 + 662 + record = { 663 + "source": source_uri, 664 + "target": target_uri, 665 + "connectionType": "related", 666 + "note": rel["evidence"][:1000], 667 + } 668 + 669 + try: 670 + _create_pds_record(session, "network.cosmik.connection", record) 671 + existing_pairs.add((source_uri, target_uri)) 672 + created += 1 673 + except Exception: 674 + pass 675 + 676 + return created 677 + 678 + 679 + @task 680 + def promote_collections( 681 + session: dict[str, Any], 682 + relationships: list[dict[str, Any]], 683 + tag_cards: dict[str, list[dict[str, Any]]], 684 + ) -> int: 685 + """Promote tag clusters with >= 3 cards to cosmik collections.""" 686 + # build clusters: group tags that are connected by relationships 687 + adj: dict[str, set[str]] = defaultdict(set) 688 + for rel in relationships: 689 + if rel["confidence"] >= 0.7: 690 + adj[rel["tag_a"]].add(rel["tag_b"]) 691 + adj[rel["tag_b"]].add(rel["tag_a"]) 692 + 693 + # find connected components via BFS 694 + visited: set[str] = set() 695 + clusters: list[set[str]] = [] 696 + for tag in adj: 697 + if tag in visited: 698 + continue 699 + cluster: set[str] = set() 700 + queue = [tag] 701 + while queue: 702 + t = queue.pop() 703 + if t in visited: 704 + continue 705 + visited.add(t) 706 + cluster.add(t) 707 + queue.extend(adj[t] - visited) 708 + if len(cluster) >= 2: 709 + clusters.append(cluster) 710 + 711 + created = 0 712 + for cluster in clusters: 713 + # collect all unique cards in this cluster 714 + cluster_cards: list[dict[str, Any]] = [] 715 + seen_uris: set[str] = set() 716 + for tag in cluster: 717 + for card in tag_cards.get(tag, []): 718 + if card["uri"] not in seen_uris: 719 + seen_uris.add(card["uri"]) 720 + cluster_cards.append(card) 721 + 722 + if len(cluster_cards) < 3: 723 + continue 724 + 725 + # derive collection name from tags 726 + name = " + ".join(sorted(cluster)[:3]) 727 + if len(cluster) > 3: 728 + name += f" (+{len(cluster) - 3})" 729 + 730 + record = { 731 + "name": name[:100], 732 + "accessType": "OPEN", 733 + "description": f"phi's notes on: {', '.join(sorted(cluster))}"[:500], 734 + "createdAt": datetime.now(timezone.utc).isoformat(), 735 + } 736 + 737 + try: 738 + coll_result = _create_pds_record( 739 + session, "network.cosmik.collection", record 740 + ) 741 + coll_uri = coll_result["uri"] 742 + coll_cid = coll_result["cid"] 743 + 744 + # link cards to collection 745 + for card in cluster_cards: 746 + link_record = { 747 + "collection": {"uri": coll_uri, "cid": coll_cid}, 748 + "card": {"uri": card["uri"], "cid": card["cid"]}, 749 + "addedBy": session["did"], 750 + "addedAt": datetime.now(timezone.utc).isoformat(), 751 + } 752 + try: 753 + _create_pds_record( 754 + session, "network.cosmik.collectionLink", link_record 755 + ) 756 + except Exception: 757 + pass 758 + 759 + created += 1 760 + except Exception: 761 + pass 762 + 763 + return created 764 + 765 + 766 + # --------------------------------------------------------------------------- 767 + # main flow 768 + # --------------------------------------------------------------------------- 769 + 770 + 771 + @flow(name="weave", log_prints=True) 772 + async def weave(): 773 + """Weave phi's memory graph: deduplicate tags, discover relationships, 774 + inject graph edges, and selectively promote to cosmik.""" 775 + logger = get_run_logger() 776 + 777 + tpuf_key = (await Secret.load("turbopuffer-api-key")).get() 778 + openai_key = (await Secret.load("openai-api-key")).get() 779 + anthropic_key = (await Secret.load("anthropic-api-key")).get() 780 + 781 + # --- phase 1: collect and deduplicate tags --- 782 + tag_data = collect_all_tags(tpuf_key) 783 + tag_info: dict[str, dict] = tag_data["tag_info"] 784 + cooccurrences: dict[str, int] = tag_data["cooccurrences"] 785 + user_tag_sets: dict[str, list[str]] = tag_data["user_tag_sets"] 786 + 787 + tags = sorted(tag_info.keys()) 788 + if not tags: 789 + print("no tags found, nothing to weave") 790 + return 791 + 792 + print(f"collected {len(tags)} unique tags across all namespaces") 793 + tag_embeddings = embed_tags(openai_key, tags) 794 + 795 + tags_text = "\n".join(tags) 796 + merge_dicts = await identify_tag_merges( 797 + tags_text, tag_info, tag_embeddings, anthropic_key 798 + ) 799 + 800 + # collect all aliases for filtering in phase 2 801 + merged_aliases: set[str] = set() 802 + if merge_dicts: 803 + for m in merge_dicts: 804 + merged_aliases.update(m["aliases"]) 805 + updated = apply_tag_merges(tpuf_key, merge_dicts) 806 + print( 807 + f"phase 1: merged {len(merged_aliases)} aliases into " 808 + f"{len(merge_dicts)} canonical tags, updated {updated} observations" 809 + ) 810 + else: 811 + print("phase 1: no tag merges needed") 812 + 813 + # --- phase 2: discover relationships --- 814 + rel_dicts = await discover_tag_relationships( 815 + tags_text, 816 + tag_info, 817 + tag_embeddings, 818 + cooccurrences, 819 + user_tag_sets, 820 + merged_aliases, 821 + anthropic_key, 822 + ) 823 + print(f"phase 2: discovered {len(rel_dicts)} tag relationships") 824 + 825 + # --- phase 3: store in turbopuffer --- 826 + if rel_dicts: 827 + stored = store_tag_relationships(tpuf_key, openai_key, rel_dicts) 828 + print(f"phase 3: stored {stored} relationships in {TAG_REL_NAMESPACE}") 829 + else: 830 + print("phase 3: no relationships to store") 831 + 832 + # --- phase 4: cosmik promotion --- 833 + try: 834 + bsky_handle = (await Secret.load("atproto-handle")).get() 835 + bsky_password = (await Secret.load("atproto-password")).get() 836 + except Exception: 837 + print("phase 4: skipped — atproto secrets not configured") 838 + return 839 + 840 + if not rel_dicts: 841 + print("phase 4: no relationships to promote") 842 + return 843 + 844 + session = _create_bsky_session(bsky_handle, bsky_password) 845 + cards = _list_cosmik_cards(PHI_DID) 846 + existing_conns = _list_cosmik_connections(PHI_DID) 847 + print(f"phase 4: found {len(cards)} existing cards, {len(existing_conns)} connections") 848 + 849 + if not cards: 850 + print("phase 4: no cosmik cards exist yet — skipping promotion") 851 + return 852 + 853 + tag_cards = _match_cards_to_tags(cards, tag_info, openai_key) 854 + tags_with_cards = [t for t in tag_cards if tag_cards[t]] 855 + print(f"phase 4: matched cards to {len(tags_with_cards)} tags") 856 + 857 + conn_count = promote_connections( 858 + session, rel_dicts, tag_cards, tag_info, existing_conns 859 + ) 860 + coll_count = promote_collections(session, rel_dicts, tag_cards) 861 + print( 862 + f"phase 4: promoted {conn_count} connections, {coll_count} collections to cosmik" 863 + ) 864 + 865 + 866 + if __name__ == "__main__": 867 + import asyncio 868 + 869 + asyncio.run(weave())
+23
packages/mps/src/mps/phi.py
··· 2 2 3 3 from dataclasses import dataclass, field 4 4 5 + from pydantic import BaseModel, Field 6 + 5 7 6 8 def clean_handle(handle: str) -> str: 7 9 """Sanitize handle to match bot's namespace naming convention.""" ··· 11 13 def restore_handle(ns_id: str) -> str: 12 14 """Reverse-map a namespace ID back to a handle (best-effort).""" 13 15 return ns_id.removeprefix("phi-users-").replace("_", ".") 16 + 17 + 18 + class TagMerge(BaseModel): 19 + """A set of tags to merge into one canonical form.""" 20 + 21 + canonical: str = Field(description="the preferred tag name (lowercase, hyphenated)") 22 + aliases: list[str] = Field(description="tags to rewrite to canonical") 23 + related: list[str] = Field( 24 + default_factory=list, 25 + description="distinct but related tags — link, don't merge", 26 + ) 27 + 28 + 29 + class TagRelationship(BaseModel): 30 + """A discovered relationship between two distinct tags.""" 31 + 32 + tag_a: str 33 + tag_b: str 34 + relationship_type: str = Field(description="e.g. related, subtopic, overlapping") 35 + confidence: float = Field(ge=0.0, le=1.0) 36 + evidence: str = Field(description="brief explanation of why these are related") 14 37 15 38 16 39 @dataclass
+11
prefect.yaml
··· 85 85 match_related: 86 86 prefect.resource.name: "transform" 87 87 prefect.resource.role: "deployment" 88 + 89 + - name: weave 90 + entrypoint: flows/weave.py:weave 91 + work_pool: *k8s 92 + triggers: 93 + - type: event 94 + expect: 95 + - "prefect.flow-run.Completed" 96 + match_related: 97 + prefect.resource.name: "transform" 98 + prefect.resource.role: "deployment"