my prefect server setup prefect-metrics.waow.tech
python orchestration
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

chain enrich and curate via deployment triggers

replace cron schedules on enrich and curate with event-driven
triggers. enrich fires when tangled-items completes (both data
sources are in DuckDB by then). curate fires when enrich completes.
no pods spin up unless upstream actually finished.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 3924f982 7390814a

+61 -23
+6 -4
README.md
··· 4 4 5 5 ``` 6 6 github API ──► gh-notifications ──► raw_github_issues ──┐ 7 - (hourly :00) │ 7 + (hourly) │ 8 8 9 - enrich (dbt, :05) 9 + enrich (dbt) 10 + [on complete ✓] 10 11 11 12 tangled PDS ──► tangled-items ───► raw_tangled_items ────┘ 12 - (hourly :02) │ 13 + (hourly) │ 13 14 14 15 hub_action_items 15 16 (top 200) 16 17 17 18 ┌──────────────┼──────────┐ 18 19 ▼ ▼ ▼ 19 - curate (:10) /api/cards hub UI 20 + curate /api/cards hub UI 21 + [on complete ✓] 20 22 21 23 22 24 briefing.json
+14 -13
docs/hub.md
··· 4 4 5 5 ## data sources 6 6 7 - two ingestion flows run hourly, staggered because DuckDB only allows one writer process at a time: 7 + two ingestion flows run hourly on cron, staggered because DuckDB only allows one writer process at a time. downstream flows (enrich, curate) are event-driven via deployment triggers — they only run when upstream completes: 8 8 9 9 **gh-notifications** (`flows/gh_notifications.py`) — fetches github notifications (issues + PRs) and open items authored by `zzstoatzz` via the search API. each issue is cached by repo+number for 24h. persists to `raw_github_issues`. 10 10 ··· 17 17 (hourly :00) │ 18 18 19 19 ┌─── enrich (dbt) ───┐ 20 - │ (hourly :05) │ 20 + │ [on tangled-items ✓] │ 21 21 └────────────────────┘ 22 22 23 23 tangled PDS ──► tangled-items ───► raw_tangled_items ────┘ ··· 29 29 ┌──────────────┼──────────────┐ 30 30 ▼ ▼ ▼ 31 31 curate /api/cards.json +page.svelte 32 - (hourly :10) (SSR loader) 32 + [on enrich ✓] (SSR loader) 33 33 34 34 35 35 briefing.json ──► /api/briefing.json ··· 37 37 38 38 ## flows 39 39 40 - | deployment | schedule | what it does | 40 + | deployment | trigger | what it does | 41 41 |---|---|---| 42 - | `diagnostics` | `*/5 * * * *` | prints system info — canary for worker health | 43 - | `gh-notifications` | `0 * * * *` | github notifications + authored open issues/PRs → `raw_github_issues` | 44 - | `tangled-items` | `2 * * * *` | tangled.org issues/PRs/comments → `raw_tangled_items` | 45 - | `enrich` | `5 * * * *` | dbt build: staging → enrichment → mart. concurrency limit 1. runs under python 3.13 (dbt-core compat) | 46 - | `curate` | `10 * * * *` | loads top 200 scored items, sends to claude haiku 4.5 via pydantic-ai, writes `briefing.json` | 47 - | `cleanup` | `0 2 * * 0` | deletes old terminal flow runs (completed, failed, cancelled, crashed) older than 30 days | 42 + | `diagnostics` | cron `*/5 * * * *` | prints system info — canary for worker health | 43 + | `gh-notifications` | cron `0 * * * *` | github notifications + authored open issues/PRs → `raw_github_issues` | 44 + | `tangled-items` | cron `2 * * * *` | tangled.org issues/PRs/comments → `raw_tangled_items` | 45 + | `enrich` | on `tangled-items` completion | dbt build: staging → enrichment → mart. concurrency limit 1. runs under python 3.13 (dbt-core compat) | 46 + | `curate` | on `enrich` completion | loads top 200 scored items, sends to claude haiku 4.5 via pydantic-ai, writes `briefing.json`. cached by items content hash (skips LLM when data unchanged) | 47 + | `cleanup` | cron `0 2 * * 0` | deletes old terminal flow runs (completed, failed, cancelled, crashed) older than 30 days | 48 48 49 49 all flows run in the `kubernetes-pool` work pool. code is pulled at runtime via `git clone` from tangled.sh (github fallback). deps install via `uv run --with 'my-prefect-server @ git+...'`. deployments are registered by CI on every push to main. 50 50 ··· 64 64 65 65 ## curation 66 66 67 - the `curate` flow runs at :10 each hour after `enrich` refreshes the mart. it: 67 + the `curate` flow fires automatically when `enrich` completes (via deployment trigger). it: 68 68 69 69 1. snapshots DuckDB to `/tmp` (bypass exclusive flock) 70 70 2. loads top 200 items from `hub_action_items` 71 - 3. sends them to claude haiku 4.5 with a system prompt that groups by actionability 72 - 4. writes a structured `Briefing` (headline, 2-5 themed sections with accent colors, icons, priority) to `briefing.json` 71 + 3. checks cache — the `generate_briefing` task uses a `ByItemsContent` cache policy that hashes the items text + system prompt. if the data hasn't changed since the last run, the cached briefing is returned without calling the LLM (4h expiration) 72 + 4. on cache miss, sends items to claude haiku 4.5 with a system prompt that groups by actionability 73 + 5. writes a structured `Briefing` (headline, 4 themed sections with accent colors, icons, priority) to `briefing.json` 73 74 74 75 briefing model is defined in `packages/mps/src/mps/briefing.py`. 75 76
+27 -2
notes/journey.md
··· 334 334 - labels are tracked via `sh.tangled.label.op` records (add/delete operations on AT URI subjects) 335 335 - activity volume is very low currently — full resync each run is fine 336 336 337 + ## step 12: event-driven pipeline with deployment triggers 338 + 339 + replaced the staggered cron schedules on `enrich` and `curate` with deployment triggers (prefect automations). the data flows (`gh-notifications` at :00, `tangled-items` at :02) stay on cron. downstream flows fire reactively: 340 + 341 + - `enrich` triggers on `tangled-items` completion — by that point both data sources have written to DuckDB 342 + - `curate` triggers on `enrich` completion — the mart is fresh 343 + 344 + this eliminates wasted pods: no enrich/curate run unless upstream actually finished. previously, cron offsets were a guess (`:05`, `:10`) — if enrich took longer than 5 minutes, curate would run on stale data. 345 + 346 + the `curate` flow also gained a `ByItemsContent` cache policy that hashes the items text + system prompt. if the scored items haven't changed, the LLM call is skipped entirely (4h cache expiration). combined with event-driven triggering, curate only spins up a pod when there's new data, and only calls haiku when the data actually changed. 347 + 348 + defined in `prefect.yaml` as deployment triggers: 349 + ```yaml 350 + triggers: 351 + - type: event 352 + expect: 353 + - "prefect.flow-run.Completed" 354 + match_related: 355 + prefect.resource.name: "tangled-items" 356 + prefect.resource.role: "deployment" 357 + ``` 358 + 359 + these are syntactic sugar — `prefect deploy` converts each trigger into a full `Automation` with a `RunDeployment` action targeting the owning deployment. 360 + 361 + **gotcha**: `prefect deploy` creates the automation but does NOT remove old cron schedules. must manually pause them via `prefect deployment schedule pause`. 362 + 337 363 ## what's next 338 364 339 - - CI for flow registration on tangled (.tangled CI, not github actions) 340 - - more interesting deployments with automations 341 365 - file upstream issue on prefecthq/prefect-helm for docket URL 342 366 - contribute guide back to prefect docs 343 367 - upstream: `job_variables` placement should be validated or warned about (silent ignore is surprising) 344 368 - upstream: consider a prefect recipe/docs page for the `uv run --with` pattern on kubernetes work pools 369 + - upstream: `prefect deploy` should remove schedules when switching to triggers-only
+14 -4
prefect.yaml
··· 48 48 --with 'my-prefect-server @ git+https://github.com/zzstoatzz/my-prefect-server.git' 49 49 prefect flow-run execute 50 50 concurrency_limit: 1 51 - schedules: 52 - - cron: "5 * * * *" # 5 minutes after gh-notifications 51 + triggers: 52 + - type: event 53 + expect: 54 + - "prefect.flow-run.Completed" 55 + match_related: 56 + prefect.resource.name: "tangled-items" 57 + prefect.resource.role: "deployment" 53 58 54 59 - name: cleanup 55 60 entrypoint: flows/cleanup.py:cleanup ··· 67 72 - name: curate 68 73 entrypoint: flows/curate.py:curate 69 74 work_pool: *k8s 70 - schedules: 71 - - cron: "10 * * * *" # after enrich refreshes the mart 75 + triggers: 76 + - type: event 77 + expect: 78 + - "prefect.flow-run.Completed" 79 + match_related: 80 + prefect.resource.name: "enrich" 81 + prefect.resource.role: "deployment"