my prefect server setup prefect-metrics.waow.tech
python orchestration
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

bridge liked posts into phi's knowledge pipeline

resolve liked post content at ingest time (batch getPosts), extract
per-author observations in compact via LLM with structured output
(ADD/UPDATE/NOOP reconciliation), write as kind="observation" to
TurboPuffer user namespaces. observations flow through existing
morning flow, compact summaries, and phi runtime unchanged.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+474 -2
+1
analytics/models/staging/_sources.yml
··· 8 8 - name: raw_phi_observations 9 9 - name: raw_phi_interactions 10 10 - name: raw_likes 11 + - name: raw_liked_posts
+9
analytics/models/staging/stg_liked_posts.sql
··· 1 + -- bootstrap: ensure table exists even before liked posts have been resolved 2 + {{ config(pre_hook="CREATE TABLE IF NOT EXISTS raw_liked_posts (subject_uri VARCHAR PRIMARY KEY, author_handle VARCHAR, author_did VARCHAR, text VARCHAR, created_at VARCHAR, liked_at VARCHAR, embed_type VARCHAR, embed_text VARCHAR, fetched_at TIMESTAMP DEFAULT now())") }} 3 + 4 + -- dedup by subject_uri, keep most recent fetch 5 + SELECT DISTINCT ON (subject_uri) 6 + subject_uri, author_handle, author_did, text, 7 + created_at, liked_at, embed_type, embed_text, fetched_at 8 + FROM {{ source('raw', 'raw_liked_posts') }} 9 + ORDER BY subject_uri, fetched_at DESC
+300
flows/compact.py
··· 1 1 """ 2 2 Synthesize per-user relationship summaries from phi's observations + interactions. 3 + Extract observations from liked posts and write to TurboPuffer. 3 4 4 5 Reads from dbt mart (int_phi_user_profiles) and staging models, sends to LLM, 5 6 writes summaries back to TurboPuffer where phi can consume them at conversation time. ··· 10 11 import hashlib 11 12 import os 12 13 import shutil 14 + from collections import defaultdict 13 15 from datetime import datetime, timedelta, timezone 14 16 from typing import Any 15 17 ··· 17 19 import httpx 18 20 import turbopuffer 19 21 from openai import OpenAI 22 + from pydantic import BaseModel, Field 20 23 from pydantic_ai import Agent 21 24 from pydantic_ai.models.anthropic import AnthropicModel 22 25 from pydantic_ai.providers.anthropic import AnthropicProvider ··· 258 261 ) 259 262 260 263 264 + # --- likes observation extraction --- 265 + 266 + LIKES_SYSTEM_PROMPT = """\ 267 + you extract observations about a bluesky user from context nate provided by liking their posts. 268 + you're given what phi already knows (if anything), the user's profile, their posts that nate liked, 269 + and any publications they have. 270 + 271 + produce 1-3 atomic observations. each should be a concrete fact (what they work on, what they write 272 + about, a specific project, a notable take). include 1-3 lowercase tags. 273 + 274 + for each observation, specify an action: 275 + - ADD: genuinely new fact not covered by existing knowledge 276 + - UPDATE: existing knowledge is stale or incomplete — provide the merged version 277 + - NOOP: this is already known — skip 278 + 279 + if you can't say anything meaningful beyond what's already known, return empty. 280 + use lowercase. be concrete, not vague.""" 281 + 282 + 283 + class LikesObservation(BaseModel): 284 + author_handle: str 285 + content: str = Field(description="one atomic fact about this person") 286 + tags: list[str] = Field(description="1-3 lowercase tags") 287 + action: str = Field(description="ADD, UPDATE, or NOOP") 288 + supersedes_content: str | None = Field( 289 + default=None, 290 + description="for UPDATE: the old observation content this replaces", 291 + ) 292 + 293 + 294 + class LikesExtractionResult(BaseModel): 295 + observations: list[LikesObservation] = [] 296 + 297 + 298 + class ByLikedPostsHash(CachePolicy): 299 + """Cache likes extraction by author handle + liked posts content hash.""" 300 + 301 + def compute_key( 302 + self, 303 + task_ctx: TaskRunContext, 304 + inputs: dict[str, Any], 305 + flow_parameters: dict[str, Any], 306 + **kwargs: Any, 307 + ) -> str | None: 308 + handle = inputs.get("handle") 309 + liked_posts_text = inputs.get("liked_posts_text") 310 + if not handle or not liked_posts_text: 311 + return None 312 + h = hashlib.md5(liked_posts_text.encode()).hexdigest()[:12] 313 + return f"likes-obs/{handle}/{h}" 314 + 315 + 316 + @task 317 + def load_recent_liked_posts(snap_path: str) -> dict[str, list[dict[str, str]]]: 318 + """Load liked posts from last 7 days, grouped by author handle.""" 319 + db = duckdb.connect(snap_path, read_only=True) 320 + try: 321 + rows = db.execute(""" 322 + SELECT author_handle, author_did, text, created_at, 323 + liked_at, embed_type, embed_text 324 + FROM raw_liked_posts 325 + WHERE liked_at >= (now() - INTERVAL '7 days')::VARCHAR 326 + AND author_handle != '' 327 + ORDER BY liked_at DESC 328 + """).fetchall() 329 + except duckdb.CatalogException: 330 + db.close() 331 + return {} 332 + db.close() 333 + 334 + columns = ["author_handle", "author_did", "text", "created_at", 335 + "liked_at", "embed_type", "embed_text"] 336 + by_author: dict[str, list[dict[str, str]]] = defaultdict(list) 337 + for row in rows: 338 + post = dict(zip(columns, row)) 339 + by_author[post["author_handle"]].append(post) 340 + return dict(by_author) 341 + 342 + 343 + @task 344 + def query_existing_knowledge(tpuf_key: str, handle: str) -> str: 345 + """Query TurboPuffer for existing observations about this author.""" 346 + client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 347 + ns_name = f"phi-users-{clean_handle(handle)}" 348 + ns = client.namespace(ns_name) 349 + 350 + try: 351 + resp = ns.query( 352 + rank_by=("created_at", "desc"), 353 + top_k=10, 354 + filters={"kind": ["Eq", "observation"]}, 355 + include_attributes=["content", "tags", "created_at"], 356 + ) 357 + if not resp.rows: 358 + return "no prior knowledge" 359 + lines = [] 360 + for row in resp.rows: 361 + tags = getattr(row, "tags", []) or [] 362 + tag_str = f" [{', '.join(tags)}]" if tags else "" 363 + lines.append(f"- {row.content}{tag_str}") 364 + return "\n".join(lines) 365 + except Exception: 366 + return "no prior knowledge" 367 + 368 + 369 + @task 370 + def search_publications(handle: str) -> str: 371 + """Check pub-search for long-form writing by this author.""" 372 + try: 373 + resp = httpx.get( 374 + "https://leaflet-search-backend.fly.dev/api/search", 375 + params={"author": handle, "limit": 5}, 376 + timeout=10, 377 + ) 378 + if resp.status_code != 200: 379 + return "" 380 + results = resp.json().get("results", []) 381 + if not results: 382 + return "" 383 + lines = [] 384 + for r in results: 385 + title = r.get("title", "") 386 + url = r.get("url", "") 387 + lines.append(f"- {title} ({url})" if url else f"- {title}") 388 + return "publications:\n" + "\n".join(lines) 389 + except Exception: 390 + return "" 391 + 392 + 393 + def _format_liked_posts(posts: list[dict[str, str]]) -> str: 394 + """Format liked posts as text for LLM input.""" 395 + lines = [] 396 + for p in posts: 397 + text = p.get("text", "").strip() 398 + embed = "" 399 + if p.get("embed_type") and p.get("embed_text"): 400 + embed = f" [{p['embed_type']}: {p['embed_text'][:200]}]" 401 + if text or embed: 402 + lines.append(f"- {text}{embed}") 403 + return "\n".join(lines) 404 + 405 + 406 + @task( 407 + cache_policy=ByLikedPostsHash(), 408 + cache_expiration=timedelta(hours=4), 409 + persist_result=True, 410 + result_serializer="json", 411 + ) 412 + async def extract_likes_observations( 413 + handle: str, 414 + liked_posts_text: str, 415 + existing_knowledge: str, 416 + bsky_profile: dict[str, str] | None, 417 + publications: str, 418 + api_key: str, 419 + ) -> list[dict[str, Any]]: 420 + """LLM extraction of observations from liked posts. Cached by posts hash.""" 421 + model = AnthropicModel("claude-haiku-4-5", provider=AnthropicProvider(api_key=api_key)) 422 + agent = Agent( 423 + model, 424 + system_prompt=LIKES_SYSTEM_PROMPT, 425 + output_type=LikesExtractionResult, 426 + name="likes-observer", 427 + ) 428 + 429 + profile_section = f"handle: @{handle}\n" 430 + if bsky_profile: 431 + if bsky_profile.get("display_name"): 432 + profile_section += f"display name: {bsky_profile['display_name']}\n" 433 + if bsky_profile.get("bio"): 434 + profile_section += f"bio: {bsky_profile['bio']}\n" 435 + 436 + prompt = ( 437 + f"author profile:\n{profile_section}\n" 438 + f"existing knowledge about @{handle}:\n{existing_knowledge}\n\n" 439 + f"posts nate liked by this author:\n{liked_posts_text}\n\n" 440 + f"{publications}" 441 + ) 442 + result = await agent.run(prompt) 443 + return [obs.model_dump() for obs in result.output.observations] 444 + 445 + 446 + def _observation_id(handle: str, content: str) -> str: 447 + """Deterministic ID for a likes-derived observation.""" 448 + return hashlib.sha256(f"user-{handle}-observation-{content}".encode()).hexdigest()[:16] 449 + 450 + 451 + USER_NAMESPACE_SCHEMA = { 452 + "kind": {"type": "string", "filterable": True}, 453 + "content": {"type": "string", "full_text_search": True}, 454 + "tags": {"type": "[]string", "filterable": True}, 455 + "created_at": {"type": "string"}, 456 + } 457 + 458 + 459 + @task 460 + def write_likes_observations_to_turbopuffer( 461 + tpuf_key: str, 462 + openai_key: str, 463 + observations: list[dict[str, Any]], 464 + ): 465 + """Embed and write likes-derived observations to TurboPuffer user namespaces.""" 466 + logger = get_run_logger() 467 + openai_client = OpenAI(api_key=openai_key) 468 + client = turbopuffer.Turbopuffer(api_key=tpuf_key, region="gcp-us-central1") 469 + 470 + added = 0 471 + updated = 0 472 + 473 + for obs in observations: 474 + action = obs.get("action", "NOOP").upper() 475 + if action == "NOOP": 476 + continue 477 + 478 + handle = obs["author_handle"] 479 + content = obs["content"] 480 + tags = obs.get("tags", []) 481 + ns_name = f"phi-users-{clean_handle(handle)}" 482 + ns = client.namespace(ns_name) 483 + 484 + embedding = openai_client.embeddings.create( 485 + model="text-embedding-3-small", input=content, 486 + ).data[0].embedding 487 + 488 + if action == "UPDATE" and obs.get("supersedes_content"): 489 + # find and delete the old observation by semantic similarity 490 + try: 491 + resp = ns.query( 492 + rank_by=("vector", "ANN", embedding), 493 + top_k=3, 494 + filters={"kind": ["Eq", "observation"]}, 495 + include_attributes=["content"], 496 + ) 497 + if resp.rows: 498 + old_content = obs["supersedes_content"].lower() 499 + for row in resp.rows: 500 + if old_content in row.content.lower() or row.content.lower() in old_content: 501 + ns.write(deletes=[row.id], distance_metric="cosine_distance") 502 + break 503 + except Exception: 504 + pass # proceed with upsert even if delete fails 505 + 506 + obs_id = _observation_id(handle, content) 507 + ns.write( 508 + upsert_rows=[{ 509 + "id": obs_id, 510 + "vector": embedding, 511 + "kind": "observation", 512 + "content": content, 513 + "tags": tags, 514 + "created_at": datetime.now(timezone.utc).isoformat(), 515 + }], 516 + distance_metric="cosine_distance", 517 + schema=USER_NAMESPACE_SCHEMA, 518 + ) 519 + 520 + if action == "ADD": 521 + added += 1 522 + elif action == "UPDATE": 523 + updated += 1 524 + 525 + logger.info(f"likes observations: {added} added, {updated} updated, " 526 + f"{sum(1 for o in observations if o.get('action', '').upper() == 'NOOP')} skipped") 527 + 528 + 261 529 @flow(name="compact", log_prints=True) 262 530 async def compact(): 263 531 """Synthesize per-user relationship summaries from phi's memory.""" ··· 288 556 ) 289 557 write_summary_to_turbopuffer(tpuf_key, openai_key, handle, summary) 290 558 logger.info(f"@{handle}: compacted {profile['observation_count']} observations") 559 + 560 + # --- phase 2: extract observations from liked posts --- 561 + liked_by_author = load_recent_liked_posts(snap_path) 562 + if liked_by_author: 563 + # limit to top 15 most-liked unique authors 564 + top_authors = sorted( 565 + liked_by_author.items(), key=lambda kv: len(kv[1]), reverse=True, 566 + )[:15] 567 + logger.info(f"extracting observations from {len(top_authors)} liked authors") 568 + 569 + all_observations: list[dict[str, Any]] = [] 570 + for handle, posts in top_authors: 571 + liked_posts_text = _format_liked_posts(posts) 572 + if not liked_posts_text.strip(): 573 + continue 574 + 575 + bsky_profile = resolve_bsky_profile(handle) 576 + existing = query_existing_knowledge(tpuf_key, handle) 577 + pubs = search_publications(handle) 578 + 579 + obs_dicts = await extract_likes_observations( 580 + handle, liked_posts_text, existing, bsky_profile, pubs, anthropic_key, 581 + ) 582 + all_observations.extend(obs_dicts) 583 + 584 + actionable = [o for o in all_observations if o.get("action", "").upper() != "NOOP"] 585 + if actionable: 586 + write_likes_observations_to_turbopuffer(tpuf_key, openai_key, actionable) 587 + logger.info(f"extracted {len(all_observations)} observations from liked posts " 588 + f"({len(actionable)} actionable)") 589 + else: 590 + logger.info("no recent liked posts to extract observations from") 291 591 292 592 293 593 if __name__ == "__main__":
+87 -1
flows/ingest.py
··· 24 24 25 25 from mps.db import ( 26 26 write_github_issues, 27 + write_liked_posts, 27 28 write_likes, 28 29 write_phi_interactions, 29 30 write_phi_observations, ··· 31 32 ) 32 33 from mps.github import IssueOrPR, IssueRef, gh_headers 33 34 from mps.phi import PhiInteraction, PhiObservation, restore_handle 34 - from mps.likes import LikeRecord, fetch_likes 35 + from mps.likes import LikeRecord, LikedPost, fetch_likes, summarize_embed 35 36 from mps.tangled import PDS_BASE, TangledItem, fetch_items, fetch_repo_at_uris 36 37 37 38 GITHUB_API = "https://api.github.com" ··· 272 273 273 274 274 275 @task 276 + def resolve_liked_posts(db_path: str) -> list[LikedPost]: 277 + """Find recent unresolved likes and batch-resolve post content via public API.""" 278 + import duckdb 279 + 280 + logger = get_run_logger() 281 + 282 + con = duckdb.connect(db_path) 283 + # bootstrap raw_liked_posts if it doesn't exist yet 284 + con.execute(""" 285 + CREATE TABLE IF NOT EXISTS raw_liked_posts ( 286 + subject_uri VARCHAR PRIMARY KEY, 287 + author_handle VARCHAR, 288 + author_did VARCHAR, 289 + text VARCHAR, 290 + created_at VARCHAR, 291 + liked_at VARCHAR, 292 + embed_type VARCHAR, 293 + embed_text VARCHAR, 294 + fetched_at TIMESTAMP DEFAULT now() 295 + ) 296 + """) 297 + # find likes from last 7 days not yet resolved 298 + rows = con.execute(""" 299 + SELECT l.subject_uri, l.created_at AS liked_at 300 + FROM raw_likes l 301 + LEFT JOIN raw_liked_posts lp ON l.subject_uri = lp.subject_uri 302 + WHERE lp.subject_uri IS NULL 303 + AND l.created_at >= (now() - INTERVAL '7 days')::VARCHAR 304 + ORDER BY l.created_at DESC 305 + LIMIT 200 306 + """).fetchall() 307 + con.close() 308 + 309 + if not rows: 310 + logger.info("no unresolved likes to fetch") 311 + return [] 312 + 313 + uri_to_liked_at = {row[0]: row[1] for row in rows} 314 + uris = list(uri_to_liked_at.keys()) 315 + logger.info(f"resolving {len(uris)} liked posts via public API") 316 + 317 + posts: list[LikedPost] = [] 318 + with httpx.Client(timeout=15) as client: 319 + # getPosts accepts up to 25 URIs per call 320 + for i in range(0, len(uris), 25): 321 + batch = uris[i : i + 25] 322 + resp = client.get( 323 + "https://public.api.bsky.app/xrpc/app.bsky.feed.getPosts", 324 + params=[("uris", u) for u in batch], 325 + ) 326 + if resp.status_code != 200: 327 + logger.warning(f"getPosts returned {resp.status_code} for batch {i}") 328 + continue 329 + for post in resp.json().get("posts", []): 330 + uri = post.get("uri", "") 331 + record = post.get("record", {}) 332 + author = post.get("author", {}) 333 + embed_type, embed_text = summarize_embed(post.get("embed") or record.get("embed") or {}) 334 + posts.append(LikedPost( 335 + subject_uri=uri, 336 + author_handle=author.get("handle", ""), 337 + author_did=author.get("did", ""), 338 + text=record.get("text", ""), 339 + created_at=record.get("createdAt", ""), 340 + liked_at=uri_to_liked_at.get(uri, ""), 341 + embed_type=embed_type, 342 + embed_text=embed_text, 343 + )) 344 + 345 + logger.info(f"resolved {len(posts)} liked posts") 346 + return posts 347 + 348 + 349 + @task 350 + def persist_liked_posts(items: list[LikedPost]) -> int: 351 + return write_liked_posts(items, _db_path()) 352 + 353 + 354 + @task 275 355 def persist_phi( 276 356 observations: list[PhiObservation], 277 357 interactions: list[PhiInteraction], ··· 362 442 if likes: 363 443 total = persist_likes(likes) 364 444 logger.info(f"persisted {len(likes)} likes; {total} total in raw_likes") 445 + 446 + # resolve liked post content for recent unresolved likes 447 + liked_posts = resolve_liked_posts(_db_path()) 448 + if liked_posts: 449 + lp_total = persist_liked_posts(liked_posts) 450 + logger.info(f"resolved {len(liked_posts)} liked posts; {lp_total} total in raw_liked_posts") 365 451 366 452 if phi_observations or phi_interactions: 367 453 obs_total, ix_total = persist_phi(phi_observations, phi_interactions)
+36 -1
packages/mps/src/mps/db.py
··· 4 4 5 5 import duckdb 6 6 7 - from mps.likes import LikeRecord 7 + from mps.likes import LikeRecord, LikedPost 8 8 from mps.phi import PhiInteraction, PhiObservation 9 9 10 10 ··· 33 33 rows, 34 34 ) 35 35 count = con.execute("SELECT count(*) FROM raw_likes").fetchone()[0] 36 + con.close() 37 + return count 38 + 39 + 40 + def write_liked_posts(items: list[LikedPost], db_path: str) -> int: 41 + """Upsert resolved liked posts into raw_liked_posts. Returns total row count.""" 42 + con = duckdb.connect(db_path) 43 + con.execute(""" 44 + CREATE TABLE IF NOT EXISTS raw_liked_posts ( 45 + subject_uri VARCHAR PRIMARY KEY, 46 + author_handle VARCHAR, 47 + author_did VARCHAR, 48 + text VARCHAR, 49 + created_at VARCHAR, 50 + liked_at VARCHAR, 51 + embed_type VARCHAR, 52 + embed_text VARCHAR, 53 + fetched_at TIMESTAMP DEFAULT now() 54 + ) 55 + """) 56 + rows = [ 57 + ( 58 + item.subject_uri, item.author_handle, item.author_did, 59 + item.text, item.created_at, item.liked_at, 60 + item.embed_type, item.embed_text, 61 + datetime.datetime.now(datetime.UTC), 62 + ) 63 + for item in items 64 + ] 65 + if rows: 66 + con.executemany( 67 + "INSERT OR REPLACE INTO raw_liked_posts VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", 68 + rows, 69 + ) 70 + count = con.execute("SELECT count(*) FROM raw_liked_posts").fetchone()[0] 36 71 con.close() 37 72 return count 38 73
+41
packages/mps/src/mps/likes.py
··· 16 16 created_at: str # ISO timestamp from like record 17 17 18 18 19 + @dataclass 20 + class LikedPost: 21 + subject_uri: str # the post URI (primary key) 22 + author_handle: str 23 + author_did: str 24 + text: str 25 + created_at: str # post's creation time 26 + liked_at: str # when nate liked it 27 + embed_type: str = "" # external/images/record/video/"" 28 + embed_text: str = "" # link card title+url, image alt, quote text 29 + 30 + 31 + def summarize_embed(embed: dict) -> tuple[str, str]: 32 + """Extract type + summary text from a post embed (raw API JSON).""" 33 + if not embed: 34 + return "", "" 35 + typ = embed.get("$type", "") 36 + if "external" in typ: 37 + ext = embed.get("external", {}) 38 + title = ext.get("title", "") 39 + uri = ext.get("uri", "") 40 + desc = ext.get("description", "") 41 + parts = [p for p in [title, uri, desc] if p] 42 + return "external", " — ".join(parts) 43 + if "images" in typ: 44 + alts = [img.get("alt", "") for img in embed.get("images", [])] 45 + return "images", " | ".join(a for a in alts if a) 46 + if "record" in typ: 47 + rec = embed.get("record", {}) 48 + # recordWithMedia wraps record + media 49 + if "record" in rec: 50 + rec = rec["record"] 51 + value = rec.get("value", rec) 52 + text = value.get("text", "") 53 + return "record", text[:500] if text else "" 54 + if "video" in typ: 55 + alt = embed.get("alt", "") 56 + return "video", alt 57 + return "", "" 58 + 59 + 19 60 def fetch_likes(client: httpx.Client) -> list[LikeRecord]: 20 61 """Fetch like records from nate's PDS. Cursor-based pagination.""" 21 62 cursor: str | None = None