my prefect server setup prefect-metrics.waow.tech
python orchestration
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

weave: create URL cards from observations, fix metadata format

- add create_cluster_cards task that scans all tpuf namespaces for URLs
in observations and creates cosmik URL cards before promotion
- fix _match_cards_to_tags to read nested content.metadata format
- solves chicken-and-egg: promotion needs cards, but cards didn't exist

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+130 -21
+130 -21
flows/weave.py
··· 7 7 """ 8 8 9 9 import hashlib 10 + import re 10 11 from collections import defaultdict 11 12 from datetime import datetime, timedelta, timezone 12 13 from typing import Any ··· 576 577 return resp.json() 577 578 578 579 580 + def _card_text(card: dict[str, Any]) -> str: 581 + """Extract searchable text from a cosmik card (handles both old and new format).""" 582 + val = card.get("value", {}) 583 + if val.get("type") == "NOTE": 584 + return val.get("content", {}).get("text", "")[:500] 585 + if val.get("type") == "URL": 586 + content = val.get("content", {}) 587 + # new format: metadata nested under content.metadata 588 + meta = content.get("metadata", {}) 589 + title = meta.get("title", "") or content.get("title", "") 590 + desc = meta.get("description", "") or content.get("description", "") 591 + return f"{title} {desc}".strip() or content.get("url", "") 592 + return "" 593 + 594 + 579 595 def _match_cards_to_tags( 580 596 cards: list[dict[str, Any]], 581 597 tag_info: dict[str, dict], ··· 590 606 591 607 openai_client = OpenAI(api_key=openai_key) 592 608 593 - # embed card content 594 - card_texts = [] 595 - for card in cards: 596 - val = card.get("value", {}) 597 - if val.get("type") == "NOTE": 598 - card_texts.append(val.get("content", {}).get("text", "")[:500]) 599 - elif val.get("type") == "URL": 600 - content = val.get("content", {}) 601 - card_texts.append( 602 - f"{content.get('title', '')} {content.get('description', '')}".strip() 603 - or content.get("url", "") 604 - ) 605 - else: 606 - card_texts.append("") 607 - 608 - # filter out empty 609 + card_texts = [_card_text(c) for c in cards] 609 610 valid = [(i, t) for i, t in enumerate(card_texts) if t] 610 611 if not valid: 611 612 return {} ··· 615 616 ) 616 617 card_vecs = {valid[j][0]: card_embeddings.data[j].embedding for j in range(len(valid))} 617 618 618 - # embed tags 619 619 tags = list(tag_info.keys()) 620 620 tag_embeddings = openai_client.embeddings.create( 621 621 model="text-embedding-3-small", input=tags 622 622 ) 623 623 tag_vecs = {tags[j]: tag_embeddings.data[j].embedding for j in range(len(tags))} 624 624 625 - # match: for each tag, find cards with similarity >= 0.5 626 625 result: dict[str, list[dict[str, Any]]] = defaultdict(list) 627 626 for tag, tvec in tag_vecs.items(): 628 627 for card_idx, cvec in card_vecs.items(): ··· 630 629 result[tag].append(cards[card_idx]) 631 630 632 631 return dict(result) 632 + 633 + 634 + _URL_RE = re.compile(r"https?://[^\s,)>\]]+") 635 + 636 + 637 + @task 638 + def create_cluster_cards( 639 + session: dict[str, Any], 640 + tpuf_key: str, 641 + tag_info: dict[str, dict], 642 + existing_cards: list[dict[str, Any]], 643 + ) -> list[dict[str, Any]]: 644 + """Create cosmik URL cards for URLs found in phi's observations. 645 + 646 + Scans all observations and episodic memories for URLs, creates cards for 647 + those that don't exist yet. Returns the full updated card list. 648 + """ 649 + logger = get_run_logger() 650 + 651 + # index existing card URLs for dedup 652 + existing_urls: set[str] = set() 653 + for card in existing_cards: 654 + val = card.get("value", {}) 655 + if val.get("type") == "URL": 656 + existing_urls.add(val.get("content", {}).get("url", "")) 657 + 658 + # scan turbopuffer for URLs in observation content 659 + client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 660 + url_evidence: dict[str, dict[str, Any]] = {} 661 + 662 + ns_ids: list[str] = ["phi-episodic"] 663 + page = client.namespaces(prefix="phi-users-") 664 + ns_ids.extend(ns.id for ns in page.namespaces) 665 + 666 + for ns_id in ns_ids: 667 + ns = client.namespace(ns_id) 668 + try: 669 + kwargs: dict[str, Any] = { 670 + "rank_by": ("vector", "ANN", [0.5] * 1536), 671 + "top_k": 200, 672 + "include_attributes": ["content", "tags"], 673 + } 674 + if ns_id.startswith("phi-users-"): 675 + kwargs["filters"] = {"kind": ["Eq", "observation"]} 676 + response = ns.query(**kwargs) 677 + except Exception: 678 + continue 679 + 680 + for row in response.rows or []: 681 + content = row.content or "" 682 + row_tags = list(getattr(row, "tags", []) or []) 683 + for url in _URL_RE.findall(content): 684 + url = url.rstrip(".,;:!?") 685 + if url in existing_urls: 686 + continue 687 + if url not in url_evidence: 688 + url_evidence[url] = { 689 + "tags": set(), 690 + "context": content[:300], 691 + "count": 0, 692 + } 693 + url_evidence[url]["tags"].update(row_tags) 694 + url_evidence[url]["count"] += 1 695 + 696 + if not url_evidence: 697 + logger.info("no new URLs found in observations") 698 + return existing_cards 699 + 700 + # sort by evidence strength (count * tag breadth) 701 + ranked = sorted( 702 + url_evidence.items(), 703 + key=lambda x: x[1]["count"] * len(x[1]["tags"]), 704 + reverse=True, 705 + ) 706 + 707 + new_cards = list(existing_cards) 708 + created = 0 709 + for url, evidence in ranked[:20]: # cap at 20 new cards per run 710 + tags_str = ", ".join(sorted(evidence["tags"])[:5]) 711 + record = { 712 + "type": "URL", 713 + "content": { 714 + "$type": "network.cosmik.card#urlContent", 715 + "url": url, 716 + "metadata": { 717 + "$type": "network.cosmik.card#urlMetadata", 718 + "description": f"discussed in context of: {tags_str}", 719 + }, 720 + }, 721 + "createdAt": datetime.now(timezone.utc).isoformat(), 722 + } 723 + try: 724 + result = _create_pds_record(session, "network.cosmik.card", record) 725 + new_cards.append({ 726 + "uri": result["uri"], 727 + "cid": result["cid"], 728 + "value": record, 729 + }) 730 + existing_urls.add(url) 731 + created += 1 732 + logger.info(f"created URL card: {url}") 733 + except Exception as e: 734 + logger.warning(f"failed to create card for {url}: {e}") 735 + 736 + logger.info(f"created {created} new URL cards from observations") 737 + return new_cards 633 738 634 739 635 740 @task ··· 868 973 return 869 974 870 975 session = _create_bsky_session(bsky_handle, bsky_password) 871 - cards = _list_cosmik_cards(PHI_DID) 976 + existing_cards = _list_cosmik_cards(PHI_DID) 872 977 existing_conns = _list_cosmik_connections(PHI_DID) 873 - print(f"phase 4: found {len(cards)} existing cards, {len(existing_conns)} connections") 978 + print(f"phase 4: found {len(existing_cards)} existing cards, {len(existing_conns)} connections") 979 + 980 + # create cards from URLs in observations before trying to link 981 + cards = create_cluster_cards(session, tpuf_key, tag_info, existing_cards) 982 + print(f"phase 4: {len(cards)} total cards after creation") 874 983 875 984 if not cards: 876 - print("phase 4: no cosmik cards exist yet — skipping promotion") 985 + print("phase 4: no cosmik cards — skipping promotion") 877 986 return 878 987 879 988 tag_cards = _match_cards_to_tags(cards, tag_info, openai_key)