my prefect server setup prefect-metrics.waow.tech
python orchestration
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

merge gh-notifications + tangled-items into single ingest flow

both sources fetched concurrently in one flow, DuckDB writes sequential
in the same process — eliminates the staggered cron hack. enrich now
triggers on ingest completion instead of tangled-items.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 9bcca77b 83bfa1f5

+122 -153
+18 -19
README.md
··· 3 3 [hub](https://hub.waow.tech) · [grafana](https://prefect-metrics.waow.tech/d/executive-overview/executive-overview?orgId=1&from=now-6h&to=now&timezone=browser) 4 4 5 5 ``` 6 - github API ──► gh-notifications ──► raw_github_issues ──┐ 7 - (hourly) │ 8 - 9 - enrich (dbt) 10 - [on complete ✓] 11 - 12 - tangled PDS ──► tangled-items ───► raw_tangled_items ────┘ 13 - (hourly) │ 14 - 15 - hub_action_items 16 - (top 200) 17 - 18 - ┌──────────────┼──────────┐ 19 - ▼ ▼ ▼ 20 - curate /api/cards hub UI 21 - [on complete ✓] 22 - 23 - 24 - briefing.json 6 + github API ──┐ 7 + ├──► ingest ──► raw_github_issues ──┐ 8 + tangled PDS ─┘ (hourly) raw_tangled_items ──┤ 9 + 10 + enrich (dbt) 11 + [on ingest ✓] 12 + 13 + 14 + hub_action_items 15 + (top 200) 16 + 17 + ┌──────────────┼──────────┐ 18 + ▼ ▼ ▼ 19 + curate /api/cards hub UI 20 + [on enrich ✓] 21 + 22 + 23 + briefing.json 25 24 ``` 26 25 27 26 see [docs/hub.md](docs/hub.md) for the full pipeline breakdown.
+23 -26
docs/hub.md
··· 4 4 5 5 ## data sources 6 6 7 - two ingestion flows run hourly on cron, staggered because DuckDB only allows one writer process at a time. downstream flows (enrich, curate) are event-driven via deployment triggers — they only run when upstream completes: 7 + a single `ingest` flow runs hourly on cron and fetches both data sources concurrently, then writes to DuckDB sequentially (same process = no single-writer lock contention). downstream flows (enrich, curate) are event-driven via deployment triggers — they only run when upstream completes. 8 8 9 - **gh-notifications** (`flows/gh_notifications.py`) — fetches github notifications (issues + PRs) and open items authored by `zzstoatzz` via the search API. each issue is cached by repo+number for 24h. persists to `raw_github_issues`. 9 + **github** — fetches notifications (issues + PRs) and open items authored by `zzstoatzz` via the search API. each issue is cached by repo+number for 24h. persists to `raw_github_issues`. 10 10 11 - **tangled-items** (`flows/tangled_items.py`) — fetches issues, PRs, and comments from the tangled.org PDS (`pds.zzstoatzz.io`) via AT Protocol's `com.atproto.repo.listRecords`. no auth needed — records are public. targets repos: zat, zlay, plyr.fm, at-me, pollz, typeahead. persists to `raw_tangled_items`. 11 + **tangled.org** — fetches issues, PRs, and comments from the PDS (`pds.zzstoatzz.io`) via AT Protocol's `com.atproto.repo.listRecords`. no auth needed — records are public. targets repos: zat, zlay, plyr.fm, at-me, pollz, typeahead. persists to `raw_tangled_items`. 12 12 13 13 ## pipeline 14 14 15 15 ``` 16 - github API ──► gh-notifications ──► raw_github_issues ──┐ 17 - (hourly :00) │ 18 - 19 - ┌─── enrich (dbt) ───┐ 20 - │ [on tangled-items ✓] │ 21 - └────────────────────┘ 22 - 23 - tangled PDS ──► tangled-items ───► raw_tangled_items ────┘ 24 - (hourly :02) │ 25 - 26 - hub_action_items 27 - (mart, top 200) 28 - 29 - ┌──────────────┼──────────────┐ 30 - ▼ ▼ ▼ 31 - curate /api/cards.json +page.svelte 32 - [on enrich ✓] (SSR loader) 33 - 34 - 35 - briefing.json ──► /api/briefing.json 16 + github API ──┐ 17 + ├──► ingest ──► raw_github_issues ──┐ 18 + tangled PDS ─┘ (hourly) raw_tangled_items ──┤ 19 + 20 + enrich (dbt) 21 + [on ingest ✓] 22 + 23 + 24 + hub_action_items 25 + (mart, top 200) 26 + 27 + ┌──────────────┼──────────────┐ 28 + ▼ ▼ ▼ 29 + curate /api/cards.json +page.svelte 30 + [on enrich ✓] (SSR loader) 31 + 32 + 33 + briefing.json ──► /api/briefing.json 36 34 ``` 37 35 38 36 ## flows ··· 40 38 | deployment | trigger | what it does | 41 39 |---|---|---| 42 40 | `diagnostics` | cron `*/5 * * * *` | prints system info — canary for worker health | 43 - | `gh-notifications` | cron `0 * * * *` | github notifications + authored open issues/PRs → `raw_github_issues` | 44 - | `tangled-items` | cron `2 * * * *` | tangled.org issues/PRs/comments → `raw_tangled_items` | 45 - | `enrich` | on `tangled-items` completion | dbt build: staging → enrichment → mart. concurrency limit 1. runs under python 3.13 (dbt-core compat) | 41 + | `ingest` | cron `0 * * * *` | fetches github notifications + authored items and tangled.org items concurrently, persists both to DuckDB sequentially | 42 + | `enrich` | on `ingest` completion | dbt build: staging → enrichment → mart. concurrency limit 1. runs under python 3.13 (dbt-core compat) | 46 43 | `curate` | on `enrich` completion | loads top 200 scored items, sends to claude haiku 4.5 via pydantic-ai, writes `briefing.json`. cached by items content hash (skips LLM when data unchanged) | 47 44 | `cleanup` | cron `0 2 * * 0` | deletes old terminal flow runs (completed, failed, cancelled, crashed) older than 30 days | 48 45
+78 -31
flows/gh_notifications.py flows/ingest.py
··· 1 1 """ 2 - Fetch GitHub notifications and the issues/PRs behind them. 3 - Results are persisted as JSON to a shared PVC so DuckDB can query them later. 2 + Fetch GitHub notifications and tangled.org items, persist both to DuckDB. 4 3 5 - Cache policy: each issue is cached by repo+number for 24h — we don't re-fetch 6 - what we already have. 4 + Combines the two data sources into one flow so DuckDB's single-writer lock 5 + is never contested — both persists happen sequentially in the same process. 6 + 7 + Cache policy: each GitHub issue is cached by repo+number for 24h. 7 8 8 9 Requires: 9 10 - Secret block "github-token" (notifications scope) 10 11 - PREFECT_LOCAL_STORAGE_PATH env var pointing at the mounted PVC 11 - (set in the work pool base job template or via deploy/results-pvc.yaml) 12 12 """ 13 13 14 14 import datetime ··· 16 16 from dataclasses import dataclass 17 17 18 18 import httpx 19 - from prefect import flow, task, get_run_logger, unmapped 19 + from prefect import flow, get_run_logger, task, unmapped 20 20 from prefect.blocks.system import Secret 21 21 from prefect.cache_policies import CachePolicy 22 22 from prefect.context import TaskRunContext 23 23 24 - from mps.db import write_github_issues 24 + from mps.db import write_github_issues, write_tangled_items 25 25 from mps.github import IssueOrPR, IssueRef, gh_headers 26 + from mps.tangled import PDS_BASE, TangledItem, fetch_items, fetch_repo_at_uris 26 27 27 28 GITHUB_API = "https://api.github.com" 28 29 30 + TANGLED_COLLECTIONS = [ 31 + "sh.tangled.repo.issue", 32 + "sh.tangled.repo.pull", 33 + "sh.tangled.repo.issue.comment", 34 + "sh.tangled.repo.pull.comment", 35 + ] 36 + 29 37 # bump to invalidate all cached results (e.g. when fetch shape changes) 30 38 _CACHE_VERSION = "v2" 31 39 ··· 45 53 if ref is None: 46 54 return None 47 55 return f"gh/{_CACHE_VERSION}/{ref.repo}/{ref.number}" 56 + 57 + 58 + # --- github tasks --- 48 59 49 60 50 61 @task ··· 93 104 def fetch_issue_or_pr(ref: IssueRef, token: str) -> IssueOrPR | None: 94 105 """Fetch a single issue or PR. Cached by repo+number for 24h.""" 95 106 with httpx.Client(headers=gh_headers(token)) as client: 96 - # always use the issues endpoint — PRs are issues in GitHub's model and 97 - # only the issues endpoint returns the reactions field 98 107 resp = client.get(f"{GITHUB_API}/repos/{ref.repo}/issues/{ref.number}") 99 108 if resp.status_code == 404: 100 109 return None ··· 133 142 for item in resp.json().get("items", []): 134 143 html_url = item.get("html_url", "") 135 144 is_pr = "/pull/" in html_url 136 - # extract repo from html_url: https://github.com/{owner}/{repo}/issues/{n} 137 145 parts = html_url.split("/") 138 146 try: 139 147 repo = f"{parts[3]}/{parts[4]}" ··· 150 158 return refs 151 159 152 160 161 + # --- tangled tasks --- 162 + 163 + 153 164 @task 154 - def persist_to_duckdb(items: list[IssueOrPR]) -> int: 155 - db_path = os.environ.get( 165 + def fetch_all_tangled_items() -> list[TangledItem]: 166 + """Fetch issues, PRs, and comments from the tangled.org PDS.""" 167 + logger = get_run_logger() 168 + with httpx.Client(base_url=PDS_BASE, timeout=30) as client: 169 + repo_uris = fetch_repo_at_uris(client) 170 + logger.info(f"found {len(repo_uris)} target repos on PDS") 171 + 172 + items: list[TangledItem] = [] 173 + for collection in TANGLED_COLLECTIONS: 174 + batch = fetch_items(client, collection, repo_uris) 175 + logger.info(f"{collection}: {len(batch)} records") 176 + items.extend(batch) 177 + 178 + return items 179 + 180 + 181 + # --- persist tasks --- 182 + 183 + 184 + def _db_path() -> str: 185 + return os.environ.get( 156 186 "ANALYTICS_DB_PATH", 157 187 os.environ.get("PREFECT_LOCAL_STORAGE_PATH", "/tmp") + "/analytics.duckdb", 158 188 ) 159 - return write_github_issues(items, db_path) 189 + 160 190 191 + @task 192 + def persist_github(items: list[IssueOrPR]) -> int: 193 + return write_github_issues(items, _db_path()) 161 194 162 - def _gh_run_name(): 163 - import prefect.runtime 164 195 165 - unread = prefect.runtime.flow_run.parameters.get("only_unread", True) 166 - return "unread" if unread else "all" 196 + @task 197 + def persist_tangled(items: list[TangledItem]) -> int: 198 + return write_tangled_items(items, _db_path()) 199 + 200 + 201 + # --- flow --- 167 202 168 203 169 - @flow(name="gh-notifications", flow_run_name=_gh_run_name, log_prints=True) 170 - def gh_notifications(only_unread: bool = True) -> list[IssueOrPR]: 204 + @flow(name="ingest", log_prints=True) 205 + def ingest(only_unread: bool = True): 171 206 """ 172 - Fetch GitHub notifications and persist each issue/PR as a cached JSON result. 173 - Cache hit = skips the fetch entirely. Results on disk = DuckDB-queryable. 207 + Fetch GitHub and tangled.org data concurrently, then persist sequentially. 174 208 """ 175 209 logger = get_run_logger() 176 210 177 211 token = load_token() 212 + 213 + # kick off tangled fetch immediately (no deps) 214 + tangled_future = fetch_all_tangled_items.submit() 215 + 216 + # github fetches need the token 178 217 notif_refs = fetch_notifications(token, only_unread=only_unread) 179 218 authored_refs = fetch_authored_items(token) 180 219 ··· 187 226 seen.add(key) 188 227 refs.append(ref) 189 228 190 - if not refs: 191 - logger.info("no items") 192 - return [] 229 + # fetch full issue/PR details (cached) 230 + gh_items: list[IssueOrPR] = [] 231 + if refs: 232 + futures = fetch_issue_or_pr.map(refs, unmapped(token)) 233 + gh_items = [r for r in futures.result() if r is not None] 234 + logger.info(f"resolved {len(gh_items)} github issues/PRs") 193 235 194 - futures = fetch_issue_or_pr.map(refs, unmapped(token)) 195 - items = [r for r in futures.result() if r is not None] 196 - logger.info(f"resolved {len(items)} issues/PRs") 236 + # wait for tangled fetch 237 + tangled_items = tangled_future.result() 238 + logger.info(f"fetched {len(tangled_items)} tangled items") 239 + 240 + # sequential writes — same process, no DuckDB lock contention 241 + if gh_items: 242 + total = persist_github(gh_items) 243 + logger.info(f"upserted {len(gh_items)} github rows; {total} total in raw_github_issues") 197 244 198 - total = persist_to_duckdb(items) 199 - logger.info(f"upserted {len(items)} rows; {total} total in raw_github_issues") 200 - return items 245 + if tangled_items: 246 + total = persist_tangled(tangled_items) 247 + logger.info(f"persisted {len(tangled_items)} tangled rows; {total} total in raw_tangled_items") 201 248 202 249 203 250 if __name__ == "__main__": 204 - gh_notifications() 251 + ingest()
-68
flows/tangled_items.py
··· 1 - """ 2 - Fetch issues, PRs, and comments from the tangled.org PDS and persist to DuckDB. 3 - 4 - No auth needed — PDS records are public. Low volume, full resync each run. 5 - """ 6 - 7 - import os 8 - 9 - import httpx 10 - from prefect import flow, get_run_logger, task 11 - 12 - from mps.db import write_tangled_items 13 - from mps.tangled import ( 14 - PDS_BASE, 15 - TangledItem, 16 - fetch_items, 17 - fetch_repo_at_uris, 18 - ) 19 - 20 - COLLECTIONS = [ 21 - "sh.tangled.repo.issue", 22 - "sh.tangled.repo.pull", 23 - "sh.tangled.repo.issue.comment", 24 - "sh.tangled.repo.pull.comment", 25 - ] 26 - 27 - 28 - @task 29 - def fetch_all_items() -> list[TangledItem]: 30 - """Fetch issues, PRs, and comments from the PDS.""" 31 - logger = get_run_logger() 32 - with httpx.Client(base_url=PDS_BASE, timeout=30) as client: 33 - repo_uris = fetch_repo_at_uris(client) 34 - logger.info(f"found {len(repo_uris)} target repos on PDS") 35 - 36 - items: list[TangledItem] = [] 37 - for collection in COLLECTIONS: 38 - batch = fetch_items(client, collection, repo_uris) 39 - logger.info(f"{collection}: {len(batch)} records") 40 - items.extend(batch) 41 - 42 - return items 43 - 44 - 45 - @task 46 - def persist_to_duckdb(items: list[TangledItem]) -> int: 47 - db_path = os.environ.get( 48 - "ANALYTICS_DB_PATH", 49 - os.environ.get("PREFECT_LOCAL_STORAGE_PATH", "/tmp") + "/analytics.duckdb", 50 - ) 51 - return write_tangled_items(items, db_path) 52 - 53 - 54 - @flow(name="tangled-items", log_prints=True) 55 - def tangled_items(): 56 - logger = get_run_logger() 57 - 58 - items = fetch_all_items() 59 - if not items: 60 - logger.info("no tangled items found") 61 - return 62 - 63 - total = persist_to_duckdb(items) 64 - logger.info(f"persisted {len(items)} items; {total} total in raw_tangled_items") 65 - 66 - 67 - if __name__ == "__main__": 68 - tangled_items()
+3 -9
prefect.yaml
··· 24 24 schedules: 25 25 - cron: "*/5 * * * *" 26 26 27 - - name: gh-notifications 28 - entrypoint: flows/gh_notifications.py:gh_notifications 27 + - name: ingest 28 + entrypoint: flows/ingest.py:ingest 29 29 work_pool: *k8s 30 30 schedules: 31 31 - cron: "0 * * * *" # hourly 32 32 parameters: 33 33 only_unread: true 34 - 35 - - name: tangled-items 36 - entrypoint: flows/tangled_items.py:tangled_items 37 - work_pool: *k8s 38 - schedules: 39 - - cron: "2 * * * *" # after gh-notifications; DuckDB only allows one writer process 40 34 41 35 - name: enrich 42 36 entrypoint: flows/enrich.py:enrich ··· 53 47 expect: 54 48 - "prefect.flow-run.Completed" 55 49 match_related: 56 - prefect.resource.name: "tangled-items" 50 + prefect.resource.name: "ingest" 57 51 prefect.resource.role: "deployment" 58 52 59 53 - name: cleanup