Webhooks for the AT Protocol airglow.run
atproto atprotocol automation webhook
12
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: better subscriptions and datav fetching steps

Hugo c2747336 66fc590f

+996 -140
-7
app/islands/RecordFormBuilder.css.ts
··· 162 162 boxShadow: `0 0 0 3px ${vars.focus.ring}`, 163 163 }, 164 164 }); 165 - 166 - export const placeholderHelp = style({ 167 - fontSize: fontSize.xs, 168 - color: vars.color.textMuted, 169 - fontFamily: "monospace", 170 - lineHeight: 1.6, 171 - });
-5
app/islands/RecordFormBuilder.tsx
··· 574 574 placeholders={placeholders} 575 575 /> 576 576 ))} 577 - {placeholders.length > 0 && ( 578 - <div class={s.placeholderHelp}> 579 - Available placeholders: {placeholders.map((p) => `{{${p}}}`).join(", ")} 580 - </div> 581 - )} 582 577 </div> 583 578 ); 584 579 }
+61 -8
app/islands/SubscriptionForm.css.ts
··· 1 - import { style } from "@vanilla-extract/css"; 1 + import { globalStyle, style } from "@vanilla-extract/css"; 2 2 import { vars } from "../styles/theme.css.ts"; 3 3 import { space } from "../styles/tokens/spacing.ts"; 4 4 import { fontSize, fontWeight } from "../styles/tokens/typography.ts"; ··· 203 203 }, 204 204 }); 205 205 206 - export const placeholderHelp = style({ 207 - fontSize: fontSize.xs, 208 - color: vars.color.textMuted, 209 - fontFamily: "monospace", 210 - lineHeight: 1.6, 211 - }); 212 - 213 206 export const actionsSection = style({ 214 207 display: "flex", 215 208 flexDirection: "column", ··· 243 236 display: "flex", 244 237 gap: space[2], 245 238 }); 239 + 240 + export const fetchRow = style({ 241 + display: "flex", 242 + gap: space[2], 243 + alignItems: "flex-start", 244 + }); 245 + 246 + export const fetchName = style({ 247 + flex: 1, 248 + }); 249 + 250 + export const fetchUri = style({ 251 + flex: 2, 252 + }); 253 + 254 + export const collapsibleDetails = style({ 255 + border: `1px solid ${vars.color.border}`, 256 + borderRadius: radii.md, 257 + }); 258 + 259 + export const collapsibleSummary = style({ 260 + cursor: "pointer", 261 + display: "flex", 262 + alignItems: "center", 263 + gap: space[2], 264 + padding: `${space[2]} ${space[3]}`, 265 + fontSize: fontSize.sm, 266 + color: vars.color.text, 267 + userSelect: "none", 268 + listStyle: "none", 269 + "::marker": { 270 + display: "none", 271 + }, 272 + "::before": { 273 + content: '"\\25B8"', 274 + fontSize: fontSize.xs, 275 + transition: "transform 0.15s ease", 276 + }, 277 + selectors: { 278 + [`${collapsibleDetails}[open] > &`]: { 279 + borderBlockEnd: `1px solid ${vars.color.border}`, 280 + }, 281 + [`${collapsibleDetails}[open] > &::before`]: { 282 + transform: "rotate(90deg)", 283 + }, 284 + }, 285 + }); 286 + 287 + globalStyle(`${collapsibleSummary}::-webkit-details-marker`, { 288 + display: "none", 289 + }); 290 + 291 + export const collapsibleContent = style({ 292 + padding: `${space[2]} ${space[3]}`, 293 + fontSize: fontSize.xs, 294 + color: vars.color.textSecondary, 295 + fontFamily: "monospace", 296 + lineHeight: 1.6, 297 + wordBreak: "break-word", 298 + });
+146 -51
app/islands/SubscriptionForm.tsx
··· 1 - import { useState, useCallback, useRef } from "hono/jsx"; 1 + import { useState, useCallback, useRef, useMemo } from "hono/jsx"; 2 2 import type { RecordSchema } from "../../lib/lexicons/schema-tree.js"; 3 3 import RecordFormBuilder from "./RecordFormBuilder.js"; 4 4 import * as s from "./SubscriptionForm.css.ts"; ··· 15 15 value: string; 16 16 }; 17 17 18 + type FetchDraft = { name: string; uri: string }; 19 + 18 20 type WebhookDraft = { type: "webhook"; callbackUrl: string }; 19 21 type RecordDraft = { type: "record"; targetCollection: string; recordTemplate: string }; 20 22 type ActionDraft = WebhookDraft | RecordDraft; 21 23 22 - const NSID_RE = /^[a-z][a-z0-9-]*(\.[a-z][a-z0-9-]*){2,}$/; 24 + const NSID_RE = /^[a-z][a-z0-9-]*(\.[a-zA-Z][a-zA-Z0-9-]*){2,}$/; 25 + 26 + const BUILTIN_CONDITION_FIELDS: Field[] = [ 27 + { path: "event.did", type: "string", description: "DID of the repo that emitted the event" }, 28 + { 29 + path: "event.commit.operation", 30 + type: "string", 31 + description: "create, update, or delete", 32 + }, 33 + ]; 23 34 24 35 const BUILTIN_PLACEHOLDERS = [ 25 36 "event.did", ··· 202 213 } 203 214 required 204 215 /> 205 - {placeholders.length > 0 && ( 206 - <div class={s.placeholderHelp}> 207 - Available: {placeholders.map((p) => `{{${p}}}`).join(", ")} 208 - </div> 209 - )} 210 216 </div> 211 217 )} 212 218 </> ··· 224 230 const [fieldsLoading, setFieldsLoading] = useState(false); 225 231 const [fieldsError, setFieldsError] = useState(""); 226 232 const [conditions, setConditions] = useState<Condition[]>([]); 233 + const [fetches, setFetches] = useState<FetchDraft[]>([]); 227 234 const [actions, setActions] = useState<ActionDraft[]>([]); 228 235 const [nsidSuggestions, setNsidSuggestions] = useState<string[]>([]); 229 236 const [submitting, setSubmitting] = useState(false); ··· 317 324 [], 318 325 ); 319 326 327 + const addFetch = useCallback(() => { 328 + setFetches((prev) => [...prev, { name: "", uri: "" }]); 329 + }, []); 330 + 331 + const removeFetch = useCallback((index: number) => { 332 + setFetches((prev) => prev.filter((_, i) => i !== index)); 333 + }, []); 334 + 335 + const updateFetch = useCallback((index: number, key: "name" | "uri", val: string) => { 336 + setFetches((prev) => prev.map((f, i) => (i === index ? { ...f, [key]: val } : f))); 337 + }, []); 338 + 320 339 const addAction = useCallback((type: "webhook" | "record") => { 321 340 if (type === "webhook") { 322 341 setActions((prev) => [...prev, { type: "webhook", callbackUrl: "" }]); ··· 333 352 setActions((prev) => prev.map((a, i) => (i === index ? action : a))); 334 353 }, []); 335 354 355 + const previewPayload = useMemo(() => { 356 + const payload: Record<string, unknown> = { lexicon }; 357 + const filteredFetches = fetches.filter((f) => f.name && f.uri); 358 + if (filteredFetches.length > 0) { 359 + payload.fetches = filteredFetches.map((f) => ({ name: f.name, uri: f.uri })); 360 + } 361 + const filteredConditions = conditions.filter((c) => c.field && c.value); 362 + if (filteredConditions.length > 0) { 363 + payload.conditions = filteredConditions.map((c) => ({ 364 + field: c.field, 365 + operator: c.operator, 366 + value: c.value, 367 + })); 368 + } 369 + payload.actions = actions.map((a) => { 370 + if (a.type === "webhook") return { type: "webhook", callbackUrl: a.callbackUrl }; 371 + return { 372 + type: "record", 373 + targetCollection: a.targetCollection, 374 + recordTemplate: a.recordTemplate, 375 + }; 376 + }); 377 + return JSON.stringify(payload, null, 2); 378 + }, [lexicon, fetches, conditions, actions]); 379 + 336 380 const handleSubmit = useCallback( 337 381 async (e: Event) => { 338 382 e.preventDefault(); 339 383 setError(""); 340 384 setSubmitting(true); 341 385 try { 342 - const payload = { 343 - lexicon, 344 - conditions: conditions 345 - .filter((c) => c.field && c.value) 346 - .map((c) => ({ field: c.field, operator: c.operator, value: c.value })), 347 - actions: actions.map((a) => { 348 - if (a.type === "webhook") { 349 - return { type: "webhook" as const, callbackUrl: a.callbackUrl }; 350 - } 351 - return { 352 - type: "record" as const, 353 - targetCollection: a.targetCollection, 354 - recordTemplate: a.recordTemplate, 355 - }; 356 - }), 357 - }; 358 - 359 386 const res = await fetch("/api/subscriptions", { 360 387 method: "POST", 361 388 headers: { "Content-Type": "application/json" }, 362 - body: JSON.stringify(payload), 389 + body: previewPayload, 363 390 }); 364 391 const data = await res.json(); 365 392 if (!res.ok) { ··· 373 400 setSubmitting(false); 374 401 } 375 402 }, 376 - [lexicon, conditions, actions], 403 + [previewPayload], 377 404 ); 378 405 379 406 const allPlaceholders = [ 380 407 ...BUILTIN_PLACEHOLDERS, 381 408 ...fields.map((f) => `event.commit.record.${f.path}`), 409 + ...fetches 410 + .filter((f) => f.name) 411 + .flatMap((f) => [`${f.name}.uri`, `${f.name}.cid`, `${f.name}.record.*`]), 382 412 ]; 413 + 414 + const conditionFields = [...BUILTIN_CONDITION_FIELDS, ...fields]; 383 415 384 416 return ( 385 417 <form onSubmit={handleSubmit} class={s.form}> ··· 408 440 {fieldsError && <span class={s.errorText}>{fieldsError}</span>} 409 441 </div> 410 442 411 - {fields.length > 0 && ( 443 + {allPlaceholders.length > 0 && ( 444 + <details class={s.collapsibleDetails}> 445 + <summary class={s.collapsibleSummary}>Available placeholders</summary> 446 + <div class={s.collapsibleContent}> 447 + {allPlaceholders.map((p) => `{{${p}}}`).join(", ")} 448 + </div> 449 + </details> 450 + )} 451 + 452 + {NSID_RE.test(lexicon) && ( 412 453 <div class={s.conditionsSection}> 413 454 <div> 414 455 <h3>Conditions</h3> 415 456 <p class={s.hint}> 416 - Filter events by field values. All conditions must match (AND). 457 + Filter events by field values. All conditions must match (AND). Use{" "} 458 + <code>{"{{self}}"}</code> as a value to match your own DID. 417 459 </p> 418 460 </div> 419 461 {conditions.map((cond, i) => ( ··· 427 469 } 428 470 > 429 471 <option value="">Select field...</option> 430 - {fields.map((f) => ( 472 + {conditionFields.map((f) => ( 431 473 <option key={f.path} value={f.path}> 432 474 {f.path} 433 475 </option> 434 476 ))} 435 477 </select> 436 - {cond.field && fields.find((f) => f.path === cond.field)?.description && ( 437 - <span class={s.hint}> 438 - {fields.find((f) => f.path === cond.field)!.description} 439 - </span> 478 + {cond.field && 479 + conditionFields.find((f) => f.path === cond.field)?.description && ( 480 + <span class={s.hint}> 481 + {conditionFields.find((f) => f.path === cond.field)!.description} 482 + </span> 483 + )} 484 + </div> 485 + {cond.field && 486 + conditionFields.find((f) => f.path === cond.field)?.type !== "boolean" && ( 487 + <div class={s.conditionOperator}> 488 + <select 489 + class={s.select} 490 + value={cond.operator} 491 + onChange={(e: Event) => 492 + updateCondition(i, "operator", (e.target as HTMLSelectElement).value) 493 + } 494 + > 495 + <option value="eq">equals</option> 496 + <option value="startsWith">starts with</option> 497 + <option value="endsWith">ends with</option> 498 + <option value="contains">contains</option> 499 + </select> 500 + </div> 440 501 )} 441 - </div> 442 - {cond.field && fields.find((f) => f.path === cond.field)?.type !== "boolean" && ( 443 - <div class={s.conditionOperator}> 444 - <select 445 - class={s.select} 446 - value={cond.operator} 447 - onChange={(e: Event) => 448 - updateCondition(i, "operator", (e.target as HTMLSelectElement).value) 449 - } 450 - > 451 - <option value="eq">equals</option> 452 - <option value="startsWith">starts with</option> 453 - <option value="endsWith">ends with</option> 454 - <option value="contains">contains</option> 455 - </select> 456 - </div> 457 - )} 458 502 <div class={s.conditionValue}> 459 - {fields.find((f) => f.path === cond.field)?.type === "boolean" ? ( 503 + {conditionFields.find((f) => f.path === cond.field)?.type === "boolean" ? ( 460 504 <select 461 505 class={s.select} 462 506 value={cond.value} ··· 491 535 </div> 492 536 )} 493 537 538 + {NSID_RE.test(lexicon) && ( 539 + <div class={s.conditionsSection}> 540 + <div> 541 + <h3>Data Sources</h3> 542 + <p class={s.hint}> 543 + Fetch related PDS records before executing actions. Fetched data is available as 544 + named variables in action templates. 545 + </p> 546 + </div> 547 + {fetches.map((f, i) => ( 548 + <div key={i} class={s.fetchRow}> 549 + <div class={s.fetchName}> 550 + <input 551 + class={s.input} 552 + type="text" 553 + placeholder="Variable name" 554 + value={f.name} 555 + onInput={(e: Event) => 556 + updateFetch(i, "name", (e.target as HTMLInputElement).value) 557 + } 558 + /> 559 + </div> 560 + <div class={s.fetchUri}> 561 + <input 562 + class={s.input} 563 + type="text" 564 + placeholder="AT URI template, e.g. {{event.commit.record.subject}}" 565 + value={f.uri} 566 + onInput={(e: Event) => 567 + updateFetch(i, "uri", (e.target as HTMLInputElement).value) 568 + } 569 + /> 570 + </div> 571 + <button type="button" class={s.removeBtn} onClick={() => removeFetch(i)}> 572 + Remove 573 + </button> 574 + </div> 575 + ))} 576 + <button type="button" class={s.addBtn} onClick={addFetch}> 577 + + Add data source 578 + </button> 579 + </div> 580 + )} 581 + 494 582 <div class={s.actionsSection}> 495 583 <div> 496 584 <h3>Actions</h3> ··· 529 617 </button> 530 618 </div> 531 619 </div> 620 + 621 + {actions.length > 0 && ( 622 + <details class={s.collapsibleDetails}> 623 + <summary class={s.collapsibleSummary}>Preview JSON payload</summary> 624 + <pre class={s.collapsibleContent}>{previewPayload}</pre> 625 + </details> 626 + )} 532 627 533 628 {error && <div class={s.alertError}>{error}</div>} 534 629
+46 -4
app/routes/api/subscriptions/[rkey].ts
··· 8 8 type Action, 9 9 type WebhookAction, 10 10 type RecordAction, 11 + type FetchStep, 11 12 } from "@/db/schema.js"; 12 13 import { config } from "@/config.js"; 13 14 import { isValidNsid, isNsidAllowed } from "@/lexicons/resolver.js"; 14 - import { getRecord, putRecord, deleteRecord, type PdsAction } from "@/subscriptions/pds.js"; 15 + import { 16 + getRecord, 17 + putRecord, 18 + deleteRecord, 19 + type PdsAction, 20 + type PdsFetchStep, 21 + } from "@/subscriptions/pds.js"; 15 22 import { verifyCallback } from "@/subscriptions/verify.js"; 16 - import { validateTemplate } from "@/actions/template.js"; 23 + import { validateTemplate, validateFetchStep } from "@/actions/template.js"; 17 24 import { notifySubscriptionChange } from "@/jetstream/consumer.js"; 18 25 19 26 type ActionInput = ··· 46 53 rkey: sub.rkey, 47 54 lexicon: sub.lexicon, 48 55 actions: sub.actions, 56 + fetches: sub.fetches, 49 57 conditions: sub.conditions, 50 58 active: sub.active, 51 59 indexedAt: sub.indexedAt.getTime(), ··· 70 78 71 79 const body = await c.req.json<{ 72 80 actions?: ActionInput[]; 81 + fetches?: Array<{ name: string; uri: string }>; 73 82 conditions?: Array<{ field: string; operator?: string; value: string }>; 74 83 active?: boolean; 75 84 }>(); ··· 80 89 .map((cond) => ({ 81 90 field: cond.field, 82 91 operator: cond.operator ?? "eq", 83 - value: cond.value, 92 + value: cond.value === "{{self}}" ? user.did : cond.value, 84 93 })) 85 94 : sub.conditions; 86 95 if (conditions.length > 20) { ··· 93 102 } 94 103 const active = body.active ?? sub.active; 95 104 105 + // Resolve fetch steps — full replacement when provided 106 + let localFetches = sub.fetches; 107 + let pdsFetches: PdsFetchStep[] | null = null; 108 + 109 + if (body.fetches) { 110 + if (body.fetches.length > 5) { 111 + return c.json({ error: "Maximum 5 fetch steps allowed" }, 400); 112 + } 113 + const newLocalFetches: FetchStep[] = []; 114 + const newPdsFetches: PdsFetchStep[] = []; 115 + const seenNames = new Set<string>(); 116 + for (const f of body.fetches) { 117 + const stepValidation = validateFetchStep(f.name, f.uri, seenNames); 118 + if (!stepValidation.valid) { 119 + return c.json({ error: stepValidation.error }, 400); 120 + } 121 + seenNames.add(f.name); 122 + newLocalFetches.push({ name: f.name, uri: f.uri }); 123 + newPdsFetches.push({ $type: "app.rglw.subscription#fetchStep", name: f.name, uri: f.uri }); 124 + } 125 + localFetches = newLocalFetches; 126 + pdsFetches = newPdsFetches; 127 + } 128 + const fetchNames = localFetches.map((f) => f.name); 129 + 96 130 // Resolve actions — full replacement when provided 97 131 let localActions = sub.actions; 98 132 let pdsActions: PdsAction[] | null = null; ··· 153 187 if (!input.recordTemplate) { 154 188 return c.json({ error: "recordTemplate is required for record actions" }, 400); 155 189 } 156 - const templateValidation = validateTemplate(input.recordTemplate); 190 + const templateValidation = validateTemplate(input.recordTemplate, fetchNames); 157 191 if (!templateValidation.valid) { 158 192 return c.json({ error: templateValidation.error }, 400); 159 193 } ··· 215 249 await putRecord(user.did, rkey, { 216 250 lexicon: sub.lexicon, 217 251 actions: pdsActions, 252 + fetches: 253 + pdsFetches ?? 254 + localFetches.map((f) => ({ 255 + $type: "app.rglw.subscription#fetchStep" as const, 256 + name: f.name, 257 + uri: f.uri, 258 + })), 218 259 conditions, 219 260 active, 220 261 createdAt, ··· 230 271 .update(subscriptions) 231 272 .set({ 232 273 actions: localActions, 274 + fetches: localFetches, 233 275 conditions, 234 276 active, 235 277 indexedAt: now,
+42 -5
app/routes/api/subscriptions/index.ts
··· 2 2 import { eq } from "drizzle-orm"; 3 3 import { nanoid } from "nanoid"; 4 4 import { db } from "@/db/index.js"; 5 - import { subscriptions, type Action, type WebhookAction, type RecordAction } from "@/db/schema.js"; 5 + import { 6 + subscriptions, 7 + type Action, 8 + type WebhookAction, 9 + type RecordAction, 10 + type FetchStep, 11 + } from "@/db/schema.js"; 6 12 import { config } from "@/config.js"; 7 13 import { isValidNsid, isNsidAllowed } from "@/lexicons/resolver.js"; 8 14 import { verifyCallback } from "@/subscriptions/verify.js"; 9 - import { createRecord, deleteRecord, type PdsAction } from "@/subscriptions/pds.js"; 10 - import { validateTemplate } from "@/actions/template.js"; 15 + import { 16 + createRecord, 17 + deleteRecord, 18 + type PdsAction, 19 + type PdsFetchStep, 20 + } from "@/subscriptions/pds.js"; 21 + import { validateTemplate, validateFetchStep } from "@/actions/template.js"; 11 22 import { notifySubscriptionChange } from "@/jetstream/consumer.js"; 12 23 13 24 type ActionInput = ··· 27 38 rkey: r.rkey, 28 39 lexicon: r.lexicon, 29 40 actions: r.actions, 41 + fetches: r.fetches, 30 42 conditions: r.conditions, 31 43 active: r.active, 32 44 indexedAt: r.indexedAt.getTime(), ··· 39 51 const body = await c.req.json<{ 40 52 lexicon: string; 41 53 actions: ActionInput[]; 54 + fetches?: Array<{ name: string; uri: string }>; 42 55 conditions?: Array<{ field: string; operator?: string; value: string }>; 43 56 active?: boolean; 44 57 }>(); ··· 60 73 } 61 74 62 75 // Normalize and validate conditions 76 + // {{self}} in condition values is resolved to the authenticated user's DID 63 77 const conditions = (body.conditions ?? []) 64 78 .filter((cond) => cond.field && cond.value) 65 79 .map((cond) => ({ 66 80 field: cond.field, 67 81 operator: cond.operator ?? "eq", 68 - value: cond.value, 82 + value: cond.value === "{{self}}" ? user.did : cond.value, 69 83 })); 70 84 if (conditions.length > 20) { 71 85 return c.json({ error: "Maximum 20 conditions allowed" }, 400); ··· 76 90 } 77 91 } 78 92 93 + // Validate and normalize fetch steps 94 + const localFetches: FetchStep[] = []; 95 + const pdsFetches: PdsFetchStep[] = []; 96 + 97 + if (body.fetches && body.fetches.length > 0) { 98 + if (body.fetches.length > 5) { 99 + return c.json({ error: "Maximum 5 fetch steps allowed" }, 400); 100 + } 101 + const seenNames = new Set<string>(); 102 + for (const f of body.fetches) { 103 + const stepValidation = validateFetchStep(f.name, f.uri, seenNames); 104 + if (!stepValidation.valid) { 105 + return c.json({ error: stepValidation.error }, 400); 106 + } 107 + seenNames.add(f.name); 108 + localFetches.push({ name: f.name, uri: f.uri }); 109 + pdsFetches.push({ $type: "app.rglw.subscription#fetchStep", name: f.name, uri: f.uri }); 110 + } 111 + } 112 + const fetchNames = localFetches.map((f) => f.name); 113 + 79 114 // Validate each action and build local + PDS action arrays 80 115 const localActions: Action[] = []; 81 116 const pdsActions: PdsAction[] = []; ··· 119 154 if (!input.recordTemplate) { 120 155 return c.json({ error: "recordTemplate is required for record actions" }, 400); 121 156 } 122 - const templateValidation = validateTemplate(input.recordTemplate); 157 + const templateValidation = validateTemplate(input.recordTemplate, fetchNames); 123 158 if (!templateValidation.valid) { 124 159 return c.json({ error: templateValidation.error }, 400); 125 160 } ··· 149 184 const result = await createRecord(user.did, { 150 185 lexicon: body.lexicon, 151 186 actions: pdsActions, 187 + fetches: pdsFetches.length > 0 ? pdsFetches : undefined, 152 188 conditions, 153 189 active, 154 190 createdAt: now.toISOString(), ··· 168 204 rkey, 169 205 lexicon: body.lexicon, 170 206 actions: localActions, 207 + fetches: localFetches, 171 208 conditions, 172 209 active, 173 210 indexedAt: now,
+15
app/routes/dashboard/subscriptions/[rkey].tsx
··· 111 111 </Card> 112 112 )} 113 113 114 + {sub.fetches.length > 0 && ( 115 + <Card variant="flat"> 116 + <Stack gap={3}> 117 + <h3>Data Sources</h3> 118 + <ul class={plainList}> 119 + {sub.fetches.map((f, i) => ( 120 + <li key={i}> 121 + <InlineCode>{f.name}</InlineCode> &larr; <InlineCode>{f.uri}</InlineCode> 122 + </li> 123 + ))} 124 + </ul> 125 + </Stack> 126 + </Card> 127 + )} 128 + 114 129 <Stack gap={3}> 115 130 <h3>Actions ({sub.actions.length})</h3> 116 131 {sub.actions.map((action, i) => (
+14 -2
app/server.ts
··· 3 3 import { startJetstream } from "@/jetstream/consumer.js"; 4 4 import { dispatch } from "@/webhooks/dispatcher.js"; 5 5 import { executeAction } from "@/actions/executor.js"; 6 + import { resolveFetches } from "@/actions/fetcher.js"; 6 7 7 8 const app = createApp(); 8 9 9 10 // Start Jetstream consumer — routes matched events to action handlers 10 - startJetstream((match) => { 11 + startJetstream(async (match) => { 12 + // Resolve fetch steps once for all actions 13 + let fetchContext: Record<string, { uri: string; cid: string; record: Record<string, unknown> }> = 14 + {}; 15 + if (match.subscription.fetches.length > 0) { 16 + const result = await resolveFetches(match.subscription.fetches, match.event); 17 + fetchContext = result.context; 18 + for (const err of result.errors) { 19 + console.warn(`Fetch "${err.name}" failed for ${match.subscription.uri}: ${err.error}`); 20 + } 21 + } 22 + 11 23 for (let i = 0; i < match.subscription.actions.length; i++) { 12 24 const action = match.subscription.actions[i]!; 13 25 const handler = action.$type === "record" ? executeAction : dispatch; 14 - handler(match, i).catch((err) => { 26 + handler(match, i, fetchContext).catch((err) => { 15 27 console.error(`Action ${i} (${action.$type}) delivery error:`, err); 16 28 }); 17 29 }
+26
lexicons/app/rglw/subscription.json
··· 34 34 "ref": "#condition" 35 35 } 36 36 }, 37 + "fetches": { 38 + "type": "array", 39 + "description": "Records to fetch from PDS before executing actions. Fetched data is available as named variables in action templates.", 40 + "maxLength": 5, 41 + "items": { 42 + "type": "ref", 43 + "ref": "#fetchStep" 44 + } 45 + }, 37 46 "active": { 38 47 "type": "boolean", 39 48 "description": "Whether this subscription is currently active.", ··· 73 82 "type": "string", 74 83 "description": "JSON template with {{placeholder}} expressions resolved from event data.", 75 84 "maxLength": 10240 85 + } 86 + } 87 + }, 88 + "fetchStep": { 89 + "type": "object", 90 + "description": "Fetch a PDS record by resolving an AT URI template against event data.", 91 + "required": ["name", "uri"], 92 + "properties": { 93 + "name": { 94 + "type": "string", 95 + "description": "Variable name for use in templates, e.g. 'parentEntry'.", 96 + "maxLength": 64 97 + }, 98 + "uri": { 99 + "type": "string", 100 + "description": "AT URI template, e.g. '{{event.commit.record.subject}}'.", 101 + "maxLength": 2048 76 102 } 77 103 } 78 104 },
+18 -8
lib/actions/executor.ts
··· 1 1 import { db } from "../db/index.js"; 2 2 import { deliveryLogs, type RecordAction } from "../db/schema.js"; 3 3 import { createArbitraryRecord } from "../subscriptions/pds.js"; 4 - import { renderTemplate } from "./template.js"; 4 + import { renderTemplate, type FetchContext } from "./template.js"; 5 5 import type { MatchedEvent } from "../jetstream/consumer.js"; 6 6 7 7 const RETRY_DELAYS = [5_000, 30_000]; ··· 9 9 async function execute( 10 10 match: MatchedEvent, 11 11 action: RecordAction, 12 + fetchContext?: FetchContext, 12 13 ): Promise<{ statusCode: number; error?: string }> { 13 14 const { subscription, event } = match; 14 15 15 16 let record: Record<string, unknown>; 16 17 try { 17 - record = renderTemplate(action.recordTemplate, event); 18 + record = renderTemplate(action.recordTemplate, event, fetchContext); 18 19 } catch (err) { 19 20 return { 20 21 statusCode: 0, ··· 62 63 return code >= 500 || code === 0; 63 64 } 64 65 65 - function scheduleRetry(match: MatchedEvent, actionIndex: number, retryIndex: number) { 66 + function scheduleRetry( 67 + match: MatchedEvent, 68 + actionIndex: number, 69 + retryIndex: number, 70 + fetchContext?: FetchContext, 71 + ) { 66 72 if (retryIndex >= RETRY_DELAYS.length) return; 67 73 68 74 setTimeout(async () => { 69 75 try { 70 76 const action = match.subscription.actions[actionIndex] as RecordAction; 71 - const result = await execute(match, action); 77 + const result = await execute(match, action, fetchContext); 72 78 const body = JSON.stringify({ 73 79 targetCollection: action.targetCollection, 74 80 recordTemplate: action.recordTemplate, ··· 85 91 ); 86 92 87 93 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 88 - scheduleRetry(match, actionIndex, retryIndex + 1); 94 + scheduleRetry(match, actionIndex, retryIndex + 1, fetchContext); 89 95 } 90 96 } catch (err) { 91 97 console.error("Action retry error:", err); ··· 94 100 } 95 101 96 102 /** Execute a record action for a matched event. */ 97 - export async function executeAction(match: MatchedEvent, actionIndex: number) { 103 + export async function executeAction( 104 + match: MatchedEvent, 105 + actionIndex: number, 106 + fetchContext?: FetchContext, 107 + ) { 98 108 const action = match.subscription.actions[actionIndex] as RecordAction; 99 - const result = await execute(match, action); 109 + const result = await execute(match, action, fetchContext); 100 110 101 111 const body = JSON.stringify({ 102 112 targetCollection: action.targetCollection, ··· 114 124 ); 115 125 116 126 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 117 - scheduleRetry(match, actionIndex, 0); 127 + scheduleRetry(match, actionIndex, 0, fetchContext); 118 128 } 119 129 }
+47
lib/actions/fetcher.ts
··· 1 + import { fetchRecord } from "../pds/resolver.js"; 2 + import type { FetchStep } from "../db/schema.js"; 3 + import type { JetstreamEvent } from "../jetstream/matcher.js"; 4 + import { resolveEventPlaceholder, PLACEHOLDER_RE, type FetchContext } from "./template.js"; 5 + 6 + /** Resolve a fetch URI template against event data. Returns empty string on non-string values. */ 7 + function resolveUri(uriTemplate: string, event: JetstreamEvent): string { 8 + return uriTemplate.replace(PLACEHOLDER_RE, (_, path: string) => { 9 + const value = resolveEventPlaceholder(path.trim(), event); 10 + if (typeof value === "string") return value; 11 + // Non-string values (objects, arrays, undefined) can't form valid AT URIs 12 + return ""; 13 + }); 14 + } 15 + 16 + /** Resolve all fetch steps, returning available context and any errors. */ 17 + export async function resolveFetches( 18 + steps: FetchStep[], 19 + event: JetstreamEvent, 20 + ): Promise<{ context: FetchContext; errors: Array<{ name: string; error: string }> }> { 21 + const context: FetchContext = {}; 22 + const errors: Array<{ name: string; error: string }> = []; 23 + 24 + await Promise.all( 25 + steps.map(async (step) => { 26 + try { 27 + const resolvedUri = resolveUri(step.uri, event); 28 + if (!resolvedUri || !resolvedUri.startsWith("at://")) { 29 + errors.push({ 30 + name: step.name, 31 + error: `Resolved URI is not a valid AT URI: ${resolvedUri}`, 32 + }); 33 + return; 34 + } 35 + const result = await fetchRecord(resolvedUri); 36 + context[step.name] = result; 37 + } catch (err) { 38 + errors.push({ 39 + name: step.name, 40 + error: err instanceof Error ? err.message : String(err), 41 + }); 42 + } 43 + }), 44 + ); 45 + 46 + return { context, errors }; 47 + }
+114 -11
lib/actions/template.ts
··· 1 1 import type { JetstreamEvent } from "../jetstream/matcher.js"; 2 2 3 - const PLACEHOLDER_RE = /\{\{([^}]+)\}\}/g; 3 + export const PLACEHOLDER_RE = /\{\{([^}]+)\}\}/g; 4 + 5 + export type FetchContext = Record< 6 + string, 7 + { uri: string; cid: string; record: Record<string, unknown> } 8 + >; 4 9 5 10 /** 6 - * Resolve a placeholder path against a Jetstream event. 11 + * Resolve a placeholder path against a Jetstream event and optional fetch context. 7 12 * Supports: 8 13 * - "now" → current ISO datetime 9 14 * - "event.did", "event.time_us", "event.kind" 10 15 * - "event.commit.operation", "event.commit.collection", "event.commit.rkey", "event.commit.cid" 11 16 * - "event.commit.record.<dotted.path>" → nested record field 17 + * - "<fetchName>.<dotted.path>" → fetched record data 12 18 */ 13 - function resolvePlaceholder(path: string, event: JetstreamEvent): unknown { 19 + function resolvePlaceholder( 20 + path: string, 21 + event: JetstreamEvent, 22 + fetchContext?: FetchContext, 23 + ): unknown { 14 24 if (path === "now") return new Date().toISOString(); 15 25 26 + // Check fetch context: {{fetchName.record.field}} or {{fetchName.uri}} etc. 27 + if (fetchContext) { 28 + const dotIndex = path.indexOf("."); 29 + const rootKey = dotIndex > 0 ? path.slice(0, dotIndex) : path; 30 + if (rootKey in fetchContext) { 31 + const rest = dotIndex > 0 ? path.slice(dotIndex + 1) : undefined; 32 + let value: unknown = fetchContext[rootKey]; 33 + if (rest) { 34 + for (const key of rest.split(".")) { 35 + if (value == null || typeof value !== "object") return undefined; 36 + value = (value as Record<string, unknown>)[key]; 37 + } 38 + } 39 + return value; 40 + } 41 + } 42 + 16 43 if (!path.startsWith("event.")) return undefined; 17 44 const rest = path.slice("event.".length); 18 45 ··· 25 52 return value; 26 53 } 27 54 55 + /** 56 + * Resolve a placeholder path against only the event (for fetch URI templates). 57 + * Only supports "now" and "event.*" paths. 58 + */ 59 + export function resolveEventPlaceholder(path: string, event: JetstreamEvent): unknown { 60 + return resolvePlaceholder(path, event); 61 + } 62 + 28 63 /** Validate template syntax at creation time. */ 29 64 export function validateTemplate( 30 65 template: string, 66 + fetchNames?: string[], 31 67 ): { valid: true; placeholders: string[] } | { valid: false; error: string } { 32 - // Check that the template is valid JSON (ignoring placeholders) 68 + // Check that the template is valid JSON (ignoring placeholders). 69 + // Collect placeholder paths, then replace all {{...}} with a plain string 70 + // that works in any JSON context (both inside quoted strings and bare values). 33 71 const placeholders: string[] = []; 34 - const stripped = template.replace(PLACEHOLDER_RE, (_, path: string) => { 35 - placeholders.push(path.trim()); 36 - return '"__placeholder__"'; 37 - }); 72 + const stripped = template 73 + .replace(PLACEHOLDER_RE, (_, path: string) => { 74 + placeholders.push(path.trim()); 75 + return "__ph__"; 76 + }) 77 + // Clean up quoted placeholders: "__ph__" → just a valid JSON string value 78 + .replace(/"([^"]*__ph__[^"]*)"/g, '"__placeholder__"') 79 + // Bare placeholders (not inside quotes) need to become valid JSON values 80 + .replace(/__ph__/g, '"__placeholder__"'); 38 81 39 82 try { 40 83 const parsed = JSON.parse(stripped); ··· 49 92 return { valid: false, error: "Template must contain at least one {{placeholder}}" }; 50 93 } 51 94 95 + const fetchSet = new Set(fetchNames ?? []); 96 + for (const p of placeholders) { 97 + if (p === "now" || p.startsWith("event.")) continue; 98 + const root = p.split(".")[0]!; 99 + if (fetchSet.has(root)) continue; 100 + return { valid: false, error: `Invalid placeholder: {{${p}}}` }; 101 + } 102 + 103 + return { valid: true, placeholders }; 104 + } 105 + 106 + const FETCH_NAME_RE = /^[a-zA-Z_][a-zA-Z0-9_]*$/; 107 + const RESERVED_FETCH_NAMES = new Set(["event", "now"]); 108 + 109 + /** Validate a single fetch step name + URI. */ 110 + export function validateFetchStep( 111 + name: string, 112 + uri: string, 113 + seenNames: Set<string>, 114 + ): { valid: true } | { valid: false; error: string } { 115 + if (!name || !FETCH_NAME_RE.test(name)) { 116 + return { valid: false, error: `Invalid fetch name: "${name}". Must be a valid identifier.` }; 117 + } 118 + if (RESERVED_FETCH_NAMES.has(name)) { 119 + return { valid: false, error: `Fetch name "${name}" is reserved` }; 120 + } 121 + if (seenNames.has(name)) { 122 + return { valid: false, error: `Duplicate fetch name: "${name}"` }; 123 + } 124 + const uriResult = validateFetchUri(uri); 125 + if (!uriResult.valid) return uriResult; 126 + return { valid: true }; 127 + } 128 + 129 + /** Validate a fetch step URI template. */ 130 + export function validateFetchUri( 131 + uri: string, 132 + ): { valid: true; placeholders: string[] } | { valid: false; error: string } { 133 + if (!uri.trim()) { 134 + return { valid: false, error: "Fetch URI is required" }; 135 + } 136 + 137 + const placeholders: string[] = []; 138 + uri.replace(PLACEHOLDER_RE, (_, path: string) => { 139 + placeholders.push(path.trim()); 140 + return ""; 141 + }); 142 + 143 + // Must contain placeholders or be a literal AT URI 144 + if (placeholders.length === 0 && !uri.startsWith("at://")) { 145 + return { 146 + valid: false, 147 + error: "Fetch URI must contain {{placeholders}} or be a literal AT URI", 148 + }; 149 + } 150 + 52 151 for (const p of placeholders) { 53 152 if (p !== "now" && !p.startsWith("event.")) { 54 - return { valid: false, error: `Invalid placeholder: {{${p}}}` }; 153 + return { valid: false, error: `Invalid placeholder in fetch URI: {{${p}}}` }; 55 154 } 56 155 } 57 156 ··· 59 158 } 60 159 61 160 /** Render a template by resolving all {{placeholder}} expressions against event data. */ 62 - export function renderTemplate(template: string, event: JetstreamEvent): Record<string, unknown> { 161 + export function renderTemplate( 162 + template: string, 163 + event: JetstreamEvent, 164 + fetchContext?: FetchContext, 165 + ): Record<string, unknown> { 63 166 const rendered = template.replace(PLACEHOLDER_RE, (match, path: string) => { 64 - const value = resolvePlaceholder(path.trim(), event); 167 + const value = resolvePlaceholder(path.trim(), event, fetchContext); 65 168 if (value === undefined) return ""; 66 169 67 170 // If the placeholder is the entire JSON value (between quotes), return raw
+1
lib/db/migrations/0001_wide_solo.sql
··· 1 + ALTER TABLE `subscriptions` ADD `fetches` text DEFAULT '[]' NOT NULL;
+317
lib/db/migrations/meta/0001_snapshot.json
··· 1 + { 2 + "version": "6", 3 + "dialect": "sqlite", 4 + "id": "0bf27488-9d9d-4dba-a902-a94840770439", 5 + "prevId": "b7fc849c-bab7-48f6-8ff2-aff7575d0a7d", 6 + "tables": { 7 + "delivery_logs": { 8 + "name": "delivery_logs", 9 + "columns": { 10 + "id": { 11 + "name": "id", 12 + "type": "integer", 13 + "primaryKey": true, 14 + "notNull": true, 15 + "autoincrement": true 16 + }, 17 + "subscription_uri": { 18 + "name": "subscription_uri", 19 + "type": "text", 20 + "primaryKey": false, 21 + "notNull": true, 22 + "autoincrement": false 23 + }, 24 + "action_index": { 25 + "name": "action_index", 26 + "type": "integer", 27 + "primaryKey": false, 28 + "notNull": true, 29 + "autoincrement": false, 30 + "default": 0 31 + }, 32 + "event_time_us": { 33 + "name": "event_time_us", 34 + "type": "integer", 35 + "primaryKey": false, 36 + "notNull": true, 37 + "autoincrement": false 38 + }, 39 + "payload": { 40 + "name": "payload", 41 + "type": "text", 42 + "primaryKey": false, 43 + "notNull": false, 44 + "autoincrement": false 45 + }, 46 + "status_code": { 47 + "name": "status_code", 48 + "type": "integer", 49 + "primaryKey": false, 50 + "notNull": false, 51 + "autoincrement": false 52 + }, 53 + "error": { 54 + "name": "error", 55 + "type": "text", 56 + "primaryKey": false, 57 + "notNull": false, 58 + "autoincrement": false 59 + }, 60 + "attempt": { 61 + "name": "attempt", 62 + "type": "integer", 63 + "primaryKey": false, 64 + "notNull": true, 65 + "autoincrement": false, 66 + "default": 1 67 + }, 68 + "created_at": { 69 + "name": "created_at", 70 + "type": "integer", 71 + "primaryKey": false, 72 + "notNull": true, 73 + "autoincrement": false 74 + } 75 + }, 76 + "indexes": {}, 77 + "foreignKeys": { 78 + "delivery_logs_subscription_uri_subscriptions_uri_fk": { 79 + "name": "delivery_logs_subscription_uri_subscriptions_uri_fk", 80 + "tableFrom": "delivery_logs", 81 + "tableTo": "subscriptions", 82 + "columnsFrom": ["subscription_uri"], 83 + "columnsTo": ["uri"], 84 + "onDelete": "cascade", 85 + "onUpdate": "no action" 86 + } 87 + }, 88 + "compositePrimaryKeys": {}, 89 + "uniqueConstraints": {}, 90 + "checkConstraints": {} 91 + }, 92 + "lexicon_cache": { 93 + "name": "lexicon_cache", 94 + "columns": { 95 + "nsid": { 96 + "name": "nsid", 97 + "type": "text", 98 + "primaryKey": true, 99 + "notNull": true, 100 + "autoincrement": false 101 + }, 102 + "schema": { 103 + "name": "schema", 104 + "type": "text", 105 + "primaryKey": false, 106 + "notNull": true, 107 + "autoincrement": false 108 + }, 109 + "fetched_at": { 110 + "name": "fetched_at", 111 + "type": "integer", 112 + "primaryKey": false, 113 + "notNull": true, 114 + "autoincrement": false 115 + } 116 + }, 117 + "indexes": {}, 118 + "foreignKeys": {}, 119 + "compositePrimaryKeys": {}, 120 + "uniqueConstraints": {}, 121 + "checkConstraints": {} 122 + }, 123 + "oauth_sessions": { 124 + "name": "oauth_sessions", 125 + "columns": { 126 + "key": { 127 + "name": "key", 128 + "type": "text", 129 + "primaryKey": true, 130 + "notNull": true, 131 + "autoincrement": false 132 + }, 133 + "value": { 134 + "name": "value", 135 + "type": "text", 136 + "primaryKey": false, 137 + "notNull": true, 138 + "autoincrement": false 139 + }, 140 + "expires_at": { 141 + "name": "expires_at", 142 + "type": "integer", 143 + "primaryKey": false, 144 + "notNull": false, 145 + "autoincrement": false 146 + } 147 + }, 148 + "indexes": {}, 149 + "foreignKeys": {}, 150 + "compositePrimaryKeys": {}, 151 + "uniqueConstraints": {}, 152 + "checkConstraints": {} 153 + }, 154 + "oauth_states": { 155 + "name": "oauth_states", 156 + "columns": { 157 + "key": { 158 + "name": "key", 159 + "type": "text", 160 + "primaryKey": true, 161 + "notNull": true, 162 + "autoincrement": false 163 + }, 164 + "value": { 165 + "name": "value", 166 + "type": "text", 167 + "primaryKey": false, 168 + "notNull": true, 169 + "autoincrement": false 170 + }, 171 + "expires_at": { 172 + "name": "expires_at", 173 + "type": "integer", 174 + "primaryKey": false, 175 + "notNull": false, 176 + "autoincrement": false 177 + } 178 + }, 179 + "indexes": {}, 180 + "foreignKeys": {}, 181 + "compositePrimaryKeys": {}, 182 + "uniqueConstraints": {}, 183 + "checkConstraints": {} 184 + }, 185 + "subscriptions": { 186 + "name": "subscriptions", 187 + "columns": { 188 + "uri": { 189 + "name": "uri", 190 + "type": "text", 191 + "primaryKey": true, 192 + "notNull": true, 193 + "autoincrement": false 194 + }, 195 + "did": { 196 + "name": "did", 197 + "type": "text", 198 + "primaryKey": false, 199 + "notNull": true, 200 + "autoincrement": false 201 + }, 202 + "rkey": { 203 + "name": "rkey", 204 + "type": "text", 205 + "primaryKey": false, 206 + "notNull": true, 207 + "autoincrement": false 208 + }, 209 + "lexicon": { 210 + "name": "lexicon", 211 + "type": "text", 212 + "primaryKey": false, 213 + "notNull": true, 214 + "autoincrement": false 215 + }, 216 + "actions": { 217 + "name": "actions", 218 + "type": "text", 219 + "primaryKey": false, 220 + "notNull": true, 221 + "autoincrement": false, 222 + "default": "'[]'" 223 + }, 224 + "fetches": { 225 + "name": "fetches", 226 + "type": "text", 227 + "primaryKey": false, 228 + "notNull": true, 229 + "autoincrement": false, 230 + "default": "'[]'" 231 + }, 232 + "conditions": { 233 + "name": "conditions", 234 + "type": "text", 235 + "primaryKey": false, 236 + "notNull": true, 237 + "autoincrement": false, 238 + "default": "'[]'" 239 + }, 240 + "active": { 241 + "name": "active", 242 + "type": "integer", 243 + "primaryKey": false, 244 + "notNull": true, 245 + "autoincrement": false, 246 + "default": false 247 + }, 248 + "indexed_at": { 249 + "name": "indexed_at", 250 + "type": "integer", 251 + "primaryKey": false, 252 + "notNull": true, 253 + "autoincrement": false 254 + } 255 + }, 256 + "indexes": {}, 257 + "foreignKeys": {}, 258 + "compositePrimaryKeys": {}, 259 + "uniqueConstraints": {}, 260 + "checkConstraints": {} 261 + }, 262 + "users": { 263 + "name": "users", 264 + "columns": { 265 + "id": { 266 + "name": "id", 267 + "type": "integer", 268 + "primaryKey": true, 269 + "notNull": true, 270 + "autoincrement": true 271 + }, 272 + "did": { 273 + "name": "did", 274 + "type": "text", 275 + "primaryKey": false, 276 + "notNull": true, 277 + "autoincrement": false 278 + }, 279 + "handle": { 280 + "name": "handle", 281 + "type": "text", 282 + "primaryKey": false, 283 + "notNull": true, 284 + "autoincrement": false 285 + }, 286 + "created_at": { 287 + "name": "created_at", 288 + "type": "integer", 289 + "primaryKey": false, 290 + "notNull": true, 291 + "autoincrement": false 292 + } 293 + }, 294 + "indexes": { 295 + "users_did_unique": { 296 + "name": "users_did_unique", 297 + "columns": ["did"], 298 + "isUnique": true 299 + } 300 + }, 301 + "foreignKeys": {}, 302 + "compositePrimaryKeys": {}, 303 + "uniqueConstraints": {}, 304 + "checkConstraints": {} 305 + } 306 + }, 307 + "views": {}, 308 + "enums": {}, 309 + "_meta": { 310 + "schemas": {}, 311 + "tables": {}, 312 + "columns": {} 313 + }, 314 + "internal": { 315 + "indexes": {} 316 + } 317 + }
+7
lib/db/migrations/meta/_journal.json
··· 8 8 "when": 1775492878874, 9 9 "tag": "0000_tearful_tarot", 10 10 "breakpoints": true 11 + }, 12 + { 13 + "idx": 1, 14 + "version": "6", 15 + "when": 1775505023185, 16 + "tag": "0001_wide_solo", 17 + "breakpoints": true 11 18 } 12 19 ] 13 20 }
+6
lib/db/schema.ts
··· 22 22 23 23 export type Action = WebhookAction | RecordAction; 24 24 25 + export type FetchStep = { 26 + name: string; 27 + uri: string; // AT URI template 28 + }; 29 + 25 30 // Local index of app.rglw.subscription records living on user PDS. 26 31 // Source of truth is the PDS; this is a cache for fast Jetstream matching. 27 32 export const subscriptions = sqliteTable("subscriptions", { ··· 30 35 rkey: text("rkey").notNull(), 31 36 lexicon: text("lexicon").notNull(), // NSID being watched 32 37 actions: text("actions", { mode: "json" }).notNull().$type<Action[]>().default([]), 38 + fetches: text("fetches", { mode: "json" }).notNull().$type<FetchStep[]>().default([]), 33 39 conditions: text("conditions", { mode: "json" }) 34 40 .notNull() 35 41 .$type<Array<{ field: string; operator: string; value: string }>>()
+4 -2
lib/jetstream/consumer.ts
··· 13 13 event: JetstreamEvent; 14 14 }; 15 15 16 - type EventHandler = (match: MatchedEvent) => void; 16 + type EventHandler = (match: MatchedEvent) => void | Promise<void>; 17 17 18 18 const CURSOR_FLUSH_INTERVAL = 5_000; // persist cursor every 5s 19 19 ··· 154 154 155 155 for (const sub of subs) { 156 156 if (matchConditions(event, sub.conditions)) { 157 - this.handler({ subscription: sub, event }); 157 + void Promise.resolve(this.handler({ subscription: sub, event })).catch((err) => { 158 + console.error("Jetstream: handler error:", err); 159 + }); 158 160 } 159 161 } 160 162 }
+16 -14
lib/jetstream/matcher.ts
··· 22 22 23 23 /** 24 24 * Resolve a dotted field path against an event. 25 - * - "repo" → event.did 26 - * - "record.foo.bar" → event.commit.record.foo.bar 25 + * - "event.did" → event.did 26 + * - "event.commit.operation" → event.commit.operation 27 + * - "status" → event.commit.record.status 28 + * - "record.path" → event.commit.record.record.path 29 + * 30 + * Legacy: "repo" → event.did 27 31 */ 28 32 function resolveField(event: JetstreamEvent, field: string): string | undefined { 29 - if (field === "repo") return event.did; 33 + if (field === "event.did" || field === "repo") return event.did; 34 + if (field === "event.commit.operation") return event.commit?.operation; 30 35 31 - if (field.startsWith("record.") && event.commit?.record) { 32 - const path = field.slice("record.".length); 33 - let value: unknown = event.commit.record; 34 - for (const key of path.split(".")) { 35 - if (value == null || typeof value !== "object") return undefined; 36 - value = (value as Record<string, unknown>)[key]; 37 - } 38 - if (value == null) return undefined; 39 - return typeof value === "string" ? value : JSON.stringify(value); 36 + if (!event.commit?.record) return undefined; 37 + 38 + let value: unknown = event.commit.record; 39 + for (const key of field.split(".")) { 40 + if (value == null || typeof value !== "object") return undefined; 41 + value = (value as Record<string, unknown>)[key]; 40 42 } 41 - 42 - return undefined; 43 + if (value == null) return undefined; 44 + return typeof value === "string" ? value : JSON.stringify(value); 43 45 } 44 46 45 47 function evaluateCondition(event: JetstreamEvent, condition: Condition): boolean {
+32 -20
lib/lexicons/resolver.ts
··· 15 15 raw: Record<string, unknown>; 16 16 }; 17 17 18 - const NSID_RE = /^[a-z][a-z0-9-]*(\.[a-z][a-z0-9-]*){2,}$/; 18 + // Authority segments (first N-1) are DNS labels (lowercase), but the name segments 19 + // may use camelCase in practice (e.g. "site.exosphere.featureRequest.status"). 20 + const NSID_RE = /^[a-z][a-z0-9-]*(\.[a-zA-Z][a-zA-Z0-9-]*){2,}$/; 19 21 20 22 export function isValidNsid(nsid: string): boolean { 21 23 return NSID_RE.test(nsid); ··· 55 57 const fields: LexiconField[] = []; 56 58 57 59 for (const [name, prop] of Object.entries(properties)) { 58 - const path = `${prefix}.${name}`; 60 + const path = prefix ? `${prefix}.${name}` : name; 59 61 60 62 switch (prop.type) { 61 63 case "string": ··· 96 98 } 97 99 98 100 const record = defs.main.record; 99 - const fields = record?.properties ? extractFields(record.properties, defs, "record") : []; 101 + const fields = record?.properties ? extractFields(record.properties, defs, "") : []; 100 102 101 103 return { 102 104 nsid, ··· 121 123 } 122 124 123 125 async function fetchRemoteRaw(nsid: string): Promise<Record<string, unknown> | null> { 124 - const authority = nsidToAuthority(nsid); 125 126 const segments = nsid.split("."); 126 127 127 - const urls = [ 128 - `https://${authority}/lexicons/${segments.join("/")}.json`, 129 - `https://${authority}/.well-known/lexicons/${nsid}.json`, 130 - ]; 128 + // Try the full derived authority, then the base 2-segment authority as fallback. 129 + // For 4+ segment NSIDs like "site.exosphere.featureRequest.status", 130 + // the full authority "featureRequest.exosphere.site" may not exist as a real domain, 131 + // but the base "exosphere.site" does. 132 + const fullAuthority = nsidToAuthority(nsid); 133 + const baseAuthority = segments.slice(0, 2).reverse().join("."); 134 + const authorities = [fullAuthority]; 135 + if (baseAuthority !== fullAuthority) authorities.push(baseAuthority); 131 136 132 - for (const url of urls) { 133 - try { 134 - const res = await fetch(url, { 135 - headers: { Accept: "application/json" }, 136 - signal: AbortSignal.timeout(10_000), 137 - }); 138 - if (!res.ok) continue; 137 + for (const authority of authorities) { 138 + const urls = [ 139 + `https://${authority}/lexicons/${segments.join("/")}.json`, 140 + `https://${authority}/.well-known/lexicons/${nsid}.json`, 141 + ]; 139 142 140 - const json = (await res.json()) as Record<string, unknown>; 141 - if (json.lexicon !== 1 || json.id !== nsid) continue; 143 + for (const url of urls) { 144 + try { 145 + const res = await fetch(url, { 146 + headers: { Accept: "application/json" }, 147 + signal: AbortSignal.timeout(10_000), 148 + }); 149 + if (!res.ok) continue; 142 150 143 - return json; 144 - } catch { 145 - continue; 151 + const json = (await res.json()) as Record<string, unknown>; 152 + if (json.lexicon !== 1 || json.id !== nsid) continue; 153 + 154 + return json; 155 + } catch { 156 + continue; 157 + } 146 158 } 147 159 } 148 160
+67
lib/pds/resolver.ts
··· 1 + export type FetchedRecord = { 2 + uri: string; 3 + cid: string; 4 + record: Record<string, unknown>; 5 + }; 6 + 7 + // AT URI format: at://did/collection/rkey 8 + const AT_URI_RE = /^at:\/\/(did:[^/]+)\/([^/]+)\/([^/]+)$/; 9 + 10 + /** Parse an AT URI into its components. */ 11 + function parseAtUri(uri: string): { did: string; collection: string; rkey: string } { 12 + const match = uri.match(AT_URI_RE); 13 + if (!match) throw new Error(`Invalid AT URI: ${uri}`); 14 + return { did: match[1]!, collection: match[2]!, rkey: match[3]! }; 15 + } 16 + 17 + /** Resolve a DID to its PDS endpoint URL via plc.directory. */ 18 + export async function resolveDid(did: string): Promise<string | null> { 19 + try { 20 + const res = await fetch(`https://plc.directory/${encodeURIComponent(did)}`, { 21 + signal: AbortSignal.timeout(10_000), 22 + }); 23 + if (!res.ok) return null; 24 + const doc = (await res.json()) as { 25 + service?: Array<{ id: string; serviceEndpoint: string }>; 26 + }; 27 + return doc.service?.find((s) => s.id === "#atproto_pds")?.serviceEndpoint ?? null; 28 + } catch { 29 + return null; 30 + } 31 + } 32 + 33 + /** Fetch a public PDS record given an AT URI. */ 34 + export async function fetchRecord(atUri: string): Promise<FetchedRecord> { 35 + const { did, collection, rkey } = parseAtUri(atUri); 36 + 37 + const pdsEndpoint = await resolveDid(did); 38 + if (!pdsEndpoint) { 39 + throw new Error(`Could not resolve PDS for ${did}`); 40 + } 41 + 42 + const url = new URL(`${pdsEndpoint}/xrpc/com.atproto.repo.getRecord`); 43 + url.searchParams.set("repo", did); 44 + url.searchParams.set("collection", collection); 45 + url.searchParams.set("rkey", rkey); 46 + 47 + const res = await fetch(url, { 48 + headers: { Accept: "application/json" }, 49 + signal: AbortSignal.timeout(10_000), 50 + }); 51 + 52 + if (!res.ok) { 53 + throw new Error(`PDS getRecord failed (${res.status}) for ${atUri}`); 54 + } 55 + 56 + const data = (await res.json()) as { 57 + uri?: string; 58 + cid?: string; 59 + value?: Record<string, unknown>; 60 + }; 61 + 62 + return { 63 + uri: data.uri ?? atUri, 64 + cid: data.cid ?? "", 65 + record: data.value ?? {}, 66 + }; 67 + }
+7
lib/subscriptions/pds.ts
··· 29 29 30 30 export type PdsAction = PdsWebhookAction | PdsRecordAction; 31 31 32 + export type PdsFetchStep = { 33 + $type: "app.rglw.subscription#fetchStep"; 34 + name: string; 35 + uri: string; 36 + }; 37 + 32 38 type SubscriptionRecord = { 33 39 lexicon: string; 34 40 actions: PdsAction[]; 41 + fetches?: PdsFetchStep[]; 35 42 conditions: Array<{ field: string; operator: string; value: string }>; 36 43 active: boolean; 37 44 createdAt: string;
+10 -3
lib/webhooks/dispatcher.ts
··· 2 2 import { deliveryLogs, type WebhookAction } from "../db/schema.js"; 3 3 import { sign } from "./signer.js"; 4 4 import type { MatchedEvent } from "../jetstream/consumer.js"; 5 + import type { FetchContext } from "../actions/template.js"; 5 6 6 7 const RETRY_DELAYS = [5_000, 30_000]; // 1st retry after 5s, 2nd after 30s 7 8 ··· 21 22 cid?: string; 22 23 }; 23 24 }; 25 + fetches?: Record<string, { uri: string; cid: string; record: Record<string, unknown> }>; 24 26 }; 25 27 26 - function buildPayload(match: MatchedEvent): WebhookPayload { 28 + function buildPayload(match: MatchedEvent, fetchContext?: FetchContext): WebhookPayload { 27 29 const { subscription, event } = match; 28 30 return { 29 31 subscription: subscription.uri, ··· 43 45 } 44 46 : undefined, 45 47 }, 48 + fetches: fetchContext && Object.keys(fetchContext).length > 0 ? fetchContext : undefined, 46 49 }; 47 50 } 48 51 ··· 145 148 } 146 149 147 150 /** Deliver a matched event to the subscription's callback URL. */ 148 - export async function dispatch(match: MatchedEvent, actionIndex: number) { 151 + export async function dispatch( 152 + match: MatchedEvent, 153 + actionIndex: number, 154 + fetchContext?: FetchContext, 155 + ) { 149 156 const { subscription, event } = match; 150 157 const action = subscription.actions[actionIndex] as WebhookAction; 151 - const payload = buildPayload(match); 158 + const payload = buildPayload(match, fetchContext); 152 159 const body = JSON.stringify(payload); 153 160 154 161 const result = await deliver(action.callbackUrl, body, action.secret, subscription.uri);