my prefect server setup prefect-metrics.waow.tech
python orchestration
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

rename enrich → transform, curate → brief

ingest → transform → brief reads as a clear pipeline.
the old names sounded like synonyms.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 57cc2f87 9bcca77b

+21 -21
+3 -3
README.md
··· 7 7 ├──► ingest ──► raw_github_issues ──┐ 8 8 tangled PDS ─┘ (hourly) raw_tangled_items ──┤ 9 9 10 - enrich (dbt) 10 + transform (dbt) 11 11 [on ingest ✓] 12 12 13 13 ··· 16 16 17 17 ┌──────────────┼──────────┐ 18 18 ▼ ▼ ▼ 19 - curate /api/cards hub UI 20 - [on enrich ✓] 19 + brief /api/cards hub UI 20 + [on transform ✓] 21 21 22 22 23 23 briefing.json
+7 -7
docs/hub.md
··· 4 4 5 5 ## data sources 6 6 7 - a single `ingest` flow runs hourly on cron and fetches both data sources concurrently, then writes to DuckDB sequentially (same process = no single-writer lock contention). downstream flows (enrich, curate) are event-driven via deployment triggers — they only run when upstream completes. 7 + a single `ingest` flow runs hourly on cron and fetches both data sources concurrently, then writes to DuckDB sequentially (same process = no single-writer lock contention). downstream flows (transform, brief) are event-driven via deployment triggers — they only run when upstream completes. 8 8 9 9 **github** — fetches notifications (issues + PRs) and open items authored by `zzstoatzz` via the search API. each issue is cached by repo+number for 24h. persists to `raw_github_issues`. 10 10 ··· 17 17 ├──► ingest ──► raw_github_issues ──┐ 18 18 tangled PDS ─┘ (hourly) raw_tangled_items ──┤ 19 19 20 - enrich (dbt) 20 + transform (dbt) 21 21 [on ingest ✓] 22 22 23 23 ··· 26 26 27 27 ┌──────────────┼──────────────┐ 28 28 ▼ ▼ ▼ 29 - curate /api/cards.json +page.svelte 30 - [on enrich ✓] (SSR loader) 29 + brief /api/cards.json +page.svelte 30 + [on transform ✓] (SSR loader) 31 31 32 32 33 33 briefing.json ──► /api/briefing.json ··· 39 39 |---|---|---| 40 40 | `diagnostics` | cron `*/5 * * * *` | prints system info — canary for worker health | 41 41 | `ingest` | cron `0 * * * *` | fetches github notifications + authored items and tangled.org items concurrently, persists both to DuckDB sequentially | 42 - | `enrich` | on `ingest` completion | dbt build: staging → enrichment → mart. concurrency limit 1. runs under python 3.13 (dbt-core compat) | 43 - | `curate` | on `enrich` completion | loads top 200 scored items, sends to claude haiku 4.5 via pydantic-ai, writes `briefing.json`. cached by items content hash (skips LLM when data unchanged) | 42 + | `transform` | on `ingest` completion | dbt build: staging → scoring → mart. concurrency limit 1. runs under python 3.13 (dbt-core compat) | 43 + | `brief` | on `transform` completion | loads top 200 scored items, sends to claude haiku 4.5 via pydantic-ai, writes `briefing.json`. cached by items content hash (skips LLM when data unchanged) | 44 44 | `cleanup` | cron `0 2 * * 0` | deletes old terminal flow runs (completed, failed, cancelled, crashed) older than 30 days | 45 45 46 46 all flows run in the `kubernetes-pool` work pool. code is pulled at runtime via `git clone` from tangled.sh (github fallback). deps install via `uv run --with 'my-prefect-server @ git+...'`. deployments are registered by CI on every push to main. ··· 61 61 62 62 ## curation 63 63 64 - the `curate` flow fires automatically when `enrich` completes (via deployment trigger). it: 64 + the `brief` flow fires automatically when `transform` completes (via deployment trigger). it: 65 65 66 66 1. snapshots DuckDB to `/tmp` (bypass exclusive flock) 67 67 2. loads top 200 items from `hub_action_items`
+3 -3
flows/curate.py flows/brief.py
··· 109 109 Path(path).write_text(briefing.model_dump_json(indent=2)) 110 110 111 111 112 - @flow(name="curate", log_prints=True) 113 - async def curate(): 112 + @flow(name="brief", log_prints=True) 113 + async def brief(): 114 114 logger = get_run_logger() 115 115 db_path = os.environ.get( 116 116 "ANALYTICS_DB_PATH", ··· 136 136 if __name__ == "__main__": 137 137 import asyncio 138 138 139 - asyncio.run(curate()) 139 + asyncio.run(brief())
+3 -3
flows/enrich.py flows/transform.py
··· 6 6 ANALYTICS_DIR = Path(__file__).parent.parent / "analytics" 7 7 8 8 9 - @flow(name="enrich", log_prints=True) 10 - def enrich(): 9 + @flow(name="transform", log_prints=True) 10 + def transform(): 11 11 # lazy imports: dbt-common -> mashumaro has Python 3.14 compat issues at 12 12 # module load time; importing inside the function defers until flow runs 13 13 from datetime import timedelta ··· 49 49 50 50 51 51 if __name__ == "__main__": 52 - enrich() 52 + transform()
+5 -5
prefect.yaml
··· 32 32 parameters: 33 33 only_unread: true 34 34 35 - - name: enrich 36 - entrypoint: flows/enrich.py:enrich 35 + - name: transform 36 + entrypoint: flows/transform.py:transform 37 37 work_pool: 38 38 name: kubernetes-pool 39 39 job_variables: ··· 63 63 rate_limit_delay: 0.5 64 64 dry_run: false 65 65 66 - - name: curate 67 - entrypoint: flows/curate.py:curate 66 + - name: brief 67 + entrypoint: flows/brief.py:brief 68 68 work_pool: *k8s 69 69 triggers: 70 70 - type: event 71 71 expect: 72 72 - "prefect.flow-run.Completed" 73 73 match_related: 74 - prefect.resource.name: "enrich" 74 + prefect.resource.name: "transform" 75 75 prefect.resource.role: "deployment"