Webhooks for the AT Protocol airglow.run
atproto atprotocol automation webhook
12
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: forEach in actions

Hugo 11c183b2 81e82144

+1968 -128
+311 -3
app/islands/AutomationForm.tsx
··· 30 30 unresolved: boolean; 31 31 error: string; 32 32 fields: Field[]; 33 + /** Recursive record schema, used to surface array-typed paths in the 34 + * forEach picker for fetch-rooted iterations. */ 35 + record?: RecordSchema | null; 33 36 }; 34 37 35 38 type Condition = { ··· 37 40 operator: string; 38 41 value: string; 39 42 comment: string; 43 + }; 44 + 45 + type ForEachDraft = { 46 + path: string; 47 + conditions: Condition[]; 40 48 }; 41 49 42 50 type FetchDraft = { ··· 57 65 callbackUrl: string; 58 66 headers: HeaderDraft[]; 59 67 comment: string; 68 + forEach?: ForEachDraft; 60 69 }; 61 70 type RecordDraft = { 62 71 type: "record"; 63 72 targetCollection: string; 64 73 recordTemplate: string; 65 74 comment: string; 75 + forEach?: ForEachDraft; 66 76 }; 67 77 type BskyPostDraft = { 68 78 type: "bsky-post"; ··· 70 80 langsText: string; 71 81 labels: string[]; 72 82 comment: string; 83 + forEach?: ForEachDraft; 73 84 }; 74 85 type PatchRecordDraft = { 75 86 type: "patch-record"; ··· 77 88 baseRecordUri: string; 78 89 recordTemplate: string; 79 90 comment: string; 91 + forEach?: ForEachDraft; 80 92 }; 81 93 type BookmarkDraft = { 82 94 type: "bookmark"; ··· 85 97 bodyValue: string; 86 98 tagsText: string; 87 99 comment: string; 100 + forEach?: ForEachDraft; 88 101 }; 89 102 type FollowDraft = { 90 103 type: "follow"; 91 104 target: FollowTarget; 92 105 subject: string; 93 106 comment: string; 107 + forEach?: ForEachDraft; 94 108 }; 95 109 type ActionDraft = 96 110 | WebhookDraft ··· 124 138 }; 125 139 126 140 const NSID_RE = /^[a-z][a-z0-9-]*(\.[a-zA-Z][a-zA-Z0-9-]*){2,}$/; 141 + 142 + /** Recursively walk a record schema and collect dotted paths whose effective 143 + * type is `array`. Each segment that crosses an array level is rendered as 144 + * `name[]` so the final path is directly usable as a forEach.path value. */ 145 + function collectArrayPaths( 146 + schema: import("../../lib/lexicons/schema-types.js").RecordSchema | null | undefined, 147 + prefix = "event.commit.record", 148 + ): string[] { 149 + if (!schema) return []; 150 + const out: string[] = []; 151 + walkSchemaForArrays(schema.properties, prefix, out, 0); 152 + return out; 153 + } 154 + 155 + function walkSchemaForArrays( 156 + props: Record<string, import("../../lib/lexicons/schema-types.js").SchemaNode> | undefined, 157 + prefix: string, 158 + out: string[], 159 + depth: number, 160 + ): void { 161 + if (!props || depth > 6) return; 162 + for (const [name, node] of Object.entries(props)) { 163 + const path = `${prefix}.${name}`; 164 + walkArrayLevels(node, path, out, depth); 165 + } 166 + } 167 + 168 + function walkArrayLevels( 169 + node: import("../../lib/lexicons/schema-types.js").SchemaNode, 170 + path: string, 171 + out: string[], 172 + depth: number, 173 + ): void { 174 + if (depth > 6) return; 175 + if (node.type === "array") { 176 + const arrayPath = `${path}[]`; 177 + out.push(arrayPath); 178 + // Drill into the element to expose nested arrays (e.g. facets[].features[]). 179 + walkArrayLevels(node.items, arrayPath, out, depth + 1); 180 + } else if (node.type === "object") { 181 + walkSchemaForArrays(node.properties, path, out, depth + 1); 182 + } else if (node.type === "union") { 183 + // Variants are typically objects; merge their properties for path discovery. 184 + for (const variant of node.variants) { 185 + if (variant.node.type === "object") { 186 + walkSchemaForArrays(variant.node.properties, path, out, depth + 1); 187 + } else if (variant.node.type === "array") { 188 + walkArrayLevels(variant.node, path, out, depth + 1); 189 + } 190 + } 191 + } 192 + } 127 193 128 194 const BUILTIN_CONDITION_FIELDS: Field[] = [ 129 195 { path: "event.did", type: "string", description: "DID of the repo that emitted the event" }, ··· 823 889 } 824 890 825 891 // --------------------------------------------------------------------------- 892 + // forEach (run per item) editor 893 + // --------------------------------------------------------------------------- 894 + 895 + const FOR_EACH_OPERATORS = ["eq", "startsWith", "endsWith", "contains", "exists", "not-exists"]; 896 + 897 + function ForEachConfigEditor({ 898 + index, 899 + draft, 900 + onChange, 901 + arrayPathSuggestions, 902 + }: { 903 + index: number; 904 + draft: ForEachDraft | undefined; 905 + onChange: (next: ForEachDraft | undefined) => void; 906 + arrayPathSuggestions: string[]; 907 + }) { 908 + const enabled = draft !== undefined; 909 + const pathId = `action-${index}-foreach-path`; 910 + const pathListId = `action-${index}-foreach-path-suggestions`; 911 + // Default-open the panel when forEach is set on first mount so users land on 912 + // the config they last saved. After mount, the user controls the toggle via 913 + // the native <details> element — we don't re-bind `open` so re-renders don't 914 + // fight their interaction. 915 + const [initialOpen] = useState(enabled); 916 + 917 + const updateCondition = (i: number, key: keyof Condition, val: string) => { 918 + if (!draft) return; 919 + onChange({ 920 + ...draft, 921 + conditions: draft.conditions.map((c, j) => (j === i ? { ...c, [key]: val } : c)), 922 + }); 923 + }; 924 + const addCondition = () => { 925 + if (!draft) return; 926 + onChange({ 927 + ...draft, 928 + conditions: [...draft.conditions, { field: "", operator: "eq", value: "", comment: "" }], 929 + }); 930 + }; 931 + const removeCondition = (i: number) => { 932 + if (!draft) return; 933 + onChange({ ...draft, conditions: draft.conditions.filter((_, j) => j !== i) }); 934 + }; 935 + 936 + return ( 937 + <details class={s.collapsibleDetails} open={initialOpen}> 938 + <summary class={s.collapsibleSummary}> 939 + Run per item{" "} 940 + <span class={s.hint}> 941 + (fan this action out across an array — each match counts toward the rate limit) 942 + </span> 943 + </summary> 944 + <div class={s.collapsibleContent}> 945 + <label class={s.checkboxLabel}> 946 + <input 947 + type="checkbox" 948 + checked={enabled} 949 + onChange={(e: Event) => { 950 + const checked = (e.target as HTMLInputElement).checked; 951 + if (checked) onChange({ path: "", conditions: [] }); 952 + else onChange(undefined); 953 + }} 954 + /> 955 + Enable forEach 956 + </label> 957 + 958 + {enabled && ( 959 + <> 960 + <div class={s.fieldGroup}> 961 + <label class={s.label} for={pathId}> 962 + Array path 963 + </label> 964 + <input 965 + id={pathId} 966 + class={s.input} 967 + type="text" 968 + list={pathListId} 969 + placeholder="e.g. event.commit.record.facets[]" 970 + value={draft.path} 971 + onInput={(e: Event) => 972 + onChange({ ...draft, path: (e.target as HTMLInputElement).value }) 973 + } 974 + autocomplete="off" 975 + /> 976 + <span class={s.hint}> 977 + Dotted path with <code>[]</code> segments to mark array levels. Roots:{" "} 978 + <code>event.*</code>, a fetch name, or an upstream <code>actionN</code>. Must end in{" "} 979 + <code>[]</code>. 980 + </span> 981 + {arrayPathSuggestions.length > 0 && ( 982 + <datalist id={pathListId}> 983 + {arrayPathSuggestions.map((p) => ( 984 + <option key={p} value={p} /> 985 + ))} 986 + </datalist> 987 + )} 988 + </div> 989 + 990 + <div class={s.fieldGroup}> 991 + <span class={s.label}> 992 + Per-item conditions <span class={s.hint}>(optional)</span> 993 + </span> 994 + <span class={s.hint}> 995 + Each item is filtered by these. Field paths are rooted at the item (e.g.{" "} 996 + <code>$type</code> or <code>features.0.uri</code>). The optional <code>item.</code>{" "} 997 + prefix is also accepted. 998 + </span> 999 + {draft.conditions.map((c, i) => ( 1000 + <div key={i} class={s.conditionRow}> 1001 + <div class={s.conditionField}> 1002 + <input 1003 + class={s.input} 1004 + type="text" 1005 + placeholder="e.g. $type" 1006 + value={c.field} 1007 + onInput={(e: Event) => 1008 + updateCondition(i, "field", (e.target as HTMLInputElement).value) 1009 + } 1010 + autocomplete="off" 1011 + aria-label="Item field" 1012 + /> 1013 + </div> 1014 + <select 1015 + class={s.input} 1016 + value={c.operator} 1017 + onChange={(e: Event) => 1018 + updateCondition(i, "operator", (e.target as HTMLSelectElement).value) 1019 + } 1020 + aria-label="Operator" 1021 + > 1022 + {FOR_EACH_OPERATORS.map((op) => ( 1023 + <option key={op} value={op}> 1024 + {op} 1025 + </option> 1026 + ))} 1027 + </select> 1028 + {!VALUE_LESS_CONDITION_OPS.has(c.operator) && ( 1029 + <div class={s.conditionValue}> 1030 + <input 1031 + class={s.input} 1032 + type="text" 1033 + placeholder="e.g. app.bsky.richtext.facet#link" 1034 + value={c.value} 1035 + onInput={(e: Event) => 1036 + updateCondition(i, "value", (e.target as HTMLInputElement).value) 1037 + } 1038 + autocomplete="off" 1039 + aria-label="Value" 1040 + /> 1041 + </div> 1042 + )} 1043 + <button type="button" class={s.removeBtn} onClick={() => removeCondition(i)}> 1044 + Remove 1045 + </button> 1046 + </div> 1047 + ))} 1048 + <button type="button" class={s.addBtn} onClick={addCondition}> 1049 + + Add item condition 1050 + </button> 1051 + </div> 1052 + 1053 + <p class={s.hint}> 1054 + Inside this action, templates can use <code>{`{{item.*}}`}</code> placeholders (e.g.{" "} 1055 + <code>{`{{item.uri}}`}</code>, <code>{`{{item.did}}`}</code>) to read fields off each 1056 + matching item. 1057 + </p> 1058 + </> 1059 + )} 1060 + </div> 1061 + </details> 1062 + ); 1063 + } 1064 + 1065 + // --------------------------------------------------------------------------- 826 1066 // Main form 827 1067 // --------------------------------------------------------------------------- 828 1068 1069 + function toForEachDraft(fe: Action["forEach"]): ForEachDraft | undefined { 1070 + if (!fe) return undefined; 1071 + return { 1072 + path: fe.path, 1073 + conditions: (fe.conditions ?? []).map((c) => ({ 1074 + field: c.field, 1075 + operator: c.operator, 1076 + value: c.value, 1077 + comment: c.comment ?? "", 1078 + })), 1079 + }; 1080 + } 1081 + 829 1082 function toActionDrafts(actions: Action[]): ActionDraft[] { 830 1083 return actions.map((a) => { 1084 + const forEach = toForEachDraft(a.forEach); 1085 + const forEachField = forEach ? { forEach } : {}; 831 1086 if (a.$type === "webhook") { 832 1087 const headers: HeaderDraft[] = a.headers 833 1088 ? Object.entries(a.headers).map(([key, value]) => ({ key, value })) 834 1089 : []; 835 - return { type: "webhook", callbackUrl: a.callbackUrl, headers, comment: a.comment ?? "" }; 1090 + return { 1091 + type: "webhook", 1092 + callbackUrl: a.callbackUrl, 1093 + headers, 1094 + comment: a.comment ?? "", 1095 + ...forEachField, 1096 + }; 836 1097 } 837 1098 if (a.$type === "bsky-post") { 838 1099 return { ··· 841 1102 langsText: (a.langs ?? []).join(", "), 842 1103 labels: a.labels ?? [], 843 1104 comment: a.comment ?? "", 1105 + ...forEachField, 844 1106 }; 845 1107 } 846 1108 if (a.$type === "patch-record") { ··· 850 1112 baseRecordUri: a.baseRecordUri, 851 1113 recordTemplate: a.recordTemplate, 852 1114 comment: a.comment ?? "", 1115 + ...forEachField, 853 1116 }; 854 1117 } 855 1118 if (a.$type === "bookmark") { ··· 860 1123 bodyValue: a.bodyValue ?? "", 861 1124 tagsText: (a.tags ?? []).join(", "), 862 1125 comment: a.comment ?? "", 1126 + ...forEachField, 863 1127 }; 864 1128 } 865 1129 if (a.$type === "follow") { ··· 868 1132 target: a.target, 869 1133 subject: a.subject, 870 1134 comment: a.comment ?? "", 1135 + ...forEachField, 871 1136 }; 872 1137 } 873 1138 return { ··· 875 1140 targetCollection: a.targetCollection, 876 1141 recordTemplate: a.recordTemplate, 877 1142 comment: a.comment ?? "", 1143 + ...forEachField, 878 1144 }; 879 1145 }); 880 1146 } 881 1147 1148 + function forEachToPayload(fe: ForEachDraft | undefined) { 1149 + if (!fe) return undefined; 1150 + const path = fe.path.trim(); 1151 + if (!path) return undefined; 1152 + const completed = fe.conditions.filter(conditionIsComplete).map(conditionToPayload); 1153 + return { 1154 + path, 1155 + ...(completed.length > 0 ? { conditions: completed } : {}), 1156 + }; 1157 + } 1158 + 882 1159 function toFetchDrafts(fetches: FetchStep[]): FetchDraft[] { 883 1160 return fetches.map((f) => { 884 1161 const conditions = (f.conditions ?? []).map((c) => ({ ··· 1009 1286 const [fieldsLoading, setFieldsLoading] = useState(false); 1010 1287 const [fieldsError, setFieldsError] = useState(""); 1011 1288 const [schemaUnresolved, setSchemaUnresolved] = useState(false); 1289 + const [triggerRecordSchema, setTriggerRecordSchema] = useState<RecordSchema | null>(null); 1012 1290 const [fetchSchemas, setFetchSchemas] = useState<Record<string, FetchSchemaState>>({}); 1013 1291 const fetchSchemaDebounceRef = useRef<Map<string, ReturnType<typeof setTimeout>>>(new Map()); 1014 1292 const [conditions, setConditions] = useState<Condition[]>( ··· 1070 1348 setFields([]); 1071 1349 setFieldsError(""); 1072 1350 setSchemaUnresolved(false); 1351 + setTriggerRecordSchema(null); 1073 1352 if (updateUrl) { 1074 1353 const url = new URL(window.location.href); 1075 1354 url.searchParams.delete("lexicon"); ··· 1091 1370 setFieldsLoading(true); 1092 1371 setFieldsError(""); 1093 1372 try { 1094 - const res = await fetch(`/api/lexicons/${encodeURIComponent(nsid)}`); 1373 + const res = await fetch(`/api/lexicons/${encodeURIComponent(nsid)}?schema=record`); 1095 1374 const data = await res.json(); 1096 1375 if (!res.ok) { 1097 1376 if (res.status === 404) { ··· 1102 1381 setFieldsError(data.error || "Failed to load fields"); 1103 1382 } 1104 1383 setFields([]); 1384 + setTriggerRecordSchema(null); 1105 1385 } else { 1106 1386 setSchemaUnresolved(false); 1107 1387 setFields(data.fields || []); 1388 + setTriggerRecordSchema(data.record ?? null); 1108 1389 } 1109 1390 } catch { 1110 1391 setSchemaUnresolved(false); 1111 1392 setFieldsError("Failed to fetch lexicon fields"); 1112 1393 setFields([]); 1394 + setTriggerRecordSchema(null); 1113 1395 } finally { 1114 1396 setFieldsLoading(false); 1115 1397 } ··· 1147 1429 if (existing) clearTimeout(existing); 1148 1430 const handle = setTimeout(async () => { 1149 1431 try { 1150 - const res = await fetch(`/api/lexicons/${encodeURIComponent(nsid)}`); 1432 + const res = await fetch(`/api/lexicons/${encodeURIComponent(nsid)}?schema=record`); 1151 1433 const data = await res.json(); 1152 1434 if (!res.ok) { 1153 1435 setFetchSchemas((prev) => ({ ··· 1167 1449 unresolved: false, 1168 1450 error: "", 1169 1451 fields: data.fields || [], 1452 + record: data.record ?? null, 1170 1453 }, 1171 1454 })); 1172 1455 } ··· 1419 1702 } 1420 1703 payload.actions = actions.map((a) => { 1421 1704 const comment = a.comment ? { comment: a.comment } : {}; 1705 + const forEach = forEachToPayload(a.forEach); 1706 + const forEachField = forEach ? { forEach } : {}; 1422 1707 if (a.type === "webhook") { 1423 1708 const filtered = a.headers.filter((h) => h.key.trim() && h.value.trim()); 1424 1709 const headers = ··· 1429 1714 type: "webhook", 1430 1715 callbackUrl: a.callbackUrl, 1431 1716 ...(headers ? { headers } : {}), 1717 + ...forEachField, 1432 1718 ...comment, 1433 1719 }; 1434 1720 } ··· 1442 1728 textTemplate: a.textTemplate, 1443 1729 ...(langs.length > 0 ? { langs } : {}), 1444 1730 ...(a.labels.length > 0 ? { labels: a.labels } : {}), 1731 + ...forEachField, 1445 1732 ...comment, 1446 1733 }; 1447 1734 } ··· 1451 1738 targetCollection: a.targetCollection, 1452 1739 baseRecordUri: a.baseRecordUri, 1453 1740 recordTemplate: a.recordTemplate, 1741 + ...forEachField, 1454 1742 ...comment, 1455 1743 }; 1456 1744 } ··· 1468 1756 ...(targetTitle ? { targetTitle } : {}), 1469 1757 ...(bodyValue ? { bodyValue } : {}), 1470 1758 ...(tags.length > 0 ? { tags } : {}), 1759 + ...forEachField, 1471 1760 ...comment, 1472 1761 }; 1473 1762 } ··· 1476 1765 type: "follow", 1477 1766 target: a.target, 1478 1767 subject: a.subject, 1768 + ...forEachField, 1479 1769 ...comment, 1480 1770 }; 1481 1771 } ··· 1483 1773 type: "record", 1484 1774 targetCollection: a.targetCollection, 1485 1775 recordTemplate: a.recordTemplate, 1776 + ...forEachField, 1486 1777 ...comment, 1487 1778 }; 1488 1779 }); ··· 1521 1812 }, 1522 1813 [previewPayload, isEdit], 1523 1814 ); 1815 + 1816 + const arrayPathSuggestions = useMemo(() => { 1817 + const out = collectArrayPaths(triggerRecordSchema); 1818 + for (const f of fetches) { 1819 + if (!f.name || !f.collection) continue; 1820 + const schema = fetchSchemas[f.collection]?.record; 1821 + if (!schema) continue; 1822 + out.push(...collectArrayPaths(schema, `${f.name}.record`)); 1823 + } 1824 + return out; 1825 + }, [triggerRecordSchema, fetches, fetchSchemas]); 1524 1826 1525 1827 // Catalogue-tile identity: follows split into follow-<target> so per-tile 1526 1828 // "#N" counters don't collapse the three tiles into one group. ··· 2564 2866 autocomplete="off" 2565 2867 /> 2566 2868 </div> 2869 + <ForEachConfigEditor 2870 + index={i} 2871 + draft={action.forEach} 2872 + onChange={(next) => updateAction(i, { ...action, forEach: next })} 2873 + arrayPathSuggestions={arrayPathSuggestions} 2874 + /> 2567 2875 {isRecordProducingAction(action.type) && ( 2568 2876 <details class={s.collapsibleDetails}> 2569 2877 <summary class={s.collapsibleSummary}>
+37 -2
app/routes/api/automations/[rkey].ts
··· 32 32 validateWebhookHeaders, 33 33 validateBookmarkInput, 34 34 validateFollowInput, 35 + validateForEachInput, 35 36 resolveWantedDids, 36 37 } from "@/actions/validation.js"; 37 38 import { ··· 106 107 ...(a.headers ? { headers: a.headers } : {}), 107 108 verified: a.verified ?? false, 108 109 comment: a.comment, 110 + ...(a.forEach ? { forEach: a.forEach } : {}), 109 111 } 110 112 : a, 111 113 ), ··· 230 232 const actionResultNames: string[] = []; 231 233 232 234 for (const [actionIndex, input] of body.actions.entries()) { 235 + const label = `action ${actionIndex + 1}`; 236 + const forEachResult = validateForEachInput( 237 + input.forEach, 238 + fetchNames, 239 + actionResultNames, 240 + label, 241 + ); 242 + if (!forEachResult.valid) return c.json({ error: forEachResult.error }, 400); 243 + const forEach = forEachResult.value; 244 + const hasItem = forEach !== undefined; 245 + const forEachField = forEach ? { forEach } : {}; 246 + 233 247 if (input.type === "webhook") { 234 248 if (!input.callbackUrl) { 235 249 return c.json({ error: "callbackUrl is required for webhook actions" }, 400); ··· 266 280 secret, 267 281 ...(headers ? { headers } : {}), 268 282 verified: verification.ok, 283 + ...forEachField, 269 284 ...(input.comment ? { comment: input.comment } : {}), 270 285 } satisfies WebhookAction); 271 286 newPdsActions.push({ 272 287 $type: "run.airglow.automation#webhookAction", 273 288 callbackUrl: input.callbackUrl, 289 + ...forEachField, 274 290 ...(input.comment ? { comment: input.comment } : {}), 275 291 }); 276 292 } else if (input.type === "record") { ··· 287 303 input.recordTemplate, 288 304 fetchNames, 289 305 actionResultNames, 306 + hasItem, 290 307 ); 291 308 if (!templateValidation.valid) { 292 309 return c.json({ error: templateValidation.error }, 400); ··· 296 313 $type: "record", 297 314 targetCollection: input.targetCollection, 298 315 recordTemplate: input.recordTemplate, 316 + ...forEachField, 299 317 ...(input.comment ? { comment: input.comment } : {}), 300 318 } satisfies RecordAction); 301 319 newPdsActions.push({ 302 320 $type: "run.airglow.automation#recordAction", 303 321 targetCollection: input.targetCollection, 304 322 recordTemplate: input.recordTemplate, 323 + ...forEachField, 305 324 ...(input.comment ? { comment: input.comment } : {}), 306 325 }); 307 326 actionResultNames.push(`action${actionIndex + 1}`); ··· 313 332 input.textTemplate, 314 333 fetchNames, 315 334 actionResultNames, 335 + hasItem, 316 336 ); 317 337 if (!textValidation.valid) { 318 338 return c.json({ error: textValidation.error }, 400); ··· 341 361 textTemplate: input.textTemplate, 342 362 ...(langs && langs.length > 0 ? { langs } : {}), 343 363 ...(labels && labels.length > 0 ? { labels } : {}), 364 + ...forEachField, 344 365 ...(input.comment ? { comment: input.comment } : {}), 345 366 } satisfies BskyPostAction); 346 367 newPdsActions.push({ ··· 348 369 textTemplate: input.textTemplate, 349 370 ...(langs && langs.length > 0 ? { langs } : {}), 350 371 ...(labels && labels.length > 0 ? { labels } : {}), 372 + ...forEachField, 351 373 ...(input.comment ? { comment: input.comment } : {}), 352 374 }); 353 375 actionResultNames.push(`action${actionIndex + 1}`); ··· 365 387 input.baseRecordUri, 366 388 fetchNames, 367 389 actionResultNames, 390 + hasItem, 368 391 ); 369 392 if (!uriValidation.valid) { 370 393 return c.json({ error: uriValidation.error }, 400); ··· 376 399 input.recordTemplate, 377 400 fetchNames, 378 401 actionResultNames, 402 + hasItem, 379 403 ); 380 404 if (!templateValidation.valid) { 381 405 return c.json({ error: templateValidation.error }, 400); ··· 386 410 targetCollection: input.targetCollection, 387 411 baseRecordUri: input.baseRecordUri, 388 412 recordTemplate: input.recordTemplate, 413 + ...forEachField, 389 414 ...(input.comment ? { comment: input.comment } : {}), 390 415 } satisfies PatchRecordAction); 391 416 newPdsActions.push({ ··· 393 418 targetCollection: input.targetCollection, 394 419 baseRecordUri: input.baseRecordUri, 395 420 recordTemplate: input.recordTemplate, 421 + ...forEachField, 396 422 ...(input.comment ? { comment: input.comment } : {}), 397 423 }); 398 424 actionResultNames.push(`action${actionIndex + 1}`); 399 425 } else if (input.type === "bookmark") { 400 - const bookmarkValidation = validateBookmarkInput(input, fetchNames, actionResultNames); 426 + const bookmarkValidation = validateBookmarkInput( 427 + input, 428 + fetchNames, 429 + actionResultNames, 430 + hasItem, 431 + ); 401 432 if (!bookmarkValidation.valid) { 402 433 return c.json({ error: bookmarkValidation.error }, 400); 403 434 } ··· 412 443 ...(targetTitle ? { targetTitle } : {}), 413 444 ...(bodyValue ? { bodyValue } : {}), 414 445 ...(tags ? { tags } : {}), 446 + ...forEachField, 415 447 ...(input.comment ? { comment: input.comment } : {}), 416 448 } satisfies BookmarkAction); 417 449 newPdsActions.push({ ··· 420 452 ...(targetTitle ? { targetTitle } : {}), 421 453 ...(bodyValue ? { bodyValue } : {}), 422 454 ...(tags ? { tags } : {}), 455 + ...forEachField, 423 456 ...(input.comment ? { comment: input.comment } : {}), 424 457 }); 425 458 actionResultNames.push(`action${actionIndex + 1}`); 426 459 } else if (input.type === "follow") { 427 - const followValidation = validateFollowInput(input, fetchNames, actionResultNames); 460 + const followValidation = validateFollowInput(input, fetchNames, actionResultNames, hasItem); 428 461 if (!followValidation.valid) { 429 462 return c.json({ error: followValidation.error }, 400); 430 463 } ··· 433 466 $type: "follow", 434 467 target: input.target, 435 468 subject: input.subject, 469 + ...forEachField, 436 470 ...(input.comment ? { comment: input.comment } : {}), 437 471 } satisfies FollowAction); 438 472 newPdsActions.push({ 439 473 $type: "run.airglow.automation#followAction", 440 474 target: input.target, 441 475 subject: input.subject, 476 + ...forEachField, 442 477 ...(input.comment ? { comment: input.comment } : {}), 443 478 }); 444 479 actionResultNames.push(`action${actionIndex + 1}`);
+32 -2
app/routes/api/automations/index.ts
··· 30 30 validateWebhookHeaders, 31 31 validateBookmarkInput, 32 32 validateFollowInput, 33 + validateForEachInput, 33 34 resolveWantedDids, 34 35 } from "@/actions/validation.js"; 35 36 import { ··· 64 65 ...(a.headers ? { headers: a.headers } : {}), 65 66 verified: a.verified ?? false, 66 67 comment: a.comment, 68 + ...(a.forEach ? { forEach: a.forEach } : {}), 67 69 } 68 70 : a, 69 71 ), ··· 158 160 const actionResultNames: string[] = []; 159 161 160 162 for (const [actionIndex, input] of body.actions.entries()) { 163 + const label = `action ${actionIndex + 1}`; 164 + const forEachResult = validateForEachInput(input.forEach, fetchNames, actionResultNames, label); 165 + if (!forEachResult.valid) return c.json({ error: forEachResult.error }, 400); 166 + const forEach = forEachResult.value; 167 + const hasItem = forEach !== undefined; 168 + const forEachField = forEach ? { forEach } : {}; 169 + 161 170 if (input.type === "webhook") { 162 171 if (!input.callbackUrl) { 163 172 return c.json({ error: "callbackUrl is required for webhook actions" }, 400); ··· 188 197 secret, 189 198 ...(headers ? { headers } : {}), 190 199 verified: verification.ok, 200 + ...forEachField, 191 201 ...(input.comment ? { comment: input.comment } : {}), 192 202 } satisfies WebhookAction); 193 203 pdsActions.push({ 194 204 $type: "run.airglow.automation#webhookAction", 195 205 callbackUrl: input.callbackUrl, 206 + ...forEachField, 196 207 ...(input.comment ? { comment: input.comment } : {}), 197 208 }); 198 209 } else if (input.type === "record") { ··· 209 220 input.recordTemplate, 210 221 fetchNames, 211 222 actionResultNames, 223 + hasItem, 212 224 ); 213 225 if (!templateValidation.valid) { 214 226 return c.json({ error: templateValidation.error }, 400); ··· 218 230 $type: "record", 219 231 targetCollection: input.targetCollection, 220 232 recordTemplate: input.recordTemplate, 233 + ...forEachField, 221 234 ...(input.comment ? { comment: input.comment } : {}), 222 235 } satisfies RecordAction); 223 236 pdsActions.push({ 224 237 $type: "run.airglow.automation#recordAction", 225 238 targetCollection: input.targetCollection, 226 239 recordTemplate: input.recordTemplate, 240 + ...forEachField, 227 241 ...(input.comment ? { comment: input.comment } : {}), 228 242 }); 229 243 actionResultNames.push(`action${actionIndex + 1}`); ··· 235 249 input.textTemplate, 236 250 fetchNames, 237 251 actionResultNames, 252 + hasItem, 238 253 ); 239 254 if (!textValidation.valid) { 240 255 return c.json({ error: textValidation.error }, 400); ··· 263 278 textTemplate: input.textTemplate, 264 279 ...(langs && langs.length > 0 ? { langs } : {}), 265 280 ...(labels && labels.length > 0 ? { labels } : {}), 281 + ...forEachField, 266 282 ...(input.comment ? { comment: input.comment } : {}), 267 283 } satisfies BskyPostAction); 268 284 pdsActions.push({ ··· 270 286 textTemplate: input.textTemplate, 271 287 ...(langs && langs.length > 0 ? { langs } : {}), 272 288 ...(labels && labels.length > 0 ? { labels } : {}), 289 + ...forEachField, 273 290 ...(input.comment ? { comment: input.comment } : {}), 274 291 }); 275 292 actionResultNames.push(`action${actionIndex + 1}`); ··· 287 304 input.baseRecordUri, 288 305 fetchNames, 289 306 actionResultNames, 307 + hasItem, 290 308 ); 291 309 if (!uriValidation.valid) { 292 310 return c.json({ error: uriValidation.error }, 400); ··· 298 316 input.recordTemplate, 299 317 fetchNames, 300 318 actionResultNames, 319 + hasItem, 301 320 ); 302 321 if (!templateValidation.valid) { 303 322 return c.json({ error: templateValidation.error }, 400); ··· 308 327 targetCollection: input.targetCollection, 309 328 baseRecordUri: input.baseRecordUri, 310 329 recordTemplate: input.recordTemplate, 330 + ...forEachField, 311 331 ...(input.comment ? { comment: input.comment } : {}), 312 332 } satisfies PatchRecordAction); 313 333 pdsActions.push({ ··· 315 335 targetCollection: input.targetCollection, 316 336 baseRecordUri: input.baseRecordUri, 317 337 recordTemplate: input.recordTemplate, 338 + ...forEachField, 318 339 ...(input.comment ? { comment: input.comment } : {}), 319 340 }); 320 341 actionResultNames.push(`action${actionIndex + 1}`); 321 342 } else if (input.type === "bookmark") { 322 - const bookmarkValidation = validateBookmarkInput(input, fetchNames, actionResultNames); 343 + const bookmarkValidation = validateBookmarkInput( 344 + input, 345 + fetchNames, 346 + actionResultNames, 347 + hasItem, 348 + ); 323 349 if (!bookmarkValidation.valid) { 324 350 return c.json({ error: bookmarkValidation.error }, 400); 325 351 } ··· 334 360 ...(targetTitle ? { targetTitle } : {}), 335 361 ...(bodyValue ? { bodyValue } : {}), 336 362 ...(tags ? { tags } : {}), 363 + ...forEachField, 337 364 ...(input.comment ? { comment: input.comment } : {}), 338 365 } satisfies BookmarkAction); 339 366 pdsActions.push({ ··· 342 369 ...(targetTitle ? { targetTitle } : {}), 343 370 ...(bodyValue ? { bodyValue } : {}), 344 371 ...(tags ? { tags } : {}), 372 + ...forEachField, 345 373 ...(input.comment ? { comment: input.comment } : {}), 346 374 }); 347 375 actionResultNames.push(`action${actionIndex + 1}`); 348 376 } else if (input.type === "follow") { 349 - const followValidation = validateFollowInput(input, fetchNames, actionResultNames); 377 + const followValidation = validateFollowInput(input, fetchNames, actionResultNames, hasItem); 350 378 if (!followValidation.valid) { 351 379 return c.json({ error: followValidation.error }, 400); 352 380 } ··· 355 383 $type: "follow", 356 384 target: input.target, 357 385 subject: input.subject, 386 + ...forEachField, 358 387 ...(input.comment ? { comment: input.comment } : {}), 359 388 } satisfies FollowAction); 360 389 pdsActions.push({ 361 390 $type: "run.airglow.automation#followAction", 362 391 target: input.target, 363 392 subject: input.subject, 393 + ...forEachField, 364 394 ...(input.comment ? { comment: input.comment } : {}), 365 395 }); 366 396 actionResultNames.push(`action${actionIndex + 1}`);
+45
lexicons/run/airglow/automation.json
··· 108 108 "description": "URL to receive webhook POST requests.", 109 109 "maxLength": 2048 110 110 }, 111 + "forEach": { 112 + "type": "ref", 113 + "ref": "#forEachConfig" 114 + }, 111 115 "comment": { 112 116 "type": "string", 113 117 "description": "Optional user note about this action.", ··· 129 133 "type": "string", 130 134 "description": "JSON template with {{placeholder}} expressions resolved from event data.", 131 135 "maxLength": 10240 136 + }, 137 + "forEach": { 138 + "type": "ref", 139 + "ref": "#forEachConfig" 132 140 }, 133 141 "comment": { 134 142 "type": "string", ··· 279 287 "maxLength": 32 280 288 } 281 289 }, 290 + "forEach": { 291 + "type": "ref", 292 + "ref": "#forEachConfig" 293 + }, 282 294 "comment": { 283 295 "type": "string", 284 296 "description": "Optional user note about this action.", ··· 305 317 "type": "string", 306 318 "description": "JSON template with {{placeholder}} expressions. Fields are shallow-merged on top of the fetched base record.", 307 319 "maxLength": 10240 320 + }, 321 + "forEach": { 322 + "type": "ref", 323 + "ref": "#forEachConfig" 308 324 }, 309 325 "comment": { 310 326 "type": "string", ··· 342 358 "maxLength": 64 343 359 } 344 360 }, 361 + "forEach": { 362 + "type": "ref", 363 + "ref": "#forEachConfig" 364 + }, 345 365 "comment": { 346 366 "type": "string", 347 367 "description": "Optional user note about this action.", ··· 365 385 "description": "DID of the account to follow. Supports {{placeholders}}; the runtime enforces the rendered value matches the DID format before writing the record.", 366 386 "maxLength": 512 367 387 }, 388 + "forEach": { 389 + "type": "ref", 390 + "ref": "#forEachConfig" 391 + }, 368 392 "comment": { 369 393 "type": "string", 370 394 "description": "Optional user note about this action.", 371 395 "maxLength": 512 396 + } 397 + } 398 + }, 399 + "forEachConfig": { 400 + "type": "object", 401 + "description": "Per-action fan-out: when set, the action runs once per matching item resolved from `path`. The path is a dotted expression where `[]` segments mark array levels to flat-map (e.g. 'event.commit.record.facets[].features[]'). Inside a forEach action, templates may reference `{{item.*}}` to access fields on the current item.", 402 + "required": ["path"], 403 + "properties": { 404 + "path": { 405 + "type": "string", 406 + "description": "Dotted path with `[]` segments to mark array levels. Must end in `[]`. Roots: `event.*`, a declared fetch name, or an upstream action result (action1, action2, …).", 407 + "maxLength": 512 408 + }, 409 + "conditions": { 410 + "type": "array", 411 + "description": "Conditions evaluated against each item. Items that fail any condition are skipped without firing the action.", 412 + "maxLength": 10, 413 + "items": { 414 + "type": "ref", 415 + "ref": "#condition" 416 + } 372 417 } 373 418 } 374 419 },
+16 -8
lib/actions/bookmark.ts
··· 26 26 match: MatchedEvent, 27 27 action: BookmarkAction, 28 28 fetchContext?: FetchContext, 29 + item?: unknown, 29 30 ): Promise<Record<string, unknown>> { 30 31 const { automation, event } = match; 31 32 ··· 36 37 event, 37 38 fetchContext, 38 39 automation, 40 + item, 39 41 ); 40 42 41 43 const targetSource = await renderTextTemplate( ··· 43 45 event, 44 46 fetchContext, 45 47 automation, 48 + item, 46 49 ); 47 50 if (!targetSource.trim()) { 48 51 throw new Error("targetSource rendered to an empty string"); ··· 51 54 const sourceHash = computeSourceHash(targetSource); 52 55 53 56 const title = action.targetTitle 54 - ? (await renderTextTemplate(action.targetTitle, event, fetchContext, automation)).trim() 57 + ? (await renderTextTemplate(action.targetTitle, event, fetchContext, automation, item)).trim() 55 58 : ""; 56 59 const body = action.bodyValue 57 - ? (await renderTextTemplate(action.bodyValue, event, fetchContext, automation)).trim() 60 + ? (await renderTextTemplate(action.bodyValue, event, fetchContext, automation, item)).trim() 58 61 : ""; 59 62 60 63 const tags: string[] = []; 61 64 if (action.tags) { 62 65 for (const tag of action.tags) { 63 - const rendered = (await renderTextTemplate(tag, event, fetchContext, automation)).trim(); 66 + const rendered = ( 67 + await renderTextTemplate(tag, event, fetchContext, automation, item) 68 + ).trim(); 64 69 if (rendered) tags.push(rendered); 65 70 } 66 71 } ··· 94 99 match: MatchedEvent, 95 100 action: BookmarkAction, 96 101 fetchContext?: FetchContext, 102 + item?: unknown, 97 103 ): Promise<ActionResult> { 98 104 const { automation } = match; 99 105 100 106 let record: Record<string, unknown>; 101 107 try { 102 - record = await buildRecord(match, action, fetchContext); 108 + record = await buildRecord(match, action, fetchContext, item); 103 109 } catch (err) { 104 110 return { 105 111 statusCode: 0, ··· 132 138 actionIndex: number, 133 139 retryIndex: number, 134 140 fetchContext?: FetchContext, 141 + item?: unknown, 135 142 ) { 136 143 if (retryIndex >= RETRY_DELAYS.length) return; 137 144 138 145 setTimeout(async () => { 139 146 try { 140 147 const action = match.automation.actions[actionIndex] as BookmarkAction; 141 - const result = await execute(match, action, fetchContext); 148 + const result = await execute(match, action, fetchContext, item); 142 149 const body = actionPayload(action); 143 150 144 151 await logDelivery( ··· 152 159 ); 153 160 154 161 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 155 - scheduleRetry(match, actionIndex, retryIndex + 1, fetchContext); 162 + scheduleRetry(match, actionIndex, retryIndex + 1, fetchContext, item); 156 163 } 157 164 } catch (err) { 158 165 console.error("Bookmark retry error:", err); ··· 165 172 match: MatchedEvent, 166 173 actionIndex: number, 167 174 fetchContext?: FetchContext, 175 + item?: unknown, 168 176 ): Promise<ActionResult> { 169 177 const action = match.automation.actions[actionIndex] as BookmarkAction; 170 - const result = await execute(match, action, fetchContext); 178 + const result = await execute(match, action, fetchContext, item); 171 179 const body = actionPayload(action); 172 180 173 181 await logDelivery( ··· 181 189 ); 182 190 183 191 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 184 - scheduleRetry(match, actionIndex, 0, fetchContext); 192 + scheduleRetry(match, actionIndex, 0, fetchContext, item); 185 193 } 186 194 187 195 return result;
+8 -5
lib/actions/bsky-post.ts
··· 12 12 match: MatchedEvent, 13 13 action: BskyPostAction, 14 14 fetchContext?: FetchContext, 15 + item?: unknown, 15 16 ): Promise<ActionResult> { 16 17 const { automation, event } = match; 17 18 18 19 let text: string; 19 20 try { 20 - text = await renderTextTemplate(action.textTemplate, event, fetchContext, automation); 21 + text = await renderTextTemplate(action.textTemplate, event, fetchContext, automation, item); 21 22 } catch (err) { 22 23 return { 23 24 statusCode: 0, ··· 69 70 actionIndex: number, 70 71 retryIndex: number, 71 72 fetchContext?: FetchContext, 73 + item?: unknown, 72 74 ) { 73 75 if (retryIndex >= RETRY_DELAYS.length) return; 74 76 75 77 setTimeout(async () => { 76 78 try { 77 79 const action = match.automation.actions[actionIndex] as BskyPostAction; 78 - const result = await execute(match, action, fetchContext); 80 + const result = await execute(match, action, fetchContext, item); 79 81 const body = JSON.stringify({ textTemplate: action.textTemplate }); 80 82 81 83 await logDelivery( ··· 89 91 ); 90 92 91 93 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 92 - scheduleRetry(match, actionIndex, retryIndex + 1, fetchContext); 94 + scheduleRetry(match, actionIndex, retryIndex + 1, fetchContext, item); 93 95 } 94 96 } catch (err) { 95 97 console.error("Bsky-post retry error:", err); ··· 102 104 match: MatchedEvent, 103 105 actionIndex: number, 104 106 fetchContext?: FetchContext, 107 + item?: unknown, 105 108 ): Promise<ActionResult> { 106 109 const action = match.automation.actions[actionIndex] as BskyPostAction; 107 - const result = await execute(match, action, fetchContext); 110 + const result = await execute(match, action, fetchContext, item); 108 111 109 112 const body = JSON.stringify({ textTemplate: action.textTemplate }); 110 113 ··· 119 122 ); 120 123 121 124 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 122 - scheduleRetry(match, actionIndex, 0, fetchContext); 125 + scheduleRetry(match, actionIndex, 0, fetchContext, item); 123 126 } 124 127 125 128 return result;
+8 -5
lib/actions/executor.ts
··· 22 22 match: MatchedEvent, 23 23 action: RecordAction, 24 24 fetchContext?: FetchContext, 25 + item?: unknown, 25 26 ): Promise<ActionResult> { 26 27 const { automation, event } = match; 27 28 28 29 let record: Record<string, unknown>; 29 30 try { 30 - record = await renderTemplate(action.recordTemplate, event, fetchContext, automation); 31 + record = await renderTemplate(action.recordTemplate, event, fetchContext, automation, item); 31 32 } catch (err) { 32 33 return { 33 34 statusCode: 0, ··· 51 52 actionIndex: number, 52 53 retryIndex: number, 53 54 fetchContext?: FetchContext, 55 + item?: unknown, 54 56 ) { 55 57 if (retryIndex >= RETRY_DELAYS.length) return; 56 58 57 59 setTimeout(async () => { 58 60 try { 59 61 const action = match.automation.actions[actionIndex] as RecordAction; 60 - const result = await execute(match, action, fetchContext); 62 + const result = await execute(match, action, fetchContext, item); 61 63 const body = JSON.stringify({ 62 64 targetCollection: action.targetCollection, 63 65 recordTemplate: action.recordTemplate, ··· 74 76 ); 75 77 76 78 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 77 - scheduleRetry(match, actionIndex, retryIndex + 1, fetchContext); 79 + scheduleRetry(match, actionIndex, retryIndex + 1, fetchContext, item); 78 80 } 79 81 } catch (err) { 80 82 console.error("Action retry error:", err); ··· 87 89 match: MatchedEvent, 88 90 actionIndex: number, 89 91 fetchContext?: FetchContext, 92 + item?: unknown, 90 93 ): Promise<ActionResult> { 91 94 const action = match.automation.actions[actionIndex] as RecordAction; 92 - const result = await execute(match, action, fetchContext); 95 + const result = await execute(match, action, fetchContext, item); 93 96 94 97 const body = JSON.stringify({ 95 98 targetCollection: action.targetCollection, ··· 107 110 ); 108 111 109 112 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 110 - scheduleRetry(match, actionIndex, 0, fetchContext); 113 + scheduleRetry(match, actionIndex, 0, fetchContext, item); 111 114 } 112 115 113 116 return result;
+10 -5
lib/actions/follow.ts
··· 90 90 match: MatchedEvent, 91 91 action: FollowAction, 92 92 fetchContext?: FetchContext, 93 + item?: unknown, 93 94 ): Promise<ActionResult> { 94 95 const { automation, event } = match; 95 96 96 97 let subject: string; 97 98 try { 98 - subject = (await renderTextTemplate(action.subject, event, fetchContext, automation)).trim(); 99 + subject = ( 100 + await renderTextTemplate(action.subject, event, fetchContext, automation, item) 101 + ).trim(); 99 102 } catch (err) { 100 103 // 400: deterministic failure. Retrying won't re-render differently. 101 104 return { ··· 150 153 actionIndex: number, 151 154 retryIndex: number, 152 155 fetchContext?: FetchContext, 156 + item?: unknown, 153 157 ) { 154 158 if (retryIndex >= RETRY_DELAYS.length) return; 155 159 156 160 setTimeout(async () => { 157 161 try { 158 162 const action = match.automation.actions[actionIndex] as FollowAction; 159 - const result = await execute(match, action, fetchContext); 163 + const result = await execute(match, action, fetchContext, item); 160 164 const body = actionPayload(action); 161 165 162 166 await logDelivery( ··· 171 175 ); 172 176 173 177 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 174 - scheduleRetry(match, actionIndex, retryIndex + 1, fetchContext); 178 + scheduleRetry(match, actionIndex, retryIndex + 1, fetchContext, item); 175 179 } 176 180 } catch (err) { 177 181 console.error("Follow retry error:", err); ··· 184 188 match: MatchedEvent, 185 189 actionIndex: number, 186 190 fetchContext?: FetchContext, 191 + item?: unknown, 187 192 ): Promise<ActionResult> { 188 193 const action = match.automation.actions[actionIndex] as FollowAction; 189 - const result = await execute(match, action, fetchContext); 194 + const result = await execute(match, action, fetchContext, item); 190 195 const body = actionPayload(action); 191 196 192 197 await logDelivery( ··· 201 206 ); 202 207 203 208 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 204 - scheduleRetry(match, actionIndex, 0, fetchContext); 209 + scheduleRetry(match, actionIndex, 0, fetchContext, item); 205 210 } 206 211 207 212 return result;
+106
lib/actions/patch-record.test.ts
··· 1 + import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; 2 + 3 + vi.mock("@/db/index.js", async () => { 4 + const { createTestDb } = await import("../test/db.js"); 5 + return { db: createTestDb() }; 6 + }); 7 + 8 + vi.mock("@/automations/pds.js", () => ({ 9 + patchArbitraryRecord: vi.fn(), 10 + })); 11 + 12 + vi.mock("@/pds/resolver.js", async (importOriginal) => { 13 + const original = await importOriginal<typeof import("@/pds/resolver.js")>(); 14 + return { 15 + ...original, 16 + fetchRecord: vi.fn(), 17 + }; 18 + }); 19 + 20 + import { executePatchRecord } from "./patch-record.js"; 21 + import { patchArbitraryRecord } from "../automations/pds.js"; 22 + import { fetchRecord } from "../pds/resolver.js"; 23 + import { db } from "../db/index.js"; 24 + import { automations, deliveryLogs } from "../db/schema.js"; 25 + import { makeMatch, makeAutomation, makePatchRecordAction } from "../test/fixtures.js"; 26 + 27 + const mockPatchRecord = vi.mocked(patchArbitraryRecord); 28 + const mockFetchRecord = vi.mocked(fetchRecord); 29 + 30 + describe("executePatchRecord — collection mismatch guard", () => { 31 + beforeEach(async () => { 32 + vi.useFakeTimers(); 33 + vi.setSystemTime(new Date("2024-06-15T12:00:00.000Z")); 34 + mockPatchRecord.mockReset(); 35 + mockFetchRecord.mockReset(); 36 + 37 + await db.delete(deliveryLogs); 38 + await db.delete(automations); 39 + await db.insert(automations).values(makeAutomation()); 40 + }); 41 + 42 + afterEach(() => { 43 + vi.useRealTimers(); 44 + }); 45 + 46 + it("rejects with 400 when the resolved URI's collection differs from targetCollection", async () => { 47 + // The action targets `run.airglow.automation` (its default in the fixture 48 + // is `site.standard.document`, but we override here). The resolved URI 49 + // points at `app.bsky.feed.post`, simulating an attacker-injected item 50 + // whose uri redirects the patch to a different collection. 51 + const action = makePatchRecordAction({ 52 + targetCollection: "run.airglow.automation", 53 + baseRecordUri: "at://{{event.did}}/app.bsky.feed.post/abc123", 54 + }); 55 + const match = makeMatch({ automation: { actions: [action] } }); 56 + const result = await executePatchRecord(match, 0); 57 + 58 + expect(result.statusCode).toBe(400); 59 + expect(result.error).toMatch(/does not match targetCollection/); 60 + // PDS write was never attempted — the attack is fully blocked. 61 + expect(mockPatchRecord).not.toHaveBeenCalled(); 62 + }); 63 + 64 + it("proceeds normally when the URI's collection matches", async () => { 65 + mockFetchRecord.mockResolvedValueOnce({ 66 + found: true, 67 + uri: "at://did:plc:testuser123/site.standard.document/abc", 68 + cid: "cid-base", 69 + did: "did:plc:testuser123", 70 + collection: "site.standard.document", 71 + rkey: "abc", 72 + record: { title: "old", $type: "site.standard.document" }, 73 + }); 74 + mockPatchRecord.mockResolvedValueOnce({ 75 + uri: "at://did:plc:testuser123/site.standard.document/abc", 76 + cid: "cid-new", 77 + }); 78 + 79 + // Default fixture: targetCollection is "site.standard.document" and 80 + // baseRecordUri uses `{{event.commit.collection}}`. We construct a match 81 + // whose event.commit.collection is also "site.standard.document" so the 82 + // resolved URI's collection matches. 83 + const action = makePatchRecordAction({ 84 + recordTemplate: '{"title":"new"}', 85 + }); 86 + const match = makeMatch({ 87 + automation: { actions: [action] }, 88 + event: { 89 + did: "did:plc:testuser123", 90 + time_us: 1700000000000000, 91 + kind: "commit", 92 + commit: { 93 + rev: "r", 94 + operation: "create", 95 + collection: "site.standard.document", 96 + rkey: "abc", 97 + record: { text: "renamed" }, 98 + }, 99 + }, 100 + }); 101 + 102 + const result = await executePatchRecord(match, 0); 103 + expect(result.statusCode).toBe(200); 104 + expect(mockPatchRecord).toHaveBeenCalledOnce(); 105 + }); 106 + });
+29 -6
lib/actions/patch-record.ts
··· 10 10 match: MatchedEvent, 11 11 action: PatchRecordAction, 12 12 fetchContext?: FetchContext, 13 + item?: unknown, 13 14 ): Promise<ActionResult> { 14 15 const { automation, event } = match; 15 16 16 - const resolvedUri = resolveUriTemplate(action.baseRecordUri, event, fetchContext, automation.did); 17 + const resolvedUri = resolveUriTemplate( 18 + action.baseRecordUri, 19 + event, 20 + fetchContext, 21 + automation.did, 22 + item, 23 + ); 17 24 let parsed: { did: string; collection: string; rkey: string }; 18 25 try { 19 26 parsed = parseAtUri(resolvedUri); ··· 21 28 return { statusCode: 0, error: `Invalid base record URI: ${resolvedUri}` }; 22 29 } 23 30 31 + // The action's PDS write always targets `targetCollection` on the owner's 32 + // repo, but the rkey comes from the resolved URI. If the URI's collection 33 + // doesn't match `targetCollection`, the user has wired the action against 34 + // data they didn't intend to patch — and an attacker who can inject AT URIs 35 + // into the iteration source (e.g. crafted facets in a watched post) would 36 + // otherwise be able to choose which of the owner's records get clobbered. 37 + // Reject as a deterministic 400 so retries don't repeat the same mistake. 38 + if (parsed.collection !== action.targetCollection) { 39 + return { 40 + statusCode: 400, 41 + error: `Resolved baseRecordUri collection "${parsed.collection}" does not match targetCollection "${action.targetCollection}"`, 42 + }; 43 + } 44 + 24 45 // Fetch base record and render patch template in parallel (they're independent) 25 46 const [fetchResult, templateResult] = await Promise.allSettled([ 26 47 fetchRecord(resolvedUri), 27 - renderTemplate(action.recordTemplate, event, fetchContext, automation), 48 + renderTemplate(action.recordTemplate, event, fetchContext, automation, item), 28 49 ]); 29 50 30 51 if (fetchResult.status === "rejected") { ··· 70 91 actionIndex: number, 71 92 retryIndex: number, 72 93 fetchContext?: FetchContext, 94 + item?: unknown, 73 95 ) { 74 96 if (retryIndex >= RETRY_DELAYS.length) return; 75 97 76 98 setTimeout(async () => { 77 99 try { 78 100 const action = match.automation.actions[actionIndex] as PatchRecordAction; 79 - const result = await execute(match, action, fetchContext); 101 + const result = await execute(match, action, fetchContext, item); 80 102 const body = JSON.stringify({ 81 103 targetCollection: action.targetCollection, 82 104 baseRecordUri: action.baseRecordUri, ··· 94 116 ); 95 117 96 118 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 97 - scheduleRetry(match, actionIndex, retryIndex + 1, fetchContext); 119 + scheduleRetry(match, actionIndex, retryIndex + 1, fetchContext, item); 98 120 } 99 121 } catch (err) { 100 122 console.error("Patch-record retry error:", err); ··· 107 129 match: MatchedEvent, 108 130 actionIndex: number, 109 131 fetchContext?: FetchContext, 132 + item?: unknown, 110 133 ): Promise<ActionResult> { 111 134 const action = match.automation.actions[actionIndex] as PatchRecordAction; 112 - const result = await execute(match, action, fetchContext); 135 + const result = await execute(match, action, fetchContext, item); 113 136 114 137 const body = JSON.stringify({ 115 138 targetCollection: action.targetCollection, ··· 128 151 ); 129 152 130 153 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 131 - scheduleRetry(match, actionIndex, 0, fetchContext); 154 + scheduleRetry(match, actionIndex, 0, fetchContext, item); 132 155 } 133 156 134 157 return result;
+105
lib/actions/template.test.ts
··· 6 6 validateFetchUri, 7 7 renderTemplate, 8 8 renderTextTemplate, 9 + resolvePlaceholder, 9 10 resolveEventPlaceholder, 10 11 } from "./template.js"; 11 12 import { makeEvent } from "../test/fixtures.js"; ··· 504 505 expect(mockResolve).toHaveBeenCalledTimes(1); 505 506 }); 506 507 }); 508 + 509 + describe("item.* placeholder", () => { 510 + const event = makeEvent(); 511 + 512 + describe("resolvePlaceholder", () => { 513 + it("returns the whole item for `item`", () => { 514 + const item = { uri: "https://x", did: "did:plc:bob" }; 515 + expect(resolvePlaceholder("item", event, undefined, undefined, item)).toBe(item); 516 + }); 517 + 518 + it("walks dotted paths into the item", () => { 519 + const item = { index: { byteStart: 14 }, uri: "https://example.com" }; 520 + expect(resolvePlaceholder("item.uri", event, undefined, undefined, item)).toBe( 521 + "https://example.com", 522 + ); 523 + expect(resolvePlaceholder("item.index.byteStart", event, undefined, undefined, item)).toBe( 524 + 14, 525 + ); 526 + }); 527 + 528 + it("returns undefined when the item is missing", () => { 529 + expect( 530 + resolvePlaceholder("item.uri", event, undefined, undefined, undefined), 531 + ).toBeUndefined(); 532 + }); 533 + 534 + it("returns undefined when the field doesn't exist on the item", () => { 535 + expect( 536 + resolvePlaceholder("item.missing", event, undefined, undefined, { uri: "x" }), 537 + ).toBeUndefined(); 538 + }); 539 + 540 + it("refuses to walk into __proto__ / constructor / prototype on the item", () => { 541 + const item = { uri: "https://x" }; 542 + expect( 543 + resolvePlaceholder("item.__proto__", event, undefined, undefined, item), 544 + ).toBeUndefined(); 545 + expect( 546 + resolvePlaceholder("item.constructor.name", event, undefined, undefined, item), 547 + ).toBeUndefined(); 548 + expect( 549 + resolvePlaceholder("item.prototype", event, undefined, undefined, item), 550 + ).toBeUndefined(); 551 + }); 552 + }); 553 + 554 + describe("validateTemplate", () => { 555 + it("rejects {{item.*}} when hasItem is false", () => { 556 + const r = validateTemplate('{"x":"{{item.uri}}"}', [], [], false); 557 + expect(r.valid).toBe(false); 558 + if (!r.valid) expect(r.error).toMatch(/item\.\*/); 559 + }); 560 + 561 + it("accepts {{item.*}} when hasItem is true", () => { 562 + const r = validateTemplate('{"x":"{{item.uri}}"}', [], [], true); 563 + expect(r.valid).toBe(true); 564 + }); 565 + 566 + it("accepts the bare {{item}} placeholder when hasItem is true", () => { 567 + const r = validateTemplate('{"x":"{{item}}"}', [], [], true); 568 + expect(r.valid).toBe(true); 569 + }); 570 + }); 571 + 572 + describe("validateTextTemplate", () => { 573 + it("rejects {{item.*}} without forEach", () => { 574 + const r = validateTextTemplate("hello {{item.uri}}", [], [], false); 575 + expect(r.valid).toBe(false); 576 + }); 577 + 578 + it("accepts {{item.*}} with forEach", () => { 579 + const r = validateTextTemplate("hello {{item.uri}}", [], [], true); 580 + expect(r.valid).toBe(true); 581 + }); 582 + }); 583 + 584 + describe("renderTextTemplate", () => { 585 + it("interpolates item fields", async () => { 586 + const text = await renderTextTemplate("link={{item.uri}}", event, undefined, undefined, { 587 + uri: "https://x", 588 + }); 589 + expect(text).toBe("link=https://x"); 590 + }); 591 + 592 + it("returns empty string when an item field is missing", async () => { 593 + const text = await renderTextTemplate("u={{item.uri}}", event, undefined, undefined, {}); 594 + expect(text).toBe("u="); 595 + }); 596 + }); 597 + 598 + describe("renderTemplate", () => { 599 + it("interpolates item fields into a JSON template", async () => { 600 + const json = await renderTemplate( 601 + '{"link":"{{item.uri}}","kind":"{{item.$type}}"}', 602 + event, 603 + undefined, 604 + undefined, 605 + { uri: "https://x", $type: "app.bsky.richtext.facet#link" }, 606 + ); 607 + expect(json.link).toBe("https://x"); 608 + expect(json.kind).toBe("app.bsky.richtext.facet#link"); 609 + }); 610 + }); 611 + });
+52 -8
lib/actions/template.ts
··· 1 1 import type { JetstreamEvent } from "../jetstream/matcher.js"; 2 + import { isUnsafeKey } from "../jetstream/matcher.js"; 2 3 import { resolveDidToHandle } from "../auth/client.js"; 3 4 import { config } from "../config.js"; 4 5 ··· 47 48 * - "event.commit.operation", "event.commit.collection", "event.commit.rkey", "event.commit.cid" 48 49 * - "event.commit.record.<dotted.path>" → nested record field 49 50 * - "<fetchName>.<dotted.path>" → fetched record data 51 + * - "item" → the current forEach item (whole object) 52 + * - "item.<dotted.path>" → field on the current forEach item 50 53 */ 51 54 export function resolvePlaceholder( 52 55 path: string, 53 56 event: JetstreamEvent, 54 57 fetchContext?: FetchContext, 55 58 automation?: AutomationContext, 59 + item?: unknown, 56 60 ): unknown { 57 61 if (path === "now") return new Date().toISOString(); 58 62 if (path === "self") return automation?.did; 59 63 64 + if (path === "item") return item; 65 + if (path.startsWith("item.")) { 66 + if (item == null || typeof item !== "object") return undefined; 67 + let value: unknown = item; 68 + for (const key of path.slice("item.".length).split(".")) { 69 + if (value == null || typeof value !== "object") return undefined; 70 + if (isUnsafeKey(key)) return undefined; 71 + value = (value as Record<string, unknown>)[key]; 72 + } 73 + return value; 74 + } 75 + 60 76 if (path.startsWith("automation.")) { 61 77 if (!automation) return undefined; 62 78 const field = path.slice("automation.".length); ··· 110 126 event: JetstreamEvent, 111 127 fetchContext?: FetchContext, 112 128 automation?: AutomationContext, 129 + item?: unknown, 113 130 ): Promise<Map<string, string>> { 114 131 const dids = new Set<string>(); 115 132 116 133 for (const [, raw] of template.matchAll(PLACEHOLDER_RE)) { 117 134 const call = parseFunctionCall(raw!.trim()); 118 135 if (call?.fn === "didToHandle") { 119 - const resolved = resolvePlaceholder(call.arg, event, fetchContext, automation); 136 + const resolved = resolvePlaceholder(call.arg, event, fetchContext, automation, item); 120 137 if (typeof resolved === "string" && resolved) { 121 138 dids.add(resolved); 122 139 } ··· 167 184 event: JetstreamEvent, 168 185 fetchContext?: FetchContext, 169 186 ownerDid?: string, 187 + item?: unknown, 170 188 ): string { 171 189 const automation = ownerDid ? { did: ownerDid, rkey: "", name: "" } : undefined; 172 190 return uriTemplate.replace(PLACEHOLDER_RE, (_, path: string) => { 173 - const value = resolvePlaceholder(path.trim(), event, fetchContext, automation); 191 + const value = resolvePlaceholder(path.trim(), event, fetchContext, automation, item); 174 192 if (typeof value === "string") return value; 175 193 return ""; 176 194 }); ··· 181 199 template: string, 182 200 fetchNames?: string[], 183 201 actionNames?: string[], 202 + hasItem?: boolean, 184 203 ): { valid: true; placeholders: string[] } | { valid: false; error: string } { 185 204 // Check that the template is valid JSON (ignoring placeholders). 186 205 // Collect placeholder paths, then replace all {{...}} with a plain string ··· 215 234 const call = parseFunctionCall(p); 216 235 const toValidate = call ? call.arg : p; 217 236 if (toValidate === "now" || toValidate === "self" || toValidate.startsWith("event.")) continue; 237 + if (toValidate === "item" || toValidate.startsWith("item.")) { 238 + if (hasItem) continue; 239 + return { 240 + valid: false, 241 + error: `Invalid placeholder: {{${p}}}. The "item.*" placeholder is only available on actions with a forEach config.`, 242 + }; 243 + } 218 244 if (toValidate.startsWith("automation.")) { 219 245 const field = toValidate.slice("automation.".length); 220 246 if (AUTOMATION_FIELDS.has(field)) continue; ··· 293 319 uri: string, 294 320 fetchNames?: string[], 295 321 actionNames?: string[], 322 + hasItem?: boolean, 296 323 ): { valid: true; placeholders: string[] } | { valid: false; error: string } { 297 324 if (!uri.trim()) { 298 325 return { valid: false, error: "Base record URI is required" }; ··· 318 345 return { valid: false, error: `Function calls not allowed in base record URI: {{${p}}}` }; 319 346 } 320 347 if (p === "now" || p === "self" || p.startsWith("event.")) continue; 348 + if (p === "item" || p.startsWith("item.")) { 349 + if (hasItem) continue; 350 + return { 351 + valid: false, 352 + error: `Invalid placeholder in base record URI: {{${p}}}. The "item.*" placeholder is only available on actions with a forEach config.`, 353 + }; 354 + } 321 355 const root = p.split(".")[0]!; 322 356 if (fetchSet.has(root)) continue; 323 357 if (actionSet.has(root)) continue; ··· 332 366 template: string, 333 367 fetchNames?: string[], 334 368 actionNames?: string[], 369 + hasItem?: boolean, 335 370 ): { valid: true; placeholders: string[] } | { valid: false; error: string } { 336 371 if (!template.trim()) { 337 372 return { valid: false, error: "Text template is required" }; ··· 349 384 const call = parseFunctionCall(p); 350 385 const toValidate = call ? call.arg : p; 351 386 if (toValidate === "now" || toValidate === "self" || toValidate.startsWith("event.")) continue; 387 + if (toValidate === "item" || toValidate.startsWith("item.")) { 388 + if (hasItem) continue; 389 + return { 390 + valid: false, 391 + error: `Invalid placeholder: {{${p}}}. The "item.*" placeholder is only available on actions with a forEach config.`, 392 + }; 393 + } 352 394 if (toValidate.startsWith("automation.")) { 353 395 const field = toValidate.slice("automation.".length); 354 396 if (AUTOMATION_FIELDS.has(field)) continue; ··· 369 411 event: JetstreamEvent, 370 412 fetchContext?: FetchContext, 371 413 automation?: AutomationContext, 414 + item?: unknown, 372 415 ): Promise<string> { 373 416 const [handleMap, resolvedAutomation] = await Promise.all([ 374 - resolveHandlePlaceholders(template, event, fetchContext, automation), 417 + resolveHandlePlaceholders(template, event, fetchContext, automation, item), 375 418 resolveAutomationUrl(template, automation), 376 419 ]); 377 420 ··· 379 422 const path = raw.trim(); 380 423 const call = parseFunctionCall(path); 381 424 if (call?.fn === "didToHandle") { 382 - const resolved = resolvePlaceholder(call.arg, event, fetchContext, resolvedAutomation); 425 + const resolved = resolvePlaceholder(call.arg, event, fetchContext, resolvedAutomation, item); 383 426 if (typeof resolved === "string" && resolved) { 384 427 return `@${handleMap.get(resolved) ?? resolved}`; 385 428 } 386 429 return ""; 387 430 } 388 - const value = resolvePlaceholder(path, event, fetchContext, resolvedAutomation); 431 + const value = resolvePlaceholder(path, event, fetchContext, resolvedAutomation, item); 389 432 if (value === undefined) return ""; 390 433 if (typeof value === "string") return value; 391 434 if (typeof value === "number" || typeof value === "boolean") return String(value); ··· 399 442 event: JetstreamEvent, 400 443 fetchContext?: FetchContext, 401 444 automation?: AutomationContext, 445 + item?: unknown, 402 446 ): Promise<Record<string, unknown>> { 403 447 const [handleMap, resolvedAutomation] = await Promise.all([ 404 - resolveHandlePlaceholders(template, event, fetchContext, automation), 448 + resolveHandlePlaceholders(template, event, fetchContext, automation, item), 405 449 resolveAutomationUrl(template, automation), 406 450 ]); 407 451 ··· 409 453 const path = raw.trim(); 410 454 const call = parseFunctionCall(path); 411 455 if (call?.fn === "didToHandle") { 412 - const resolved = resolvePlaceholder(call.arg, event, fetchContext, resolvedAutomation); 456 + const resolved = resolvePlaceholder(call.arg, event, fetchContext, resolvedAutomation, item); 413 457 if (typeof resolved === "string" && resolved) { 414 458 const handle = handleMap.get(resolved) ?? resolved; 415 459 return JSON.stringify(`@${handle}`).slice(1, -1); ··· 417 461 return ""; 418 462 } 419 463 420 - const value = resolvePlaceholder(path, event, fetchContext, resolvedAutomation); 464 + const value = resolvePlaceholder(path, event, fetchContext, resolvedAutomation, item); 421 465 if (value === undefined) return ""; 422 466 423 467 // If the placeholder is the entire JSON value (between quotes), return raw
+173 -17
lib/actions/validation.ts
··· 3 3 import { nsidRequiresWantedDids } from "../lexicons/match.js"; 4 4 import { isValidNsid } from "../lexicons/resolver.js"; 5 5 import { PLACEHOLDER_RE, validateTextTemplate } from "./template.js"; 6 - import type { Condition, FetchStepSearch } from "../db/schema.js"; 6 + import type { Condition, FetchStepSearch, ForEachConfig } from "../db/schema.js"; 7 7 import { 8 8 FOLLOW_TARGETS, 9 9 VALID_FOLLOW_TARGETS, 10 10 type FollowTarget, 11 11 } from "../automations/follow-targets.js"; 12 12 13 + export type ForEachInput = { 14 + path: string; 15 + conditions?: FetchConditionInput[]; 16 + }; 17 + 18 + type ActionBase = { 19 + forEach?: ForEachInput; 20 + }; 21 + 13 22 export type ActionInput = 14 - | { 23 + | (ActionBase & { 15 24 type: "webhook"; 16 25 callbackUrl: string; 17 26 headers?: Record<string, string>; 18 27 comment?: string; 19 - } 20 - | { type: "record"; targetCollection: string; recordTemplate: string; comment?: string } 21 - | { 28 + }) 29 + | (ActionBase & { 30 + type: "record"; 31 + targetCollection: string; 32 + recordTemplate: string; 33 + comment?: string; 34 + }) 35 + | (ActionBase & { 22 36 type: "bsky-post"; 23 37 textTemplate: string; 24 38 langs?: string[]; 25 39 labels?: string[]; 26 40 comment?: string; 27 - } 28 - | { 41 + }) 42 + | (ActionBase & { 29 43 type: "patch-record"; 30 44 targetCollection: string; 31 45 baseRecordUri: string; 32 46 recordTemplate: string; 33 47 comment?: string; 34 - } 35 - | { 48 + }) 49 + | (ActionBase & { 36 50 type: "bookmark"; 37 51 targetSource: string; 38 52 targetTitle?: string; 39 53 bodyValue?: string; 40 54 tags?: string[]; 41 55 comment?: string; 42 - } 43 - | { 56 + }) 57 + | (ActionBase & { 44 58 type: "follow"; 45 59 target: FollowTarget; 46 60 subject: string; 47 61 comment?: string; 48 - }; 62 + }); 49 63 50 64 export const VALID_OPERATORS = new Set([ 51 65 "eq", ··· 416 430 input: FollowInput, 417 431 fetchNames: string[], 418 432 actionNames: string[], 433 + hasItem?: boolean, 419 434 ): { valid: true } | { valid: false; error: string } { 420 435 if (!input.target || typeof input.target !== "string") { 421 436 return { valid: false, error: "target is required for follow actions" }; ··· 435 450 error: `subject must be ${FOLLOW_SUBJECT_MAX} characters or less`, 436 451 }; 437 452 } 438 - const templateCheck = validateTextTemplate(input.subject, fetchNames, actionNames); 453 + const templateCheck = validateTextTemplate(input.subject, fetchNames, actionNames, hasItem); 439 454 if (!templateCheck.valid) { 440 455 return { valid: false, error: `subject: ${templateCheck.error}` }; 441 456 } ··· 454 469 input: BookmarkInput, 455 470 fetchNames: string[], 456 471 actionNames: string[], 472 + hasItem?: boolean, 457 473 ): { valid: true; tags: string[] } | { valid: false; error: string } { 458 474 if (!input.targetSource || typeof input.targetSource !== "string" || !input.targetSource.trim()) { 459 475 return { valid: false, error: "targetSource is required for bookmark actions" }; ··· 464 480 error: `targetSource must be ${BOOKMARK_LIMITS.targetSource} characters or less`, 465 481 }; 466 482 } 467 - const sourceValidation = validateTextTemplate(input.targetSource, fetchNames, actionNames); 483 + const sourceValidation = validateTextTemplate( 484 + input.targetSource, 485 + fetchNames, 486 + actionNames, 487 + hasItem, 488 + ); 468 489 if (!sourceValidation.valid) { 469 490 return { valid: false, error: `targetSource: ${sourceValidation.error}` }; 470 491 } ··· 480 501 }; 481 502 } 482 503 if (input.targetTitle.trim()) { 483 - const titleValidation = validateTextTemplate(input.targetTitle, fetchNames, actionNames); 504 + const titleValidation = validateTextTemplate( 505 + input.targetTitle, 506 + fetchNames, 507 + actionNames, 508 + hasItem, 509 + ); 484 510 if (!titleValidation.valid) { 485 511 return { valid: false, error: `targetTitle: ${titleValidation.error}` }; 486 512 } ··· 498 524 }; 499 525 } 500 526 if (input.bodyValue.trim()) { 501 - const bodyValidation = validateTextTemplate(input.bodyValue, fetchNames, actionNames); 527 + const bodyValidation = validateTextTemplate( 528 + input.bodyValue, 529 + fetchNames, 530 + actionNames, 531 + hasItem, 532 + ); 502 533 if (!bodyValidation.valid) { 503 534 return { valid: false, error: `bodyValue: ${bodyValidation.error}` }; 504 535 } ··· 525 556 error: `Tag "${trimmed.slice(0, 32)}..." exceeds ${BOOKMARK_LIMITS.tag} characters`, 526 557 }; 527 558 } 528 - const tagValidation = validateTextTemplate(trimmed, fetchNames, actionNames); 559 + const tagValidation = validateTextTemplate(trimmed, fetchNames, actionNames, hasItem); 529 560 if (!tagValidation.valid) { 530 561 return { valid: false, error: `tag: ${tagValidation.error}` }; 531 562 } ··· 535 566 536 567 return { valid: true, tags }; 537 568 } 569 + 570 + const FOR_EACH_PATH_RE = /^[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_-]*|\[\])+$/; 571 + const MAX_FOR_EACH_PATH = 512; 572 + const MAX_FOR_EACH_CONDITIONS = 10; 573 + const FOR_EACH_FIELD_RE = /^(item\.)?[a-zA-Z_$][a-zA-Z0-9_$-]*(\.[a-zA-Z0-9_$][a-zA-Z0-9_$-]*)*$/; 574 + /** Path segments that walk into JS engine internals and must never appear in 575 + * user-supplied paths. Mirrors the runtime guard in matcher.ts. */ 576 + const FOR_EACH_UNSAFE_SEGMENTS = new Set(["__proto__", "constructor", "prototype"]); 577 + 578 + /** Validate a `forEach` input from a single action. The path must be rooted at 579 + * `event.*`, a known fetch name, or an upstream action result, must use `[]` 580 + * segments to mark array levels, and must end in `[]`. Item conditions are 581 + * validated as field paths rooted at the item (with optional `item.` prefix). */ 582 + export function validateForEachInput( 583 + input: ForEachInput | undefined, 584 + fetchNames: string[], 585 + actionResultNames: string[], 586 + label: string, 587 + ): { valid: true; value: ForEachConfig | undefined } | { valid: false; error: string } { 588 + if (input === undefined || input === null) return { valid: true, value: undefined }; 589 + 590 + if (typeof input !== "object" || Array.isArray(input)) { 591 + return { valid: false, error: `${label}: forEach must be an object` }; 592 + } 593 + const path = typeof input.path === "string" ? input.path.trim() : ""; 594 + if (!path) return { valid: false, error: `${label}: forEach.path is required` }; 595 + if (path.length > MAX_FOR_EACH_PATH) { 596 + return { 597 + valid: false, 598 + error: `${label}: forEach.path must be ${MAX_FOR_EACH_PATH} characters or less`, 599 + }; 600 + } 601 + if (!FOR_EACH_PATH_RE.test(path)) { 602 + return { 603 + valid: false, 604 + error: `${label}: forEach.path syntax is invalid. Use dotted segments and "[]" to mark array levels.`, 605 + }; 606 + } 607 + if (!path.endsWith("[]")) { 608 + return { 609 + valid: false, 610 + error: `${label}: forEach.path must end with "[]" (the array level being iterated)`, 611 + }; 612 + } 613 + 614 + const segments = path.split(".").map((s) => s.replace(/\[\]$/, "")); 615 + for (const seg of segments) { 616 + if (FOR_EACH_UNSAFE_SEGMENTS.has(seg)) { 617 + return { 618 + valid: false, 619 + error: `${label}: forEach.path may not include the reserved segment "${seg}"`, 620 + }; 621 + } 622 + } 623 + 624 + const root = segments[0]!; 625 + const knownRoots = new Set<string>(["event", ...fetchNames, ...actionResultNames]); 626 + if (!knownRoots.has(root)) { 627 + return { 628 + valid: false, 629 + error: `${label}: forEach.path root "${root}" must be "event", a declared fetch name, or an upstream action (action1, action2, …).`, 630 + }; 631 + } 632 + 633 + const conditions: Condition[] = []; 634 + if (input.conditions !== undefined) { 635 + if (!Array.isArray(input.conditions)) { 636 + return { valid: false, error: `${label}: forEach.conditions must be an array` }; 637 + } 638 + if (input.conditions.length > MAX_FOR_EACH_CONDITIONS) { 639 + return { 640 + valid: false, 641 + error: `${label}: maximum ${MAX_FOR_EACH_CONDITIONS} forEach conditions allowed`, 642 + }; 643 + } 644 + for (const cond of input.conditions) { 645 + if (!cond.field || typeof cond.field !== "string") { 646 + return { valid: false, error: `${label}: forEach condition field is required` }; 647 + } 648 + if (!FOR_EACH_FIELD_RE.test(cond.field)) { 649 + return { 650 + valid: false, 651 + error: `${label}: forEach condition field "${cond.field}" is invalid`, 652 + }; 653 + } 654 + const fieldBody = cond.field.startsWith("item.") ? cond.field.slice(5) : cond.field; 655 + for (const seg of fieldBody.split(".")) { 656 + if (FOR_EACH_UNSAFE_SEGMENTS.has(seg)) { 657 + return { 658 + valid: false, 659 + error: `${label}: forEach condition field may not include the reserved segment "${seg}"`, 660 + }; 661 + } 662 + } 663 + const operator = cond.operator ?? "eq"; 664 + if (!VALID_OPERATORS.has(operator)) { 665 + return { 666 + valid: false, 667 + error: `${label}: invalid forEach condition operator "${operator}"`, 668 + }; 669 + } 670 + const valueLess = VALUE_LESS_OPERATORS.has(operator); 671 + if (!valueLess && (cond.value === undefined || cond.value === null)) { 672 + return { 673 + valid: false, 674 + error: `${label}: forEach condition value is required for operator "${operator}"`, 675 + }; 676 + } 677 + conditions.push({ 678 + field: cond.field, 679 + operator, 680 + value: valueLess ? "" : String(cond.value), 681 + ...(cond.comment ? { comment: cond.comment } : {}), 682 + }); 683 + } 684 + } 685 + 686 + return { 687 + valid: true, 688 + value: { 689 + path, 690 + ...(conditions.length > 0 ? { conditions } : {}), 691 + }, 692 + }; 693 + }
+19 -2
lib/automations/pds-serialize.ts
··· 1 - import type { Action } from "../db/schema.js"; 2 - import type { PdsAction } from "./pds.js"; 1 + import type { Action, ForEachConfig } from "../db/schema.js"; 2 + import type { PdsAction, PdsForEachConfig } from "./pds.js"; 3 + 4 + function toPdsForEach(fe: ForEachConfig | undefined): PdsForEachConfig | undefined { 5 + if (!fe) return undefined; 6 + return { 7 + path: fe.path, 8 + ...(fe.conditions && fe.conditions.length > 0 ? { conditions: fe.conditions } : {}), 9 + }; 10 + } 3 11 4 12 /** Serialize a stored Action into its PDS-record shape. Split from pds.ts so 5 13 * tests that mock the OAuth-backed PDS client don't need to re-stub this. */ 6 14 export function toPdsAction(a: Action): PdsAction { 15 + const forEach = toPdsForEach(a.forEach); 16 + const forEachField = forEach ? { forEach } : {}; 17 + 7 18 if (a.$type === "webhook") { 8 19 return { 9 20 $type: "run.airglow.automation#webhookAction", 10 21 callbackUrl: a.callbackUrl, 22 + ...forEachField, 11 23 ...(a.comment ? { comment: a.comment } : {}), 12 24 }; 13 25 } ··· 17 29 textTemplate: a.textTemplate, 18 30 ...(a.langs && a.langs.length > 0 ? { langs: a.langs } : {}), 19 31 ...(a.labels && a.labels.length > 0 ? { labels: a.labels } : {}), 32 + ...forEachField, 20 33 ...(a.comment ? { comment: a.comment } : {}), 21 34 }; 22 35 } ··· 26 39 targetCollection: a.targetCollection, 27 40 baseRecordUri: a.baseRecordUri, 28 41 recordTemplate: a.recordTemplate, 42 + ...forEachField, 29 43 ...(a.comment ? { comment: a.comment } : {}), 30 44 }; 31 45 } ··· 36 50 ...(a.targetTitle ? { targetTitle: a.targetTitle } : {}), 37 51 ...(a.bodyValue ? { bodyValue: a.bodyValue } : {}), 38 52 ...(a.tags && a.tags.length > 0 ? { tags: a.tags } : {}), 53 + ...forEachField, 39 54 ...(a.comment ? { comment: a.comment } : {}), 40 55 }; 41 56 } ··· 44 59 $type: "run.airglow.automation#followAction", 45 60 target: a.target, 46 61 subject: a.subject, 62 + ...forEachField, 47 63 ...(a.comment ? { comment: a.comment } : {}), 48 64 }; 49 65 } ··· 51 67 $type: "run.airglow.automation#recordAction", 52 68 targetCollection: a.targetCollection, 53 69 recordTemplate: a.recordTemplate, 70 + ...forEachField, 54 71 ...(a.comment ? { comment: a.comment } : {}), 55 72 }; 56 73 }
+18 -7
lib/automations/pds.ts
··· 32 32 return s.padStart(13, "2"); 33 33 } 34 34 35 + type PdsCondition = { 36 + field: string; 37 + operator: string; 38 + value: string; 39 + comment?: string; 40 + }; 41 + 42 + export type PdsForEachConfig = { 43 + path: string; 44 + conditions?: PdsCondition[]; 45 + }; 46 + 35 47 type PdsWebhookAction = { 36 48 $type: "run.airglow.automation#webhookAction"; 37 49 callbackUrl: string; 50 + forEach?: PdsForEachConfig; 38 51 comment?: string; 39 52 }; 40 53 ··· 42 55 $type: "run.airglow.automation#recordAction"; 43 56 targetCollection: string; 44 57 recordTemplate: string; 58 + forEach?: PdsForEachConfig; 45 59 comment?: string; 46 60 }; 47 61 ··· 50 64 textTemplate: string; 51 65 langs?: string[]; 52 66 labels?: string[]; 67 + forEach?: PdsForEachConfig; 53 68 comment?: string; 54 69 }; 55 70 ··· 58 73 targetCollection: string; 59 74 baseRecordUri: string; 60 75 recordTemplate: string; 76 + forEach?: PdsForEachConfig; 61 77 comment?: string; 62 78 }; 63 79 ··· 67 83 targetTitle?: string; 68 84 bodyValue?: string; 69 85 tags?: string[]; 86 + forEach?: PdsForEachConfig; 70 87 comment?: string; 71 88 }; 72 89 ··· 74 91 $type: "run.airglow.automation#followAction"; 75 92 target: FollowTarget; 76 93 subject: string; 94 + forEach?: PdsForEachConfig; 77 95 comment?: string; 78 96 }; 79 97 ··· 84 102 | PdsPatchRecordAction 85 103 | PdsBookmarkAction 86 104 | PdsFollowAction; 87 - 88 - type PdsCondition = { 89 - field: string; 90 - operator: string; 91 - value: string; 92 - comment?: string; 93 - }; 94 105 95 106 export type PdsFetchStepRecord = { 96 107 $type: "run.airglow.automation#fetchStep";
+20 -12
lib/automations/sanitize.ts
··· 1 - import type { Action } from "../db/schema.ts"; 1 + import type { Action, ForEachConfig } from "../db/schema.ts"; 2 2 import type { FollowTarget } from "./follow-targets.js"; 3 + 4 + type WithForEach = { forEach?: ForEachConfig }; 3 5 4 6 export type PublicAction = 5 - | { 7 + | (WithForEach & { 6 8 $type: "webhook"; 7 9 callbackDomain: string; 8 10 headerNames?: string[]; 9 11 verified?: boolean; 10 12 comment?: string; 11 - } 12 - | { $type: "record"; targetCollection: string; recordTemplate: string; comment?: string } 13 - | { 13 + }) 14 + | (WithForEach & { 15 + $type: "record"; 16 + targetCollection: string; 17 + recordTemplate: string; 18 + comment?: string; 19 + }) 20 + | (WithForEach & { 14 21 $type: "bsky-post"; 15 22 textTemplate: string; 16 23 langs?: string[]; 17 24 labels?: string[]; 18 25 comment?: string; 19 - } 20 - | { 26 + }) 27 + | (WithForEach & { 21 28 $type: "patch-record"; 22 29 targetCollection: string; 23 30 baseRecordUri: string; 24 31 recordTemplate: string; 25 32 comment?: string; 26 - } 27 - | { 33 + }) 34 + | (WithForEach & { 28 35 $type: "bookmark"; 29 36 targetSource: string; 30 37 targetTitle?: string; 31 38 bodyValue?: string; 32 39 tags?: string[]; 33 40 comment?: string; 34 - } 35 - | { 41 + }) 42 + | (WithForEach & { 36 43 $type: "follow"; 37 44 target: FollowTarget; 38 45 subject: string; 39 46 comment?: string; 40 - }; 47 + }); 41 48 42 49 /** Strip instance-local secrets and truncate webhook URLs to domain-only. */ 43 50 export function sanitizeActions(actions: Action[]): PublicAction[] { ··· 56 63 ...(headerNames && headerNames.length > 0 ? { headerNames } : {}), 57 64 verified: a.verified, 58 65 comment: a.comment, 66 + ...(a.forEach ? { forEach: a.forEach } : {}), 59 67 }; 60 68 } 61 69 return a;
+14
lib/db/schema.ts
··· 12 12 }); 13 13 14 14 // Action types for automation actions (stored as JSON) 15 + /** Per-action fan-out: when set, the action runs once per matching item 16 + * resolved from `path` (a dotted path with `[]` segments meaning "flat-map this 17 + * array level"). Conditions are evaluated against each item, rooted at `item.*`. */ 18 + export type ForEachConfig = { 19 + path: string; 20 + conditions?: Condition[]; 21 + }; 22 + 15 23 export type WebhookAction = { 16 24 $type: "webhook"; 17 25 callbackUrl: string; ··· 19 27 headers?: Record<string, string>; // custom HTTP headers, values may reference {{secret:name}} 20 28 verified?: boolean; // true if /.well-known/airglow manifest matched 21 29 comment?: string; 30 + forEach?: ForEachConfig; 22 31 }; 23 32 24 33 export type RecordAction = { ··· 26 35 targetCollection: string; 27 36 recordTemplate: string; 28 37 comment?: string; 38 + forEach?: ForEachConfig; 29 39 }; 30 40 31 41 export type BskyPostAction = { ··· 34 44 langs?: string[]; 35 45 labels?: string[]; 36 46 comment?: string; 47 + forEach?: ForEachConfig; 37 48 }; 38 49 39 50 export type PatchRecordAction = { ··· 42 53 baseRecordUri: string; // AT URI template for the record to update 43 54 recordTemplate: string; // JSON template — fields merged on top of base 44 55 comment?: string; 56 + forEach?: ForEachConfig; 45 57 }; 46 58 47 59 export type BookmarkAction = { ··· 51 63 bodyValue?: string; 52 64 tags?: string[]; 53 65 comment?: string; 66 + forEach?: ForEachConfig; 54 67 }; 55 68 56 69 export type FollowAction = { ··· 58 71 target: FollowTarget; 59 72 subject: string; 60 73 comment?: string; 74 + forEach?: ForEachConfig; 61 75 }; 62 76 63 77 export type Action =
+260
lib/jetstream/handler.test.ts
··· 483 483 expect(mockDispatch).toHaveBeenCalledOnce(); 484 484 }); 485 485 }); 486 + 487 + describe("forEach", () => { 488 + // A bsky-post event with two link facets and one mention facet — the 489 + // canonical "iterate over facets" use case that drove this feature. 490 + function postWithFacets() { 491 + return makeMatch({ 492 + event: { 493 + did: "did:plc:author", 494 + time_us: 1700000000000000, 495 + kind: "commit", 496 + commit: { 497 + rev: "r", 498 + operation: "create", 499 + collection: "app.bsky.feed.post", 500 + rkey: "rk1", 501 + record: { 502 + text: "x", 503 + facets: [ 504 + { 505 + index: { byteStart: 0, byteEnd: 5 }, 506 + features: [{ $type: "app.bsky.richtext.facet#link", uri: "https://a.test" }], 507 + }, 508 + { 509 + index: { byteStart: 6, byteEnd: 12 }, 510 + features: [{ $type: "app.bsky.richtext.facet#mention", did: "did:plc:bob" }], 511 + }, 512 + { 513 + index: { byteStart: 14, byteEnd: 20 }, 514 + features: [{ $type: "app.bsky.richtext.facet#link", uri: "https://b.test" }], 515 + }, 516 + ], 517 + }, 518 + }, 519 + }, 520 + }); 521 + } 522 + 523 + it("invokes the action once per matching item", async () => { 524 + const match = postWithFacets(); 525 + match.automation.actions = [ 526 + makeWebhookAction({ 527 + forEach: { 528 + path: "event.commit.record.facets[].features[]", 529 + conditions: [{ field: "$type", operator: "eq", value: "app.bsky.richtext.facet#link" }], 530 + }, 531 + }), 532 + ]; 533 + 534 + await handleMatchedEvent(match); 535 + 536 + // Two link facets → two dispatch calls. Mention facet filtered out. 537 + expect(mockDispatch).toHaveBeenCalledTimes(2); 538 + const itemArgs = mockDispatch.mock.calls.map((c) => c[3]); 539 + expect((itemArgs[0] as { uri: string }).uri).toBe("https://a.test"); 540 + expect((itemArgs[1] as { uri: string }).uri).toBe("https://b.test"); 541 + }); 542 + 543 + it("skips the action entirely when no item matches", async () => { 544 + const match = postWithFacets(); 545 + match.automation.actions = [ 546 + makeWebhookAction({ 547 + forEach: { 548 + path: "event.commit.record.facets[].features[]", 549 + conditions: [{ field: "$type", operator: "eq", value: "nonexistent" }], 550 + }, 551 + }), 552 + ]; 553 + 554 + await handleMatchedEvent(match); 555 + expect(mockDispatch).not.toHaveBeenCalled(); 556 + }); 557 + 558 + it("returns an empty list (no calls) when the path resolves to nothing", async () => { 559 + const match = postWithFacets(); 560 + match.automation.actions = [ 561 + makeWebhookAction({ 562 + forEach: { path: "event.commit.record.tags[]" }, 563 + }), 564 + ]; 565 + 566 + await handleMatchedEvent(match); 567 + expect(mockDispatch).not.toHaveBeenCalled(); 568 + }); 569 + 570 + it("chains the last successful iteration's result into actionN", async () => { 571 + mockExecuteAction 572 + .mockReset() 573 + .mockResolvedValueOnce({ 574 + statusCode: 200, 575 + uri: "at://did:plc:test/col/first", 576 + cid: "cidA", 577 + }) 578 + .mockResolvedValueOnce({ 579 + statusCode: 200, 580 + uri: "at://did:plc:test/col/second", 581 + cid: "cidB", 582 + }) 583 + .mockResolvedValue(okWithUri); 584 + 585 + const match = postWithFacets(); 586 + match.automation.actions = [ 587 + makeRecordAction({ 588 + forEach: { path: "event.commit.record.facets[].features[]" }, 589 + }), 590 + makeRecordAction({ recordTemplate: '{"x":"{{action1.uri}}"}' }), 591 + ]; 592 + 593 + await handleMatchedEvent(match); 594 + 595 + // 3 forEach iterations + 1 trailing action = 4 calls (only 2 mock results 596 + // were queued for the iterations; the rest fall through to the default 597 + // `okWithUri`). The 4th call is the trailing action — its fetchContext 598 + // should expose actionN populated from the *last successful* iteration. 599 + expect(mockExecuteAction).toHaveBeenCalledTimes(4); 600 + const trailingCall = mockExecuteAction.mock.calls[3]!; 601 + expect(trailingCall[1]).toBe(1); // action index 602 + const trailingFetchContext = trailingCall[2] as Record<string, { uri: string }>; 603 + expect(trailingFetchContext.action1!.uri).toBe("at://did:plc:test/app.bsky.feed.post/abc123"); 604 + }); 605 + 606 + it("dry-run logs one row per matching item, plus actionN is synthesized", async () => { 607 + const match = postWithFacets(); 608 + match.automation.dryRun = true; 609 + match.automation.actions = [ 610 + makeWebhookAction({ 611 + forEach: { 612 + path: "event.commit.record.facets[].features[]", 613 + conditions: [{ field: "$type", operator: "eq", value: "app.bsky.richtext.facet#link" }], 614 + }, 615 + }), 616 + ]; 617 + 618 + await handleMatchedEvent(match); 619 + expect(mockDispatch).not.toHaveBeenCalled(); 620 + // 2 dry-run rows (one per matching link) 621 + expect(mockInsertValues).toHaveBeenCalledTimes(2); 622 + }); 623 + 624 + it("dry-run with empty forEach result writes one explanatory row", async () => { 625 + const match = postWithFacets(); 626 + match.automation.dryRun = true; 627 + match.automation.actions = [ 628 + makeWebhookAction({ 629 + forEach: { 630 + path: "event.commit.record.facets[].features[]", 631 + conditions: [{ field: "$type", operator: "eq", value: "nonexistent" }], 632 + }, 633 + }), 634 + ]; 635 + 636 + await handleMatchedEvent(match); 637 + expect(mockInsertValues).toHaveBeenCalledTimes(1); 638 + const row = mockInsertValues.mock.calls[0]![0] as { message: string | null }; 639 + expect(row.message).toMatch(/none matched the per-item conditions/); 640 + }); 641 + 642 + it("caps iterations at 64 per trigger and writes a truncation log row", async () => { 643 + // 80-element array — past the cap of 64. Each item is a link facet that 644 + // matches the condition. 645 + const features = Array.from({ length: 80 }, (_, i) => ({ 646 + $type: "app.bsky.richtext.facet#link", 647 + uri: `https://link${i}.test`, 648 + })); 649 + const match = makeMatch({ 650 + event: { 651 + did: "did:plc:author", 652 + time_us: 1700000000000000, 653 + kind: "commit", 654 + commit: { 655 + rev: "r", 656 + operation: "create", 657 + collection: "app.bsky.feed.post", 658 + rkey: "rk", 659 + record: { 660 + text: "x", 661 + facets: features.map((f) => ({ 662 + index: { byteStart: 0, byteEnd: 1 }, 663 + features: [f], 664 + })), 665 + }, 666 + }, 667 + }, 668 + }); 669 + match.automation.actions = [ 670 + makeWebhookAction({ 671 + forEach: { path: "event.commit.record.facets[].features[]" }, 672 + }), 673 + ]; 674 + 675 + mockInsertValues.mockClear(); 676 + await handleMatchedEvent(match); 677 + // Action ran exactly 64 times, not 80. 678 + expect(mockDispatch).toHaveBeenCalledTimes(64); 679 + // Truncation row was written. The handler doesn't write per-call 680 + // delivery logs (executors do that themselves and they're mocked here), 681 + // so this is the only insert we expect. 682 + expect(mockInsertValues).toHaveBeenCalledTimes(1); 683 + const row = mockInsertValues.mock.calls[0]![0] as { error: string | null }; 684 + expect(row.error).toMatch(/forEach matched 80 items, capped at 64/); 685 + }); 686 + 687 + it("dry-run also caps at 64 and emits one truncation row alongside per-item rows", async () => { 688 + const features = Array.from({ length: 100 }, (_, i) => ({ 689 + $type: "app.bsky.richtext.facet#link", 690 + uri: `https://l${i}.test`, 691 + })); 692 + const match = makeMatch({ 693 + event: { 694 + did: "did:plc:author", 695 + time_us: 1700000000000000, 696 + kind: "commit", 697 + commit: { 698 + rev: "r", 699 + operation: "create", 700 + collection: "app.bsky.feed.post", 701 + rkey: "rk", 702 + record: { 703 + text: "x", 704 + facets: features.map((f) => ({ 705 + index: { byteStart: 0, byteEnd: 1 }, 706 + features: [f], 707 + })), 708 + }, 709 + }, 710 + }, 711 + }); 712 + match.automation.dryRun = true; 713 + match.automation.actions = [ 714 + makeWebhookAction({ 715 + forEach: { path: "event.commit.record.facets[].features[]" }, 716 + }), 717 + ]; 718 + 719 + mockInsertValues.mockClear(); 720 + await handleMatchedEvent(match); 721 + // 64 per-item dry-run rows + 1 truncation row. 722 + expect(mockInsertValues).toHaveBeenCalledTimes(65); 723 + }); 724 + 725 + it("stops the chain (fail-fast) on the first failed iteration", async () => { 726 + mockDispatch 727 + .mockReset() 728 + .mockResolvedValueOnce({ statusCode: 200 }) 729 + .mockResolvedValueOnce({ statusCode: 500, error: "boom" }); 730 + 731 + const match = postWithFacets(); 732 + match.automation.actions = [ 733 + makeWebhookAction({ 734 + forEach: { path: "event.commit.record.facets[].features[]" }, 735 + }), 736 + makeWebhookAction({ callbackUrl: "https://other.test/hook" }), 737 + ]; 738 + 739 + await handleMatchedEvent(match); 740 + 741 + // 2 iterations into action 0 (first ok, second 500), then action 1 must 742 + // not run because the chain broke. 743 + expect(mockDispatch).toHaveBeenCalledTimes(2); 744 + }); 745 + }); 486 746 });
+230 -41
lib/jetstream/handler.ts
··· 10 10 import { resolveFetches } from "../actions/fetcher.js"; 11 11 import { renderTemplate, renderTextTemplate, type FetchContext } from "../actions/template.js"; 12 12 import { parseAtUri } from "../pds/resolver.js"; 13 + import { collectItems, matchItemConditions } from "./matcher.js"; 13 14 import { notifyAutomationChange, type MatchedEvent } from "./consumer.js"; 14 15 import { checkRateLimit, disableForRateLimit, type RateLimitBreach } from "./rate-limit.js"; 15 16 17 + type ActionHandler = ( 18 + match: MatchedEvent, 19 + actionIndex: number, 20 + fetchContext?: FetchContext, 21 + item?: unknown, 22 + ) => Promise<ActionResult>; 23 + 24 + function handlerFor(action: Action): ActionHandler { 25 + switch (action.$type) { 26 + case "bsky-post": 27 + return executeBskyPost; 28 + case "record": 29 + return executeAction; 30 + case "patch-record": 31 + return executePatchRecord; 32 + case "bookmark": 33 + return executeBookmark; 34 + case "follow": 35 + return executeFollow; 36 + default: 37 + return dispatch; 38 + } 39 + } 40 + 41 + /** 42 + * Hard cap on items processed per forEach action. Bounds blast radius so a 43 + * single adversarial event (e.g. a Bluesky post crafted with thousands of 44 + * facets) can't cause a runaway fan-out of webhook deliveries / PDS writes / 45 + * delivery_log rows for one action. Items beyond the cap are silently dropped 46 + * and a single delivery_log entry records the truncation. 47 + * 48 + * Note: this cap is per-action, not per-trigger. With the per-automation 49 + * action limit (`AUTOMATION_LIMITS.actions = 10`), the absolute upper bound 50 + * on deliveries from one trigger is 10 × cap. The work cap inside 51 + * `collectItems` (matcher.ts) bounds the *processing* fan-out independently. 52 + */ 53 + const MAX_FOR_EACH_ITEMS_PER_ACTION = 64; 54 + 55 + async function logForEachTruncation( 56 + automationUri: string, 57 + actionIndex: number, 58 + eventTimeUs: number, 59 + matchedCount: number, 60 + dryRun: boolean, 61 + ) { 62 + await db.insert(deliveryLogs).values({ 63 + automationUri, 64 + actionIndex, 65 + eventTimeUs, 66 + payload: null, 67 + statusCode: null, 68 + message: null, 69 + error: `forEach matched ${matchedCount} items, capped at ${MAX_FOR_EACH_ITEMS_PER_ACTION}; remaining items skipped for this trigger.`, 70 + dryRun, 71 + attempt: 1, 72 + createdAt: new Date(), 73 + }); 74 + } 75 + 16 76 /** Handle a matched Jetstream event: resolve fetches, then dispatch all actions. */ 17 77 export async function handleMatchedEvent(match: MatchedEvent) { 18 78 // Rate-limit gate. Runs before fetches so a breached automation stops ··· 58 118 : []; 59 119 for (let i = 0; i < match.automation.actions.length; i++) { 60 120 const action = match.automation.actions[i]!; 61 - await logDryRun(match, i, action, fetchContext, fetchErrors); 121 + 122 + if (action.forEach) { 123 + const items = collectItems( 124 + action.forEach.path, 125 + match.event, 126 + fetchContext, 127 + match.automation.uri, 128 + ); 129 + const matched = items.filter((item) => 130 + matchItemConditions(item, action.forEach!.conditions, match.automation.did), 131 + ); 132 + if (matched.length === 0) { 133 + await logDryRun(match, i, action, fetchContext, fetchErrors, { 134 + forEachEmpty: true, 135 + totalItems: items.length, 136 + }); 137 + } else { 138 + const truncated = matched.length > MAX_FOR_EACH_ITEMS_PER_ACTION; 139 + const toRun = truncated ? matched.slice(0, MAX_FOR_EACH_ITEMS_PER_ACTION) : matched; 140 + for (const item of toRun) { 141 + await logDryRun(match, i, action, fetchContext, fetchErrors, { item }); 142 + } 143 + if (truncated) { 144 + await logForEachTruncation( 145 + match.automation.uri, 146 + i, 147 + match.event.time_us, 148 + matched.length, 149 + true, 150 + ); 151 + } 152 + } 153 + } else { 154 + await logDryRun(match, i, action, fetchContext, fetchErrors); 155 + } 156 + 62 157 // Inject synthetic action result so subsequent dry-run actions can reference {{actionN.*}} 63 158 if (isRecordProducingAction(action.$type)) { 64 159 fetchContext[`action${i + 1}`] = { ··· 77 172 78 173 for (let i = 0; i < match.automation.actions.length; i++) { 79 174 const action = match.automation.actions[i]!; 80 - const handler = 81 - action.$type === "bsky-post" 82 - ? executeBskyPost 83 - : action.$type === "record" 84 - ? executeAction 85 - : action.$type === "patch-record" 86 - ? executePatchRecord 87 - : action.$type === "bookmark" 88 - ? executeBookmark 89 - : action.$type === "follow" 90 - ? executeFollow 91 - : dispatch; 175 + const handler = handlerFor(action); 92 176 93 177 try { 94 - const result: ActionResult = await handler(match, i, fetchContext); 178 + let lastSuccess: ActionResult | null = null; 179 + let chainBreak = false; 180 + 181 + if (action.forEach) { 182 + const items = collectItems( 183 + action.forEach.path, 184 + match.event, 185 + fetchContext, 186 + match.automation.uri, 187 + ); 188 + const matched = items.filter((item) => 189 + matchItemConditions(item, action.forEach!.conditions, match.automation.did), 190 + ); 191 + const truncated = matched.length > MAX_FOR_EACH_ITEMS_PER_ACTION; 192 + const toRun = truncated ? matched.slice(0, MAX_FOR_EACH_ITEMS_PER_ACTION) : matched; 193 + for (const item of toRun) { 194 + const result: ActionResult = await handler(match, i, fetchContext, item); 195 + if (isActionSuccess(result.statusCode)) { 196 + lastSuccess = result; 197 + } else { 198 + console.error( 199 + `Action ${i} (${action.$type}) failed (${result.statusCode}) on forEach item, stopping chain: ${result.error ?? ""}`, 200 + ); 201 + chainBreak = true; 202 + break; 203 + } 204 + } 205 + if (truncated && !chainBreak) { 206 + await logForEachTruncation( 207 + match.automation.uri, 208 + i, 209 + match.event.time_us, 210 + matched.length, 211 + false, 212 + ); 213 + } 214 + } else { 215 + const result: ActionResult = await handler(match, i, fetchContext); 216 + if (isActionSuccess(result.statusCode)) { 217 + lastSuccess = result; 218 + } else { 219 + console.error( 220 + `Action ${i} (${action.$type}) failed (${result.statusCode}), stopping chain: ${result.error ?? ""}`, 221 + ); 222 + chainBreak = true; 223 + } 224 + } 95 225 96 - // Accumulate result into fetchContext for downstream actions 97 - if (result.uri && result.cid) { 98 - const { did, collection, rkey } = parseAtUri(result.uri); 226 + // Accumulate the most recent successful result into fetchContext for 227 + // downstream actions. For forEach, this is the last successful iteration. 228 + if (lastSuccess?.uri && lastSuccess.cid) { 229 + const { did, collection, rkey } = parseAtUri(lastSuccess.uri); 99 230 fetchContext[`action${i + 1}`] = { 100 231 found: true, 101 - uri: result.uri, 102 - cid: result.cid, 232 + uri: lastSuccess.uri, 233 + cid: lastSuccess.cid, 103 234 did, 104 235 collection, 105 236 rkey, ··· 107 238 }; 108 239 } 109 240 110 - // Fail-fast: stop chain on error 111 - if (!isActionSuccess(result.statusCode)) { 112 - console.error( 113 - `Action ${i} (${action.$type}) failed (${result.statusCode}), stopping chain: ${result.error ?? ""}`, 114 - ); 115 - break; 116 - } 241 + if (chainBreak) break; 117 242 } catch (err) { 118 243 console.error(`Action ${i} (${action.$type}) threw, stopping chain:`, err); 119 244 break; ··· 131 256 action: Action, 132 257 fetchContext: FetchContext, 133 258 failedFetches: string[], 259 + options?: { item?: unknown; forEachEmpty?: boolean; totalItems?: number }, 134 260 ) { 135 261 let message: string | null = null; 136 262 let error: string | null = null; 137 263 let payload: string | null = null; 138 264 265 + // forEach with no matching item: emit a single explanatory row instead of 266 + // staying silent — otherwise the user sees nothing and can't tell whether 267 + // the path or the conditions filtered everything out. 268 + if (options?.forEachEmpty) { 269 + const total = options.totalItems ?? 0; 270 + message = 271 + total === 0 272 + ? `Would skip: forEach path "${action.forEach?.path}" resolved to no items` 273 + : `Would skip: ${total} item(s) found at "${action.forEach?.path}" but none matched the per-item conditions`; 274 + await db.insert(deliveryLogs).values({ 275 + automationUri: match.automation.uri, 276 + actionIndex, 277 + eventTimeUs: match.event.time_us, 278 + payload: null, 279 + statusCode: null, 280 + message, 281 + error: null, 282 + dryRun: true, 283 + attempt: 1, 284 + createdAt: new Date(), 285 + }); 286 + return; 287 + } 288 + 289 + const item = options?.item; 290 + const itemSuffix = item !== undefined ? ` (item: ${truncateForLog(JSON.stringify(item))})` : ""; 291 + 139 292 if (failedFetches.length > 0) { 140 293 error = `Fetch failed: ${failedFetches.join(", ")}`; 141 294 } else if (action.$type === "webhook") { 142 295 const headerCount = action.headers ? Object.keys(action.headers).length : 0; 143 296 const headerNote = headerCount > 0 ? ` with ${headerCount} custom header(s)` : ""; 144 - message = `Would POST to ${action.callbackUrl}${headerNote}`; 145 - payload = JSON.stringify(buildPayload(match, fetchContext)); 297 + message = `Would POST to ${action.callbackUrl}${headerNote}${itemSuffix}`; 298 + payload = JSON.stringify(buildPayload(match, fetchContext, item)); 146 299 } else if (action.$type === "bsky-post") { 147 300 try { 148 301 const text = await renderTextTemplate( ··· 150 303 match.event, 151 304 fetchContext, 152 305 match.automation, 306 + item, 153 307 ); 154 - message = `Would post to Bluesky`; 155 - payload = JSON.stringify({ text, langs: action.langs, labels: action.labels }); 308 + message = `Would post to Bluesky${itemSuffix}`; 309 + payload = JSON.stringify({ text, langs: action.langs, labels: action.labels, item }); 156 310 } catch (err) { 157 311 error = `Template error: ${err instanceof Error ? err.message : String(err)}`; 158 312 } 159 313 } else if (action.$type === "follow") { 160 314 try { 161 315 const subject = ( 162 - await renderTextTemplate(action.subject, match.event, fetchContext, match.automation) 316 + await renderTextTemplate(action.subject, match.event, fetchContext, match.automation, item) 163 317 ).trim(); 164 318 const target = FOLLOW_TARGETS[action.target]; 165 319 const collection = target.collection; ··· 167 321 // The built-in safety checks live inside executeFollow and aren't run in 168 322 // dry-run (keeps the preview cheap). Advertise their presence in the 169 323 // message so authors know the real run will skip cleanly on both edges. 170 - message = `Would follow ${subject || "(empty)"} on ${action.target} (will skip if no ${appName} profile exists or already following)`; 171 - payload = JSON.stringify({ collection, subject }); 324 + message = `Would follow ${subject || "(empty)"} on ${action.target} (will skip if no ${appName} profile exists or already following)${itemSuffix}`; 325 + payload = JSON.stringify({ collection, subject, item }); 172 326 } catch (err) { 173 327 error = `Template error: ${err instanceof Error ? err.message : String(err)}`; 174 328 } ··· 179 333 match.event, 180 334 fetchContext, 181 335 match.automation, 336 + item, 182 337 ); 183 338 const title = action.targetTitle 184 - ? await renderTextTemplate(action.targetTitle, match.event, fetchContext, match.automation) 339 + ? await renderTextTemplate( 340 + action.targetTitle, 341 + match.event, 342 + fetchContext, 343 + match.automation, 344 + item, 345 + ) 185 346 : undefined; 186 347 const body = action.bodyValue 187 - ? await renderTextTemplate(action.bodyValue, match.event, fetchContext, match.automation) 348 + ? await renderTextTemplate( 349 + action.bodyValue, 350 + match.event, 351 + fetchContext, 352 + match.automation, 353 + item, 354 + ) 188 355 : undefined; 189 356 const tags: string[] = []; 190 357 if (action.tags) { ··· 194 361 match.event, 195 362 fetchContext, 196 363 match.automation, 364 + item, 197 365 ); 198 366 if (rendered.trim()) tags.push(rendered.trim()); 199 367 } 200 368 } 201 - message = `Would bookmark ${source}`; 202 - payload = JSON.stringify({ source, title, body, tags }); 369 + message = `Would bookmark ${source}${itemSuffix}`; 370 + payload = JSON.stringify({ source, title, body, tags, item }); 203 371 } catch (err) { 204 372 error = `Template error: ${err instanceof Error ? err.message : String(err)}`; 205 373 } ··· 210 378 match.event, 211 379 fetchContext, 212 380 match.automation, 381 + item, 213 382 ); 214 383 message = 215 384 action.$type === "patch-record" 216 - ? `Would patch record in ${action.targetCollection} via ${action.baseRecordUri}` 217 - : `Would create record in ${action.targetCollection}`; 218 - payload = JSON.stringify(rendered); 385 + ? `Would patch record in ${action.targetCollection} via ${action.baseRecordUri}${itemSuffix}` 386 + : `Would create record in ${action.targetCollection}${itemSuffix}`; 387 + payload = JSON.stringify(item !== undefined ? { rendered, item } : rendered); 219 388 } catch (err) { 220 389 error = `Template error: ${err instanceof Error ? err.message : String(err)}`; 221 390 } ··· 225 394 automationUri: match.automation.uri, 226 395 actionIndex, 227 396 eventTimeUs: match.event.time_us, 228 - payload, 397 + payload: capPayload(payload), 229 398 statusCode: null, 230 399 message, 231 400 error, ··· 233 402 attempt: 1, 234 403 createdAt: new Date(), 235 404 }); 405 + } 406 + 407 + function truncateForLog(s: string, max = 120): string { 408 + return s.length <= max ? s : s.slice(0, max) + "..."; 409 + } 410 + 411 + /** 412 + * Cap the size of a payload string before persisting it to delivery_logs. 413 + * Item content (and other PDS-derived data) flows into dry-run payloads, and 414 + * a single record can be tens of KB; multiplied by 64 forEach iterations this 415 + * could swell the local SQLite quickly. The cap keeps any individual log row 416 + * bounded while still leaving enough room to debug a template. The marker 417 + * suffix breaks JSON-validity intentionally so consumers don't try to parse a 418 + * truncated blob as a complete object. 419 + */ 420 + const MAX_LOG_PAYLOAD_BYTES = 8 * 1024; 421 + function capPayload(payload: string | null): string | null { 422 + if (payload == null) return null; 423 + if (payload.length <= MAX_LOG_PAYLOAD_BYTES) return payload; 424 + return payload.slice(0, MAX_LOG_PAYLOAD_BYTES) + `…(truncated, ${payload.length} bytes total)`; 236 425 } 237 426 238 427 /**
+238 -2
lib/jetstream/matcher.test.ts
··· 1 - import { describe, it, expect } from "vitest"; 2 - import { matchConditions, evaluateFetchConditions } from "./matcher.js"; 1 + import { describe, it, expect, vi } from "vitest"; 2 + import { 3 + matchConditions, 4 + evaluateFetchConditions, 5 + collectItems, 6 + matchItemConditions, 7 + MAX_FOR_EACH_COLLECT_ITEMS, 8 + } from "./matcher.js"; 3 9 import { makeEvent } from "../test/fixtures.js"; 4 10 import type { FetchContextEntry } from "../actions/template.js"; 5 11 ··· 478 484 }); 479 485 }); 480 486 }); 487 + 488 + describe("collectItems", () => { 489 + // Bluesky post with a mention facet and a link facet — mirrors the example 490 + // in the user's request so a regression here means the headline use case 491 + // would break. 492 + function postEvent() { 493 + return makeEvent({ 494 + commit: { 495 + rev: "r", 496 + operation: "create", 497 + collection: "app.bsky.feed.post", 498 + rkey: "rkey1", 499 + record: { 500 + text: "hi", 501 + facets: [ 502 + { 503 + index: { byteStart: 0, byteEnd: 5 }, 504 + features: [{ $type: "app.bsky.richtext.facet#mention", did: "did:plc:alice" }], 505 + }, 506 + { 507 + index: { byteStart: 6, byteEnd: 12 }, 508 + features: [{ $type: "app.bsky.richtext.facet#link", uri: "https://example.com" }], 509 + }, 510 + ], 511 + }, 512 + }, 513 + }); 514 + } 515 + 516 + it("flattens a single-level array path (facets[])", () => { 517 + const items = collectItems("event.commit.record.facets[]", postEvent(), undefined); 518 + expect(items).toHaveLength(2); 519 + expect((items[0] as { index: { byteStart: number } }).index.byteStart).toBe(0); 520 + }); 521 + 522 + it("flat-maps a nested array path (facets[].features[])", () => { 523 + const items = collectItems("event.commit.record.facets[].features[]", postEvent(), undefined); 524 + expect(items).toHaveLength(2); 525 + expect((items[0] as { $type: string }).$type).toBe("app.bsky.richtext.facet#mention"); 526 + expect((items[1] as { uri: string }).uri).toBe("https://example.com"); 527 + }); 528 + 529 + it("returns an empty list when the path resolves to nothing", () => { 530 + const items = collectItems("event.commit.record.missing[]", postEvent(), undefined); 531 + expect(items).toEqual([]); 532 + }); 533 + 534 + it("returns an empty list when the leaf isn't an array", () => { 535 + // text is a string, not an array — `text[]` should yield nothing 536 + const items = collectItems("event.commit.record.text[]", postEvent(), undefined); 537 + expect(items).toEqual([]); 538 + }); 539 + 540 + it("walks a path rooted at a fetch context entry", () => { 541 + const fetchContext = { 542 + likedPost: { 543 + found: true, 544 + uri: "at://did:plc:author/app.bsky.feed.post/x", 545 + cid: "cid", 546 + record: { 547 + facets: [ 548 + { 549 + features: [{ $type: "app.bsky.richtext.facet#link", uri: "https://b.test" }], 550 + }, 551 + ], 552 + }, 553 + }, 554 + }; 555 + const items = collectItems("likedPost.record.facets[].features[]", makeEvent(), fetchContext); 556 + expect(items).toHaveLength(1); 557 + expect((items[0] as { uri: string }).uri).toBe("https://b.test"); 558 + }); 559 + 560 + it("returns an empty list when the root is unknown", () => { 561 + expect(collectItems("nope.x[]", makeEvent(), undefined)).toEqual([]); 562 + }); 563 + 564 + it("caps the number of leaves materialized regardless of array size", () => { 565 + // Build a record with a single array of MAX + 100 items. The cap should 566 + // bound the result length even when no conditions are filtering. 567 + const huge = Array.from({ length: MAX_FOR_EACH_COLLECT_ITEMS + 100 }, (_, i) => ({ 568 + idx: i, 569 + })); 570 + const event = makeEvent({ 571 + commit: { 572 + rev: "r", 573 + operation: "create", 574 + collection: "x", 575 + rkey: "k", 576 + record: { items: huge }, 577 + }, 578 + }); 579 + const warn = vi.spyOn(console, "warn").mockImplementation(() => {}); 580 + try { 581 + const result = collectItems( 582 + "event.commit.record.items[]", 583 + event, 584 + undefined, 585 + "at://owner/auto/x", 586 + ); 587 + expect(result.length).toBe(MAX_FOR_EACH_COLLECT_ITEMS); 588 + expect((result[0] as { idx: number }).idx).toBe(0); 589 + expect((result.at(-1) as { idx: number }).idx).toBe(MAX_FOR_EACH_COLLECT_ITEMS - 1); 590 + // Operator-visible warning fired with the path and automation URI for 591 + // diagnostics. This is the only signal of the work cap; it doesn't 592 + // surface in delivery_logs. 593 + expect(warn).toHaveBeenCalledOnce(); 594 + expect(warn.mock.calls[0]![0]).toMatch(/work cap/); 595 + expect(warn.mock.calls[0]![0]).toMatch(/at:\/\/owner\/auto\/x/); 596 + } finally { 597 + warn.mockRestore(); 598 + } 599 + }); 600 + 601 + it("does not log a work-cap warning under the cap", () => { 602 + const small = Array.from({ length: 8 }, (_, i) => ({ idx: i })); 603 + const event = makeEvent({ 604 + commit: { 605 + rev: "r", 606 + operation: "create", 607 + collection: "x", 608 + rkey: "k", 609 + record: { items: small }, 610 + }, 611 + }); 612 + const warn = vi.spyOn(console, "warn").mockImplementation(() => {}); 613 + try { 614 + const result = collectItems("event.commit.record.items[]", event, undefined); 615 + expect(result).toHaveLength(8); 616 + expect(warn).not.toHaveBeenCalled(); 617 + } finally { 618 + warn.mockRestore(); 619 + } 620 + }); 621 + 622 + it("caps across nested array levels", () => { 623 + // A 2-deep nested array whose total leaf count exceeds the cap. Each outer 624 + // entry contributes 100 leaves; we expect the cap to short-circuit 625 + // mid-iteration. 626 + const outer = Array.from({ length: 50 }, () => ({ 627 + inner: Array.from({ length: 100 }, (_, j) => ({ j })), 628 + })); 629 + const event = makeEvent({ 630 + commit: { 631 + rev: "r", 632 + operation: "create", 633 + collection: "x", 634 + rkey: "k", 635 + record: { outer }, 636 + }, 637 + }); 638 + const warn = vi.spyOn(console, "warn").mockImplementation(() => {}); 639 + try { 640 + const result = collectItems("event.commit.record.outer[].inner[]", event, undefined); 641 + expect(result.length).toBe(MAX_FOR_EACH_COLLECT_ITEMS); 642 + expect(warn).toHaveBeenCalledOnce(); 643 + } finally { 644 + warn.mockRestore(); 645 + } 646 + }); 647 + 648 + it("refuses to walk into __proto__ / constructor / prototype", () => { 649 + // Even though the validator should reject these segments at save time, 650 + // the runtime guard prevents leaking JS engine internals into payloads 651 + // if a path with these segments somehow reaches collectItems. 652 + const event = makeEvent(); 653 + expect(collectItems("event.__proto__[]", event, undefined)).toEqual([]); 654 + expect(collectItems("event.commit.record.constructor[]", event, undefined)).toEqual([]); 655 + expect(collectItems("event.commit.record.prototype[]", event, undefined)).toEqual([]); 656 + }); 657 + }); 658 + 659 + describe("matchItemConditions", () => { 660 + const ownerDid = "did:plc:owner"; 661 + 662 + it("passes when an empty/undefined condition list is given", () => { 663 + expect(matchItemConditions({ a: 1 }, undefined, ownerDid)).toBe(true); 664 + expect(matchItemConditions({ a: 1 }, [], ownerDid)).toBe(true); 665 + }); 666 + 667 + it("filters facet features by $type", () => { 668 + const link = { $type: "app.bsky.richtext.facet#link", uri: "https://x" }; 669 + const mention = { $type: "app.bsky.richtext.facet#mention", did: "did:plc:bob" }; 670 + const conditions = [{ field: "$type", operator: "eq", value: "app.bsky.richtext.facet#link" }]; 671 + expect(matchItemConditions(link, conditions, ownerDid)).toBe(true); 672 + expect(matchItemConditions(mention, conditions, ownerDid)).toBe(false); 673 + }); 674 + 675 + it("accepts the optional `item.` prefix on field paths", () => { 676 + const item = { uri: "https://x" }; 677 + expect( 678 + matchItemConditions( 679 + item, 680 + [{ field: "item.uri", operator: "startsWith", value: "https://" }], 681 + ownerDid, 682 + ), 683 + ).toBe(true); 684 + }); 685 + 686 + it("supports exists / not-exists on item fields", () => { 687 + const item = { uri: "https://x" }; 688 + expect( 689 + matchItemConditions(item, [{ field: "uri", operator: "exists", value: "" }], ownerDid), 690 + ).toBe(true); 691 + expect( 692 + matchItemConditions(item, [{ field: "did", operator: "not-exists", value: "" }], ownerDid), 693 + ).toBe(true); 694 + }); 695 + 696 + it("treats __proto__ / constructor / prototype as missing", () => { 697 + const item = { uri: "https://x" }; 698 + expect( 699 + matchItemConditions(item, [{ field: "__proto__", operator: "exists", value: "" }], ownerDid), 700 + ).toBe(false); 701 + expect( 702 + matchItemConditions( 703 + item, 704 + [{ field: "constructor", operator: "exists", value: "" }], 705 + ownerDid, 706 + ), 707 + ).toBe(false); 708 + expect( 709 + matchItemConditions( 710 + item, 711 + [{ field: "constructor.name", operator: "eq", value: "Object" }], 712 + ownerDid, 713 + ), 714 + ).toBe(false); 715 + }); 716 + });
+200 -1
lib/jetstream/matcher.ts
··· 1 1 import type { Condition } from "../db/schema.js"; 2 - import type { FetchContextEntry } from "../actions/template.js"; 2 + import type { FetchContext, FetchContextEntry } from "../actions/template.js"; 3 3 4 4 export type JetstreamCommit = { 5 5 rev: string; ··· 147 147 if (!conditions || conditions.length === 0) return true; 148 148 return conditions.every((cond) => evaluateFetchCondition(entry, cond, ownerDid)); 149 149 } 150 + 151 + // --------------------------------------------------------------------------- 152 + // forEach: array iteration over event/fetch data 153 + // --------------------------------------------------------------------------- 154 + 155 + /** 156 + * Path segments that walk into JS engine internals are rejected runtime-side. 157 + * Templates / conditions can never reach `Object.prototype`, the constructor 158 + * function, etc., even if validators upstream let one slip through. This is 159 + * defense-in-depth — it only matters if a future regex change opens a hole. 160 + */ 161 + const UNSAFE_KEYS = new Set(["__proto__", "constructor", "prototype"]); 162 + function isUnsafeKey(k: string): boolean { 163 + return UNSAFE_KEYS.has(k); 164 + } 165 + export { isUnsafeKey }; 166 + 167 + /** 168 + * Resolve the root object that a forEach path is rooted at. 169 + * - `event.*` → the event itself 170 + * - `<fetchName>.*` → that fetch's context entry 171 + * 172 + * Returns `undefined` when the root cannot be resolved (e.g. unknown fetch 173 + * name, or the path doesn't start with a known root). 174 + */ 175 + function rootForPath( 176 + path: string, 177 + event: JetstreamEvent, 178 + fetchContext: FetchContext | undefined, 179 + ): { root: unknown; rest: string } | undefined { 180 + if (path.startsWith("event.")) { 181 + return { root: event, rest: path.slice("event.".length) }; 182 + } 183 + if (fetchContext) { 184 + const dotIndex = path.indexOf("."); 185 + const head = dotIndex > 0 ? path.slice(0, dotIndex) : path; 186 + if (head in fetchContext) { 187 + return { 188 + root: fetchContext[head], 189 + rest: dotIndex > 0 ? path.slice(dotIndex + 1) : "", 190 + }; 191 + } 192 + } 193 + return undefined; 194 + } 195 + 196 + /** 197 + * Hard cap on the number of leaves `collectItems` will materialize from a 198 + * single path. Bounds the *work* done by `walk` (memory + CPU) regardless of 199 + * how the user filters with conditions. Without this, a single adversarial 200 + * record with a million-element array would force a million-iteration filter 201 + * pass and held references until the handler returned, even if the downstream 202 + * delivery cap only fires 64 actions. 203 + * 204 + * Set to 4× the delivery cap (`MAX_FOR_EACH_ITEMS_PER_ACTION`). This is 205 + * generous-enough headroom for legitimate filtering scenarios (e.g. 200 206 + * candidate facets filtered down to 50 deliveries) while keeping the 207 + * worst-case memory footprint trivial — at 256 references, the array spine 208 + * is ~2KB regardless of what the user puts in their record. Real-world 209 + * usage is well under this: typical Bluesky posts have <50 facets total. 210 + */ 211 + const MAX_FOR_EACH_COLLECT_ITEMS = 256; 212 + export { MAX_FOR_EACH_COLLECT_ITEMS }; 213 + 214 + type WalkBudget = { remaining: number; truncated: boolean }; 215 + 216 + /** 217 + * Walk `value` along `segments`. Each segment is either a key or `[]`. A `[]` 218 + * segment flat-maps the current array level — every entry is walked through 219 + * the remaining segments and its leaves are appended. The result is always a 220 + * flat array of leaves (objects, primitives, whatever the path resolves to). 221 + * 222 + * `budget` carries a shared remaining-leaves counter across the recursion; 223 + * when it hits zero, further iteration short-circuits and the truncated flag 224 + * is set so the caller can decide how to surface the cap. 225 + */ 226 + function walk(value: unknown, segments: string[], budget: WalkBudget): unknown[] { 227 + if (budget.remaining <= 0) return []; 228 + 229 + if (segments.length === 0) { 230 + if (value === undefined) return []; 231 + budget.remaining -= 1; 232 + return [value]; 233 + } 234 + 235 + const [head, ...rest] = segments; 236 + 237 + if (head === "[]") { 238 + if (!Array.isArray(value)) return []; 239 + const out: unknown[] = []; 240 + for (const item of value) { 241 + if (budget.remaining <= 0) { 242 + budget.truncated = true; 243 + break; 244 + } 245 + out.push(...walk(item, rest, budget)); 246 + } 247 + return out; 248 + } 249 + 250 + if (value == null || typeof value !== "object") return []; 251 + if (isUnsafeKey(head!)) return []; 252 + return walk((value as Record<string, unknown>)[head!], rest, budget); 253 + } 254 + 255 + /** 256 + * Resolve a forEach `path` against the event + fetch context and return a flat 257 + * list of items. Paths must use `[]` segments to mark array levels (e.g. 258 + * `event.commit.record.facets[].features[]`). Anything that doesn't ultimately 259 + * resolve to an array of leaves returns an empty list. 260 + * 261 + * Capped at `MAX_FOR_EACH_COLLECT_ITEMS` leaves regardless of the underlying 262 + * array size. When the cap is hit, the caller's `automationUri` (used only 263 + * for the warning) is logged so operators can spot a runaway path. The user 264 + * downstream will still see the regular per-action delivery cap kick in if 265 + * matches exceed it; the operator-visible warning is purely diagnostic. 266 + */ 267 + export function collectItems( 268 + path: string, 269 + event: JetstreamEvent, 270 + fetchContext: FetchContext | undefined, 271 + automationUri?: string, 272 + ): unknown[] { 273 + const rooted = rootForPath(path, event, fetchContext); 274 + if (!rooted) return []; 275 + // Trailing `[]` is required, so the rest path must contain at least one `[]` 276 + // segment for items to be returned. 277 + const segments = rooted.rest === "" ? [] : rooted.rest.split("."); 278 + // Split each segment that ends in `[]` into a key + `[]` pair. e.g. "facets[]" 279 + // becomes ["facets", "[]"] so the walker can iterate properly. 280 + const expanded: string[] = []; 281 + for (const seg of segments) { 282 + if (seg.endsWith("[]")) { 283 + const head = seg.slice(0, -2); 284 + if (head) expanded.push(head); 285 + expanded.push("[]"); 286 + } else { 287 + expanded.push(seg); 288 + } 289 + } 290 + const budget: WalkBudget = { remaining: MAX_FOR_EACH_COLLECT_ITEMS, truncated: false }; 291 + const out = walk(rooted.root, expanded, budget); 292 + if (budget.truncated) { 293 + console.warn( 294 + `[forEach] collectItems hit the ${MAX_FOR_EACH_COLLECT_ITEMS}-item work cap on path "${path}"` + 295 + (automationUri ? ` for ${automationUri}` : "") + 296 + "; remaining leaves were not materialized.", 297 + ); 298 + } 299 + return out; 300 + } 301 + 302 + /** 303 + * Resolve a dotted path against a single item. Used by item conditions and by 304 + * `{{item.*}}` placeholder resolution. Returns the resolved string form of the 305 + * leaf value, or `undefined` if missing. 306 + */ 307 + function resolveItemField(item: unknown, field: string): string | undefined { 308 + if (item == null || typeof item !== "object") return undefined; 309 + let value: unknown = item; 310 + for (const key of field.split(".")) { 311 + if (value == null || typeof value !== "object") return undefined; 312 + if (isUnsafeKey(key)) return undefined; 313 + value = (value as Record<string, unknown>)[key]; 314 + } 315 + if (value == null) return undefined; 316 + return typeof value === "string" ? value : JSON.stringify(value); 317 + } 318 + 319 + function evaluateItemCondition(item: unknown, condition: Condition, ownerDid: string): boolean { 320 + // Item conditions reference the item directly; the optional `item.` prefix 321 + // is accepted for consistency with template placeholders. 322 + const field = condition.field.startsWith("item.") 323 + ? condition.field.slice("item.".length) 324 + : condition.field; 325 + 326 + if (condition.operator === "exists" || condition.operator === "not-exists") { 327 + const actual = resolveItemField(item, field); 328 + const exists = actual !== undefined && actual !== ""; 329 + return condition.operator === "exists" ? exists : !exists; 330 + } 331 + 332 + const actual = resolveItemField(item, field); 333 + if (actual === undefined) return false; 334 + const expected = resolveConditionValue(condition.value, ownerDid); 335 + return applyStringOperator(condition.operator, actual, expected); 336 + } 337 + 338 + /** 339 + * AND-evaluate a list of conditions against a single item. Empty list passes. 340 + */ 341 + export function matchItemConditions( 342 + item: unknown, 343 + conditions: Condition[] | undefined, 344 + ownerDid: string, 345 + ): boolean { 346 + if (!conditions || conditions.length === 0) return true; 347 + return conditions.every((cond) => evaluateItemCondition(item, cond, ownerDid)); 348 + }
+17
lib/lexicons/schema-tree.ts
··· 7 7 BooleanNode, 8 8 ObjectNode, 9 9 ArrayNode, 10 + UnionNode, 10 11 UnknownNode, 11 12 SchemaNode, 12 13 RecordSchema, ··· 18 19 BooleanNode, 19 20 ObjectNode, 20 21 ArrayNode, 22 + UnionNode, 21 23 SchemaNode, 22 24 RecordSchema, 23 25 } from "./schema-types.js"; ··· 135 137 136 138 // Use the external lexicon's defs for local ref resolution within it 137 139 return buildNode(def, externalDefs, visited, depth + 1); 140 + } 141 + 142 + case "union": { 143 + const refs = (prop.refs as string[] | undefined) ?? []; 144 + const variants: UnionNode["variants"] = []; 145 + for (const ref of refs) { 146 + // Resolve each variant as if it were a `ref`. Use a per-variant visited 147 + // set so siblings can share refs without short-circuiting each other. 148 + const variantVisited = new Set(visited); 149 + const node = await buildNode({ type: "ref", ref }, localDefs, variantVisited, depth + 1); 150 + variants.push({ ref, node }); 151 + } 152 + const node: UnionNode = { type: "union", variants }; 153 + if (prop.description) node.description = prop.description; 154 + return node; 138 155 } 139 156 140 157 default:
+9
lib/lexicons/schema-types.ts
··· 36 36 maxLength?: number; 37 37 }; 38 38 39 + /** Union of refs (lexicon `union` type). Each variant carries its source ref 40 + * so callers can label fields by `$type` without re-resolving the schema. */ 41 + export type UnionNode = { 42 + type: "union"; 43 + description?: string; 44 + variants: Array<{ ref: string; node: SchemaNode }>; 45 + }; 46 + 39 47 export type UnknownNode = { 40 48 type: "unknown"; 41 49 description?: string; ··· 47 55 | BooleanNode 48 56 | ObjectNode 49 57 | ArrayNode 58 + | UnionNode 50 59 | UnknownNode; 51 60 52 61 export type RecordSchema = {
+11 -2
lib/webhooks/dispatcher.ts
··· 25 25 }; 26 26 }; 27 27 fetches?: Record<string, { uri: string; cid: string; record: Record<string, unknown> }>; 28 + /** Present only when the action is configured with a forEach modifier — this 29 + * is the array element that triggered the current delivery. */ 30 + item?: unknown; 28 31 }; 29 32 30 - export function buildPayload(match: MatchedEvent, fetchContext?: FetchContext): WebhookPayload { 33 + export function buildPayload( 34 + match: MatchedEvent, 35 + fetchContext?: FetchContext, 36 + item?: unknown, 37 + ): WebhookPayload { 31 38 const { automation, event } = match; 32 39 return { 33 40 automation: automation.uri, ··· 48 55 : undefined, 49 56 }, 50 57 fetches: fetchContext && Object.keys(fetchContext).length > 0 ? fetchContext : undefined, 58 + ...(item !== undefined ? { item } : {}), 51 59 }; 52 60 } 53 61 ··· 184 192 match: MatchedEvent, 185 193 actionIndex: number, 186 194 fetchContext?: FetchContext, 195 + item?: unknown, 187 196 ): Promise<ActionResult> { 188 197 const { automation, event } = match; 189 198 const action = automation.actions[actionIndex] as WebhookAction; 190 - const payload = buildPayload(match, fetchContext); 199 + const payload = buildPayload(match, fetchContext, item); 191 200 const body = JSON.stringify(payload); 192 201 193 202 // Resolve {{secret:name}} references in custom headers (once, reused for retries)