Webhooks for the AT Protocol airglow.run
atproto atprotocol automation webhook
12
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: jetstream consumer

Hugo d65ec701 c1b45fda

+474
+3
app/routes/api/subscriptions/[rkey].ts
··· 4 4 import { subscriptions, deliveryLogs } from "@/db/schema.js"; 5 5 import { getRecord, putRecord, deleteRecord } from "@/subscriptions/pds.js"; 6 6 import { verifyCallback } from "@/subscriptions/verify.js"; 7 + import { notifySubscriptionChange } from "@/jetstream/consumer.js"; 7 8 8 9 function findSubscription(did: string, rkey: string) { 9 10 return db.query.subscriptions.findFirst({ ··· 109 110 .set({ callbackUrl, conditions, active, indexedAt: now }) 110 111 .where(eq(subscriptions.uri, sub.uri)); 111 112 113 + notifySubscriptionChange(); 112 114 return c.json({ ok: true }); 113 115 }); 114 116 ··· 130 132 // Remove from local index (cascade deletes delivery logs) 131 133 await db.delete(subscriptions).where(eq(subscriptions.uri, sub.uri)); 132 134 135 + notifySubscriptionChange(); 133 136 return c.json({ ok: true }); 134 137 });
+2
app/routes/api/subscriptions/index.ts
··· 7 7 import { isValidNsid, isNsidAllowed } from "@/lexicons/resolver.js"; 8 8 import { verifyCallback } from "@/subscriptions/verify.js"; 9 9 import { createRecord, deleteRecord } from "@/subscriptions/pds.js"; 10 + import { notifySubscriptionChange } from "@/jetstream/consumer.js"; 10 11 11 12 export const GET = createRoute(async (c) => { 12 13 const user = c.get("user"); ··· 112 113 return c.json({ error: "Failed to save subscription" }, 500); 113 114 } 114 115 116 + notifySubscriptionChange(); 115 117 return c.json({ uri, rkey, secret }, 201); 116 118 });
+9
app/server.ts
··· 1 1 import { createApp } from "honox/server"; 2 2 import { getOAuthClient } from "@/auth/client.js"; 3 + import { startJetstream } from "@/jetstream/consumer.js"; 4 + import { dispatch } from "@/webhooks/dispatcher.js"; 3 5 4 6 const app = createApp(); 7 + 8 + // Start Jetstream consumer — delivers matched events to callback URLs 9 + startJetstream((match) => { 10 + dispatch(match).catch((err) => { 11 + console.error("Webhook delivery error:", err); 12 + }); 13 + }); 5 14 6 15 // OAuth discovery endpoints (production — loopback clients don't need these) 7 16 app.get("/oauth/client-metadata.json", async (c) => {
+206
lib/jetstream/consumer.ts
··· 1 + import { eq } from "drizzle-orm"; 2 + import { db } from "../db/index.js"; 3 + import { subscriptions } from "../db/schema.js"; 4 + import { config } from "../config.js"; 5 + import { matchConditions, type JetstreamEvent } from "./matcher.js"; 6 + import { readFileSync, writeFileSync, mkdirSync } from "node:fs"; 7 + import { dirname, join } from "node:path"; 8 + 9 + type Subscription = typeof subscriptions.$inferSelect; 10 + 11 + export type MatchedEvent = { 12 + subscription: Subscription; 13 + event: JetstreamEvent; 14 + }; 15 + 16 + type EventHandler = (match: MatchedEvent) => void; 17 + 18 + const CURSOR_FLUSH_INTERVAL = 5_000; // persist cursor every 5s 19 + 20 + const cursorPath = join(dirname(config.databasePath), "jetstream-cursor"); 21 + 22 + export class JetstreamConsumer { 23 + private ws: WebSocket | null = null; 24 + private subsByCollection = new Map<string, Subscription[]>(); 25 + private cursor: number | null = null; 26 + private cursorDirty = false; 27 + private cursorTimer: ReturnType<typeof setInterval> | null = null; 28 + private reconnectTimer: ReturnType<typeof setTimeout> | null = null; 29 + private reconnectDelay = 1000; 30 + private running = false; 31 + private handler: EventHandler; 32 + 33 + constructor(handler: EventHandler) { 34 + this.handler = handler; 35 + } 36 + 37 + async start() { 38 + this.running = true; 39 + this.cursor = this.loadCursor(); 40 + await this.refreshSubscriptions(); 41 + this.cursorTimer = setInterval(() => this.flushCursor(), CURSOR_FLUSH_INTERVAL); 42 + } 43 + 44 + stop() { 45 + this.running = false; 46 + if (this.cursorTimer) clearInterval(this.cursorTimer); 47 + if (this.reconnectTimer) clearTimeout(this.reconnectTimer); 48 + this.flushCursor(); 49 + this.ws?.close(); 50 + this.ws = null; 51 + } 52 + 53 + async refreshSubscriptions() { 54 + const rows = await db.query.subscriptions.findMany({ 55 + where: eq(subscriptions.active, true), 56 + }); 57 + 58 + const byCollection = new Map<string, Subscription[]>(); 59 + for (const row of rows) { 60 + const list = byCollection.get(row.lexicon) || []; 61 + list.push(row); 62 + byCollection.set(row.lexicon, list); 63 + } 64 + 65 + const oldCollections = new Set(this.subsByCollection.keys()); 66 + const newCollections = new Set(byCollection.keys()); 67 + this.subsByCollection = byCollection; 68 + 69 + const collectionsChanged = 70 + oldCollections.size !== newCollections.size || 71 + [...newCollections].some((c) => !oldCollections.has(c)) || 72 + [...oldCollections].some((c) => !newCollections.has(c)); 73 + 74 + if (!collectionsChanged) return; 75 + 76 + const hadCollections = oldCollections.size > 0; 77 + const hasCollections = newCollections.size > 0; 78 + 79 + if (!hadCollections && hasCollections) { 80 + this.connect(); 81 + } else if (hadCollections && !hasCollections) { 82 + this.ws?.close(); 83 + } else if (this.ws && this.ws.readyState <= WebSocket.OPEN) { 84 + this.ws.close(); // onclose reconnects with updated params 85 + } 86 + } 87 + 88 + private connect() { 89 + if (!this.running) return; 90 + if (this.reconnectTimer) { 91 + clearTimeout(this.reconnectTimer); 92 + this.reconnectTimer = null; 93 + } 94 + 95 + const collections = [...this.subsByCollection.keys()]; 96 + if (collections.length === 0) { 97 + console.log("Jetstream: no active subscriptions, waiting"); 98 + return; 99 + } 100 + 101 + const url = new URL(config.jetstreamUrl); 102 + for (const col of collections) { 103 + url.searchParams.append("wantedCollections", col); 104 + } 105 + if (this.cursor != null) { 106 + url.searchParams.set("cursor", String(this.cursor)); 107 + } 108 + 109 + console.log( 110 + `Jetstream: connecting (${collections.length} collection${collections.length === 1 ? "" : "s"})`, 111 + ); 112 + this.ws = new WebSocket(url.toString()); 113 + 114 + this.ws.addEventListener("open", () => { 115 + console.log("Jetstream: connected"); 116 + this.reconnectDelay = 1000; 117 + }); 118 + 119 + this.ws.addEventListener("message", (msg) => { 120 + try { 121 + const event = JSON.parse(msg.data as string) as JetstreamEvent; 122 + this.cursor = event.time_us; 123 + this.cursorDirty = true; 124 + 125 + if (event.kind === "commit" && event.commit) { 126 + this.processEvent(event); 127 + } 128 + } catch (err) { 129 + console.error("Jetstream: failed to process message:", err); 130 + } 131 + }); 132 + 133 + this.ws.addEventListener("close", () => { 134 + this.ws = null; 135 + if (this.running && this.subsByCollection.size > 0) { 136 + console.log(`Jetstream: reconnecting in ${this.reconnectDelay}ms`); 137 + this.reconnectTimer = setTimeout(() => { 138 + this.reconnectTimer = null; 139 + this.connect(); 140 + }, this.reconnectDelay); 141 + this.reconnectDelay = Math.min(this.reconnectDelay * 2, 60_000); 142 + } 143 + }); 144 + 145 + this.ws.addEventListener("error", (err) => { 146 + console.error("Jetstream: WebSocket error:", err); 147 + }); 148 + } 149 + 150 + private processEvent(event: JetstreamEvent) { 151 + const collection = event.commit!.collection; 152 + const subs = this.subsByCollection.get(collection); 153 + if (!subs) return; 154 + 155 + for (const sub of subs) { 156 + if (matchConditions(event, sub.conditions)) { 157 + this.handler({ subscription: sub, event }); 158 + } 159 + } 160 + } 161 + 162 + private loadCursor(): number | null { 163 + try { 164 + const data = readFileSync(cursorPath, "utf-8").trim(); 165 + const cursor = Number(data); 166 + return Number.isFinite(cursor) ? cursor : null; 167 + } catch { 168 + return null; 169 + } 170 + } 171 + 172 + private flushCursor() { 173 + if (!this.cursorDirty || this.cursor == null) return; 174 + try { 175 + mkdirSync(dirname(cursorPath), { recursive: true }); 176 + writeFileSync(cursorPath, String(this.cursor)); 177 + this.cursorDirty = false; 178 + } catch (err) { 179 + console.error("Jetstream: failed to persist cursor:", err); 180 + } 181 + } 182 + } 183 + 184 + // — Singleton management — 185 + 186 + let consumer: JetstreamConsumer | null = null; 187 + 188 + export function startJetstream(handler: EventHandler): JetstreamConsumer { 189 + if (consumer) return consumer; 190 + consumer = new JetstreamConsumer(handler); 191 + consumer.start().catch((err) => { 192 + console.error("Jetstream: failed to start:", err); 193 + }); 194 + return consumer; 195 + } 196 + 197 + export function getJetstream(): JetstreamConsumer | null { 198 + return consumer; 199 + } 200 + 201 + /** Call after creating/updating/deleting subscriptions. */ 202 + export function notifySubscriptionChange() { 203 + consumer?.refreshSubscriptions().catch((err) => { 204 + console.error("Jetstream: failed to refresh subscriptions:", err); 205 + }); 206 + }
+73
lib/jetstream/matcher.ts
··· 1 + export type JetstreamCommit = { 2 + rev: string; 3 + operation: "create" | "update" | "delete"; 4 + collection: string; 5 + rkey: string; 6 + record?: Record<string, unknown>; 7 + cid?: string; 8 + }; 9 + 10 + export type JetstreamEvent = { 11 + did: string; 12 + time_us: number; 13 + kind: "commit" | "identity" | "account"; 14 + commit?: JetstreamCommit; 15 + }; 16 + 17 + type Condition = { 18 + field: string; 19 + operator: string; 20 + value: string; 21 + }; 22 + 23 + /** 24 + * Resolve a dotted field path against an event. 25 + * - "repo" → event.did 26 + * - "record.foo.bar" → event.commit.record.foo.bar 27 + */ 28 + function resolveField( 29 + event: JetstreamEvent, 30 + field: string, 31 + ): string | undefined { 32 + if (field === "repo") return event.did; 33 + 34 + if (field.startsWith("record.") && event.commit?.record) { 35 + const path = field.slice("record.".length); 36 + let value: unknown = event.commit.record; 37 + for (const key of path.split(".")) { 38 + if (value == null || typeof value !== "object") return undefined; 39 + value = (value as Record<string, unknown>)[key]; 40 + } 41 + if (value == null) return undefined; 42 + return typeof value === "string" ? value : String(value); 43 + } 44 + 45 + return undefined; 46 + } 47 + 48 + function evaluateCondition( 49 + event: JetstreamEvent, 50 + condition: Condition, 51 + ): boolean { 52 + const actual = resolveField(event, condition.field); 53 + if (actual === undefined) return false; 54 + 55 + switch (condition.operator) { 56 + case "eq": 57 + return actual === condition.value; 58 + default: 59 + return false; 60 + } 61 + } 62 + 63 + /** 64 + * Check if all conditions match the event. 65 + * Empty conditions = match all events for that collection. 66 + */ 67 + export function matchConditions( 68 + event: JetstreamEvent, 69 + conditions: Condition[], 70 + ): boolean { 71 + if (conditions.length === 0) return true; 72 + return conditions.every((cond) => evaluateCondition(event, cond)); 73 + }
+175
lib/webhooks/dispatcher.ts
··· 1 + import { db } from "../db/index.js"; 2 + import { deliveryLogs } from "../db/schema.js"; 3 + import { sign } from "./signer.js"; 4 + import type { MatchedEvent } from "../jetstream/consumer.js"; 5 + 6 + const RETRY_DELAYS = [5_000, 30_000]; // 1st retry after 5s, 2nd after 30s 7 + 8 + type WebhookPayload = { 9 + subscription: string; 10 + lexicon: string; 11 + conditions: Array<{ field: string; operator: string; value: string }>; 12 + event: { 13 + did: string; 14 + time_us: number; 15 + kind: string; 16 + commit?: { 17 + operation: string; 18 + collection: string; 19 + rkey: string; 20 + record?: Record<string, unknown>; 21 + cid?: string; 22 + }; 23 + }; 24 + }; 25 + 26 + function buildPayload(match: MatchedEvent): WebhookPayload { 27 + const { subscription, event } = match; 28 + return { 29 + subscription: subscription.uri, 30 + lexicon: subscription.lexicon, 31 + conditions: subscription.conditions, 32 + event: { 33 + did: event.did, 34 + time_us: event.time_us, 35 + kind: event.kind, 36 + commit: event.commit 37 + ? { 38 + operation: event.commit.operation, 39 + collection: event.commit.collection, 40 + rkey: event.commit.rkey, 41 + record: event.commit.record, 42 + cid: event.commit.cid, 43 + } 44 + : undefined, 45 + }, 46 + }; 47 + } 48 + 49 + async function deliver( 50 + callbackUrl: string, 51 + body: string, 52 + secret: string, 53 + subscriptionUri: string, 54 + ): Promise<{ statusCode: number; error?: string }> { 55 + const timestamp = Date.now(); 56 + const signature = sign(body, secret); 57 + 58 + try { 59 + const res = await fetch(callbackUrl, { 60 + method: "POST", 61 + headers: { 62 + "Content-Type": "application/json", 63 + "X-Airglow-Signature": `sha256=${signature}`, 64 + "X-Airglow-Subscription": subscriptionUri, 65 + "X-Airglow-Timestamp": String(timestamp), 66 + }, 67 + body, 68 + signal: AbortSignal.timeout(10_000), 69 + }); 70 + return { statusCode: res.status }; 71 + } catch (err) { 72 + return { statusCode: 0, error: String(err) }; 73 + } 74 + } 75 + 76 + async function logDelivery( 77 + subscriptionUri: string, 78 + eventTimeUs: number, 79 + payload: string | null, 80 + statusCode: number, 81 + error: string | null, 82 + attempt: number, 83 + ) { 84 + await db.insert(deliveryLogs).values({ 85 + subscriptionUri, 86 + eventTimeUs, 87 + payload, 88 + statusCode, 89 + error, 90 + attempt, 91 + createdAt: new Date(), 92 + }); 93 + } 94 + 95 + function isSuccess(code: number): boolean { 96 + return code >= 200 && code < 300; 97 + } 98 + 99 + function isRetryable(code: number): boolean { 100 + return code >= 500 || code === 0; 101 + } 102 + 103 + function scheduleRetry( 104 + subscriptionUri: string, 105 + callbackUrl: string, 106 + secret: string, 107 + eventTimeUs: number, 108 + body: string, 109 + retryIndex: number, 110 + ) { 111 + if (retryIndex >= RETRY_DELAYS.length) return; 112 + 113 + setTimeout(async () => { 114 + try { 115 + const result = await deliver(callbackUrl, body, secret, subscriptionUri); 116 + 117 + await logDelivery( 118 + subscriptionUri, 119 + eventTimeUs, 120 + isSuccess(result.statusCode) ? null : body, 121 + result.statusCode, 122 + result.error ?? null, 123 + retryIndex + 2, // attempt 2 or 3 124 + ); 125 + 126 + if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 127 + scheduleRetry( 128 + subscriptionUri, 129 + callbackUrl, 130 + secret, 131 + eventTimeUs, 132 + body, 133 + retryIndex + 1, 134 + ); 135 + } 136 + } catch (err) { 137 + console.error("Webhook retry error:", err); 138 + } 139 + }, RETRY_DELAYS[retryIndex]); 140 + } 141 + 142 + /** Deliver a matched event to the subscription's callback URL. */ 143 + export async function dispatch(match: MatchedEvent) { 144 + const { subscription, event } = match; 145 + const payload = buildPayload(match); 146 + const body = JSON.stringify(payload); 147 + 148 + const result = await deliver( 149 + subscription.callbackUrl, 150 + body, 151 + subscription.secret, 152 + subscription.uri, 153 + ); 154 + 155 + await logDelivery( 156 + subscription.uri, 157 + event.time_us, 158 + isSuccess(result.statusCode) ? null : body, 159 + result.statusCode, 160 + result.error ?? null, 161 + 1, 162 + ); 163 + 164 + // Schedule retries for server errors / network failures 165 + if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 166 + scheduleRetry( 167 + subscription.uri, 168 + subscription.callbackUrl, 169 + subscription.secret, 170 + event.time_us, 171 + body, 172 + 0, 173 + ); 174 + } 175 + }
+6
lib/webhooks/signer.ts
··· 1 + import { createHmac } from "node:crypto"; 2 + 3 + /** Compute HMAC-SHA256 hex digest of `body` using `secret`. */ 4 + export function sign(body: string, secret: string): string { 5 + return createHmac("sha256", secret).update(body).digest("hex"); 6 + }