my prefect server setup prefect-metrics.waow.tech
python orchestration
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: gh-notifications uses results + cache policy, not artifacts

- ByRepoAndNumber cache policy: caches fetch_issue_or_pr by repo+number,
ignoring token arg. 24h TTL via cache_expiration.
- result_serializer="json" + persist_result=True: results are readable
JSON files on disk, queryable by DuckDB later
- removed all artifact creation — those were display-only, not indexable
- deploy/results-pvc.yaml: 5Gi PVC for persistent result storage on node
- scripts/patch_work_pool.py: patches kubernetes-pool base job template
to mount the PVC at /prefect-results and set PREFECT_LOCAL_STORAGE_PATH
- justfile: results-storage recipe wraps PVC creation + pool patch

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

+162 -63
+17
deploy/results-pvc.yaml
··· 1 + --- 2 + # Persistent storage for Prefect task results. 3 + # k3s local-path provisioner stores data at /var/lib/rancher/k3s/storage/. 4 + # Flow job pods mount this at /prefect-results and write JSON results there. 5 + # DuckDB can query the files directly: read_json('/prefect-results/**/*.json') 6 + apiVersion: v1 7 + kind: PersistentVolumeClaim 8 + metadata: 9 + name: prefect-results 10 + namespace: prefect 11 + spec: 12 + accessModes: 13 + - ReadWriteOnce 14 + storageClassName: local-path 15 + resources: 16 + requests: 17 + storage: 5Gi
+97 -63
flows/gh_notifications.py
··· 1 1 """ 2 - Fetch GitHub notifications and the issues/PRs behind them, 3 - storing results as Prefect table artifacts for later indexing. 2 + Fetch GitHub notifications and the issues/PRs behind them. 3 + Results are persisted as JSON to a shared PVC so DuckDB can query them later. 4 4 5 - Requires a Prefect Secret block named "github-token" containing 6 - a GitHub personal access token with `notifications` scope (read:org optional). 5 + Cache policy: each issue is cached by repo+number for 24h — we don't re-fetch 6 + what we already have. 7 7 8 - To create the secret: 9 - prefect python -c " 10 - from prefect.blocks.system import Secret 11 - Secret(value='ghp_...').save('github-token', overwrite=True) 12 - " 8 + Requires: 9 + - Secret block "github-token" (notifications scope) 10 + - PREFECT_LOCAL_STORAGE_PATH env var pointing at the mounted PVC 11 + (set in the work pool base job template or via deploy/results-pvc.yaml) 13 12 """ 13 + 14 + import datetime 15 + from dataclasses import dataclass 14 16 15 17 import httpx 16 - from datetime import datetime, timezone 17 18 from prefect import flow, task, get_run_logger 18 - from prefect.artifacts import create_table_artifact, create_markdown_artifact 19 19 from prefect.blocks.system import Secret 20 + from prefect.cache_policies import CachePolicy 21 + from prefect.context import TaskRunContext 20 22 21 23 22 24 GITHUB_API = "https://api.github.com" 23 25 24 26 25 - def _headers(token: str) -> dict: 26 - return { 27 - "Authorization": f"Bearer {token}", 28 - "Accept": "application/vnd.github+json", 29 - "X-GitHub-Api-Version": "2022-11-28", 30 - } 27 + # --- cache policy --- 31 28 29 + @dataclass 30 + class ByRepoAndNumber(CachePolicy): 31 + """Cache key is repo + number only — ignores token/client/other args.""" 32 + 33 + def compute_key( 34 + self, 35 + task_ctx: TaskRunContext, 36 + inputs: dict, 37 + flow_parameters: dict, 38 + **kwargs, 39 + ) -> str | None: 40 + repo = inputs.get("repo") 41 + number = inputs.get("number") 42 + if not repo or not number: 43 + return None 44 + return f"gh/{repo}/{number}" 45 + 46 + 47 + # --- tasks --- 32 48 33 49 @task 34 50 def load_token() -> str: ··· 38 54 @task 39 55 def fetch_notifications(token: str, only_unread: bool = True) -> list[dict]: 40 56 logger = get_run_logger() 41 - params = {"all": str(not only_unread).lower(), "per_page": 50} 42 - with httpx.Client(headers=_headers(token)) as client: 43 - resp = client.get(f"{GITHUB_API}/notifications", params=params) 57 + with httpx.Client(headers=_gh_headers(token)) as client: 58 + resp = client.get( 59 + f"{GITHUB_API}/notifications", 60 + params={"all": str(not only_unread).lower(), "per_page": 50}, 61 + ) 44 62 resp.raise_for_status() 45 - notifications = resp.json() 63 + notifications = resp.json() 46 64 logger.info(f"fetched {len(notifications)} notifications") 47 65 return notifications 48 66 49 67 50 - @task 51 - def fetch_issue_or_pr(token: str, notification: dict) -> dict | None: 52 - """Resolve the subject URL to get the actual issue/PR body.""" 53 - subject = notification.get("subject", {}) 54 - url = subject.get("url") 55 - if not url or subject.get("type") not in ("Issue", "PullRequest"): 56 - return None 57 - with httpx.Client(headers=_headers(token)) as client: 58 - resp = client.get(url) 68 + @task( 69 + cache_policy=ByRepoAndNumber(), 70 + cache_expiration=datetime.timedelta(hours=24), 71 + persist_result=True, 72 + result_serializer="json", 73 + ) 74 + def fetch_issue_or_pr(token: str, repo: str, number: int, subject_type: str) -> dict: 75 + """Fetch a single issue or PR. Cached by repo+number for 24h.""" 76 + with httpx.Client(headers=_gh_headers(token)) as client: 77 + kind = "pulls" if subject_type == "PullRequest" else "issues" 78 + resp = client.get(f"{GITHUB_API}/repos/{repo}/{kind}/{number}") 59 79 if resp.status_code == 404: 60 - return None 80 + return {} 61 81 resp.raise_for_status() 62 82 data = resp.json() 63 83 return { 64 - "repo": notification["repository"]["full_name"], 65 - "type": subject["type"], 66 - "number": data.get("number"), 84 + "repo": repo, 85 + "number": number, 86 + "type": subject_type, 67 87 "title": data.get("title"), 68 88 "state": data.get("state"), 69 - "body": (data.get("body") or "")[:500], # truncate for artifact display 89 + "body": data.get("body") or "", 70 90 "url": data.get("html_url"), 71 - "updated_at": notification.get("updated_at"), 72 - "reason": notification.get("reason"), 91 + "labels": [la["name"] for la in data.get("labels", [])], 92 + "created_at": data.get("created_at"), 93 + "updated_at": data.get("updated_at"), 94 + "user": (data.get("user") or {}).get("login"), 95 + "comments": data.get("comments", 0), 73 96 } 74 97 75 98 99 + # --- flow --- 100 + 76 101 @flow(name="gh-notifications", log_prints=True) 77 102 def gh_notifications(only_unread: bool = True) -> list[dict]: 78 - """Fetch GitHub notifications and store them as Prefect artifacts.""" 103 + """ 104 + Fetch GitHub notifications and persist each issue/PR as a cached JSON result. 105 + Cache hit = skips the fetch entirely. Results on disk = DuckDB-queryable. 106 + """ 79 107 logger = get_run_logger() 80 108 81 109 token = load_token() ··· 85 113 logger.info("no notifications") 86 114 return [] 87 115 88 - items = [fetch_issue_or_pr(token, n) for n in notifications] 89 - items = [i for i in items if i] 90 - logger.info(f"resolved {len(items)} issues/PRs") 91 - 92 - if not items: 93 - return [] 116 + items = [] 117 + for n in notifications: 118 + subject = n.get("subject", {}) 119 + subject_type = subject.get("type") 120 + if subject_type not in ("Issue", "PullRequest"): 121 + continue 94 122 95 - # table artifact — one row per notification 96 - now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") 97 - create_table_artifact( 98 - key="github-notifications", 99 - table=items, 100 - description=f"## GitHub notifications\n\nFetched {len(items)} items at {now}", 101 - ) 123 + # parse number from subject URL e.g. .../issues/42 124 + url = subject.get("url", "") 125 + try: 126 + number = int(url.rstrip("/").split("/")[-1]) 127 + except (ValueError, IndexError): 128 + continue 102 129 103 - # markdown artifact — readable summary 104 - lines = [f"## GitHub notifications — {now}\n"] 105 - for item in items: 106 - icon = "🔀" if item["type"] == "PullRequest" else "🐛" 107 - lines.append( 108 - f"- {icon} **[{item['repo']}#{item['number']}]({item['url']})** " 109 - f"`{item['state']}` — {item['title']} \n" 110 - f" _reason: {item['reason']}_" 130 + repo = n["repository"]["full_name"] 131 + result = fetch_issue_or_pr( 132 + token=token, 133 + repo=repo, 134 + number=number, 135 + subject_type=subject_type, 111 136 ) 112 - create_markdown_artifact( 113 - key="github-notifications-summary", 114 - markdown="\n".join(lines), 115 - ) 137 + if result: 138 + items.append(result) 116 139 140 + logger.info(f"resolved {len(items)} issues/PRs (cache hits skip fetches)") 117 141 return items 142 + 143 + 144 + # --- helpers --- 145 + 146 + def _gh_headers(token: str) -> dict: 147 + return { 148 + "Authorization": f"Bearer {token}", 149 + "Accept": "application/vnd.github+json", 150 + "X-GitHub-Api-Version": "2022-11-28", 151 + } 118 152 119 153 120 154 if __name__ == "__main__":
+12
justfile
··· 140 140 worker: 141 141 kubectl apply -f deploy/worker.yaml 142 142 143 + # create the results PVC and patch the kubernetes-pool base job template to mount it 144 + results-storage: 145 + #!/usr/bin/env bash 146 + set -euo pipefail 147 + : "${DOMAIN:?set DOMAIN}" 148 + : "${AUTH_STRING:?set AUTH_STRING}" 149 + echo "==> creating results PVC" 150 + kubectl apply -f deploy/results-pvc.yaml 151 + echo "==> patching kubernetes-pool base job template" 152 + PREFECT_API_URL="https://$DOMAIN/api" PREFECT_API_AUTH_STRING="$AUTH_STRING" \ 153 + uv run --with prefect python scripts/patch_work_pool.py 154 + 143 155 # register flow deployments (run locally with PREFECT_API_URL + PREFECT_API_AUTH_STRING) 144 156 register-flows: 145 157 PREFECT_API_URL="https://$DOMAIN/api" PREFECT_API_AUTH_STRING="$AUTH_STRING" \
+36
scripts/patch_work_pool.py
··· 1 + """Patch the kubernetes-pool base job template to mount the results PVC.""" 2 + import asyncio 3 + from prefect.client.orchestration import get_client 4 + from prefect.client.schemas.actions import WorkPoolUpdate 5 + 6 + 7 + async def main(): 8 + async with get_client() as c: 9 + pool = await c.read_work_pool("kubernetes-pool") 10 + t = pool.base_job_template 11 + props = t.setdefault("variables", {}).setdefault("properties", {}) 12 + 13 + vols = props.setdefault("volumes", {}).setdefault("default", []) 14 + pvc_vol = { 15 + "name": "prefect-results", 16 + "persistentVolumeClaim": {"claimName": "prefect-results"}, 17 + } 18 + if pvc_vol not in vols: 19 + vols.append(pvc_vol) 20 + 21 + mounts = props.setdefault("volume_mounts", {}).setdefault("default", []) 22 + pvc_mount = {"name": "prefect-results", "mountPath": "/prefect-results"} 23 + if pvc_mount not in mounts: 24 + mounts.append(pvc_mount) 25 + 26 + envs = props.setdefault("env", {}).setdefault("default", {}) 27 + envs["PREFECT_LOCAL_STORAGE_PATH"] = "/prefect-results" 28 + 29 + await c.update_work_pool( 30 + "kubernetes-pool", 31 + WorkPoolUpdate(base_job_template=t), 32 + ) 33 + print("done — flow job pods will now mount /prefect-results from PVC") 34 + 35 + 36 + asyncio.run(main())