my prefect server setup prefect-metrics.waow.tech
python orchestration
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add gh-notifications flow

fetches github notifications, resolves issues/PRs, stores results as
table + markdown artifacts in prefect (postgres). runs hourly.

requires Secret block "github-token" with notifications scope.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

zzstoatzz ef837083 c02dfdc5

+130
+121
flows/gh_notifications.py
··· 1 + """ 2 + Fetch GitHub notifications and the issues/PRs behind them, 3 + storing results as Prefect table artifacts for later indexing. 4 + 5 + Requires a Prefect Secret block named "github-token" containing 6 + a GitHub personal access token with `notifications` scope (read:org optional). 7 + 8 + To create the secret: 9 + prefect python -c " 10 + from prefect.blocks.system import Secret 11 + Secret(value='ghp_...').save('github-token', overwrite=True) 12 + " 13 + """ 14 + 15 + import httpx 16 + from datetime import datetime, timezone 17 + from prefect import flow, task, get_run_logger 18 + from prefect.artifacts import create_table_artifact, create_markdown_artifact 19 + from prefect.blocks.system import Secret 20 + 21 + 22 + GITHUB_API = "https://api.github.com" 23 + 24 + 25 + def _headers(token: str) -> dict: 26 + return { 27 + "Authorization": f"Bearer {token}", 28 + "Accept": "application/vnd.github+json", 29 + "X-GitHub-Api-Version": "2022-11-28", 30 + } 31 + 32 + 33 + @task 34 + def load_token() -> str: 35 + return Secret.load("github-token").get() 36 + 37 + 38 + @task 39 + def fetch_notifications(token: str, only_unread: bool = True) -> list[dict]: 40 + logger = get_run_logger() 41 + params = {"all": str(not only_unread).lower(), "per_page": 50} 42 + with httpx.Client(headers=_headers(token)) as client: 43 + resp = client.get(f"{GITHUB_API}/notifications", params=params) 44 + resp.raise_for_status() 45 + notifications = resp.json() 46 + logger.info(f"fetched {len(notifications)} notifications") 47 + return notifications 48 + 49 + 50 + @task 51 + def fetch_issue_or_pr(token: str, notification: dict) -> dict | None: 52 + """Resolve the subject URL to get the actual issue/PR body.""" 53 + subject = notification.get("subject", {}) 54 + url = subject.get("url") 55 + if not url or subject.get("type") not in ("Issue", "PullRequest"): 56 + return None 57 + with httpx.Client(headers=_headers(token)) as client: 58 + resp = client.get(url) 59 + if resp.status_code == 404: 60 + return None 61 + resp.raise_for_status() 62 + data = resp.json() 63 + return { 64 + "repo": notification["repository"]["full_name"], 65 + "type": subject["type"], 66 + "number": data.get("number"), 67 + "title": data.get("title"), 68 + "state": data.get("state"), 69 + "body": (data.get("body") or "")[:500], # truncate for artifact display 70 + "url": data.get("html_url"), 71 + "updated_at": notification.get("updated_at"), 72 + "reason": notification.get("reason"), 73 + } 74 + 75 + 76 + @flow(name="gh-notifications", log_prints=True) 77 + def gh_notifications(only_unread: bool = True) -> list[dict]: 78 + """Fetch GitHub notifications and store them as Prefect artifacts.""" 79 + logger = get_run_logger() 80 + 81 + token = load_token() 82 + notifications = fetch_notifications(token, only_unread=only_unread) 83 + 84 + if not notifications: 85 + logger.info("no notifications") 86 + return [] 87 + 88 + items = [fetch_issue_or_pr(token, n) for n in notifications] 89 + items = [i for i in items if i] 90 + logger.info(f"resolved {len(items)} issues/PRs") 91 + 92 + if not items: 93 + return [] 94 + 95 + # table artifact — one row per notification 96 + now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") 97 + create_table_artifact( 98 + key="github-notifications", 99 + table=items, 100 + description=f"## GitHub notifications\n\nFetched {len(items)} items at {now}", 101 + ) 102 + 103 + # markdown artifact — readable summary 104 + lines = [f"## GitHub notifications — {now}\n"] 105 + for item in items: 106 + icon = "🔀" if item["type"] == "PullRequest" else "🐛" 107 + lines.append( 108 + f"- {icon} **[{item['repo']}#{item['number']}]({item['url']})** " 109 + f"`{item['state']}` — {item['title']} \n" 110 + f" _reason: {item['reason']}_" 111 + ) 112 + create_markdown_artifact( 113 + key="github-notifications-summary", 114 + markdown="\n".join(lines), 115 + ) 116 + 117 + return items 118 + 119 + 120 + if __name__ == "__main__": 121 + gh_notifications()
+9
prefect.yaml
··· 15 15 schedules: 16 16 - cron: "*/5 * * * *" 17 17 18 + - name: gh-notifications 19 + entrypoint: flows/gh_notifications.py:gh_notifications 20 + work_pool: 21 + name: kubernetes-pool 22 + schedules: 23 + - cron: "0 * * * *" # hourly 24 + parameters: 25 + only_unread: true 26 + 18 27 - name: cleanup 19 28 entrypoint: flows/cleanup.py:cleanup 20 29 work_pool: