Webhooks for the AT Protocol airglow.run
atproto atprotocol automation webhook
12
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: publication can now trigger many actions

Hugo bd736561 fa861233

+692 -863
+3
app/islands/DeliveryLog.tsx
··· 3 3 4 4 type LogEntry = { 5 5 id: number; 6 + actionIndex: number; 6 7 eventTimeUs: number; 7 8 statusCode: number | null; 8 9 error: string | null; ··· 105 106 <thead> 106 107 <tr> 107 108 <th class={s.th}>Time</th> 109 + <th class={s.th}>Action</th> 108 110 <th class={s.th}>Status</th> 109 111 <th class={s.th}>Attempt</th> 110 112 <th class={s.th}>Error</th> ··· 114 116 {logs.map((log) => ( 115 117 <tr key={log.id}> 116 118 <td class={s.td}>{new Date(log.createdAt).toLocaleString()}</td> 119 + <td class={s.td}>{log.actionIndex + 1}</td> 117 120 <td class={s.td}>{log.statusCode ?? "\u2014"}</td> 118 121 <td class={s.td}>{log.attempt}</td> 119 122 <td class={s.td}>{log.error || "\u2014"}</td>
+34
app/islands/SubscriptionForm.css.ts
··· 205 205 fontFamily: "monospace", 206 206 lineHeight: 1.6, 207 207 }); 208 + 209 + export const actionsSection = style({ 210 + display: "flex", 211 + flexDirection: "column", 212 + gap: space[3], 213 + }); 214 + 215 + export const actionCard = style({ 216 + display: "flex", 217 + flexDirection: "column", 218 + gap: space[3], 219 + paddingBlock: space[4], 220 + paddingInline: space[4], 221 + borderRadius: radii.md, 222 + border: `1px solid ${vars.color.borderSubtle}`, 223 + backgroundColor: vars.color.bg, 224 + }); 225 + 226 + export const actionHeader = style({ 227 + display: "flex", 228 + justifyContent: "space-between", 229 + alignItems: "center", 230 + }); 231 + 232 + export const actionTitle = style({ 233 + fontSize: fontSize.sm, 234 + fontWeight: fontWeight.medium, 235 + color: vars.color.text, 236 + }); 237 + 238 + export const addActionsRow = style({ 239 + display: "flex", 240 + gap: space[2], 241 + });
+253 -156
app/islands/SubscriptionForm.tsx
··· 14 14 value: string; 15 15 }; 16 16 17 - type SubType = "webhook" | "record"; 17 + type WebhookDraft = { type: "webhook"; callbackUrl: string }; 18 + type RecordDraft = { type: "record"; targetCollection: string; recordTemplate: string }; 19 + type ActionDraft = WebhookDraft | RecordDraft; 18 20 19 21 const NSID_RE = /^[a-z][a-z0-9-]*(\.[a-z][a-z0-9-]*){2,}$/; 20 22 ··· 32 34 return new URLSearchParams(window.location.search).get(key) ?? ""; 33 35 } 34 36 35 - export default function SubscriptionForm({ type }: { type: SubType }) { 37 + // --------------------------------------------------------------------------- 38 + // Webhook action editor 39 + // --------------------------------------------------------------------------- 40 + 41 + function WebhookActionEditor({ 42 + action, 43 + onChange, 44 + }: { 45 + action: WebhookDraft; 46 + onChange: (a: WebhookDraft) => void; 47 + }) { 48 + return ( 49 + <div class={s.fieldGroup}> 50 + <label class={s.label}>Callback URL</label> 51 + <input 52 + class={s.input} 53 + type="url" 54 + placeholder="https://example.com/hooks/events" 55 + value={action.callbackUrl} 56 + onInput={(e: Event) => 57 + onChange({ ...action, callbackUrl: (e.target as HTMLInputElement).value }) 58 + } 59 + required 60 + /> 61 + </div> 62 + ); 63 + } 64 + 65 + // --------------------------------------------------------------------------- 66 + // Record action editor 67 + // --------------------------------------------------------------------------- 68 + 69 + function RecordActionEditor({ 70 + action, 71 + onChange, 72 + placeholders, 73 + }: { 74 + action: RecordDraft; 75 + onChange: (a: RecordDraft) => void; 76 + placeholders: string[]; 77 + }) { 78 + const [targetSchema, setTargetSchema] = useState<RecordSchema | null>(null); 79 + const [targetSchemaLoading, setTargetSchemaLoading] = useState(false); 80 + const [targetSchemaError, setTargetSchemaError] = useState(""); 81 + const [nsidSuggestions, setNsidSuggestions] = useState<string[]>([]); 82 + const debounceRef = useRef<ReturnType<typeof setTimeout> | null>(null); 83 + const suggestDebounceRef = useRef<ReturnType<typeof setTimeout> | null>(null); 84 + const lastSuggestPrefix = useRef(""); 85 + const [datalistId] = useState(() => `nsid-target-${Math.random().toString(36).slice(2, 8)}`); 86 + const initialFetched = useRef(false); 87 + 88 + const fetchTargetSchema = useCallback((nsid: string) => { 89 + if (debounceRef.current) clearTimeout(debounceRef.current); 90 + if (!nsid) { 91 + setTargetSchema(null); 92 + setTargetSchemaError(""); 93 + return; 94 + } 95 + if (!NSID_RE.test(nsid)) { 96 + setTargetSchema(null); 97 + setTargetSchemaError(""); 98 + return; 99 + } 100 + debounceRef.current = setTimeout(async () => { 101 + setTargetSchemaLoading(true); 102 + setTargetSchemaError(""); 103 + try { 104 + const res = await fetch(`/api/lexicons/${encodeURIComponent(nsid)}?schema=record`); 105 + const data = await res.json(); 106 + if (!res.ok) { 107 + setTargetSchemaError(data.error || "Failed to load schema"); 108 + setTargetSchema(null); 109 + } else { 110 + setTargetSchema(data.record ?? null); 111 + if (!data.record) { 112 + setTargetSchemaError("No record schema found for this collection"); 113 + } 114 + } 115 + } catch { 116 + setTargetSchemaError("Failed to fetch target collection schema"); 117 + setTargetSchema(null); 118 + } finally { 119 + setTargetSchemaLoading(false); 120 + } 121 + }, 400); 122 + }, []); 123 + 124 + const fetchSuggestions = useCallback((value: string) => { 125 + if (suggestDebounceRef.current) clearTimeout(suggestDebounceRef.current); 126 + const dotIndex = value.lastIndexOf("."); 127 + const prefix = dotIndex > 0 ? value.slice(0, dotIndex + 1) : ""; 128 + if (!prefix || prefix.split(".").filter(Boolean).length < 2) return; 129 + if (prefix === lastSuggestPrefix.current) return; 130 + suggestDebounceRef.current = setTimeout(async () => { 131 + lastSuggestPrefix.current = prefix; 132 + try { 133 + const res = await fetch(`/api/lexicons/suggest?prefix=${encodeURIComponent(prefix)}`); 134 + if (res.ok) { 135 + const data = await res.json(); 136 + setNsidSuggestions(data.suggestions ?? []); 137 + } 138 + } catch { 139 + // ignore 140 + } 141 + }, 300); 142 + }, []); 143 + 144 + // Fetch schema for pre-filled targetCollection 145 + if (!initialFetched.current && action.targetCollection) { 146 + initialFetched.current = true; 147 + fetchTargetSchema(action.targetCollection); 148 + } 149 + 150 + return ( 151 + <> 152 + <div class={s.fieldGroup}> 153 + <label class={s.label}>Target Collection</label> 154 + <input 155 + class={s.input} 156 + type="text" 157 + list={datalistId} 158 + placeholder="e.g. app.bsky.feed.like" 159 + value={action.targetCollection} 160 + onInput={(e: Event) => { 161 + const val = (e.target as HTMLInputElement).value; 162 + onChange({ ...action, targetCollection: val }); 163 + fetchTargetSchema(val); 164 + fetchSuggestions(val); 165 + }} 166 + required 167 + /> 168 + <span class={s.hint}>NSID of the collection to create a record in</span> 169 + <datalist id={datalistId}> 170 + {nsidSuggestions.map((nsid) => ( 171 + <option key={nsid} value={nsid} /> 172 + ))} 173 + </datalist> 174 + </div> 175 + 176 + {(targetSchema || targetSchemaLoading) && ( 177 + <RecordFormBuilder 178 + schema={targetSchema} 179 + loading={targetSchemaLoading} 180 + error={targetSchemaError} 181 + placeholders={placeholders} 182 + onChange={(tpl) => onChange({ ...action, recordTemplate: tpl })} 183 + /> 184 + )} 185 + 186 + {!targetSchema && !targetSchemaLoading && action.targetCollection && ( 187 + <div class={s.fieldGroup}> 188 + <label class={s.label}>Record Template</label> 189 + {targetSchemaError && <span class={s.errorText}>{targetSchemaError}</span>} 190 + <textarea 191 + class={s.textarea} 192 + placeholder={ 193 + '{\n "subject": {\n "uri": "at://{{event.did}}/{{event.commit.collection}}/{{event.commit.rkey}}",\n "cid": "{{event.commit.cid}}"\n },\n "createdAt": "{{now}}"\n}' 194 + } 195 + value={action.recordTemplate} 196 + onInput={(e: Event) => 197 + onChange({ 198 + ...action, 199 + recordTemplate: (e.target as HTMLTextAreaElement).value, 200 + }) 201 + } 202 + required 203 + /> 204 + {placeholders.length > 0 && ( 205 + <div class={s.placeholderHelp}> 206 + Available: {placeholders.map((p) => `{{${p}}}`).join(", ")} 207 + </div> 208 + )} 209 + </div> 210 + )} 211 + </> 212 + ); 213 + } 214 + 215 + // --------------------------------------------------------------------------- 216 + // Main form 217 + // --------------------------------------------------------------------------- 218 + 219 + export default function SubscriptionForm() { 36 220 const initial = getInitialParam("lexicon"); 37 - const initialTarget = getInitialParam("targetCollection"); 38 221 const [lexicon, setLexicon] = useState(initial); 39 222 const [fields, setFields] = useState<Field[]>([]); 40 223 const [fieldsLoading, setFieldsLoading] = useState(false); 41 224 const [fieldsError, setFieldsError] = useState(""); 42 225 const [conditions, setConditions] = useState<Condition[]>([]); 43 - const [callbackUrl, setCallbackUrl] = useState(""); 44 - const [targetCollection, setTargetCollection] = useState(initialTarget); 45 - const [recordTemplate, setRecordTemplate] = useState(""); 46 - const [targetSchema, setTargetSchema] = useState<RecordSchema | null>(null); 47 - const [targetSchemaLoading, setTargetSchemaLoading] = useState(false); 48 - const [targetSchemaError, setTargetSchemaError] = useState(""); 226 + const [actions, setActions] = useState<ActionDraft[]>([]); 49 227 const [nsidSuggestions, setNsidSuggestions] = useState<string[]>([]); 50 228 const [submitting, setSubmitting] = useState(false); 51 229 const [error, setError] = useState(""); 52 230 const debounceRef = useRef<ReturnType<typeof setTimeout> | null>(null); 53 - const targetDebounceRef = useRef<ReturnType<typeof setTimeout> | null>(null); 54 231 const suggestDebounceRef = useRef<ReturnType<typeof setTimeout> | null>(null); 55 232 const lastSuggestPrefix = useRef(""); 56 233 const initialFetched = useRef(false); ··· 99 276 }, 400); 100 277 }, []); 101 278 102 - const fetchTargetSchema = useCallback((nsid: string, updateUrl = true) => { 103 - if (targetDebounceRef.current) clearTimeout(targetDebounceRef.current); 104 - if (!nsid) { 105 - setTargetSchema(null); 106 - setTargetSchemaError(""); 107 - setRecordTemplate(""); 108 - if (updateUrl) { 109 - const url = new URL(window.location.href); 110 - url.searchParams.delete("targetCollection"); 111 - history.replaceState(null, "", url); 112 - } 113 - return; 114 - } 115 - if (updateUrl) { 116 - const url = new URL(window.location.href); 117 - url.searchParams.set("targetCollection", nsid); 118 - history.replaceState(null, "", url); 119 - } 120 - if (!NSID_RE.test(nsid)) { 121 - setTargetSchema(null); 122 - setTargetSchemaError(""); 123 - setRecordTemplate(""); 124 - return; 125 - } 126 - targetDebounceRef.current = setTimeout(async () => { 127 - setTargetSchemaLoading(true); 128 - setTargetSchemaError(""); 129 - try { 130 - const res = await fetch(`/api/lexicons/${encodeURIComponent(nsid)}?schema=record`); 131 - const data = await res.json(); 132 - if (!res.ok) { 133 - setTargetSchemaError(data.error || "Failed to load schema"); 134 - setTargetSchema(null); 135 - } else { 136 - setTargetSchema(data.record ?? null); 137 - if (!data.record) { 138 - setTargetSchemaError("No record schema found for this collection"); 139 - } 140 - } 141 - } catch { 142 - setTargetSchemaError("Failed to fetch target collection schema"); 143 - setTargetSchema(null); 144 - } finally { 145 - setTargetSchemaLoading(false); 146 - } 147 - }, 400); 148 - }, []); 149 - 150 279 const fetchSuggestions = useCallback((value: string) => { 151 280 if (suggestDebounceRef.current) clearTimeout(suggestDebounceRef.current); 152 - 153 - // Extract prefix: keep everything up to and including the last dot 154 281 const dotIndex = value.lastIndexOf("."); 155 282 const prefix = dotIndex > 0 ? value.slice(0, dotIndex + 1) : ""; 156 - 157 - // Need at least 2 segments (e.g., "app.bsky.") 158 - if (!prefix || prefix.split(".").filter(Boolean).length < 2) { 159 - return; 160 - } 161 - 162 - // Don't re-fetch for the same prefix 283 + if (!prefix || prefix.split(".").filter(Boolean).length < 2) return; 163 284 if (prefix === lastSuggestPrefix.current) return; 164 - 165 285 suggestDebounceRef.current = setTimeout(async () => { 166 286 lastSuggestPrefix.current = prefix; 167 287 try { ··· 176 296 }, 300); 177 297 }, []); 178 298 179 - if (!initialFetched.current && (initial || initialTarget)) { 299 + if (!initialFetched.current && initial) { 180 300 initialFetched.current = true; 181 - if (initial) fetchFields(initial, false); 182 - if (initialTarget) fetchTargetSchema(initialTarget, false); 301 + fetchFields(initial, false); 183 302 } 184 303 185 304 const addCondition = useCallback(() => { ··· 194 313 setConditions((prev) => prev.map((c, i) => (i === index ? { ...c, [key]: val } : c))); 195 314 }, []); 196 315 316 + const addAction = useCallback((type: "webhook" | "record") => { 317 + if (type === "webhook") { 318 + setActions((prev) => [...prev, { type: "webhook", callbackUrl: "" }]); 319 + } else { 320 + setActions((prev) => [...prev, { type: "record", targetCollection: "", recordTemplate: "" }]); 321 + } 322 + }, []); 323 + 324 + const removeAction = useCallback((index: number) => { 325 + setActions((prev) => prev.filter((_, i) => i !== index)); 326 + }, []); 327 + 328 + const updateAction = useCallback((index: number, action: ActionDraft) => { 329 + setActions((prev) => prev.map((a, i) => (i === index ? action : a))); 330 + }, []); 331 + 197 332 const handleSubmit = useCallback( 198 333 async (e: Event) => { 199 334 e.preventDefault(); 200 335 setError(""); 201 336 setSubmitting(true); 202 337 try { 203 - const payload: Record<string, unknown> = { 204 - type, 338 + const payload = { 205 339 lexicon, 206 340 conditions: conditions 207 341 .filter((c) => c.field && c.value) 208 342 .map((c) => ({ field: c.field, operator: "eq", value: c.value })), 343 + actions: actions.map((a) => { 344 + if (a.type === "webhook") { 345 + return { type: "webhook" as const, callbackUrl: a.callbackUrl }; 346 + } 347 + return { 348 + type: "record" as const, 349 + targetCollection: a.targetCollection, 350 + recordTemplate: a.recordTemplate, 351 + }; 352 + }), 209 353 }; 210 - 211 - if (type === "webhook") { 212 - payload.callbackUrl = callbackUrl; 213 - } else { 214 - payload.targetCollection = targetCollection; 215 - payload.recordTemplate = recordTemplate; 216 - } 217 354 218 355 const res = await fetch("/api/subscriptions", { 219 356 method: "POST", ··· 232 369 setSubmitting(false); 233 370 } 234 371 }, 235 - [type, lexicon, callbackUrl, targetCollection, recordTemplate, conditions], 372 + [lexicon, conditions, actions], 236 373 ); 237 374 238 375 const allPlaceholders = [ ··· 334 471 </div> 335 472 )} 336 473 337 - {type === "webhook" && ( 338 - <div class={s.fieldGroup}> 339 - <label class={s.label} for="callbackUrl"> 340 - Callback URL 341 - </label> 342 - <input 343 - id="callbackUrl" 344 - class={s.input} 345 - type="url" 346 - placeholder="https://example.com/hooks/events" 347 - value={callbackUrl} 348 - onInput={(e: Event) => setCallbackUrl((e.target as HTMLInputElement).value)} 349 - required 350 - /> 474 + <div class={s.actionsSection}> 475 + <div> 476 + <h3>Actions</h3> 477 + <p class={s.hint}>Add one or more actions to execute when a matching event occurs.</p> 351 478 </div> 352 - )} 353 479 354 - {type === "record" && ( 355 - <> 356 - <div class={s.fieldGroup}> 357 - <label class={s.label} for="targetCollection"> 358 - Target Collection 359 - </label> 360 - <input 361 - id="targetCollection" 362 - class={s.input} 363 - type="text" 364 - list="nsid-suggestions" 365 - placeholder="e.g. app.bsky.feed.like" 366 - value={targetCollection} 367 - onInput={(e: Event) => { 368 - const val = (e.target as HTMLInputElement).value; 369 - setTargetCollection(val); 370 - fetchTargetSchema(val); 371 - fetchSuggestions(val); 372 - }} 373 - required 374 - /> 375 - <span class={s.hint}>NSID of the collection to create a record in</span> 480 + {actions.map((action, i) => ( 481 + <div key={i} class={s.actionCard}> 482 + <div class={s.actionHeader}> 483 + <span class={s.actionTitle}> 484 + {action.type === "webhook" ? "Webhook" : "Record"}{" "} 485 + {actions.filter((a, j) => a.type === action.type && j <= i).length} 486 + </span> 487 + <button type="button" class={s.removeBtn} onClick={() => removeAction(i)}> 488 + Remove 489 + </button> 490 + </div> 491 + {action.type === "webhook" ? ( 492 + <WebhookActionEditor action={action} onChange={(a) => updateAction(i, a)} /> 493 + ) : ( 494 + <RecordActionEditor 495 + action={action} 496 + onChange={(a) => updateAction(i, a)} 497 + placeholders={allPlaceholders} 498 + /> 499 + )} 376 500 </div> 501 + ))} 377 502 378 - {(targetSchema || targetSchemaLoading) && ( 379 - <RecordFormBuilder 380 - schema={targetSchema} 381 - loading={targetSchemaLoading} 382 - error={targetSchemaError} 383 - placeholders={allPlaceholders} 384 - onChange={setRecordTemplate} 385 - /> 386 - )} 387 - 388 - {!targetSchema && !targetSchemaLoading && targetCollection && ( 389 - <div class={s.fieldGroup}> 390 - <label class={s.label} for="recordTemplate"> 391 - Record Template 392 - </label> 393 - {targetSchemaError && <span class={s.errorText}>{targetSchemaError}</span>} 394 - <textarea 395 - id="recordTemplate" 396 - class={s.textarea} 397 - placeholder={ 398 - '{\n "subject": {\n "uri": "at://{{event.did}}/{{event.commit.collection}}/{{event.commit.rkey}}",\n "cid": "{{event.commit.cid}}"\n },\n "createdAt": "{{now}}"\n}' 399 - } 400 - value={recordTemplate} 401 - onInput={(e: Event) => 402 - setRecordTemplate((e.target as HTMLTextAreaElement).value) 403 - } 404 - required 405 - /> 406 - {allPlaceholders.length > 0 && ( 407 - <div class={s.placeholderHelp}> 408 - Available: {allPlaceholders.map((p) => `{{${p}}}`).join(", ")} 409 - </div> 410 - )} 411 - </div> 412 - )} 413 - </> 414 - )} 503 + <div class={s.addActionsRow}> 504 + <button type="button" class={s.addBtn} onClick={() => addAction("webhook")}> 505 + + Add webhook 506 + </button> 507 + <button type="button" class={s.addBtn} onClick={() => addAction("record")}> 508 + + Add record action 509 + </button> 510 + </div> 511 + </div> 415 512 416 513 {error && <div class={s.alertError}>{error}</div>} 417 514 418 - <button type="submit" class={s.submitBtn}> 515 + <button type="submit" class={s.submitBtn} disabled={actions.length === 0}> 419 516 {submitting ? "Creating..." : "Create subscription"} 420 517 </button> 421 518
+127 -67
app/routes/api/subscriptions/[rkey].ts
··· 1 1 import { createRoute } from "honox/factory"; 2 2 import { eq, and, desc } from "drizzle-orm"; 3 + import { nanoid } from "nanoid"; 3 4 import { db } from "@/db/index.js"; 4 - import { subscriptions, deliveryLogs } from "@/db/schema.js"; 5 + import { 6 + subscriptions, 7 + deliveryLogs, 8 + type Action, 9 + type WebhookAction, 10 + type RecordAction, 11 + } from "@/db/schema.js"; 5 12 import { config } from "@/config.js"; 6 13 import { isValidNsid, isNsidAllowed } from "@/lexicons/resolver.js"; 7 - import { getRecord, putRecord, deleteRecord } from "@/subscriptions/pds.js"; 14 + import { getRecord, putRecord, deleteRecord, type PdsAction } from "@/subscriptions/pds.js"; 8 15 import { verifyCallback } from "@/subscriptions/verify.js"; 9 16 import { validateTemplate } from "@/actions/template.js"; 10 17 import { notifySubscriptionChange } from "@/jetstream/consumer.js"; 11 18 19 + type ActionInput = 20 + | { type: "webhook"; callbackUrl: string } 21 + | { type: "record"; targetCollection: string; recordTemplate: string }; 22 + 12 23 function findSubscription(did: string, rkey: string) { 13 24 return db.query.subscriptions.findFirst({ 14 25 where: and(eq(subscriptions.did, did), eq(subscriptions.rkey, rkey)), ··· 31 42 return c.json({ 32 43 uri: sub.uri, 33 44 rkey: sub.rkey, 34 - type: sub.type, 35 45 lexicon: sub.lexicon, 36 - callbackUrl: sub.callbackUrl, 37 - targetCollection: sub.targetCollection, 38 - recordTemplate: sub.recordTemplate, 46 + actions: sub.actions, 39 47 conditions: sub.conditions, 40 48 active: sub.active, 41 - secret: sub.secret, 42 49 indexedAt: sub.indexedAt.getTime(), 43 50 deliveryLogs: logs.map((l) => ({ 44 51 id: l.id, 52 + actionIndex: l.actionIndex, 45 53 eventTimeUs: l.eventTimeUs, 46 54 statusCode: l.statusCode, 47 55 error: l.error, ··· 59 67 if (!sub) return c.json({ error: "Subscription not found" }, 404); 60 68 61 69 const body = await c.req.json<{ 62 - callbackUrl?: string; 63 - targetCollection?: string; 64 - recordTemplate?: string; 70 + actions?: ActionInput[]; 65 71 conditions?: Array<{ field: string; operator?: string; value: string }>; 66 72 active?: boolean; 67 73 }>(); ··· 80 86 } 81 87 const active = body.active ?? sub.active; 82 88 83 - // Type-specific validation and field resolution 84 - let callbackUrl = sub.callbackUrl; 85 - let targetCollection = sub.targetCollection; 86 - let recordTemplate = sub.recordTemplate; 89 + // Resolve actions — full replacement when provided 90 + let localActions = sub.actions; 91 + let pdsActions: PdsAction[] | null = null; 87 92 88 - if (sub.type === "webhook") { 89 - if (body.callbackUrl !== undefined && !body.callbackUrl) { 90 - return c.json({ error: "callbackUrl cannot be empty" }, 400); 93 + if (body.actions) { 94 + if (body.actions.length === 0) { 95 + return c.json({ error: "At least one action is required" }, 400); 96 + } 97 + if (body.actions.length > 10) { 98 + return c.json({ error: "Maximum 10 actions allowed" }, 400); 91 99 } 92 - callbackUrl = body.callbackUrl ?? sub.callbackUrl; 100 + 101 + const newLocalActions: Action[] = []; 102 + const newPdsActions: PdsAction[] = []; 103 + 104 + for (const input of body.actions) { 105 + if (input.type === "webhook") { 106 + if (!input.callbackUrl) { 107 + return c.json({ error: "callbackUrl is required for webhook actions" }, 400); 108 + } 109 + try { 110 + new URL(input.callbackUrl); 111 + } catch { 112 + return c.json({ error: "Invalid callback URL" }, 400); 113 + } 93 114 94 - // Re-verify callback if URL changed or reactivating 95 - if (body.callbackUrl || (body.active === true && !sub.active)) { 96 - const verification = await verifyCallback(callbackUrl!, sub.lexicon); 97 - if (!verification.ok) { 98 - return c.json({ error: verification.error }, 422); 99 - } 100 - } 101 - } else if (sub.type === "record") { 102 - if (body.targetCollection !== undefined) { 103 - if (!body.targetCollection) { 104 - return c.json({ error: "targetCollection cannot be empty" }, 400); 105 - } 106 - if (!isValidNsid(body.targetCollection)) { 107 - return c.json({ error: "Invalid target collection NSID" }, 400); 108 - } 109 - if (!isNsidAllowed(body.targetCollection, config.nsidAllowlist, config.nsidBlocklist)) { 110 - return c.json({ error: "This target collection is not allowed on this instance" }, 403); 115 + // Re-verify callback 116 + const verification = await verifyCallback(input.callbackUrl, sub.lexicon); 117 + if (!verification.ok) { 118 + return c.json({ error: verification.error }, 422); 119 + } 120 + 121 + // Preserve existing secret if callbackUrl unchanged 122 + const existing = sub.actions.find( 123 + (a): a is WebhookAction => a.$type === "webhook" && a.callbackUrl === input.callbackUrl, 124 + ); 125 + const secret = existing?.secret ?? nanoid(32); 126 + 127 + newLocalActions.push({ 128 + $type: "webhook", 129 + callbackUrl: input.callbackUrl, 130 + secret, 131 + } satisfies WebhookAction); 132 + newPdsActions.push({ 133 + $type: "app.rglw.subscription#webhookAction", 134 + callbackUrl: input.callbackUrl, 135 + }); 136 + } else if (input.type === "record") { 137 + if (!input.targetCollection) { 138 + return c.json({ error: "targetCollection is required for record actions" }, 400); 139 + } 140 + if (!isValidNsid(input.targetCollection)) { 141 + return c.json({ error: "Invalid target collection NSID" }, 400); 142 + } 143 + if (!isNsidAllowed(input.targetCollection, config.nsidAllowlist, config.nsidBlocklist)) { 144 + return c.json({ error: "This target collection is not allowed on this instance" }, 403); 145 + } 146 + if (!input.recordTemplate) { 147 + return c.json({ error: "recordTemplate is required for record actions" }, 400); 148 + } 149 + const templateValidation = validateTemplate(input.recordTemplate); 150 + if (!templateValidation.valid) { 151 + return c.json({ error: templateValidation.error }, 400); 152 + } 153 + 154 + newLocalActions.push({ 155 + $type: "record", 156 + targetCollection: input.targetCollection, 157 + recordTemplate: input.recordTemplate, 158 + } satisfies RecordAction); 159 + newPdsActions.push({ 160 + $type: "app.rglw.subscription#recordAction", 161 + targetCollection: input.targetCollection, 162 + recordTemplate: input.recordTemplate, 163 + }); 164 + } else { 165 + return c.json({ error: "Invalid action type" }, 400); 111 166 } 112 - targetCollection = body.targetCollection; 113 167 } 114 - if (body.recordTemplate !== undefined) { 115 - if (!body.recordTemplate) { 116 - return c.json({ error: "recordTemplate cannot be empty" }, 400); 117 - } 118 - const templateValidation = validateTemplate(body.recordTemplate); 119 - if (!templateValidation.valid) { 120 - return c.json({ error: templateValidation.error }, 400); 168 + 169 + localActions = newLocalActions; 170 + pdsActions = newPdsActions; 171 + } 172 + 173 + // Re-verify webhook callbacks when reactivating 174 + if (body.active === true && !sub.active && !body.actions) { 175 + for (const action of localActions) { 176 + if (action.$type === "webhook") { 177 + const verification = await verifyCallback(action.callbackUrl, sub.lexicon); 178 + if (!verification.ok) { 179 + return c.json({ error: verification.error }, 422); 180 + } 121 181 } 122 - recordTemplate = body.recordTemplate; 123 182 } 124 183 } 125 184 ··· 127 186 const existing = await getRecord(user.did, rkey); 128 187 const createdAt = (existing?.createdAt as string) || sub.indexedAt.toISOString(); 129 188 130 - // Update on PDS 131 - const pdsRecord = 132 - sub.type === "webhook" 133 - ? { 134 - type: "webhook" as const, 135 - lexicon: sub.lexicon, 136 - callbackUrl: callbackUrl!, 137 - conditions, 138 - active, 139 - createdAt, 140 - } 141 - : { 142 - type: "record" as const, 143 - lexicon: sub.lexicon, 144 - targetCollection: targetCollection!, 145 - recordTemplate: recordTemplate!, 146 - conditions, 147 - active, 148 - createdAt, 189 + // Build PDS actions from local actions if not already built 190 + if (!pdsActions) { 191 + pdsActions = localActions.map((a): PdsAction => { 192 + if (a.$type === "webhook") { 193 + return { 194 + $type: "app.rglw.subscription#webhookAction", 195 + callbackUrl: a.callbackUrl, 149 196 }; 197 + } 198 + return { 199 + $type: "app.rglw.subscription#recordAction", 200 + targetCollection: a.targetCollection, 201 + recordTemplate: a.recordTemplate, 202 + }; 203 + }); 204 + } 150 205 206 + // Update on PDS 151 207 try { 152 - await putRecord(user.did, rkey, pdsRecord); 208 + await putRecord(user.did, rkey, { 209 + lexicon: sub.lexicon, 210 + actions: pdsActions, 211 + conditions, 212 + active, 213 + createdAt, 214 + }); 153 215 } catch (err) { 154 216 console.error("Failed to update PDS record:", err); 155 217 return c.json({ error: "Failed to update subscription on PDS" }, 502); ··· 160 222 await db 161 223 .update(subscriptions) 162 224 .set({ 163 - callbackUrl, 164 - targetCollection, 165 - recordTemplate, 225 + actions: localActions, 166 226 conditions, 167 227 active, 168 228 indexedAt: now,
+90 -79
app/routes/api/subscriptions/index.ts
··· 2 2 import { eq } from "drizzle-orm"; 3 3 import { nanoid } from "nanoid"; 4 4 import { db } from "@/db/index.js"; 5 - import { subscriptions } from "@/db/schema.js"; 5 + import { subscriptions, type Action, type WebhookAction, type RecordAction } from "@/db/schema.js"; 6 6 import { config } from "@/config.js"; 7 7 import { isValidNsid, isNsidAllowed } from "@/lexicons/resolver.js"; 8 8 import { verifyCallback } from "@/subscriptions/verify.js"; 9 - import { createRecord, deleteRecord } from "@/subscriptions/pds.js"; 9 + import { createRecord, deleteRecord, type PdsAction } from "@/subscriptions/pds.js"; 10 10 import { validateTemplate } from "@/actions/template.js"; 11 11 import { notifySubscriptionChange } from "@/jetstream/consumer.js"; 12 12 13 + type ActionInput = 14 + | { type: "webhook"; callbackUrl: string } 15 + | { type: "record"; targetCollection: string; recordTemplate: string }; 16 + 13 17 export const GET = createRoute(async (c) => { 14 18 const user = c.get("user"); 15 19 const rows = await db.query.subscriptions.findMany({ ··· 19 23 rows.map((r) => ({ 20 24 uri: r.uri, 21 25 rkey: r.rkey, 22 - type: r.type, 23 26 lexicon: r.lexicon, 24 - callbackUrl: r.callbackUrl, 25 - targetCollection: r.targetCollection, 27 + actions: r.actions, 26 28 conditions: r.conditions, 27 29 active: r.active, 28 30 indexedAt: r.indexedAt.getTime(), ··· 33 35 export const POST = createRoute(async (c) => { 34 36 const user = c.get("user"); 35 37 const body = await c.req.json<{ 36 - type?: "webhook" | "record"; 37 38 lexicon: string; 38 - callbackUrl?: string; 39 - targetCollection?: string; 40 - recordTemplate?: string; 39 + actions: ActionInput[]; 41 40 conditions?: Array<{ field: string; operator?: string; value: string }>; 42 41 active?: boolean; 43 42 }>(); 44 43 45 - const type = body.type ?? "webhook"; 46 - 47 44 // Validate lexicon NSID 48 45 if (!body.lexicon || !isValidNsid(body.lexicon)) { 49 46 return c.json({ error: "Invalid lexicon NSID" }, 400); ··· 52 49 return c.json({ error: "This lexicon is not allowed on this instance" }, 403); 53 50 } 54 51 52 + // Validate actions 53 + if (!body.actions || body.actions.length === 0) { 54 + return c.json({ error: "At least one action is required" }, 400); 55 + } 56 + if (body.actions.length > 10) { 57 + return c.json({ error: "Maximum 10 actions allowed" }, 400); 58 + } 59 + 55 60 // Normalize and validate conditions 56 61 const conditions = (body.conditions ?? []) 57 62 .filter((cond) => cond.field && cond.value) ··· 64 69 return c.json({ error: "Maximum 20 conditions allowed" }, 400); 65 70 } 66 71 67 - // Type-specific validation 68 - let callbackUrl: string | null = null; 69 - let secret: string | null = null; 70 - let targetCollection: string | null = null; 71 - let recordTemplate: string | null = null; 72 + // Validate each action and build local + PDS action arrays 73 + const localActions: Action[] = []; 74 + const pdsActions: PdsAction[] = []; 75 + 76 + for (const input of body.actions) { 77 + if (input.type === "webhook") { 78 + if (!input.callbackUrl) { 79 + return c.json({ error: "callbackUrl is required for webhook actions" }, 400); 80 + } 81 + try { 82 + new URL(input.callbackUrl); 83 + } catch { 84 + return c.json({ error: "Invalid callback URL" }, 400); 85 + } 72 86 73 - if (type === "webhook") { 74 - if (!body.callbackUrl) { 75 - return c.json({ error: "callbackUrl is required for webhook subscriptions" }, 400); 76 - } 77 - try { 78 - new URL(body.callbackUrl); 79 - } catch { 80 - return c.json({ error: "Invalid callback URL" }, 400); 81 - } 87 + const verification = await verifyCallback(input.callbackUrl, body.lexicon); 88 + if (!verification.ok) { 89 + return c.json({ error: verification.error }, 422); 90 + } 82 91 83 - const verification = await verifyCallback(body.callbackUrl, body.lexicon); 84 - if (!verification.ok) { 85 - return c.json({ error: verification.error }, 422); 86 - } 92 + const secret = nanoid(32); 93 + localActions.push({ 94 + $type: "webhook", 95 + callbackUrl: input.callbackUrl, 96 + secret, 97 + } satisfies WebhookAction); 98 + pdsActions.push({ 99 + $type: "app.rglw.subscription#webhookAction", 100 + callbackUrl: input.callbackUrl, 101 + }); 102 + } else if (input.type === "record") { 103 + if (!input.targetCollection) { 104 + return c.json({ error: "targetCollection is required for record actions" }, 400); 105 + } 106 + if (!isValidNsid(input.targetCollection)) { 107 + return c.json({ error: "Invalid target collection NSID" }, 400); 108 + } 109 + if (!isNsidAllowed(input.targetCollection, config.nsidAllowlist, config.nsidBlocklist)) { 110 + return c.json({ error: "This target collection is not allowed on this instance" }, 403); 111 + } 112 + if (!input.recordTemplate) { 113 + return c.json({ error: "recordTemplate is required for record actions" }, 400); 114 + } 115 + const templateValidation = validateTemplate(input.recordTemplate); 116 + if (!templateValidation.valid) { 117 + return c.json({ error: templateValidation.error }, 400); 118 + } 87 119 88 - callbackUrl = body.callbackUrl; 89 - secret = nanoid(32); 90 - } else if (type === "record") { 91 - if (!body.targetCollection) { 92 - return c.json({ error: "targetCollection is required for record subscriptions" }, 400); 93 - } 94 - if (!isValidNsid(body.targetCollection)) { 95 - return c.json({ error: "Invalid target collection NSID" }, 400); 96 - } 97 - if (!isNsidAllowed(body.targetCollection, config.nsidAllowlist, config.nsidBlocklist)) { 98 - return c.json({ error: "This target collection is not allowed on this instance" }, 403); 99 - } 100 - if (!body.recordTemplate) { 101 - return c.json({ error: "recordTemplate is required for record subscriptions" }, 400); 102 - } 103 - const templateValidation = validateTemplate(body.recordTemplate); 104 - if (!templateValidation.valid) { 105 - return c.json({ error: templateValidation.error }, 400); 120 + localActions.push({ 121 + $type: "record", 122 + targetCollection: input.targetCollection, 123 + recordTemplate: input.recordTemplate, 124 + } satisfies RecordAction); 125 + pdsActions.push({ 126 + $type: "app.rglw.subscription#recordAction", 127 + targetCollection: input.targetCollection, 128 + recordTemplate: input.recordTemplate, 129 + }); 130 + } else { 131 + return c.json({ error: "Invalid action type" }, 400); 106 132 } 107 - 108 - targetCollection = body.targetCollection; 109 - recordTemplate = body.recordTemplate; 110 - } else { 111 - return c.json({ error: "Invalid subscription type" }, 400); 112 133 } 113 134 114 135 // Write record to PDS ··· 117 138 let uri: string; 118 139 let rkey: string; 119 140 120 - const pdsRecord = 121 - type === "webhook" 122 - ? { 123 - type: "webhook" as const, 124 - lexicon: body.lexicon, 125 - callbackUrl: callbackUrl!, 126 - conditions, 127 - active, 128 - createdAt: now.toISOString(), 129 - } 130 - : { 131 - type: "record" as const, 132 - lexicon: body.lexicon, 133 - targetCollection: targetCollection!, 134 - recordTemplate: recordTemplate!, 135 - conditions, 136 - active, 137 - createdAt: now.toISOString(), 138 - }; 139 - 140 141 try { 141 - const result = await createRecord(user.did, pdsRecord); 142 + const result = await createRecord(user.did, { 143 + lexicon: body.lexicon, 144 + actions: pdsActions, 145 + conditions, 146 + active, 147 + createdAt: now.toISOString(), 148 + }); 142 149 uri = result.uri; 143 150 rkey = result.rkey; 144 151 } catch (err) { ··· 152 159 uri, 153 160 did: user.did, 154 161 rkey, 155 - type, 156 162 lexicon: body.lexicon, 157 - callbackUrl, 163 + actions: localActions, 158 164 conditions, 159 - secret, 160 - targetCollection, 161 - recordTemplate, 162 165 active, 163 166 indexedAt: now, 164 167 }); ··· 174 177 } 175 178 176 179 notifySubscriptionChange(); 177 - return c.json({ uri, rkey, ...(secret ? { secret } : {}) }, 201); 180 + 181 + // Return secrets for webhook actions so the user can copy them 182 + const actionSecrets = localActions.map((a, i) => ({ 183 + index: i, 184 + type: a.$type, 185 + ...(a.$type === "webhook" ? { secret: a.secret } : {}), 186 + })); 187 + 188 + return c.json({ uri, rkey, actions: actionSecrets }, 201); 178 189 });
+2 -8
app/routes/dashboard/index.tsx
··· 49 49 <thead> 50 50 <tr> 51 51 <th>Lexicon</th> 52 - <th>Action</th> 52 + <th>Actions</th> 53 53 <th>Conditions</th> 54 54 <th>Status</th> 55 55 <th></th> ··· 64 64 </a> 65 65 </td> 66 66 <td> 67 - {sub.type === "record" ? ( 68 - <> 69 - Create <InlineCode>{sub.targetCollection}</InlineCode> 70 - </> 71 - ) : ( 72 - <InlineCode>{sub.callbackUrl}</InlineCode> 73 - )} 67 + {sub.actions.length} action{sub.actions.length !== 1 ? "s" : ""} 74 68 </td> 75 69 <td>{sub.conditions.length}</td> 76 70 <td>
+39 -23
app/routes/dashboard/subscriptions/[rkey].tsx
··· 73 73 <dd> 74 74 <InlineCode>{sub.lexicon}</InlineCode> 75 75 </dd> 76 - {sub.type === "record" ? ( 77 - <> 78 - <dt>Target Collection</dt> 79 - <dd> 80 - <InlineCode>{sub.targetCollection}</InlineCode> 81 - </dd> 82 - <dt>Record Template</dt> 83 - <dd> 84 - <CodeBlock>{sub.recordTemplate}</CodeBlock> 85 - </dd> 86 - </> 87 - ) : ( 88 - <> 89 - <dt>Callback URL</dt> 90 - <dd> 91 - <InlineCode>{sub.callbackUrl}</InlineCode> 92 - </dd> 93 - <dt>HMAC Secret</dt> 94 - <dd> 95 - <InlineCode>{sub.secret}</InlineCode> 96 - </dd> 97 - </> 98 - )} 99 76 <dt>Status</dt> 100 77 <dd> 101 78 <Badge variant={sub.active ? "success" : "neutral"}> ··· 125 102 </Card> 126 103 )} 127 104 105 + <Stack gap={3}> 106 + <h3>Actions ({sub.actions.length})</h3> 107 + {sub.actions.map((action, i) => ( 108 + <Card key={i} variant="flat"> 109 + <Stack gap={2}> 110 + <h4> 111 + {action.$type === "webhook" ? "Webhook" : "Record"} {i + 1} 112 + </h4> 113 + <DescriptionList> 114 + {action.$type === "webhook" ? ( 115 + <> 116 + <dt>Callback URL</dt> 117 + <dd> 118 + <InlineCode>{action.callbackUrl}</InlineCode> 119 + </dd> 120 + <dt>HMAC Secret</dt> 121 + <dd> 122 + <InlineCode>{action.secret}</InlineCode> 123 + </dd> 124 + </> 125 + ) : ( 126 + <> 127 + <dt>Target Collection</dt> 128 + <dd> 129 + <InlineCode>{action.targetCollection}</InlineCode> 130 + </dd> 131 + <dt>Record Template</dt> 132 + <dd> 133 + <CodeBlock>{action.recordTemplate}</CodeBlock> 134 + </dd> 135 + </> 136 + )} 137 + </DescriptionList> 138 + </Stack> 139 + </Card> 140 + ))} 141 + </Stack> 142 + 128 143 <DeliveryLog 129 144 rkey={sub.rkey} 130 145 active={sub.active} 131 146 initialLogs={logs.map((l) => ({ 132 147 id: l.id, 148 + actionIndex: l.actionIndex, 133 149 eventTimeUs: l.eventTimeUs, 134 150 statusCode: l.statusCode, 135 151 error: l.error,
+4 -21
app/routes/dashboard/subscriptions/new.tsx
··· 5 5 import { PageHeader } from "../../../components/Layout/PageHeader/index.js"; 6 6 import { Card } from "../../../components/Card/index.js"; 7 7 import { Button } from "../../../components/Button/index.js"; 8 - import { Stack } from "../../../components/Layout/Stack/index.js"; 9 8 import ThemeToggle from "../../../islands/ThemeToggle.js"; 9 + import SubscriptionForm from "../../../islands/SubscriptionForm.js"; 10 10 11 11 export default createRoute((c) => { 12 12 const user = c.get("user"); ··· 22 22 </Button> 23 23 } 24 24 /> 25 - <Stack gap={4}> 26 - <Card variant="flat"> 27 - <Stack gap={2}> 28 - <h3>Webhook</h3> 29 - <p>Forward matching events to a callback URL via HTTP POST.</p> 30 - <Button href="/dashboard/subscriptions/new/webhook" size="sm"> 31 - Create webhook 32 - </Button> 33 - </Stack> 34 - </Card> 35 - <Card variant="flat"> 36 - <Stack gap={2}> 37 - <h3>Record</h3> 38 - <p>Create a record on your PDS when a matching event occurs.</p> 39 - <Button href="/dashboard/subscriptions/new/record" size="sm"> 40 - Create record action 41 - </Button> 42 - </Stack> 43 - </Card> 44 - </Stack> 25 + <Card variant="flat"> 26 + <SubscriptionForm /> 27 + </Card> 45 28 </Container> 46 29 </AppShell>, 47 30 { title: "New Subscription — Airglow" },
-32
app/routes/dashboard/subscriptions/new/record.tsx
··· 1 - import { createRoute } from "honox/factory"; 2 - import { AppShell } from "../../../../components/Layout/AppShell/index.js"; 3 - import { Header } from "../../../../components/Layout/Header/index.js"; 4 - import { Container } from "../../../../components/Layout/Container/index.js"; 5 - import { PageHeader } from "../../../../components/Layout/PageHeader/index.js"; 6 - import { Card } from "../../../../components/Card/index.js"; 7 - import { Button } from "../../../../components/Button/index.js"; 8 - import ThemeToggle from "../../../../islands/ThemeToggle.js"; 9 - import SubscriptionForm from "../../../../islands/SubscriptionForm.js"; 10 - 11 - export default createRoute((c) => { 12 - const user = c.get("user"); 13 - 14 - return c.render( 15 - <AppShell header={<Header user={user} actions={<ThemeToggle />} />}> 16 - <Container> 17 - <PageHeader 18 - title="New Record Subscription" 19 - actions={ 20 - <Button href="/dashboard/subscriptions/new" variant="ghost" size="sm"> 21 - &larr; Back 22 - </Button> 23 - } 24 - /> 25 - <Card variant="flat"> 26 - <SubscriptionForm type="record" /> 27 - </Card> 28 - </Container> 29 - </AppShell>, 30 - { title: "New Record Action — Airglow" }, 31 - ); 32 - });
-32
app/routes/dashboard/subscriptions/new/webhook.tsx
··· 1 - import { createRoute } from "honox/factory"; 2 - import { AppShell } from "../../../../components/Layout/AppShell/index.js"; 3 - import { Header } from "../../../../components/Layout/Header/index.js"; 4 - import { Container } from "../../../../components/Layout/Container/index.js"; 5 - import { PageHeader } from "../../../../components/Layout/PageHeader/index.js"; 6 - import { Card } from "../../../../components/Card/index.js"; 7 - import { Button } from "../../../../components/Button/index.js"; 8 - import ThemeToggle from "../../../../islands/ThemeToggle.js"; 9 - import SubscriptionForm from "../../../../islands/SubscriptionForm.js"; 10 - 11 - export default createRoute((c) => { 12 - const user = c.get("user"); 13 - 14 - return c.render( 15 - <AppShell header={<Header user={user} actions={<ThemeToggle />} />}> 16 - <Container> 17 - <PageHeader 18 - title="New Webhook Subscription" 19 - actions={ 20 - <Button href="/dashboard/subscriptions/new" variant="ghost" size="sm"> 21 - &larr; Back 22 - </Button> 23 - } 24 - /> 25 - <Card variant="flat"> 26 - <SubscriptionForm type="webhook" /> 27 - </Card> 28 - </Container> 29 - </AppShell>, 30 - { title: "New Webhook — Airglow" }, 31 - ); 32 - });
+8 -5
app/server.ts
··· 6 6 7 7 const app = createApp(); 8 8 9 - // Start Jetstream consumer — routes matched events to the appropriate handler 9 + // Start Jetstream consumer — routes matched events to action handlers 10 10 startJetstream((match) => { 11 - const handler = match.subscription.type === "record" ? executeAction : dispatch; 12 - handler(match).catch((err) => { 13 - console.error(`${match.subscription.type} delivery error:`, err); 14 - }); 11 + for (let i = 0; i < match.subscription.actions.length; i++) { 12 + const action = match.subscription.actions[i]!; 13 + const handler = action.$type === "record" ? executeAction : dispatch; 14 + handler(match, i).catch((err) => { 15 + console.error(`Action ${i} (${action.$type}) delivery error:`, err); 16 + }); 17 + } 15 18 }); 16 19 17 20 // OAuth discovery endpoints (production — loopback clients don't need these)
+41 -24
lexicons/app/rglw/subscription.json
··· 4 4 "defs": { 5 5 "main": { 6 6 "type": "record", 7 - "description": "A webhook subscription that forwards matching AT Protocol events to a callback URL.", 7 + "description": "A subscription that triggers actions when matching AT Protocol events occur.", 8 8 "key": "tid", 9 9 "record": { 10 10 "type": "object", 11 - "required": ["lexicon", "createdAt"], 11 + "required": ["lexicon", "actions", "createdAt"], 12 12 "properties": { 13 13 "lexicon": { 14 14 "type": "string", 15 15 "description": "NSID of the collection to subscribe to.", 16 16 "maxLength": 256 17 17 }, 18 - "type": { 19 - "type": "string", 20 - "description": "Subscription type: 'webhook' delivers via HTTP POST, 'record' creates a record on the subscriber's PDS.", 21 - "knownValues": ["webhook", "record"], 22 - "default": "webhook", 23 - "maxLength": 32 24 - }, 25 - "callbackUrl": { 26 - "type": "string", 27 - "format": "uri", 28 - "description": "For webhook subscriptions: URL to receive webhook POST requests.", 29 - "maxLength": 2048 30 - }, 31 - "targetCollection": { 32 - "type": "string", 33 - "description": "For record subscriptions: NSID of the collection to create the record in.", 34 - "maxLength": 256 35 - }, 36 - "recordTemplate": { 37 - "type": "string", 38 - "description": "For record subscriptions: JSON template with {{placeholder}} expressions resolved from event data.", 39 - "maxLength": 10240 18 + "actions": { 19 + "type": "array", 20 + "description": "Actions to execute when a matching event occurs.", 21 + "minLength": 1, 22 + "maxLength": 10, 23 + "items": { 24 + "type": "union", 25 + "refs": ["#webhookAction", "#recordAction"] 26 + } 40 27 }, 41 28 "conditions": { 42 29 "type": "array", ··· 56 43 "type": "string", 57 44 "format": "datetime" 58 45 } 46 + } 47 + } 48 + }, 49 + "webhookAction": { 50 + "type": "object", 51 + "description": "Forward matching events to a callback URL via HTTP POST.", 52 + "required": ["callbackUrl"], 53 + "properties": { 54 + "callbackUrl": { 55 + "type": "string", 56 + "format": "uri", 57 + "description": "URL to receive webhook POST requests.", 58 + "maxLength": 2048 59 + } 60 + } 61 + }, 62 + "recordAction": { 63 + "type": "object", 64 + "description": "Create a record on the subscriber's PDS when a matching event occurs.", 65 + "required": ["targetCollection", "recordTemplate"], 66 + "properties": { 67 + "targetCollection": { 68 + "type": "string", 69 + "description": "NSID of the collection to create the record in.", 70 + "maxLength": 256 71 + }, 72 + "recordTemplate": { 73 + "type": "string", 74 + "description": "JSON template with {{placeholder}} expressions resolved from event data.", 75 + "maxLength": 10240 59 76 } 60 77 } 61 78 },
+23 -17
lib/actions/executor.ts
··· 1 1 import { db } from "../db/index.js"; 2 - import { deliveryLogs } from "../db/schema.js"; 2 + import { deliveryLogs, type RecordAction } from "../db/schema.js"; 3 3 import { createArbitraryRecord } from "../subscriptions/pds.js"; 4 4 import { renderTemplate } from "./template.js"; 5 5 import type { MatchedEvent } from "../jetstream/consumer.js"; 6 6 7 7 const RETRY_DELAYS = [5_000, 30_000]; 8 8 9 - async function execute(match: MatchedEvent): Promise<{ statusCode: number; error?: string }> { 9 + async function execute( 10 + match: MatchedEvent, 11 + action: RecordAction, 12 + ): Promise<{ statusCode: number; error?: string }> { 10 13 const { subscription, event } = match; 11 14 12 15 let record: Record<string, unknown>; 13 16 try { 14 - record = renderTemplate(subscription.recordTemplate!, event); 17 + record = renderTemplate(action.recordTemplate, event); 15 18 } catch (err) { 16 19 return { 17 20 statusCode: 0, ··· 20 23 } 21 24 22 25 try { 23 - await createArbitraryRecord(subscription.did, subscription.targetCollection!, record); 26 + await createArbitraryRecord(subscription.did, action.targetCollection, record); 24 27 return { statusCode: 200 }; 25 28 } catch (err) { 26 29 const message = err instanceof Error ? err.message : String(err); 27 - // Extract HTTP status from PDS error message if available 28 30 const statusMatch = message.match(/\((\d{3})\)/); 29 31 const statusCode = statusMatch ? Number(statusMatch[1]) : 0; 30 32 return { statusCode, error: message }; ··· 33 35 34 36 async function logDelivery( 35 37 subscriptionUri: string, 38 + actionIndex: number, 36 39 eventTimeUs: number, 37 40 payload: string | null, 38 41 statusCode: number, ··· 41 44 ) { 42 45 await db.insert(deliveryLogs).values({ 43 46 subscriptionUri, 47 + actionIndex, 44 48 eventTimeUs, 45 49 payload, 46 50 statusCode, ··· 55 59 } 56 60 57 61 function isRetryable(code: number): boolean { 58 - // Retry on server errors or network failures 59 - // Do NOT retry on 401/403 (auth) or template errors (deterministic) 60 62 return code >= 500 || code === 0; 61 63 } 62 64 63 - function scheduleRetry(match: MatchedEvent, retryIndex: number) { 65 + function scheduleRetry(match: MatchedEvent, actionIndex: number, retryIndex: number) { 64 66 if (retryIndex >= RETRY_DELAYS.length) return; 65 67 66 68 setTimeout(async () => { 67 69 try { 68 - const result = await execute(match); 70 + const action = match.subscription.actions[actionIndex] as RecordAction; 71 + const result = await execute(match, action); 69 72 const body = JSON.stringify({ 70 - targetCollection: match.subscription.targetCollection, 71 - recordTemplate: match.subscription.recordTemplate, 73 + targetCollection: action.targetCollection, 74 + recordTemplate: action.recordTemplate, 72 75 }); 73 76 74 77 await logDelivery( 75 78 match.subscription.uri, 79 + actionIndex, 76 80 match.event.time_us, 77 81 isSuccess(result.statusCode) ? null : body, 78 82 result.statusCode, ··· 81 85 ); 82 86 83 87 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 84 - scheduleRetry(match, retryIndex + 1); 88 + scheduleRetry(match, actionIndex, retryIndex + 1); 85 89 } 86 90 } catch (err) { 87 91 console.error("Action retry error:", err); ··· 90 94 } 91 95 92 96 /** Execute a record action for a matched event. */ 93 - export async function executeAction(match: MatchedEvent) { 94 - const result = await execute(match); 97 + export async function executeAction(match: MatchedEvent, actionIndex: number) { 98 + const action = match.subscription.actions[actionIndex] as RecordAction; 99 + const result = await execute(match, action); 95 100 96 101 const body = JSON.stringify({ 97 - targetCollection: match.subscription.targetCollection, 98 - recordTemplate: match.subscription.recordTemplate, 102 + targetCollection: action.targetCollection, 103 + recordTemplate: action.recordTemplate, 99 104 }); 100 105 101 106 await logDelivery( 102 107 match.subscription.uri, 108 + actionIndex, 103 109 match.event.time_us, 104 110 isSuccess(result.statusCode) ? null : body, 105 111 result.statusCode, ··· 108 114 ); 109 115 110 116 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 111 - scheduleRetry(match, 0); 117 + scheduleRetry(match, actionIndex, 0); 112 118 } 113 119 }
+2 -2
lib/db/migrations/0000_lethal_titania.sql lib/db/migrations/0000_tearful_tarot.sql
··· 1 1 CREATE TABLE `delivery_logs` ( 2 2 `id` integer PRIMARY KEY AUTOINCREMENT NOT NULL, 3 3 `subscription_uri` text NOT NULL, 4 + `action_index` integer DEFAULT 0 NOT NULL, 4 5 `event_time_us` integer NOT NULL, 5 6 `payload` text, 6 7 `status_code` integer, ··· 33 34 `did` text NOT NULL, 34 35 `rkey` text NOT NULL, 35 36 `lexicon` text NOT NULL, 36 - `callback_url` text NOT NULL, 37 + `actions` text DEFAULT '[]' NOT NULL, 37 38 `conditions` text DEFAULT '[]' NOT NULL, 38 - `secret` text NOT NULL, 39 39 `active` integer DEFAULT false NOT NULL, 40 40 `indexed_at` integer NOT NULL 41 41 );
-20
lib/db/migrations/0001_early_kronos.sql
··· 1 - PRAGMA foreign_keys=OFF;--> statement-breakpoint 2 - CREATE TABLE `__new_subscriptions` ( 3 - `uri` text PRIMARY KEY NOT NULL, 4 - `did` text NOT NULL, 5 - `rkey` text NOT NULL, 6 - `type` text DEFAULT 'webhook' NOT NULL, 7 - `lexicon` text NOT NULL, 8 - `callback_url` text, 9 - `conditions` text DEFAULT '[]' NOT NULL, 10 - `secret` text, 11 - `target_collection` text, 12 - `record_template` text, 13 - `active` integer DEFAULT false NOT NULL, 14 - `indexed_at` integer NOT NULL 15 - ); 16 - --> statement-breakpoint 17 - INSERT INTO `__new_subscriptions`("uri", "did", "rkey", "type", "lexicon", "callback_url", "conditions", "secret", "target_collection", "record_template", "active", "indexed_at") SELECT "uri", "did", "rkey", "type", "lexicon", "callback_url", "conditions", "secret", "target_collection", "record_template", "active", "indexed_at" FROM `subscriptions`;--> statement-breakpoint 18 - DROP TABLE `subscriptions`;--> statement-breakpoint 19 - ALTER TABLE `__new_subscriptions` RENAME TO `subscriptions`;--> statement-breakpoint 20 - PRAGMA foreign_keys=ON;
+13 -11
lib/db/migrations/meta/0000_snapshot.json
··· 1 1 { 2 2 "version": "6", 3 3 "dialect": "sqlite", 4 - "id": "4cb923a7-52a2-4515-861e-3ec2feefbeb3", 4 + "id": "b7fc849c-bab7-48f6-8ff2-aff7575d0a7d", 5 5 "prevId": "00000000-0000-0000-0000-000000000000", 6 6 "tables": { 7 7 "delivery_logs": { ··· 20 20 "primaryKey": false, 21 21 "notNull": true, 22 22 "autoincrement": false 23 + }, 24 + "action_index": { 25 + "name": "action_index", 26 + "type": "integer", 27 + "primaryKey": false, 28 + "notNull": true, 29 + "autoincrement": false, 30 + "default": 0 23 31 }, 24 32 "event_time_us": { 25 33 "name": "event_time_us", ··· 205 213 "notNull": true, 206 214 "autoincrement": false 207 215 }, 208 - "callback_url": { 209 - "name": "callback_url", 216 + "actions": { 217 + "name": "actions", 210 218 "type": "text", 211 219 "primaryKey": false, 212 220 "notNull": true, 213 - "autoincrement": false 221 + "autoincrement": false, 222 + "default": "'[]'" 214 223 }, 215 224 "conditions": { 216 225 "name": "conditions", ··· 219 228 "notNull": true, 220 229 "autoincrement": false, 221 230 "default": "'[]'" 222 - }, 223 - "secret": { 224 - "name": "secret", 225 - "type": "text", 226 - "primaryKey": false, 227 - "notNull": true, 228 - "autoincrement": false 229 231 }, 230 232 "active": { 231 233 "name": "active",
-329
lib/db/migrations/meta/0001_snapshot.json
··· 1 - { 2 - "version": "6", 3 - "dialect": "sqlite", 4 - "id": "9fe2d7e1-920a-4929-981d-99c94549c02e", 5 - "prevId": "4cb923a7-52a2-4515-861e-3ec2feefbeb3", 6 - "tables": { 7 - "delivery_logs": { 8 - "name": "delivery_logs", 9 - "columns": { 10 - "id": { 11 - "name": "id", 12 - "type": "integer", 13 - "primaryKey": true, 14 - "notNull": true, 15 - "autoincrement": true 16 - }, 17 - "subscription_uri": { 18 - "name": "subscription_uri", 19 - "type": "text", 20 - "primaryKey": false, 21 - "notNull": true, 22 - "autoincrement": false 23 - }, 24 - "event_time_us": { 25 - "name": "event_time_us", 26 - "type": "integer", 27 - "primaryKey": false, 28 - "notNull": true, 29 - "autoincrement": false 30 - }, 31 - "payload": { 32 - "name": "payload", 33 - "type": "text", 34 - "primaryKey": false, 35 - "notNull": false, 36 - "autoincrement": false 37 - }, 38 - "status_code": { 39 - "name": "status_code", 40 - "type": "integer", 41 - "primaryKey": false, 42 - "notNull": false, 43 - "autoincrement": false 44 - }, 45 - "error": { 46 - "name": "error", 47 - "type": "text", 48 - "primaryKey": false, 49 - "notNull": false, 50 - "autoincrement": false 51 - }, 52 - "attempt": { 53 - "name": "attempt", 54 - "type": "integer", 55 - "primaryKey": false, 56 - "notNull": true, 57 - "autoincrement": false, 58 - "default": 1 59 - }, 60 - "created_at": { 61 - "name": "created_at", 62 - "type": "integer", 63 - "primaryKey": false, 64 - "notNull": true, 65 - "autoincrement": false 66 - } 67 - }, 68 - "indexes": {}, 69 - "foreignKeys": { 70 - "delivery_logs_subscription_uri_subscriptions_uri_fk": { 71 - "name": "delivery_logs_subscription_uri_subscriptions_uri_fk", 72 - "tableFrom": "delivery_logs", 73 - "tableTo": "subscriptions", 74 - "columnsFrom": ["subscription_uri"], 75 - "columnsTo": ["uri"], 76 - "onDelete": "cascade", 77 - "onUpdate": "no action" 78 - } 79 - }, 80 - "compositePrimaryKeys": {}, 81 - "uniqueConstraints": {}, 82 - "checkConstraints": {} 83 - }, 84 - "lexicon_cache": { 85 - "name": "lexicon_cache", 86 - "columns": { 87 - "nsid": { 88 - "name": "nsid", 89 - "type": "text", 90 - "primaryKey": true, 91 - "notNull": true, 92 - "autoincrement": false 93 - }, 94 - "schema": { 95 - "name": "schema", 96 - "type": "text", 97 - "primaryKey": false, 98 - "notNull": true, 99 - "autoincrement": false 100 - }, 101 - "fetched_at": { 102 - "name": "fetched_at", 103 - "type": "integer", 104 - "primaryKey": false, 105 - "notNull": true, 106 - "autoincrement": false 107 - } 108 - }, 109 - "indexes": {}, 110 - "foreignKeys": {}, 111 - "compositePrimaryKeys": {}, 112 - "uniqueConstraints": {}, 113 - "checkConstraints": {} 114 - }, 115 - "oauth_sessions": { 116 - "name": "oauth_sessions", 117 - "columns": { 118 - "key": { 119 - "name": "key", 120 - "type": "text", 121 - "primaryKey": true, 122 - "notNull": true, 123 - "autoincrement": false 124 - }, 125 - "value": { 126 - "name": "value", 127 - "type": "text", 128 - "primaryKey": false, 129 - "notNull": true, 130 - "autoincrement": false 131 - }, 132 - "expires_at": { 133 - "name": "expires_at", 134 - "type": "integer", 135 - "primaryKey": false, 136 - "notNull": false, 137 - "autoincrement": false 138 - } 139 - }, 140 - "indexes": {}, 141 - "foreignKeys": {}, 142 - "compositePrimaryKeys": {}, 143 - "uniqueConstraints": {}, 144 - "checkConstraints": {} 145 - }, 146 - "oauth_states": { 147 - "name": "oauth_states", 148 - "columns": { 149 - "key": { 150 - "name": "key", 151 - "type": "text", 152 - "primaryKey": true, 153 - "notNull": true, 154 - "autoincrement": false 155 - }, 156 - "value": { 157 - "name": "value", 158 - "type": "text", 159 - "primaryKey": false, 160 - "notNull": true, 161 - "autoincrement": false 162 - }, 163 - "expires_at": { 164 - "name": "expires_at", 165 - "type": "integer", 166 - "primaryKey": false, 167 - "notNull": false, 168 - "autoincrement": false 169 - } 170 - }, 171 - "indexes": {}, 172 - "foreignKeys": {}, 173 - "compositePrimaryKeys": {}, 174 - "uniqueConstraints": {}, 175 - "checkConstraints": {} 176 - }, 177 - "subscriptions": { 178 - "name": "subscriptions", 179 - "columns": { 180 - "uri": { 181 - "name": "uri", 182 - "type": "text", 183 - "primaryKey": true, 184 - "notNull": true, 185 - "autoincrement": false 186 - }, 187 - "did": { 188 - "name": "did", 189 - "type": "text", 190 - "primaryKey": false, 191 - "notNull": true, 192 - "autoincrement": false 193 - }, 194 - "rkey": { 195 - "name": "rkey", 196 - "type": "text", 197 - "primaryKey": false, 198 - "notNull": true, 199 - "autoincrement": false 200 - }, 201 - "type": { 202 - "name": "type", 203 - "type": "text", 204 - "primaryKey": false, 205 - "notNull": true, 206 - "autoincrement": false, 207 - "default": "'webhook'" 208 - }, 209 - "lexicon": { 210 - "name": "lexicon", 211 - "type": "text", 212 - "primaryKey": false, 213 - "notNull": true, 214 - "autoincrement": false 215 - }, 216 - "callback_url": { 217 - "name": "callback_url", 218 - "type": "text", 219 - "primaryKey": false, 220 - "notNull": false, 221 - "autoincrement": false 222 - }, 223 - "conditions": { 224 - "name": "conditions", 225 - "type": "text", 226 - "primaryKey": false, 227 - "notNull": true, 228 - "autoincrement": false, 229 - "default": "'[]'" 230 - }, 231 - "secret": { 232 - "name": "secret", 233 - "type": "text", 234 - "primaryKey": false, 235 - "notNull": false, 236 - "autoincrement": false 237 - }, 238 - "target_collection": { 239 - "name": "target_collection", 240 - "type": "text", 241 - "primaryKey": false, 242 - "notNull": false, 243 - "autoincrement": false 244 - }, 245 - "record_template": { 246 - "name": "record_template", 247 - "type": "text", 248 - "primaryKey": false, 249 - "notNull": false, 250 - "autoincrement": false 251 - }, 252 - "active": { 253 - "name": "active", 254 - "type": "integer", 255 - "primaryKey": false, 256 - "notNull": true, 257 - "autoincrement": false, 258 - "default": false 259 - }, 260 - "indexed_at": { 261 - "name": "indexed_at", 262 - "type": "integer", 263 - "primaryKey": false, 264 - "notNull": true, 265 - "autoincrement": false 266 - } 267 - }, 268 - "indexes": {}, 269 - "foreignKeys": {}, 270 - "compositePrimaryKeys": {}, 271 - "uniqueConstraints": {}, 272 - "checkConstraints": {} 273 - }, 274 - "users": { 275 - "name": "users", 276 - "columns": { 277 - "id": { 278 - "name": "id", 279 - "type": "integer", 280 - "primaryKey": true, 281 - "notNull": true, 282 - "autoincrement": true 283 - }, 284 - "did": { 285 - "name": "did", 286 - "type": "text", 287 - "primaryKey": false, 288 - "notNull": true, 289 - "autoincrement": false 290 - }, 291 - "handle": { 292 - "name": "handle", 293 - "type": "text", 294 - "primaryKey": false, 295 - "notNull": true, 296 - "autoincrement": false 297 - }, 298 - "created_at": { 299 - "name": "created_at", 300 - "type": "integer", 301 - "primaryKey": false, 302 - "notNull": true, 303 - "autoincrement": false 304 - } 305 - }, 306 - "indexes": { 307 - "users_did_unique": { 308 - "name": "users_did_unique", 309 - "columns": ["did"], 310 - "isUnique": true 311 - } 312 - }, 313 - "foreignKeys": {}, 314 - "compositePrimaryKeys": {}, 315 - "uniqueConstraints": {}, 316 - "checkConstraints": {} 317 - } 318 - }, 319 - "views": {}, 320 - "enums": {}, 321 - "_meta": { 322 - "schemas": {}, 323 - "tables": {}, 324 - "columns": {} 325 - }, 326 - "internal": { 327 - "indexes": {} 328 - } 329 - }
+2 -9
lib/db/migrations/meta/_journal.json
··· 5 5 { 6 6 "idx": 0, 7 7 "version": "6", 8 - "when": 1775226502108, 9 - "tag": "0000_lethal_titania", 10 - "breakpoints": true 11 - }, 12 - { 13 - "idx": 1, 14 - "version": "6", 15 - "when": 1775479222795, 16 - "tag": "0001_early_kronos", 8 + "when": 1775492878874, 9 + "tag": "0000_tearful_tarot", 17 10 "breakpoints": true 18 11 } 19 12 ]
+17 -5
lib/db/schema.ts
··· 7 7 createdAt: integer("created_at", { mode: "timestamp_ms" }).notNull(), 8 8 }); 9 9 10 + // Action types for subscription actions (stored as JSON) 11 + export type WebhookAction = { 12 + $type: "webhook"; 13 + callbackUrl: string; 14 + secret: string; // instance-local HMAC secret, not stored on PDS 15 + }; 16 + 17 + export type RecordAction = { 18 + $type: "record"; 19 + targetCollection: string; 20 + recordTemplate: string; 21 + }; 22 + 23 + export type Action = WebhookAction | RecordAction; 24 + 10 25 // Local index of app.rglw.subscription records living on user PDS. 11 26 // Source of truth is the PDS; this is a cache for fast Jetstream matching. 12 27 export const subscriptions = sqliteTable("subscriptions", { 13 28 uri: text("uri").primaryKey(), // at://did/app.rglw.subscription/rkey 14 29 did: text("did").notNull(), 15 30 rkey: text("rkey").notNull(), 16 - type: text("type").notNull().default("webhook"), // "webhook" | "record" 17 31 lexicon: text("lexicon").notNull(), // NSID being watched 18 - callbackUrl: text("callback_url"), // webhook only 32 + actions: text("actions", { mode: "json" }).notNull().$type<Action[]>().default([]), 19 33 conditions: text("conditions", { mode: "json" }) 20 34 .notNull() 21 35 .$type<Array<{ field: string; operator: string; value: string }>>() 22 36 .default([]), 23 - secret: text("secret"), // HMAC secret, instance-local, webhook only 24 - targetCollection: text("target_collection"), // record only — NSID to create 25 - recordTemplate: text("record_template"), // record only — JSON template 26 37 active: integer("active", { mode: "boolean" }).notNull().default(false), 27 38 indexedAt: integer("indexed_at", { mode: "timestamp_ms" }).notNull(), 28 39 }); ··· 32 43 subscriptionUri: text("subscription_uri") 33 44 .notNull() 34 45 .references(() => subscriptions.uri, { onDelete: "cascade" }), 46 + actionIndex: integer("action_index").notNull().default(0), 35 47 eventTimeUs: integer("event_time_us").notNull(), 36 48 payload: text("payload"), // JSON, stored for failed deliveries 37 49 statusCode: integer("status_code"),
+13 -12
lib/subscriptions/pds.ts
··· 16 16 return s.padStart(13, "2"); 17 17 } 18 18 19 - type BaseSubscriptionRecord = { 20 - lexicon: string; 21 - conditions: Array<{ field: string; operator: string; value: string }>; 22 - active: boolean; 23 - createdAt: string; 24 - }; 25 - 26 - type WebhookSubscriptionRecord = BaseSubscriptionRecord & { 27 - type?: "webhook"; 19 + type PdsWebhookAction = { 20 + $type: "app.rglw.subscription#webhookAction"; 28 21 callbackUrl: string; 29 22 }; 30 23 31 - type RecordSubscriptionRecord = BaseSubscriptionRecord & { 32 - type: "record"; 24 + type PdsRecordAction = { 25 + $type: "app.rglw.subscription#recordAction"; 33 26 targetCollection: string; 34 27 recordTemplate: string; 35 28 }; 36 29 37 - type SubscriptionRecord = WebhookSubscriptionRecord | RecordSubscriptionRecord; 30 + export type PdsAction = PdsWebhookAction | PdsRecordAction; 31 + 32 + type SubscriptionRecord = { 33 + lexicon: string; 34 + actions: PdsAction[]; 35 + conditions: Array<{ field: string; operator: string; value: string }>; 36 + active: boolean; 37 + createdAt: string; 38 + }; 38 39 39 40 async function pdsCall( 40 41 did: string,
+21 -11
lib/webhooks/dispatcher.ts
··· 1 1 import { db } from "../db/index.js"; 2 - import { deliveryLogs } from "../db/schema.js"; 2 + import { deliveryLogs, type WebhookAction } from "../db/schema.js"; 3 3 import { sign } from "./signer.js"; 4 4 import type { MatchedEvent } from "../jetstream/consumer.js"; 5 5 ··· 75 75 76 76 async function logDelivery( 77 77 subscriptionUri: string, 78 + actionIndex: number, 78 79 eventTimeUs: number, 79 80 payload: string | null, 80 81 statusCode: number, ··· 83 84 ) { 84 85 await db.insert(deliveryLogs).values({ 85 86 subscriptionUri, 87 + actionIndex, 86 88 eventTimeUs, 87 89 payload, 88 90 statusCode, ··· 102 104 103 105 function scheduleRetry( 104 106 subscriptionUri: string, 107 + actionIndex: number, 105 108 callbackUrl: string, 106 109 secret: string, 107 110 eventTimeUs: number, ··· 116 119 117 120 await logDelivery( 118 121 subscriptionUri, 122 + actionIndex, 119 123 eventTimeUs, 120 124 isSuccess(result.statusCode) ? null : body, 121 125 result.statusCode, ··· 124 128 ); 125 129 126 130 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 127 - scheduleRetry(subscriptionUri, callbackUrl, secret, eventTimeUs, body, retryIndex + 1); 131 + scheduleRetry( 132 + subscriptionUri, 133 + actionIndex, 134 + callbackUrl, 135 + secret, 136 + eventTimeUs, 137 + body, 138 + retryIndex + 1, 139 + ); 128 140 } 129 141 } catch (err) { 130 142 console.error("Webhook retry error:", err); ··· 133 145 } 134 146 135 147 /** Deliver a matched event to the subscription's callback URL. */ 136 - export async function dispatch(match: MatchedEvent) { 148 + export async function dispatch(match: MatchedEvent, actionIndex: number) { 137 149 const { subscription, event } = match; 150 + const action = subscription.actions[actionIndex] as WebhookAction; 138 151 const payload = buildPayload(match); 139 152 const body = JSON.stringify(payload); 140 153 141 - const result = await deliver( 142 - subscription.callbackUrl!, 143 - body, 144 - subscription.secret!, 145 - subscription.uri, 146 - ); 154 + const result = await deliver(action.callbackUrl, body, action.secret, subscription.uri); 147 155 148 156 await logDelivery( 149 157 subscription.uri, 158 + actionIndex, 150 159 event.time_us, 151 160 isSuccess(result.statusCode) ? null : body, 152 161 result.statusCode, ··· 158 167 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 159 168 scheduleRetry( 160 169 subscription.uri, 161 - subscription.callbackUrl!, 162 - subscription.secret!, 170 + actionIndex, 171 + action.callbackUrl, 172 + action.secret, 163 173 event.time_us, 164 174 body, 165 175 0,