my prefect server setup prefect-metrics.waow.tech
python orchestration
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add like ingestion to hourly ingest flow

new data source: nate's bluesky likes fetched from PDS via
com.atproto.repo.listRecords. fetched in parallel with tangled/phi,
persisted sequentially to raw_likes table. dbt staging model deduplicates.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+117 -1
+1
analytics/models/staging/_sources.yml
··· 7 7 - name: raw_tangled_items 8 8 - name: raw_phi_observations 9 9 - name: raw_phi_interactions 10 + - name: raw_likes
+8
analytics/models/staging/stg_likes.sql
··· 1 + -- bootstrap: ensure table exists even before likes have been ingested 2 + {{ config(pre_hook="CREATE TABLE IF NOT EXISTS raw_likes (at_uri VARCHAR PRIMARY KEY, subject_uri VARCHAR, created_at VARCHAR, fetched_at TIMESTAMP DEFAULT now())") }} 3 + 4 + -- dedup by at_uri, keep most recent fetch 5 + SELECT DISTINCT ON (at_uri) 6 + at_uri, subject_uri, created_at, fetched_at 7 + FROM {{ source('raw', 'raw_likes') }} 8 + ORDER BY at_uri, fetched_at DESC
+26 -1
flows/ingest.py
··· 24 24 25 25 from mps.db import ( 26 26 write_github_issues, 27 + write_likes, 27 28 write_phi_interactions, 28 29 write_phi_observations, 29 30 write_tangled_items, 30 31 ) 31 32 from mps.github import IssueOrPR, IssueRef, gh_headers 32 33 from mps.phi import PhiInteraction, PhiObservation, restore_handle 34 + from mps.likes import LikeRecord, fetch_likes 33 35 from mps.tangled import PDS_BASE, TangledItem, fetch_items, fetch_repo_at_uris 34 36 35 37 GITHUB_API = "https://api.github.com" ··· 255 257 256 258 257 259 @task 260 + def fetch_nate_likes() -> list[LikeRecord]: 261 + """Fetch recent likes from nate's PDS.""" 262 + logger = get_run_logger() 263 + with httpx.Client(base_url=PDS_BASE, timeout=30) as client: 264 + likes = fetch_likes(client) 265 + logger.info(f"fetched {len(likes)} likes from PDS") 266 + return likes 267 + 268 + 269 + @task 270 + def persist_likes(items: list[LikeRecord]) -> int: 271 + return write_likes(items, _db_path()) 272 + 273 + 274 + @task 258 275 def persist_phi( 259 276 observations: list[PhiObservation], 260 277 interactions: list[PhiInteraction], ··· 297 314 298 315 token = load_token() 299 316 300 - # kick off tangled + phi fetches immediately (no deps on github token) 317 + # kick off tangled + phi + likes fetches immediately (no deps on github token) 301 318 tangled_future = fetch_all_tangled_items.submit() 319 + likes_future = fetch_nate_likes.submit() 302 320 303 321 tpuf_key = Secret.load("turbopuffer-api-key").get() 304 322 phi_future = fetch_phi_memory.submit(tpuf_key) ··· 329 347 330 348 phi_observations, phi_interactions = phi_future.result() 331 349 350 + likes = likes_future.result() 351 + logger.info(f"fetched {len(likes)} likes") 352 + 332 353 # sequential writes — same process, no DuckDB lock contention 333 354 if gh_items: 334 355 total = persist_github(gh_items) ··· 337 358 if tangled_items: 338 359 total = persist_tangled(tangled_items) 339 360 logger.info(f"persisted {len(tangled_items)} tangled rows; {total} total in raw_tangled_items") 361 + 362 + if likes: 363 + total = persist_likes(likes) 364 + logger.info(f"persisted {len(likes)} likes; {total} total in raw_likes") 340 365 341 366 if phi_observations or phi_interactions: 342 367 obs_total, ix_total = persist_phi(phi_observations, phi_interactions)
+30
packages/mps/src/mps/db.py
··· 4 4 5 5 import duckdb 6 6 7 + from mps.likes import LikeRecord 7 8 from mps.phi import PhiInteraction, PhiObservation 9 + 10 + 11 + def write_likes(items: list[LikeRecord], db_path: str) -> int: 12 + """Upsert like records into raw_likes. Returns total row count.""" 13 + con = duckdb.connect(db_path) 14 + con.execute(""" 15 + CREATE TABLE IF NOT EXISTS raw_likes ( 16 + at_uri VARCHAR PRIMARY KEY, 17 + subject_uri VARCHAR, 18 + created_at VARCHAR, 19 + fetched_at TIMESTAMP DEFAULT now() 20 + ) 21 + """) 22 + rows = [ 23 + ( 24 + item.at_uri, item.subject_uri, 25 + item.created_at, 26 + datetime.datetime.now(datetime.UTC), 27 + ) 28 + for item in items 29 + ] 30 + if rows: 31 + con.executemany( 32 + "INSERT OR REPLACE INTO raw_likes VALUES (?, ?, ?, ?)", 33 + rows, 34 + ) 35 + count = con.execute("SELECT count(*) FROM raw_likes").fetchone()[0] 36 + con.close() 37 + return count 8 38 9 39 10 40 def write_github_issues(items: list, db_path: str) -> int:
+52
packages/mps/src/mps/likes.py
··· 1 + """Fetch like records from nate's PDS.""" 2 + 3 + from dataclasses import dataclass 4 + 5 + import httpx 6 + 7 + PDS_BASE = "https://pds.zzstoatzz.io" 8 + DID = "did:plc:xbtmt2zjwlrfegqvch7fboei" 9 + XRPC = f"{PDS_BASE}/xrpc/com.atproto.repo.listRecords" 10 + 11 + 12 + @dataclass 13 + class LikeRecord: 14 + at_uri: str # like record URI (primary key) 15 + subject_uri: str # the post that was liked 16 + created_at: str # ISO timestamp from like record 17 + 18 + 19 + def fetch_likes(client: httpx.Client) -> list[LikeRecord]: 20 + """Fetch like records from nate's PDS. Cursor-based pagination.""" 21 + cursor: str | None = None 22 + items: list[LikeRecord] = [] 23 + 24 + while True: 25 + params: dict[str, str | int] = { 26 + "repo": DID, 27 + "collection": "app.bsky.feed.like", 28 + "limit": 100, 29 + } 30 + if cursor: 31 + params["cursor"] = cursor 32 + 33 + resp = client.get(XRPC, params=params) 34 + resp.raise_for_status() 35 + data = resp.json() 36 + 37 + for record in data.get("records", []): 38 + value = record.get("value", {}) 39 + subject = value.get("subject", {}) 40 + items.append( 41 + LikeRecord( 42 + at_uri=record["uri"], 43 + subject_uri=subject.get("uri", ""), 44 + created_at=value.get("createdAt", ""), 45 + ) 46 + ) 47 + 48 + cursor = data.get("cursor") 49 + if not cursor: 50 + break 51 + 52 + return items