Webhooks for the AT Protocol airglow.run
atproto atprotocol automation webhook
12
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: search on pds and add conditions on data sources

Hugo f93c4a2a bdceb8e8

+2918 -303
+115
app/components/FetchCard/index.tsx
··· 1 + import type { FetchStep, Condition } from "@/db/schema.js"; 2 + import { FileText, Search } from "../../icons.js"; 3 + import { opLabels } from "@/automations/labels.js"; 4 + import { DescriptionList } from "../DescriptionList/index.js"; 5 + import { InlineCode } from "../CodeBlock/index.js"; 6 + import * as s from "./styles.css.ts"; 7 + 8 + type Kind = "record" | "search"; 9 + 10 + function resolveKind(f: FetchStep): Kind { 11 + return f.kind === "search" ? "search" : "record"; 12 + } 13 + 14 + function KindIcon({ kind }: { kind: Kind }) { 15 + const Icon = kind === "search" ? Search : FileText; 16 + return ( 17 + <span class={s.kindIcon} aria-hidden="true"> 18 + <Icon size={18} /> 19 + </span> 20 + ); 21 + } 22 + 23 + // Natural-language sentence for a per-fetch condition. 24 + // `found + exists/not-exists` is the duplicate-prevention idiom and gets 25 + // a phrasing that fits the fetch kind; other fields fall back to the 26 + // generic "<field> <op> <value>" template. 27 + function ConditionSentence({ cond, kind }: { cond: Condition; kind: Kind }) { 28 + if (cond.field === "found") { 29 + if (cond.operator === "exists") { 30 + return <>{kind === "search" ? "a matching record is found" : "the record is found"}</>; 31 + } 32 + if (cond.operator === "not-exists") { 33 + return <>{kind === "search" ? "no matching record is found" : "the record is not found"}</>; 34 + } 35 + } 36 + const op = opLabels[cond.operator] ?? cond.operator; 37 + const valueless = cond.operator === "exists" || cond.operator === "not-exists"; 38 + return ( 39 + <> 40 + <InlineCode>{cond.field}</InlineCode> {op} 41 + {!valueless && cond.value && ( 42 + <> 43 + {" "} 44 + <InlineCode>{cond.value}</InlineCode> 45 + </> 46 + )} 47 + </> 48 + ); 49 + } 50 + 51 + export function FetchCard({ fetch: f }: { fetch: FetchStep }) { 52 + const kind = resolveKind(f); 53 + const kindLabel = kind === "search" ? "Search" : "Record lookup"; 54 + 55 + return ( 56 + <div class={s.card}> 57 + <header class={s.headerRow}> 58 + <KindIcon kind={kind} /> 59 + <span class={s.eyebrow}>{kindLabel}</span> 60 + <span class={s.name}> 61 + <InlineCode>{f.name}</InlineCode> 62 + </span> 63 + {f.comment && <span class={s.subtitle}>— {f.comment}</span>} 64 + </header> 65 + 66 + <DescriptionList> 67 + {f.kind === "search" ? ( 68 + <> 69 + <dt>Collection</dt> 70 + <dd> 71 + <InlineCode>{f.collection}</InlineCode> 72 + </dd> 73 + <dt>In repo</dt> 74 + <dd> 75 + <InlineCode>{f.repo}</InlineCode> 76 + </dd> 77 + {f.where.length > 0 && ( 78 + <> 79 + <dt>Where</dt> 80 + <dd> 81 + {f.where.map((w, i) => ( 82 + <span key={i}> 83 + {i > 0 && <> and </>} 84 + <InlineCode>{w.field}</InlineCode> {opLabels[w.operator] ?? w.operator}{" "} 85 + <InlineCode>{w.value}</InlineCode> 86 + </span> 87 + ))} 88 + </dd> 89 + </> 90 + )} 91 + </> 92 + ) : ( 93 + <> 94 + <dt>Lookup</dt> 95 + <dd> 96 + <InlineCode>{f.uri}</InlineCode> 97 + </dd> 98 + </> 99 + )} 100 + </DescriptionList> 101 + 102 + {f.conditions && f.conditions.length > 0 && ( 103 + <ul class={s.gateList}> 104 + {f.conditions.map((cond, i) => ( 105 + <li key={i} class={s.gateRow}> 106 + <span class={s.gateLead}>Continue only if</span>{" "} 107 + <ConditionSentence cond={cond} kind={kind} /> 108 + {cond.comment && <span class={s.subtitle}> — {cond.comment}</span>} 109 + </li> 110 + ))} 111 + </ul> 112 + )} 113 + </div> 114 + ); 115 + }
+77
app/components/FetchCard/styles.css.ts
··· 1 + import { style } from "@vanilla-extract/css"; 2 + import { vars } from "../../styles/theme.css.ts"; 3 + import { radii } from "../../styles/tokens/radii.ts"; 4 + import { space } from "../../styles/tokens/spacing.ts"; 5 + import { fontSize, fontWeight } from "../../styles/tokens/typography.ts"; 6 + 7 + export const card = style({ 8 + display: "flex", 9 + flexDirection: "column", 10 + gap: space[3], 11 + paddingBlock: space[4], 12 + paddingInline: space[4], 13 + borderRadius: radii.lg, 14 + border: `1px solid ${vars.color.border}`, 15 + backgroundColor: vars.color.surface, 16 + boxShadow: vars.shadow.highlight, 17 + }); 18 + 19 + export const headerRow = style({ 20 + display: "flex", 21 + alignItems: "center", 22 + flexWrap: "wrap", 23 + gap: space[2], 24 + minWidth: 0, 25 + }); 26 + 27 + export const kindIcon = style({ 28 + display: "inline-flex", 29 + alignItems: "center", 30 + justifyContent: "center", 31 + width: "32px", 32 + height: "32px", 33 + borderRadius: radii.md, 34 + flexShrink: 0, 35 + backgroundColor: vars.color.accentSubtle, 36 + color: vars.color.accentActive, 37 + }); 38 + 39 + export const eyebrow = style({ 40 + fontSize: fontSize.xs, 41 + fontWeight: fontWeight.semibold, 42 + textTransform: "uppercase", 43 + letterSpacing: "0.07em", 44 + color: vars.color.textMuted, 45 + }); 46 + 47 + export const name = style({ 48 + fontSize: fontSize.base, 49 + fontWeight: fontWeight.medium, 50 + color: vars.color.text, 51 + }); 52 + 53 + export const subtitle = style({ 54 + fontSize: fontSize.sm, 55 + color: vars.color.textMuted, 56 + }); 57 + 58 + export const gateList = style({ 59 + listStyle: "none", 60 + display: "flex", 61 + flexDirection: "column", 62 + gap: space[2], 63 + margin: 0, 64 + padding: 0, 65 + paddingBlockStart: space[3], 66 + borderBlockStart: `1px solid ${vars.color.borderSubtle}`, 67 + }); 68 + 69 + export const gateRow = style({ 70 + fontSize: fontSize.sm, 71 + color: vars.color.textSecondary, 72 + }); 73 + 74 + export const gateLead = style({ 75 + fontWeight: fontWeight.semibold, 76 + color: vars.color.text, 77 + });
+4
app/icons.ts
··· 57 57 import CodeData from "lucide/icons/code"; 58 58 import RepeatData from "lucide/icons/repeat"; 59 59 import SettingsData from "lucide/icons/settings"; 60 + import FileTextData from "lucide/icons/file-text"; 61 + import SearchData from "lucide/icons/search"; 60 62 61 63 export const Activity = icon(ActivityData); 62 64 export const ArrowLeft = icon(ArrowLeftData); ··· 88 90 export const Code = icon(CodeData); 89 91 export const Repeat = icon(RepeatData); 90 92 export const Settings = icon(SettingsData); 93 + export const FileText = icon(FileTextData); 94 + export const Search = icon(SearchData);
+107
app/islands/AutomationForm.css.ts
··· 456 456 flex: "2 1 200px", 457 457 }); 458 458 459 + /** Card wrapper for a single data source. Visually groups name, type, 460 + * source-specific fields, per-source conditions, and the note. */ 461 + export const fetchCard = style({ 462 + display: "flex", 463 + flexDirection: "column", 464 + gap: space[4], 465 + paddingBlock: space[4], 466 + paddingInline: space[4], 467 + borderRadius: radii.md, 468 + border: `1px solid ${vars.color.borderSubtle}`, 469 + backgroundColor: vars.color.bg, 470 + }); 471 + 472 + /** Top row: Variable name + Source type + Remove on one line. */ 473 + export const fetchTopRow = style({ 474 + display: "flex", 475 + flexWrap: "wrap", 476 + gap: space[3], 477 + alignItems: "flex-start", 478 + }); 479 + 480 + /** Sub-section inside a fetch card — a group of fields with its own label. */ 481 + export const fetchSubsection = style({ 482 + display: "flex", 483 + flexDirection: "column", 484 + gap: space[2], 485 + paddingBlockStart: space[3], 486 + borderBlockStart: `1px solid ${vars.color.borderSubtle}`, 487 + }); 488 + 489 + export const fetchSubsectionTitle = style({ 490 + fontSize: fontSize.xs, 491 + fontWeight: fontWeight.semibold, 492 + textTransform: "uppercase", 493 + letterSpacing: "0.07em", 494 + color: vars.color.textMuted, 495 + }); 496 + 497 + /** Two-column grid for related fields (Repo/Collection, etc.). */ 498 + export const twoColRow = style({ 499 + display: "grid", 500 + gridTemplateColumns: "1fr 1fr", 501 + gap: space[3], 502 + "@media": { 503 + "(max-width: 480px)": { 504 + gridTemplateColumns: "1fr", 505 + }, 506 + }, 507 + }); 508 + 509 + /** Match-records-where row: Field | equals | Value. */ 510 + export const fetchMatchRow = style({ 511 + display: "flex", 512 + flexWrap: "wrap", 513 + gap: space[2], 514 + alignItems: "flex-start", 515 + }); 516 + 517 + /** Invisible spacer that takes the same block-size as a label. Used to push 518 + * a button or inline text down so it aligns with an input on an adjacent 519 + * column whose first row is a label. */ 520 + export const labelSpacer = style({ 521 + visibility: "hidden", 522 + userSelect: "none", 523 + pointerEvents: "none", 524 + }); 525 + 526 + /** Card wrapping a single per-fetch condition: the preset dropdown row, 527 + * plus the optional custom field/op/value row when "Custom" is picked. */ 528 + export const fetchConditionCard = style({ 529 + display: "flex", 530 + flexDirection: "column", 531 + gap: space[2], 532 + paddingBlock: space[3], 533 + paddingInline: space[3], 534 + borderRadius: radii.md, 535 + border: `1px solid ${vars.color.borderSubtle}`, 536 + backgroundColor: vars.color.surface, 537 + }); 538 + 539 + /** Preset row: the semantic dropdown + remove button. */ 540 + export const fetchConditionPresetRow = style({ 541 + display: "flex", 542 + alignItems: "flex-start", 543 + gap: space[2], 544 + }); 545 + 546 + /** Small inline "equals" operator in the match row. Sits in a fieldGroup 547 + * with a labelSpacer above so it aligns with the adjacent input. */ 548 + export const fetchMatchOperator = style({ 549 + fontSize: fontSize.sm, 550 + color: vars.color.textMuted, 551 + paddingBlockStart: space[2], 552 + flex: "0 0 auto", 553 + }); 554 + 555 + /** Inline code-style hint for field paths, placeholders, etc. */ 556 + export const inlineCode = style({ 557 + fontFamily: "monospace", 558 + fontSize: "0.9em", 559 + paddingBlock: "1px", 560 + paddingInline: "4px", 561 + borderRadius: radii.sm, 562 + backgroundColor: vars.color.code, 563 + color: vars.color.text, 564 + }); 565 + 459 566 export const collapsibleDetails = style({ 460 567 border: `1px solid ${vars.color.border}`, 461 568 borderRadius: radii.md,
+519 -79
app/islands/AutomationForm.tsx
··· 22 22 comment: string; 23 23 }; 24 24 25 - type FetchDraft = { name: string; uri: string; comment: string }; 25 + type FetchDraft = { 26 + kind: "record" | "search"; 27 + name: string; 28 + uri: string; 29 + repo: string; 30 + collection: string; 31 + whereField: string; 32 + whereValue: string; 33 + conditions: Condition[]; 34 + comment: string; 35 + }; 26 36 27 37 type HeaderDraft = { key: string; value: string }; 28 38 type WebhookDraft = { ··· 69 79 operations: string[]; 70 80 actions: Action[]; 71 81 fetches: FetchStep[]; 72 - conditions: Array<{ field: string; operator: string; value: string; comment?: string }>; 82 + conditions: Array<{ 83 + field: string; 84 + operator: string; 85 + value: string; 86 + comment?: string; 87 + }>; 73 88 wantedDids: string[]; 74 89 active: boolean; 75 90 }; ··· 685 700 } 686 701 687 702 function toFetchDrafts(fetches: FetchStep[]): FetchDraft[] { 688 - return fetches.map((f) => ({ name: f.name, uri: f.uri, comment: f.comment ?? "" })); 703 + return fetches.map((f) => { 704 + const conditions = (f.conditions ?? []).map((c) => ({ 705 + field: c.field, 706 + operator: c.operator, 707 + value: c.value, 708 + comment: c.comment ?? "", 709 + })); 710 + if (f.kind === "search") { 711 + const first = f.where[0]; 712 + return { 713 + kind: "search" as const, 714 + name: f.name, 715 + uri: "", 716 + repo: f.repo, 717 + collection: f.collection, 718 + whereField: first?.field ?? "subject", 719 + whereValue: first?.value ?? "", 720 + conditions, 721 + comment: f.comment ?? "", 722 + }; 723 + } 724 + return { 725 + kind: "record" as const, 726 + name: f.name, 727 + uri: f.uri, 728 + repo: "", 729 + collection: "", 730 + whereField: "", 731 + whereValue: "", 732 + conditions, 733 + comment: f.comment ?? "", 734 + }; 735 + }); 736 + } 737 + 738 + const VALUE_LESS_CONDITION_OPS = new Set(["exists", "not-exists"]); 739 + 740 + type FetchConditionPreset = "found" | "not-found" | "custom"; 741 + 742 + function fetchConditionPreset(c: Condition): FetchConditionPreset { 743 + if (c.field === "found") { 744 + if (c.operator === "exists") return "found"; 745 + if (c.operator === "not-exists") return "not-found"; 746 + } 747 + return "custom"; 748 + } 749 + 750 + function conditionIsComplete(c: Condition): boolean { 751 + if (!c.field) return false; 752 + if (VALUE_LESS_CONDITION_OPS.has(c.operator)) return true; 753 + return !!c.value; 754 + } 755 + 756 + function conditionToPayload(c: Condition) { 757 + const valueLess = VALUE_LESS_CONDITION_OPS.has(c.operator); 758 + return { 759 + field: c.field, 760 + operator: c.operator, 761 + value: valueLess ? "" : c.value, 762 + ...(c.comment ? { comment: c.comment } : {}), 763 + }; 764 + } 765 + 766 + function fetchIsComplete(f: FetchDraft): boolean { 767 + if (!f.name) return false; 768 + if (f.kind === "search") return !!(f.repo && f.collection && f.whereField && f.whereValue); 769 + return !!f.uri; 770 + } 771 + 772 + function fetchToPayload(f: FetchDraft) { 773 + const comment = f.comment ? { comment: f.comment } : {}; 774 + const completedConditions = f.conditions.filter(conditionIsComplete).map(conditionToPayload); 775 + const conditions = completedConditions.length > 0 ? { conditions: completedConditions } : {}; 776 + if (f.kind === "search") { 777 + return { 778 + kind: "search" as const, 779 + name: f.name, 780 + repo: f.repo, 781 + collection: f.collection, 782 + where: [{ field: f.whereField, operator: "eq", value: f.whereValue }], 783 + ...conditions, 784 + ...comment, 785 + }; 786 + } 787 + return { kind: "record" as const, name: f.name, uri: f.uri, ...conditions, ...comment }; 689 788 } 690 789 691 790 function toConditionDrafts( 692 - conditions: Array<{ field: string; operator: string; value: string; comment?: string }>, 791 + conditions: Array<{ 792 + field: string; 793 + operator: string; 794 + value: string; 795 + comment?: string; 796 + }>, 693 797 ): Condition[] { 694 - return conditions.map((c) => ({ ...c, comment: c.comment ?? "" })); 798 + return conditions.map((c) => ({ 799 + field: c.field, 800 + operator: c.operator, 801 + value: c.value, 802 + comment: c.comment ?? "", 803 + })); 695 804 } 696 805 697 806 export default function AutomationForm({ ··· 874 983 ); 875 984 876 985 const addFetch = useCallback(() => { 877 - setFetches((prev) => [...prev, { name: "", uri: "", comment: "" }]); 986 + setFetches((prev) => [ 987 + ...prev, 988 + { 989 + kind: "record", 990 + name: "", 991 + uri: "", 992 + repo: "", 993 + collection: "", 994 + whereField: "subject", 995 + whereValue: "", 996 + conditions: [], 997 + comment: "", 998 + }, 999 + ]); 878 1000 }, []); 879 1001 880 1002 const removeFetch = useCallback((index: number) => { 881 1003 setFetches((prev) => prev.filter((_, i) => i !== index)); 882 1004 }, []); 883 1005 884 - const updateFetch = useCallback((index: number, key: "name" | "uri" | "comment", val: string) => { 885 - setFetches((prev) => prev.map((f, i) => (i === index ? { ...f, [key]: val } : f))); 1006 + const updateFetch = useCallback( 1007 + ( 1008 + index: number, 1009 + key: 1010 + | "kind" 1011 + | "name" 1012 + | "uri" 1013 + | "repo" 1014 + | "collection" 1015 + | "whereField" 1016 + | "whereValue" 1017 + | "comment", 1018 + val: string, 1019 + ) => { 1020 + setFetches((prev) => 1021 + prev.map((f, i) => { 1022 + if (i !== index) return f; 1023 + if (key === "kind") return { ...f, kind: val === "search" ? "search" : "record" }; 1024 + return { ...f, [key]: val }; 1025 + }), 1026 + ); 1027 + }, 1028 + [], 1029 + ); 1030 + 1031 + const addFetchCondition = useCallback((fetchIndex: number) => { 1032 + setFetches((prev) => 1033 + prev.map((f, i) => 1034 + i !== fetchIndex 1035 + ? f 1036 + : { 1037 + ...f, 1038 + conditions: [ 1039 + ...f.conditions, 1040 + { field: "found", operator: "exists", value: "", comment: "" }, 1041 + ], 1042 + }, 1043 + ), 1044 + ); 886 1045 }, []); 887 1046 1047 + const removeFetchCondition = useCallback((fetchIndex: number, condIndex: number) => { 1048 + setFetches((prev) => 1049 + prev.map((f, i) => 1050 + i !== fetchIndex ? f : { ...f, conditions: f.conditions.filter((_, j) => j !== condIndex) }, 1051 + ), 1052 + ); 1053 + }, []); 1054 + 1055 + const updateFetchCondition = useCallback( 1056 + ( 1057 + fetchIndex: number, 1058 + condIndex: number, 1059 + key: "field" | "operator" | "value" | "comment", 1060 + val: string, 1061 + ) => { 1062 + setFetches((prev) => 1063 + prev.map((f, i) => 1064 + i !== fetchIndex 1065 + ? f 1066 + : { 1067 + ...f, 1068 + conditions: f.conditions.map((c, j) => 1069 + j !== condIndex ? c : { ...c, [key]: val }, 1070 + ), 1071 + }, 1072 + ), 1073 + ); 1074 + }, 1075 + [], 1076 + ); 1077 + 1078 + const setFetchConditionPreset = useCallback( 1079 + (fetchIndex: number, condIndex: number, preset: FetchConditionPreset) => { 1080 + setFetches((prev) => 1081 + prev.map((f, i) => 1082 + i !== fetchIndex 1083 + ? f 1084 + : { 1085 + ...f, 1086 + conditions: f.conditions.map((c, j) => { 1087 + if (j !== condIndex) return c; 1088 + if (preset === "found") 1089 + return { ...c, field: "found", operator: "exists", value: "" }; 1090 + if (preset === "not-found") 1091 + return { ...c, field: "found", operator: "not-exists", value: "" }; 1092 + return { ...c, field: "", operator: "eq", value: "" }; 1093 + }), 1094 + }, 1095 + ), 1096 + ); 1097 + }, 1098 + [], 1099 + ); 1100 + 888 1101 const addAction = useCallback((type: AddableActionId) => { 889 1102 if (type === "webhook") { 890 1103 setActions((prev) => [ ··· 938 1151 const previewPayload = useMemo(() => { 939 1152 const payload: Record<string, unknown> = { name, lexicon, operations }; 940 1153 if (description.trim()) payload.description = description.trim(); 941 - const filteredFetches = fetches.filter((f) => f.name && f.uri); 1154 + const filteredFetches = fetches.filter((f) => fetchIsComplete(f)); 942 1155 if (filteredFetches.length > 0 || isEdit) { 943 - payload.fetches = filteredFetches.map((f) => ({ 944 - name: f.name, 945 - uri: f.uri, 946 - ...(f.comment ? { comment: f.comment } : {}), 947 - })); 1156 + payload.fetches = filteredFetches.map((f) => fetchToPayload(f)); 948 1157 } 949 - const filteredConditions = conditions.filter((c) => c.field && c.value); 1158 + const filteredConditions = conditions.filter((c) => conditionIsComplete(c)); 950 1159 if (filteredConditions.length > 0 || isEdit) { 951 - payload.conditions = filteredConditions.map((c) => ({ 952 - field: c.field, 953 - operator: c.operator, 954 - value: c.value, 955 - ...(c.comment ? { comment: c.comment } : {}), 956 - })); 1160 + payload.conditions = filteredConditions.map((c) => conditionToPayload(c)); 957 1161 } 958 1162 const trimmedWantedDids = wantedDids.map((d) => d.trim()).filter(Boolean); 959 1163 if (trimmedWantedDids.length > 0 || isEdit) { ··· 1450 1654 <option value="startsWith">starts with</option> 1451 1655 <option value="endsWith">ends with</option> 1452 1656 <option value="contains">contains</option> 1657 + <option value="exists">exists</option> 1658 + <option value="not-exists">does not exist</option> 1453 1659 </select> 1454 1660 </div> 1455 1661 )} 1456 - <div class={s.conditionValue}> 1457 - {conditionFields.find((f) => f.path === cond.field)?.type === "boolean" ? ( 1458 - <select 1459 - class={s.select} 1460 - value={cond.value} 1461 - onChange={(e: Event) => 1462 - updateCondition(i, "value", (e.target as HTMLSelectElement).value) 1463 - } 1464 - > 1465 - <option value="">Select...</option> 1466 - <option value="true">true</option> 1467 - <option value="false">false</option> 1468 - </select> 1469 - ) : ( 1470 - <input 1471 - class={s.input} 1472 - type="text" 1473 - placeholder="Value" 1474 - value={cond.value} 1475 - onInput={(e: Event) => 1476 - updateCondition(i, "value", (e.target as HTMLInputElement).value) 1477 - } 1478 - /> 1479 - )} 1480 - </div> 1662 + {!VALUE_LESS_CONDITION_OPS.has(cond.operator) && ( 1663 + <div class={s.conditionValue}> 1664 + {conditionFields.find((f) => f.path === cond.field)?.type === "boolean" ? ( 1665 + <select 1666 + class={s.select} 1667 + value={cond.value} 1668 + onChange={(e: Event) => 1669 + updateCondition(i, "value", (e.target as HTMLSelectElement).value) 1670 + } 1671 + > 1672 + <option value="">Select...</option> 1673 + <option value="true">true</option> 1674 + <option value="false">false</option> 1675 + </select> 1676 + ) : ( 1677 + <input 1678 + class={s.input} 1679 + type="text" 1680 + placeholder="Value" 1681 + value={cond.value} 1682 + onInput={(e: Event) => 1683 + updateCondition(i, "value", (e.target as HTMLInputElement).value) 1684 + } 1685 + /> 1686 + )} 1687 + </div> 1688 + )} 1481 1689 <button type="button" class={s.removeBtn} onClick={() => removeCondition(i)}> 1482 1690 Remove 1483 1691 </button> ··· 1512 1720 </p> 1513 1721 </div> 1514 1722 {fetches.map((f, i) => ( 1515 - <div key={i} class={s.conditionBlock}> 1516 - <div class={s.fetchRow}> 1517 - <div class={s.fetchName}> 1518 - <input 1519 - class={s.input} 1520 - type="text" 1521 - placeholder="Variable name" 1522 - value={f.name} 1523 - onInput={(e: Event) => 1524 - updateFetch(i, "name", (e.target as HTMLInputElement).value) 1525 - } 1526 - /> 1723 + <div key={i} class={s.fetchCard}> 1724 + {/* Top row: Variable name + Source type + Remove, all aligned on inputs */} 1725 + <div class={s.fieldGroup}> 1726 + <div class={s.fetchTopRow}> 1727 + <div class={s.fieldGroup} style={{ flex: "1 1 160px", minWidth: 0 }}> 1728 + <label class={s.label}>Variable name</label> 1729 + <input 1730 + class={s.input} 1731 + type="text" 1732 + placeholder="e.g. alreadyFollow" 1733 + value={f.name} 1734 + onInput={(e: Event) => 1735 + updateFetch(i, "name", (e.target as HTMLInputElement).value) 1736 + } 1737 + /> 1738 + </div> 1739 + <div class={s.fieldGroup} style={{ flex: "1 1 220px", minWidth: 0 }}> 1740 + <label class={s.label}>Source type</label> 1741 + <select 1742 + class={s.select} 1743 + value={f.kind} 1744 + onChange={(e: Event) => 1745 + updateFetch(i, "kind", (e.target as HTMLSelectElement).value) 1746 + } 1747 + > 1748 + <option value="record">Fetch a single record (by AT URI)</option> 1749 + <option value="search">Search a collection (by field)</option> 1750 + </select> 1751 + </div> 1752 + <div class={s.fieldGroup} style={{ flex: "0 0 auto" }}> 1753 + <span class={`${s.label} ${s.labelSpacer}`} aria-hidden="true"> 1754 + · 1755 + </span> 1756 + <button type="button" class={s.removeBtn} onClick={() => removeFetch(i)}> 1757 + Remove 1758 + </button> 1759 + </div> 1527 1760 </div> 1528 - <div class={s.fetchUri}> 1529 - <input 1530 - class={s.input} 1531 - type="text" 1532 - placeholder="AT URI template, e.g. {{event.commit.record.subject}}" 1533 - value={f.uri} 1534 - onInput={(e: Event) => 1535 - updateFetch(i, "uri", (e.target as HTMLInputElement).value) 1536 - } 1537 - /> 1761 + <span class={s.hint}> 1762 + Used in templates as{" "} 1763 + <code class={s.inlineCode}>{`{{${f.name || "name"}.record.…}}`}</code>. 1764 + </span> 1765 + </div> 1766 + 1767 + {/* Source-specific config */} 1768 + {f.kind === "record" ? ( 1769 + <div class={s.fetchSubsection}> 1770 + <div class={s.fetchSubsectionTitle}>Record to fetch</div> 1771 + <div class={s.fieldGroup}> 1772 + <label class={s.label}>AT URI</label> 1773 + <input 1774 + class={s.input} 1775 + type="text" 1776 + placeholder="at://{{event.commit.record.subject}}" 1777 + value={f.uri} 1778 + onInput={(e: Event) => 1779 + updateFetch(i, "uri", (e.target as HTMLInputElement).value) 1780 + } 1781 + /> 1782 + <span class={s.hint}> 1783 + Supports <code class={s.inlineCode}>{"{{event.*}}"}</code>,{" "} 1784 + <code class={s.inlineCode}>{"{{self}}"}</code>, and{" "} 1785 + <code class={s.inlineCode}>{"{{now}}"}</code>. 1786 + </span> 1787 + </div> 1538 1788 </div> 1539 - <button type="button" class={s.removeBtn} onClick={() => removeFetch(i)}> 1540 - Remove 1789 + ) : ( 1790 + <> 1791 + <div class={s.fetchSubsection}> 1792 + <div class={s.fetchSubsectionTitle}>Where to search</div> 1793 + <div class={s.twoColRow}> 1794 + <div class={s.fieldGroup}> 1795 + <label class={s.label}>Repo</label> 1796 + <input 1797 + class={s.input} 1798 + type="text" 1799 + placeholder="did:plc:... or {{self}}" 1800 + value={f.repo} 1801 + onInput={(e: Event) => 1802 + updateFetch(i, "repo", (e.target as HTMLInputElement).value) 1803 + } 1804 + /> 1805 + <span class={s.hint}> 1806 + DID of the repo. Use <code class={s.inlineCode}>{"{{self}}"}</code>{" "} 1807 + for yours. 1808 + </span> 1809 + </div> 1810 + <div class={s.fieldGroup}> 1811 + <label class={s.label}>Collection</label> 1812 + <input 1813 + class={s.input} 1814 + type="text" 1815 + placeholder="app.bsky.graph.follow" 1816 + value={f.collection} 1817 + onInput={(e: Event) => 1818 + updateFetch(i, "collection", (e.target as HTMLInputElement).value) 1819 + } 1820 + /> 1821 + <span class={s.hint}>NSID of the collection.</span> 1822 + </div> 1823 + </div> 1824 + </div> 1825 + 1826 + <div class={s.fetchSubsection}> 1827 + <div class={s.fetchSubsectionTitle}>Match records where</div> 1828 + <div class={s.fetchMatchRow}> 1829 + <div class={s.fieldGroup} style={{ flex: "1 1 140px" }}> 1830 + <label class={s.label}>Field</label> 1831 + <input 1832 + class={s.input} 1833 + type="text" 1834 + placeholder="subject" 1835 + value={f.whereField} 1836 + onInput={(e: Event) => 1837 + updateFetch(i, "whereField", (e.target as HTMLInputElement).value) 1838 + } 1839 + /> 1840 + </div> 1841 + <div class={s.fieldGroup} style={{ flex: "0 0 auto" }}> 1842 + <span class={`${s.label} ${s.labelSpacer}`} aria-hidden="true"> 1843 + · 1844 + </span> 1845 + <span class={s.fetchMatchOperator}>equals</span> 1846 + </div> 1847 + <div class={s.fieldGroup} style={{ flex: "2 1 200px" }}> 1848 + <label class={s.label}>Value</label> 1849 + <input 1850 + class={s.input} 1851 + type="text" 1852 + placeholder="{{event.commit.record.subject}}" 1853 + value={f.whereValue} 1854 + onInput={(e: Event) => 1855 + updateFetch(i, "whereValue", (e.target as HTMLInputElement).value) 1856 + } 1857 + /> 1858 + </div> 1859 + </div> 1860 + <span class={s.hint}> 1861 + First matching record wins. No match resolves to "not found". 1862 + </span> 1863 + </div> 1864 + </> 1865 + )} 1866 + 1867 + {/* Per-source conditions */} 1868 + <div class={s.fetchSubsection}> 1869 + <div class={s.fetchSubsectionTitle}>Continue only if…</div> 1870 + <span class={s.hint}> 1871 + Runs after this fetch. If any check fails, the automation skips. 1872 + </span> 1873 + {f.conditions.map((cond, ci) => { 1874 + const preset = fetchConditionPreset(cond); 1875 + return ( 1876 + <div key={ci} class={s.fetchConditionCard}> 1877 + <div class={s.fetchConditionPresetRow}> 1878 + <select 1879 + class={s.select} 1880 + value={preset} 1881 + onChange={(e: Event) => 1882 + setFetchConditionPreset( 1883 + i, 1884 + ci, 1885 + (e.target as HTMLSelectElement).value as FetchConditionPreset, 1886 + ) 1887 + } 1888 + style={{ flex: "1 1 auto", minWidth: 0 }} 1889 + > 1890 + <option value="found"> 1891 + {f.kind === "search" 1892 + ? "A matching record is found" 1893 + : "The record is found"} 1894 + </option> 1895 + <option value="not-found"> 1896 + {f.kind === "search" 1897 + ? "No matching record is found" 1898 + : "The record is not found"} 1899 + </option> 1900 + <option value="custom">Custom field check…</option> 1901 + </select> 1902 + <button 1903 + type="button" 1904 + class={s.removeBtn} 1905 + onClick={() => removeFetchCondition(i, ci)} 1906 + > 1907 + Remove 1908 + </button> 1909 + </div> 1910 + {preset === "custom" && ( 1911 + <div class={s.conditionRow}> 1912 + <div class={s.conditionField}> 1913 + <input 1914 + class={s.input} 1915 + type="text" 1916 + placeholder="record.subject" 1917 + value={cond.field} 1918 + onInput={(e: Event) => 1919 + updateFetchCondition( 1920 + i, 1921 + ci, 1922 + "field", 1923 + (e.target as HTMLInputElement).value, 1924 + ) 1925 + } 1926 + /> 1927 + </div> 1928 + <div class={s.conditionOperator}> 1929 + <select 1930 + class={s.select} 1931 + value={cond.operator} 1932 + onChange={(e: Event) => 1933 + updateFetchCondition( 1934 + i, 1935 + ci, 1936 + "operator", 1937 + (e.target as HTMLSelectElement).value, 1938 + ) 1939 + } 1940 + > 1941 + <option value="eq">equals</option> 1942 + <option value="startsWith">starts with</option> 1943 + <option value="endsWith">ends with</option> 1944 + <option value="contains">contains</option> 1945 + <option value="exists">is present</option> 1946 + <option value="not-exists">is missing</option> 1947 + </select> 1948 + </div> 1949 + {!VALUE_LESS_CONDITION_OPS.has(cond.operator) && ( 1950 + <div class={s.conditionValue}> 1951 + <input 1952 + class={s.input} 1953 + type="text" 1954 + placeholder="Value to compare" 1955 + value={cond.value} 1956 + onInput={(e: Event) => 1957 + updateFetchCondition( 1958 + i, 1959 + ci, 1960 + "value", 1961 + (e.target as HTMLInputElement).value, 1962 + ) 1963 + } 1964 + /> 1965 + </div> 1966 + )} 1967 + </div> 1968 + )} 1969 + </div> 1970 + ); 1971 + })} 1972 + <button type="button" class={s.addBtn} onClick={() => addFetchCondition(i)}> 1973 + + Add check 1541 1974 </button> 1542 1975 </div> 1543 - <input 1544 - class={s.input} 1545 - type="text" 1546 - placeholder="Note (optional)" 1547 - value={f.comment} 1548 - onInput={(e: Event) => 1549 - updateFetch(i, "comment", (e.target as HTMLInputElement).value) 1550 - } 1551 - /> 1976 + 1977 + {/* Note */} 1978 + <div class={s.fieldGroup}> 1979 + <label class={s.label}> 1980 + Note <span class={s.hint}>(optional)</span> 1981 + </label> 1982 + <input 1983 + class={s.input} 1984 + type="text" 1985 + placeholder="A reminder for future-you about why this source exists" 1986 + value={f.comment} 1987 + onInput={(e: Event) => 1988 + updateFetch(i, "comment", (e.target as HTMLInputElement).value) 1989 + } 1990 + /> 1991 + </div> 1552 1992 </div> 1553 1993 ))} 1554 1994 <button type="button" class={s.addBtn} onClick={addFetch}>
+46 -66
app/routes/api/automations/[rkey].ts
··· 11 11 type BskyPostAction, 12 12 type PatchRecordAction, 13 13 type BookmarkAction, 14 - type FetchStep, 15 14 } from "@/db/schema.js"; 16 15 import { config } from "@/config.js"; 17 16 import { isValidNsid } from "@/lexicons/resolver.js"; 18 - import { 19 - getRecord, 20 - putRecord, 21 - deleteRecord, 22 - type PdsAction, 23 - type PdsFetchStep, 24 - } from "@/automations/pds.js"; 17 + import { getRecord, putRecord, deleteRecord, type PdsAction } from "@/automations/pds.js"; 25 18 import { verifyCallback } from "@/automations/verify.js"; 26 19 import { assertPublicUrl, UrlGuardError } from "@/url-guard.js"; 27 20 import { 28 21 validateTemplate, 29 22 validateTextTemplate, 30 23 validateBaseRecordUri, 31 - validateFetchStep, 32 24 } from "@/actions/template.js"; 33 25 import { 34 26 type ActionInput, 35 - VALID_OPERATORS, 36 27 VALID_OPERATIONS, 37 28 VALID_BSKY_LABELS, 38 29 BCP47_RE, ··· 40 31 validateBookmarkInput, 41 32 resolveWantedDids, 42 33 } from "@/actions/validation.js"; 34 + import { 35 + normalizeFetches, 36 + normalizeConditions, 37 + type FetchInput, 38 + type ConditionInput, 39 + } from "@/actions/api-normalize.js"; 43 40 import { AUTOMATION_LIMITS } from "@/automations/limits.js"; 44 41 import { notifyAutomationChange } from "@/jetstream/consumer.js"; 45 42 import { scopeCoversActions, computeRequiredScope } from "@/auth/client.js"; 46 43 import { SCOPE_INSUFFICIENT } from "@/auth/scope-errors.js"; 47 44 45 + function toPdsFetch( 46 + f: import("@/db/schema.js").FetchStep, 47 + ): import("@/automations/pds.js").PdsFetchStep { 48 + if (f.kind === "search") { 49 + return { 50 + $type: "run.airglow.automation#fetchSearchStep", 51 + name: f.name, 52 + repo: f.repo, 53 + collection: f.collection, 54 + where: f.where, 55 + ...(f.limit !== undefined ? { limit: f.limit } : {}), 56 + ...(f.conditions ? { conditions: f.conditions } : {}), 57 + ...(f.comment ? { comment: f.comment } : {}), 58 + }; 59 + } 60 + return { 61 + $type: "run.airglow.automation#fetchStep", 62 + name: f.name, 63 + uri: f.uri, 64 + ...(f.conditions ? { conditions: f.conditions } : {}), 65 + ...(f.comment ? { comment: f.comment } : {}), 66 + }; 67 + } 68 + 48 69 function findAutomation(did: string, rkey: string) { 49 70 return db.query.automations.findFirst({ 50 71 where: and(eq(automations.did, did), eq(automations.rkey, rkey)), ··· 117 138 description?: string | null; 118 139 operations?: string[]; 119 140 actions?: ActionInput[]; 120 - fetches?: Array<{ name: string; uri: string; comment?: string }>; 121 - conditions?: Array<{ field: string; operator?: string; value: string; comment?: string }>; 141 + fetches?: FetchInput[]; 142 + conditions?: ConditionInput[]; 122 143 wantedDids?: string[]; 123 144 active?: boolean; 124 145 dryRun?: boolean; ··· 157 178 } 158 179 } 159 180 160 - const conditions = body.conditions 161 - ? body.conditions 162 - .filter((cond) => cond.field && cond.value && cond.field !== "event.commit.operation") 163 - .map((cond) => ({ 164 - field: cond.field, 165 - operator: cond.operator ?? "eq", 166 - value: cond.value, 167 - ...(cond.comment ? { comment: cond.comment } : {}), 168 - })) 169 - : auto.conditions; 170 - if (conditions.length > AUTOMATION_LIMITS.conditions) { 171 - return c.json({ error: `Maximum ${AUTOMATION_LIMITS.conditions} conditions allowed` }, 400); 172 - } 173 - for (const cond of conditions) { 174 - if (!VALID_OPERATORS.has(cond.operator)) { 175 - return c.json({ error: `Invalid condition operator: ${cond.operator}` }, 400); 176 - } 177 - } 178 - 179 181 const wd = resolveWantedDids( 180 182 body.wantedDids, 181 183 auto.lexicon, ··· 190 192 191 193 // Resolve fetch steps — full replacement when provided 192 194 let localFetches = auto.fetches; 193 - let pdsFetches: PdsFetchStep[] | null = null; 195 + let pdsFetches: import("@/automations/pds.js").PdsFetchStep[] | null = null; 194 196 195 197 if (body.fetches) { 196 - if (body.fetches.length > AUTOMATION_LIMITS.fetches) { 197 - return c.json({ error: `Maximum ${AUTOMATION_LIMITS.fetches} fetch steps allowed` }, 400); 198 - } 199 - const newLocalFetches: FetchStep[] = []; 200 - const newPdsFetches: PdsFetchStep[] = []; 201 - const seenNames = new Set<string>(); 202 - for (const f of body.fetches) { 203 - const stepValidation = validateFetchStep(f.name, f.uri, seenNames); 204 - if (!stepValidation.valid) { 205 - return c.json({ error: stepValidation.error }, 400); 206 - } 207 - seenNames.add(f.name); 208 - newLocalFetches.push({ 209 - name: f.name, 210 - uri: f.uri, 211 - ...(f.comment ? { comment: f.comment } : {}), 212 - }); 213 - newPdsFetches.push({ 214 - $type: "run.airglow.automation#fetchStep", 215 - name: f.name, 216 - uri: f.uri, 217 - ...(f.comment ? { comment: f.comment } : {}), 218 - }); 219 - } 220 - localFetches = newLocalFetches; 221 - pdsFetches = newPdsFetches; 198 + const fetchResult = normalizeFetches(body.fetches); 199 + if (!fetchResult.ok) return c.json({ error: fetchResult.error }, 400); 200 + localFetches = fetchResult.local; 201 + pdsFetches = fetchResult.pds; 222 202 } 223 203 const fetchNames = localFetches.map((f) => f.name); 204 + 205 + let conditions = auto.conditions; 206 + if (body.conditions) { 207 + const result = normalizeConditions(body.conditions); 208 + if (!result.ok) return c.json({ error: result.error }, 400); 209 + conditions = result.value; 210 + } 224 211 225 212 // Resolve actions — full replacement when provided 226 213 let localActions = auto.actions; ··· 524 511 lexicon: auto.lexicon, 525 512 operations, 526 513 actions: pdsActions, 527 - fetches: 528 - pdsFetches ?? 529 - localFetches.map((f) => ({ 530 - $type: "run.airglow.automation#fetchStep" as const, 531 - name: f.name, 532 - uri: f.uri, 533 - ...(f.comment ? { comment: f.comment } : {}), 534 - })), 514 + fetches: pdsFetches ?? localFetches.map(toPdsFetch), 535 515 conditions, 536 516 ...(wantedDids.length > 0 ? { wantedDids } : {}), 537 517 active,
+60
app/routes/api/automations/index.test.ts
··· 406 406 ); 407 407 expect(res.status).toBe(400); 408 408 }); 409 + 410 + it("accepts a search fetch with a per-fetch 'found not-exists' condition", async () => { 411 + const res = await app.request( 412 + jsonReq("/api/automations", { 413 + name: "Mirror follows", 414 + lexicon: "app.bsky.graph.follow", 415 + operations: ["create"], 416 + wantedDids: ["{{self}}"], 417 + fetches: [ 418 + { 419 + kind: "search", 420 + name: "existingMirror", 421 + repo: "{{self}}", 422 + collection: "id.sifa.graph.follow", 423 + where: [{ field: "subject", operator: "eq", value: "{{event.commit.record.subject}}" }], 424 + conditions: [{ field: "found", operator: "not-exists" }], 425 + }, 426 + ], 427 + actions: [ 428 + { 429 + type: "record", 430 + targetCollection: "id.sifa.graph.follow", 431 + recordTemplate: '{"subject":"{{event.commit.record.subject}}","createdAt":"{{now}}"}', 432 + }, 433 + ], 434 + }), 435 + ); 436 + expect(res.status).toBe(201); 437 + 438 + const stored = await db.query.automations.findFirst(); 439 + expect(stored).toBeDefined(); 440 + expect(stored!.fetches[0]).toMatchObject({ 441 + kind: "search", 442 + name: "existingMirror", 443 + conditions: [{ field: "found", operator: "not-exists", value: "" }], 444 + }); 445 + }); 446 + 447 + it("returns 400 when a fetch condition references an event path", async () => { 448 + const res = await app.request( 449 + jsonReq("/api/automations", { 450 + name: "Invalid", 451 + lexicon: "app.bsky.graph.follow", 452 + operations: ["create"], 453 + wantedDids: ["{{self}}"], 454 + fetches: [ 455 + { 456 + kind: "search", 457 + name: "existingMirror", 458 + repo: "{{self}}", 459 + collection: "id.sifa.graph.follow", 460 + where: [{ field: "subject", operator: "eq", value: "{{event.commit.record.subject}}" }], 461 + conditions: [{ field: "event.did", operator: "eq", value: "x" }], 462 + }, 463 + ], 464 + actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 465 + }), 466 + ); 467 + expect(res.status).toBe(400); 468 + }); 409 469 }); 410 470 411 471 describe("GET /api/automations", () => {
+17 -54
app/routes/api/automations/index.ts
··· 10 10 type BskyPostAction, 11 11 type PatchRecordAction, 12 12 type BookmarkAction, 13 - type FetchStep, 14 13 } from "@/db/schema.js"; 15 14 import { config } from "@/config.js"; 16 15 import { isValidNsid, isNsidAllowed } from "@/lexicons/resolver.js"; 17 16 import { verifyCallback } from "@/automations/verify.js"; 18 17 import { assertPublicUrl, UrlGuardError } from "@/url-guard.js"; 19 - import { 20 - createRecord, 21 - deleteRecord, 22 - type PdsAction, 23 - type PdsFetchStep, 24 - } from "@/automations/pds.js"; 18 + import { createRecord, deleteRecord, type PdsAction } from "@/automations/pds.js"; 25 19 import { 26 20 validateTemplate, 27 21 validateTextTemplate, 28 22 validateBaseRecordUri, 29 - validateFetchStep, 30 23 } from "@/actions/template.js"; 31 24 import { 32 25 type ActionInput, 33 - VALID_OPERATORS, 34 26 VALID_OPERATIONS, 35 27 VALID_BSKY_LABELS, 36 28 BCP47_RE, ··· 38 30 validateBookmarkInput, 39 31 resolveWantedDids, 40 32 } from "@/actions/validation.js"; 33 + import { 34 + normalizeFetches, 35 + normalizeConditions, 36 + type FetchInput, 37 + type ConditionInput, 38 + } from "@/actions/api-normalize.js"; 41 39 import { AUTOMATION_LIMITS } from "@/automations/limits.js"; 42 40 import { computeRequiredScope, scopeCoversActions } from "@/auth/client.js"; 43 41 import { SCOPE_INSUFFICIENT } from "@/auth/scope-errors.js"; ··· 84 82 lexicon: string; 85 83 operations: string[]; 86 84 actions: ActionInput[]; 87 - fetches?: Array<{ name: string; uri: string; comment?: string }>; 88 - conditions?: Array<{ field: string; operator?: string; value: string; comment?: string }>; 85 + fetches?: FetchInput[]; 86 + conditions?: ConditionInput[]; 89 87 wantedDids?: string[]; 90 88 active?: boolean; 91 89 dryRun?: boolean; ··· 138 136 return c.json({ error: `Maximum ${AUTOMATION_LIMITS.actions} actions allowed` }, 400); 139 137 } 140 138 141 - // Normalize and validate conditions 142 - // {{self}} is kept as-is — resolved at match time to the automation owner's DID 143 - const conditions = (body.conditions ?? []) 144 - .filter((cond) => cond.field && cond.value && cond.field !== "event.commit.operation") 145 - .map((cond) => ({ 146 - field: cond.field, 147 - operator: cond.operator ?? "eq", 148 - value: cond.value, 149 - ...(cond.comment ? { comment: cond.comment } : {}), 150 - })); 151 - if (conditions.length > AUTOMATION_LIMITS.conditions) { 152 - return c.json({ error: `Maximum ${AUTOMATION_LIMITS.conditions} conditions allowed` }, 400); 153 - } 154 - for (const cond of conditions) { 155 - if (!VALID_OPERATORS.has(cond.operator)) { 156 - return c.json({ error: `Invalid condition operator: ${cond.operator}` }, 400); 157 - } 158 - } 159 - 160 139 const wd = resolveWantedDids(body.wantedDids, body.lexicon, config.nsidRequireDids); 161 140 if (!wd.valid) return c.json({ error: wd.error }, 400); 162 141 const wantedDids = wd.value; 163 142 164 - // Validate and normalize fetch steps 165 - const localFetches: FetchStep[] = []; 166 - const pdsFetches: PdsFetchStep[] = []; 143 + const fetchResult = normalizeFetches(body.fetches); 144 + if (!fetchResult.ok) return c.json({ error: fetchResult.error }, 400); 145 + const localFetches = fetchResult.local; 146 + const pdsFetches = fetchResult.pds; 147 + const fetchNames = fetchResult.names; 167 148 168 - if (body.fetches && body.fetches.length > 0) { 169 - if (body.fetches.length > AUTOMATION_LIMITS.fetches) { 170 - return c.json({ error: `Maximum ${AUTOMATION_LIMITS.fetches} fetch steps allowed` }, 400); 171 - } 172 - const seenNames = new Set<string>(); 173 - for (const f of body.fetches) { 174 - const stepValidation = validateFetchStep(f.name, f.uri, seenNames); 175 - if (!stepValidation.valid) { 176 - return c.json({ error: stepValidation.error }, 400); 177 - } 178 - seenNames.add(f.name); 179 - localFetches.push({ name: f.name, uri: f.uri, ...(f.comment ? { comment: f.comment } : {}) }); 180 - pdsFetches.push({ 181 - $type: "run.airglow.automation#fetchStep", 182 - name: f.name, 183 - uri: f.uri, 184 - ...(f.comment ? { comment: f.comment } : {}), 185 - }); 186 - } 187 - } 188 - const fetchNames = localFetches.map((f) => f.name); 149 + const conditionsResult = normalizeConditions(body.conditions); 150 + if (!conditionsResult.ok) return c.json({ error: conditionsResult.error }, 400); 151 + const conditions = conditionsResult.value; 189 152 190 153 // Validate each action and build local + PDS action arrays 191 154 const localActions: Action[] = [];
+9 -15
app/routes/dashboard/automations/[rkey].tsx
··· 15 15 import { Alert } from "../../../components/Alert/index.js"; 16 16 import { DescriptionList } from "../../../components/DescriptionList/index.js"; 17 17 import { CodeBlock, InlineCode } from "../../../components/CodeBlock/index.js"; 18 + import { FetchCard } from "../../../components/FetchCard/index.js"; 18 19 import { NsidCode } from "../../../components/NsidCode/index.js"; 19 20 import { Stack } from "../../../components/Layout/Stack/index.js"; 20 21 import ThemeToggle from "../../../islands/ThemeToggle.js"; ··· 177 178 )} 178 179 179 180 {auto.fetches.length > 0 && ( 180 - <Card variant="flat"> 181 - <Stack gap={3}> 182 - <h3 class={inlineCluster}> 183 - <Database size={18} /> Data Sources 184 - </h3> 185 - <ul class={plainList}> 186 - {auto.fetches.map((f, i) => ( 187 - <li key={i}> 188 - <InlineCode>{f.name}</InlineCode> &larr; <InlineCode>{f.uri}</InlineCode> 189 - {f.comment && <span class={textMuted}> - {f.comment}</span>} 190 - </li> 191 - ))} 192 - </ul> 193 - </Stack> 194 - </Card> 181 + <Stack gap={3}> 182 + <h3 class={inlineCluster}> 183 + <Database size={18} /> Data Sources ({auto.fetches.length}) 184 + </h3> 185 + {auto.fetches.map((f, i) => ( 186 + <FetchCard key={i} fetch={f} /> 187 + ))} 188 + </Stack> 195 189 )} 196 190 197 191 <Stack gap={3}>
+9 -15
app/routes/u/[handle]/[rkey].tsx
··· 16 16 import { Button } from "../../../components/Button/index.js"; 17 17 import { DescriptionList } from "../../../components/DescriptionList/index.js"; 18 18 import { CodeBlock, InlineCode } from "../../../components/CodeBlock/index.js"; 19 + import { FetchCard } from "../../../components/FetchCard/index.js"; 19 20 import { NsidCode } from "../../../components/NsidCode/index.js"; 20 21 import { Stack } from "../../../components/Layout/Stack/index.js"; 21 22 import ThemeToggle from "../../../islands/ThemeToggle.js"; ··· 174 175 )} 175 176 176 177 {auto.fetches.length > 0 && ( 177 - <Card variant="flat"> 178 - <Stack gap={3}> 179 - <h3 class={inlineCluster}> 180 - <Database size={18} /> Data Sources 181 - </h3> 182 - <ul class={plainList}> 183 - {auto.fetches.map((f, i) => ( 184 - <li key={i}> 185 - <InlineCode>{f.name}</InlineCode> &larr; <InlineCode>{f.uri}</InlineCode> 186 - {f.comment && <span class={textMuted}> - {f.comment}</span>} 187 - </li> 188 - ))} 189 - </ul> 190 - </Stack> 191 - </Card> 178 + <Stack gap={3}> 179 + <h3 class={inlineCluster}> 180 + <Database size={18} /> Data Sources ({auto.fetches.length}) 181 + </h3> 182 + {auto.fetches.map((f, i) => ( 183 + <FetchCard key={i} fetch={f} /> 184 + ))} 185 + </Stack> 192 186 )} 193 187 194 188 <Stack gap={3}>
+94 -8
lexicons/run/airglow/automation.json
··· 74 74 "description": "Records to fetch from PDS before executing actions. Fetched data is available as named variables in action templates.", 75 75 "maxLength": 5, 76 76 "items": { 77 - "type": "ref", 78 - "ref": "#fetchStep" 77 + "type": "union", 78 + "refs": ["#fetchStep", "#fetchSearchStep"] 79 79 } 80 80 }, 81 81 "active": { ··· 150 150 "description": "AT URI template, e.g. '{{event.commit.record.subject}}'.", 151 151 "maxLength": 2048 152 152 }, 153 + "conditions": { 154 + "type": "array", 155 + "description": "Conditions evaluated against this fetch's result after it resolves. All must pass or the automation is skipped. Field paths are resolved relative to the fetch's entry (e.g. 'found', 'record.subject').", 156 + "maxLength": 10, 157 + "items": { 158 + "type": "ref", 159 + "ref": "#condition" 160 + } 161 + }, 153 162 "comment": { 154 163 "type": "string", 155 164 "description": "Optional user note about this fetch step.", ··· 157 166 } 158 167 } 159 168 }, 169 + "fetchSearchStep": { 170 + "type": "object", 171 + "description": "Search for a record on a PDS by matching fields. Useful for checking if a record already exists (e.g. duplicate-prevention before creating).", 172 + "required": ["name", "repo", "collection", "where"], 173 + "properties": { 174 + "name": { 175 + "type": "string", 176 + "description": "Variable name for use in templates, e.g. 'existingMirror'. Check `.found` or use the `exists`/`not-exists` condition operator to test whether the search matched.", 177 + "maxLength": 64 178 + }, 179 + "repo": { 180 + "type": "string", 181 + "description": "DID template for the repo to search. Supports {{self}}, {{event.*}}, and upstream fetch references.", 182 + "maxLength": 256 183 + }, 184 + "collection": { 185 + "type": "string", 186 + "description": "NSID of the collection to search.", 187 + "maxLength": 256 188 + }, 189 + "where": { 190 + "type": "array", 191 + "description": "Field filters applied to each record. This version accepts a single clause; multi-clause support is planned.", 192 + "minLength": 1, 193 + "maxLength": 1, 194 + "items": { 195 + "type": "ref", 196 + "ref": "#fetchSearchWhere" 197 + } 198 + }, 199 + "limit": { 200 + "type": "integer", 201 + "description": "Maximum number of matches to return. This version accepts 1 only; multi-match support is planned.", 202 + "minimum": 1, 203 + "maximum": 1, 204 + "default": 1 205 + }, 206 + "conditions": { 207 + "type": "array", 208 + "description": "Conditions evaluated against this fetch's result after it resolves. All must pass or the automation is skipped. Field paths are resolved relative to the fetch's entry (e.g. 'found', 'record.subject').", 209 + "maxLength": 10, 210 + "items": { 211 + "type": "ref", 212 + "ref": "#condition" 213 + } 214 + }, 215 + "comment": { 216 + "type": "string", 217 + "description": "Optional user note about this fetch step.", 218 + "maxLength": 512 219 + } 220 + } 221 + }, 222 + "fetchSearchWhere": { 223 + "type": "object", 224 + "description": "A single field filter for a search fetch step.", 225 + "required": ["field", "value"], 226 + "properties": { 227 + "field": { 228 + "type": "string", 229 + "description": "Dot-path into each record, e.g. 'subject'.", 230 + "maxLength": 256 231 + }, 232 + "operator": { 233 + "type": "string", 234 + "description": "Comparison operator. Only 'eq' is currently supported.", 235 + "knownValues": ["eq"], 236 + "default": "eq", 237 + "maxLength": 32 238 + }, 239 + "value": { 240 + "type": "string", 241 + "description": "Value template to compare against. Supports {{self}}, {{event.*}}, and upstream fetch references.", 242 + "maxLength": 1024 243 + } 244 + } 245 + }, 160 246 "bskyPostAction": { 161 247 "type": "object", 162 248 "description": "Create a Bluesky post when a matching event occurs.", ··· 257 343 }, 258 344 "condition": { 259 345 "type": "object", 260 - "description": "A single filter condition on an event field.", 261 - "required": ["field", "value"], 346 + "description": "A single filter condition. At the automation's top level, conditions run against the incoming event before any fetches. Attached to a fetch step, conditions run after that fetch resolves and are evaluated against the fetch's own result entry.", 347 + "required": ["field"], 262 348 "properties": { 263 349 "field": { 264 350 "type": "string", 265 - "description": "Dot-path to the event field, e.g. 'repo' or 'record.subject'.", 351 + "description": "Dot-path to the field being checked. For top-level conditions: 'event.did' or a record field like 'subject'. For fetch-attached conditions: a path into the fetch's result, e.g. 'found', 'uri', or 'record.subject'.", 266 352 "maxLength": 256 267 353 }, 268 354 "operator": { 269 355 "type": "string", 270 - "description": "Comparison operator.", 271 - "knownValues": ["eq", "startsWith", "endsWith", "contains"], 356 + "description": "Comparison operator. 'exists' and 'not-exists' ignore `value`. When used on the bare 'found' field of a fetch-attached condition, they test the fetch's found flag directly.", 357 + "knownValues": ["eq", "startsWith", "endsWith", "contains", "exists", "not-exists"], 272 358 "default": "eq", 273 359 "maxLength": 32 274 360 }, 275 361 "value": { 276 362 "type": "string", 277 - "description": "Value to compare against.", 363 + "description": "Value to compare against. Ignored by the 'exists' and 'not-exists' operators.", 278 364 "maxLength": 1024 279 365 }, 280 366 "comment": {
+169
lib/actions/api-normalize.test.ts
··· 1 + import { describe, it, expect } from "vitest"; 2 + import { normalizeFetches, normalizeConditions, type FetchInput } from "./api-normalize.js"; 3 + 4 + describe("normalizeFetches", () => { 5 + it("returns empty lists when input is undefined", () => { 6 + const res = normalizeFetches(undefined); 7 + expect(res).toEqual({ ok: true, local: [], pds: [], names: [] }); 8 + }); 9 + 10 + it("returns empty lists when input is an empty array", () => { 11 + const res = normalizeFetches([]); 12 + expect(res).toEqual({ ok: true, local: [], pds: [], names: [] }); 13 + }); 14 + 15 + it("normalizes a record fetch (legacy shape without kind)", () => { 16 + const res = normalizeFetches([ 17 + { name: "parent", uri: "at://{{event.did}}/c/r" }, 18 + ] as FetchInput[]); 19 + expect(res.ok).toBe(true); 20 + if (res.ok) { 21 + expect(res.local[0]).toMatchObject({ kind: "record", name: "parent" }); 22 + expect(res.pds[0]).toMatchObject({ 23 + $type: "run.airglow.automation#fetchStep", 24 + name: "parent", 25 + }); 26 + expect(res.names).toEqual(["parent"]); 27 + } 28 + }); 29 + 30 + it("normalizes a search fetch with conditions end-to-end", () => { 31 + const res = normalizeFetches([ 32 + { 33 + kind: "search", 34 + name: "existingMirror", 35 + repo: "{{self}}", 36 + collection: "id.sifa.graph.follow", 37 + where: [{ field: "subject", value: "{{event.commit.record.subject}}" }], 38 + conditions: [{ field: "found", operator: "not-exists" }], 39 + }, 40 + ]); 41 + expect(res.ok).toBe(true); 42 + if (res.ok) { 43 + expect(res.local[0]).toMatchObject({ 44 + kind: "search", 45 + name: "existingMirror", 46 + conditions: [{ field: "found", operator: "not-exists", value: "" }], 47 + }); 48 + expect(res.pds[0]).toMatchObject({ 49 + $type: "run.airglow.automation#fetchSearchStep", 50 + conditions: [{ field: "found", operator: "not-exists", value: "" }], 51 + }); 52 + } 53 + }); 54 + 55 + it("passes record-fetch conditions through to both local and PDS shapes", () => { 56 + const res = normalizeFetches([ 57 + { 58 + kind: "record", 59 + name: "parent", 60 + uri: "at://{{event.did}}/c/r", 61 + conditions: [{ field: "record.status", operator: "eq", value: "published" }], 62 + }, 63 + ]); 64 + expect(res.ok).toBe(true); 65 + if (res.ok) { 66 + expect(res.local[0]).toMatchObject({ 67 + conditions: [{ field: "record.status", operator: "eq", value: "published" }], 68 + }); 69 + expect(res.pds[0]).toMatchObject({ 70 + conditions: [{ field: "record.status", operator: "eq", value: "published" }], 71 + }); 72 + } 73 + }); 74 + 75 + it("rejects when any fetch has invalid conditions", () => { 76 + const res = normalizeFetches([ 77 + { 78 + kind: "record", 79 + name: "parent", 80 + uri: "at://{{event.did}}/c/r", 81 + conditions: [{ field: "event.did", operator: "eq", value: "x" }], 82 + }, 83 + ]); 84 + expect(res.ok).toBe(false); 85 + }); 86 + 87 + it("enforces the global fetches cap", () => { 88 + const many: FetchInput[] = Array.from({ length: 6 }, (_, i) => ({ 89 + name: `f${i}`, 90 + uri: `at://did:plc:x/c/r${i}`, 91 + })); 92 + const res = normalizeFetches(many); 93 + expect(res.ok).toBe(false); 94 + }); 95 + 96 + it("enforces unique fetch names across record and search entries", () => { 97 + const res = normalizeFetches([ 98 + { name: "x", uri: "at://did:plc:a/c/r" }, 99 + { 100 + kind: "search", 101 + name: "x", 102 + repo: "{{self}}", 103 + collection: "app.bsky.graph.follow", 104 + where: [{ field: "subject", value: "did:plc:z" }], 105 + }, 106 + ]); 107 + expect(res.ok).toBe(false); 108 + }); 109 + }); 110 + 111 + describe("normalizeConditions", () => { 112 + it("returns empty array when input is undefined", () => { 113 + expect(normalizeConditions(undefined)).toEqual({ ok: true, value: [] }); 114 + }); 115 + 116 + it("drops entries with missing field", () => { 117 + const res = normalizeConditions([{ field: "", operator: "eq", value: "x" }]); 118 + expect(res).toEqual({ ok: true, value: [] }); 119 + }); 120 + 121 + it("drops the legacy event.commit.operation alias (handled at consumer layer)", () => { 122 + const res = normalizeConditions([ 123 + { field: "event.commit.operation", operator: "eq", value: "create" }, 124 + ]); 125 + expect(res).toEqual({ ok: true, value: [] }); 126 + }); 127 + 128 + it("drops entries that have no value for comparison operators", () => { 129 + const res = normalizeConditions([{ field: "event.did", operator: "eq" }]); 130 + expect(res).toEqual({ ok: true, value: [] }); 131 + }); 132 + 133 + it("accepts exists/not-exists without a value", () => { 134 + const res = normalizeConditions([{ field: "subject", operator: "exists" }]); 135 + expect(res.ok).toBe(true); 136 + if (res.ok) expect(res.value).toEqual([{ field: "subject", operator: "exists", value: "" }]); 137 + }); 138 + 139 + it("rejects unknown operators", () => { 140 + const res = normalizeConditions([{ field: "event.did", operator: "matches", value: "x" }]); 141 + expect(res.ok).toBe(false); 142 + }); 143 + 144 + it("enforces the global conditions cap", () => { 145 + const many = Array.from({ length: 21 }, () => ({ 146 + field: "event.did", 147 + operator: "eq", 148 + value: "did:plc:x", 149 + })); 150 + const res = normalizeConditions(many); 151 + expect(res.ok).toBe(false); 152 + }); 153 + 154 + it("silently drops the legacy `scope` field from input (post-refactor compatibility)", () => { 155 + const res = normalizeConditions([ 156 + { 157 + field: "event.did", 158 + operator: "eq", 159 + value: "did:plc:x", 160 + // @ts-expect-error — legacy field no longer in ConditionInput 161 + scope: "data", 162 + }, 163 + ]); 164 + expect(res.ok).toBe(true); 165 + if (res.ok) { 166 + expect(res.value[0]).not.toHaveProperty("scope"); 167 + } 168 + }); 169 + });
+126
lib/actions/api-normalize.ts
··· 1 + import type { FetchStep, Condition } from "../db/schema.js"; 2 + import type { PdsFetchStep } from "../automations/pds.js"; 3 + import { 4 + validateFetchSearchStep, 5 + validateFetchConditionInputs, 6 + VALID_OPERATORS, 7 + VALUE_LESS_OPERATORS, 8 + type FetchSearchInput, 9 + type FetchConditionInput, 10 + } from "./validation.js"; 11 + import { validateFetchStep } from "./template.js"; 12 + import { AUTOMATION_LIMITS } from "../automations/limits.js"; 13 + 14 + export type FetchInput = 15 + | { 16 + kind?: "record"; 17 + name: string; 18 + uri: string; 19 + conditions?: FetchConditionInput[]; 20 + comment?: string; 21 + } 22 + | ({ kind: "search" } & FetchSearchInput); 23 + 24 + export type ConditionInput = { 25 + field: string; 26 + operator?: string; 27 + value?: string; 28 + comment?: string; 29 + }; 30 + 31 + type OkFetches = { ok: true; local: FetchStep[]; pds: PdsFetchStep[]; names: string[] }; 32 + type Err = { ok: false; error: string }; 33 + 34 + /** Validate + normalize fetch inputs into local (DB) + PDS shapes. */ 35 + export function normalizeFetches(input: FetchInput[] | undefined): OkFetches | Err { 36 + const local: FetchStep[] = []; 37 + const pds: PdsFetchStep[] = []; 38 + const names: string[] = []; 39 + 40 + if (!input || input.length === 0) return { ok: true, local, pds, names }; 41 + if (input.length > AUTOMATION_LIMITS.fetches) { 42 + return { ok: false, error: `Maximum ${AUTOMATION_LIMITS.fetches} fetch steps allowed` }; 43 + } 44 + 45 + const seenNames = new Set<string>(); 46 + 47 + for (const step of input) { 48 + if (step.kind === "search") { 49 + const upstream = new Set(names); 50 + const result = validateFetchSearchStep(step, seenNames, upstream); 51 + if (!result.valid) return { ok: false, error: result.error }; 52 + seenNames.add(step.name); 53 + names.push(step.name); 54 + local.push(result.value); 55 + pds.push({ 56 + $type: "run.airglow.automation#fetchSearchStep", 57 + name: result.value.name, 58 + repo: result.value.repo, 59 + collection: result.value.collection, 60 + where: result.value.where, 61 + ...(result.value.limit !== undefined ? { limit: result.value.limit } : {}), 62 + ...(result.value.conditions ? { conditions: result.value.conditions } : {}), 63 + ...(result.value.comment ? { comment: result.value.comment } : {}), 64 + }); 65 + } else { 66 + const check = validateFetchStep(step.name, step.uri, seenNames); 67 + if (!check.valid) return { ok: false, error: check.error }; 68 + const condResult = validateFetchConditionInputs(step.conditions, step.name); 69 + if (!condResult.valid) return { ok: false, error: condResult.error }; 70 + seenNames.add(step.name); 71 + names.push(step.name); 72 + const entry: FetchStep = { 73 + kind: "record", 74 + name: step.name, 75 + uri: step.uri, 76 + ...(condResult.value.length > 0 ? { conditions: condResult.value } : {}), 77 + ...(step.comment ? { comment: step.comment } : {}), 78 + }; 79 + local.push(entry); 80 + pds.push({ 81 + $type: "run.airglow.automation#fetchStep", 82 + name: step.name, 83 + uri: step.uri, 84 + ...(condResult.value.length > 0 ? { conditions: condResult.value } : {}), 85 + ...(step.comment ? { comment: step.comment } : {}), 86 + }); 87 + } 88 + } 89 + 90 + return { ok: true, local, pds, names }; 91 + } 92 + 93 + /** Validate + normalize top-level conditions. Empty/invalid entries are dropped. 94 + * These always run on the trigger event before any fetches. */ 95 + export function normalizeConditions( 96 + input: ConditionInput[] | undefined, 97 + ): { ok: true; value: Condition[] } | Err { 98 + const conditions: Condition[] = []; 99 + if (!input) return { ok: true, value: conditions }; 100 + 101 + for (const cond of input) { 102 + if (!cond.field) continue; 103 + if (cond.field === "event.commit.operation") continue; 104 + 105 + const operator = cond.operator ?? "eq"; 106 + const valueLess = VALUE_LESS_OPERATORS.has(operator); 107 + if (!valueLess && !cond.value) continue; 108 + 109 + if (!VALID_OPERATORS.has(operator)) { 110 + return { ok: false, error: `Invalid condition operator: ${operator}` }; 111 + } 112 + 113 + conditions.push({ 114 + field: cond.field, 115 + operator, 116 + value: valueLess ? "" : cond.value!, 117 + ...(cond.comment ? { comment: cond.comment } : {}), 118 + }); 119 + } 120 + 121 + if (conditions.length > AUTOMATION_LIMITS.conditions) { 122 + return { ok: false, error: `Maximum ${AUTOMATION_LIMITS.conditions} conditions allowed` }; 123 + } 124 + 125 + return { ok: true, value: conditions }; 126 + }
+1 -1
lib/actions/executor.test.ts
··· 130 130 }); 131 131 const match = makeMatch({ automation: { actions: [action] } }); 132 132 const fetchContext = { 133 - profile: { uri: "at://x/col/rk", cid: "c", record: { displayName: "Alice" } }, 133 + profile: { found: true, uri: "at://x/col/rk", cid: "c", record: { displayName: "Alice" } }, 134 134 }; 135 135 136 136 await executeAction(match, 0, fetchContext);
+119
lib/actions/fetcher.test.ts
··· 19 19 20 20 it("resolves a single fetch step", async () => { 21 21 mockFetchRecord.mockResolvedValueOnce({ 22 + found: true, 22 23 uri: "at://did:plc:eventuser/app.bsky.actor.profile/self", 23 24 cid: "bafycid", 24 25 did: "did:plc:eventuser", ··· 34 35 ); 35 36 36 37 expect(result.context.profile).toEqual({ 38 + found: true, 37 39 uri: "at://did:plc:eventuser/app.bsky.actor.profile/self", 38 40 cid: "bafycid", 39 41 did: "did:plc:eventuser", ··· 50 52 it("resolves multiple fetch steps in parallel", async () => { 51 53 mockFetchRecord 52 54 .mockResolvedValueOnce({ 55 + found: true, 53 56 uri: "at://did1/col/rk1", 54 57 cid: "c1", 55 58 did: "did1", ··· 58 61 record: { a: 1 }, 59 62 }) 60 63 .mockResolvedValueOnce({ 64 + found: true, 61 65 uri: "at://did2/col/rk2", 62 66 cid: "c2", 63 67 did: "did2", ··· 83 87 84 88 it("resolves {{self}} in URI template", async () => { 85 89 mockFetchRecord.mockResolvedValueOnce({ 90 + found: true, 86 91 uri: "at://did:plc:owner/col/rk", 87 92 cid: "c", 88 93 did: "did:plc:owner", ··· 128 133 129 134 it("catches fetchRecord errors per step without blocking others", async () => { 130 135 mockFetchRecord.mockRejectedValueOnce(new Error("PDS unreachable")).mockResolvedValueOnce({ 136 + found: true, 131 137 uri: "at://ok/col/rk", 132 138 cid: "c", 133 139 did: "ok", ··· 150 156 expect(result.errors).toHaveLength(1); 151 157 expect(result.errors[0]!.name).toBe("failing"); 152 158 expect(result.errors[0]!.error).toContain("PDS unreachable"); 159 + expect(result.skip).toBe(false); 160 + }); 161 + 162 + describe("per-fetch conditions", () => { 163 + it("passes skip=false when all fetch conditions match", async () => { 164 + mockFetchRecord.mockResolvedValueOnce({ 165 + found: true, 166 + uri: "at://ok/col/rk", 167 + cid: "c", 168 + did: "ok", 169 + collection: "col", 170 + rkey: "rk", 171 + record: { subject: "did:plc:target" }, 172 + }); 173 + 174 + const result = await resolveFetches( 175 + [ 176 + { 177 + name: "profile", 178 + uri: "at://ok/col/rk", 179 + conditions: [{ field: "found", operator: "exists", value: "" }], 180 + }, 181 + ], 182 + event, 183 + ownerDid, 184 + ); 185 + 186 + expect(result.skip).toBe(false); 187 + expect(result.context.profile).toBeDefined(); 188 + }); 189 + 190 + it("returns skip=true when a record fetch's condition fails (duplicate-prevention pattern)", async () => { 191 + mockFetchRecord.mockResolvedValueOnce({ 192 + found: true, 193 + uri: "at://ok/col/rk", 194 + cid: "c", 195 + did: "ok", 196 + collection: "col", 197 + rkey: "rk", 198 + record: {}, 199 + }); 200 + 201 + const result = await resolveFetches( 202 + [ 203 + { 204 + name: "existingMirror", 205 + uri: "at://ok/col/rk", 206 + // "only continue if this fetch did NOT find a record" — but the fetch found one 207 + conditions: [{ field: "found", operator: "not-exists", value: "" }], 208 + }, 209 + ], 210 + event, 211 + ownerDid, 212 + ); 213 + 214 + expect(result.skip).toBe(true); 215 + }); 216 + 217 + it("skip short-circuits before running search fetches", async () => { 218 + mockFetchRecord.mockResolvedValueOnce({ 219 + found: true, 220 + uri: "at://ok/col/rk", 221 + cid: "c", 222 + did: "ok", 223 + collection: "col", 224 + rkey: "rk", 225 + record: {}, 226 + }); 227 + 228 + const result = await resolveFetches( 229 + [ 230 + { 231 + name: "profile", 232 + uri: "at://ok/col/rk", 233 + conditions: [{ field: "found", operator: "not-exists", value: "" }], 234 + }, 235 + // This search must NOT run because the record step gated to skip. 236 + { 237 + kind: "search", 238 + name: "search1", 239 + repo: "{{self}}", 240 + collection: "app.bsky.graph.follow", 241 + where: [{ field: "subject", operator: "eq", value: "x" }], 242 + }, 243 + ], 244 + event, 245 + ownerDid, 246 + ); 247 + 248 + expect(result.skip).toBe(true); 249 + expect(result.context.search1).toBeUndefined(); 250 + }); 251 + 252 + it("does not evaluate conditions on a record step that errored", async () => { 253 + mockFetchRecord.mockRejectedValueOnce(new Error("PDS unreachable")); 254 + 255 + const result = await resolveFetches( 256 + [ 257 + { 258 + name: "failing", 259 + uri: "at://did1/col/rk1", 260 + // Even with a condition that would fail on a missing entry, 261 + // the error path takes precedence — no skip is emitted. 262 + conditions: [{ field: "found", operator: "exists", value: "" }], 263 + }, 264 + ], 265 + event, 266 + ownerDid, 267 + ); 268 + 269 + expect(result.skip).toBe(false); 270 + expect(result.errors).toHaveLength(1); 271 + }); 153 272 }); 154 273 });
+74 -16
lib/actions/fetcher.ts
··· 1 1 import { fetchRecord } from "../pds/resolver.js"; 2 - import type { FetchStep } from "../db/schema.js"; 2 + import type { FetchStep, FetchStepRecord, FetchStepSearch } from "../db/schema.js"; 3 3 import type { JetstreamEvent } from "../jetstream/matcher.js"; 4 - import { resolveUriTemplate, type FetchContext } from "./template.js"; 4 + import { evaluateFetchConditions } from "../jetstream/matcher.js"; 5 + import { resolveUriTemplate, type FetchContext, type FetchContextEntry } from "./template.js"; 6 + import { executeSearch } from "./searcher.js"; 5 7 6 8 // AT URI format: at://did/collection/rkey 7 9 const AT_URI_RE = /^at:\/\/[^/]+\/[^/]+\/[^/]+$/; 8 10 9 - /** Resolve all fetch steps, returning available context and any errors. */ 11 + function isSearch(step: FetchStep): step is FetchStepSearch { 12 + return (step as FetchStepSearch).kind === "search"; 13 + } 14 + 15 + async function resolveRecord( 16 + step: FetchStepRecord, 17 + event: JetstreamEvent, 18 + ownerDid: string, 19 + ): Promise<{ ok: true; value: FetchContextEntry } | { ok: false; error: string }> { 20 + const resolvedUri = resolveUriTemplate(step.uri, event, undefined, ownerDid); 21 + if (!resolvedUri || !AT_URI_RE.test(resolvedUri)) { 22 + return { ok: false, error: `Resolved URI is not a valid AT URI: ${resolvedUri}` }; 23 + } 24 + const result = await fetchRecord(resolvedUri); 25 + return { ok: true, value: result }; 26 + } 27 + 28 + export type FetchResolution = { 29 + context: FetchContext; 30 + errors: Array<{ name: string; error: string }>; 31 + /** True when a per-fetch condition failed. Handler bails before actions. */ 32 + skip: boolean; 33 + /** Name of the fetch whose conditions triggered the skip, set alongside 34 + * `skip: true`. Used by dry-run to write an informative delivery log. */ 35 + skippedBy?: string; 36 + }; 37 + 38 + /** Resolve all fetch steps, returning available context, any errors, and 39 + * a `skip` flag if any fetch's post-resolution conditions rejected. */ 10 40 export async function resolveFetches( 11 41 steps: FetchStep[], 12 42 event: JetstreamEvent, 13 43 ownerDid: string, 14 - ): Promise<{ context: FetchContext; errors: Array<{ name: string; error: string }> }> { 44 + ): Promise<FetchResolution> { 15 45 const context: FetchContext = {}; 16 46 const errors: Array<{ name: string; error: string }> = []; 17 47 48 + // Record-kind fetches are independent of each other — run in parallel. 49 + // Searches may reference upstream fetches via templates, so we run them 50 + // sequentially after records for the MVP. 51 + const recordSteps: FetchStepRecord[] = []; 52 + const searchSteps: FetchStepSearch[] = []; 53 + for (const step of steps) { 54 + if (isSearch(step)) searchSteps.push(step); 55 + else recordSteps.push(step as FetchStepRecord); 56 + } 57 + 18 58 await Promise.all( 19 - steps.map(async (step) => { 59 + recordSteps.map(async (step) => { 20 60 try { 21 - const resolvedUri = resolveUriTemplate(step.uri, event, undefined, ownerDid); 22 - if (!resolvedUri || !AT_URI_RE.test(resolvedUri)) { 23 - errors.push({ 24 - name: step.name, 25 - error: `Resolved URI is not a valid AT URI: ${resolvedUri}`, 26 - }); 27 - return; 28 - } 29 - const result = await fetchRecord(resolvedUri); 30 - context[step.name] = result; 61 + const result = await resolveRecord(step, event, ownerDid); 62 + if (result.ok) context[step.name] = result.value; 63 + else errors.push({ name: step.name, error: result.error }); 31 64 } catch (err) { 32 65 errors.push({ 33 66 name: step.name, ··· 37 70 }), 38 71 ); 39 72 40 - return { context, errors }; 73 + // Evaluate record-step conditions after all records have resolved. 74 + // If any fail, short-circuit before running search steps. 75 + for (const step of recordSteps) { 76 + const entry = context[step.name]; 77 + if (!entry) continue; // error path — don't attempt condition gate 78 + if (!evaluateFetchConditions(entry, step.conditions, ownerDid)) { 79 + return { context, errors, skip: true, skippedBy: step.name }; 80 + } 81 + } 82 + 83 + for (const step of searchSteps) { 84 + try { 85 + const entry = await executeSearch(step, event, ownerDid, context); 86 + context[step.name] = entry; 87 + if (!evaluateFetchConditions(entry, step.conditions, ownerDid)) { 88 + return { context, errors, skip: true, skippedBy: step.name }; 89 + } 90 + } catch (err) { 91 + errors.push({ 92 + name: step.name, 93 + error: err instanceof Error ? err.message : String(err), 94 + }); 95 + } 96 + } 97 + 98 + return { context, errors, skip: false }; 41 99 }
+221
lib/actions/searcher.test.ts
··· 1 + import { describe, it, expect, vi, beforeEach } from "vitest"; 2 + 3 + vi.mock("@/config.js", () => ({ 4 + config: { 5 + databasePath: ":memory:", 6 + publicUrl: "https://airglow.test", 7 + }, 8 + })); 9 + 10 + vi.mock("@/db/index.js", async () => { 11 + const { createTestDb } = await import("../test/db.js"); 12 + return { db: createTestDb() }; 13 + }); 14 + 15 + vi.mock("../pds/resolver.js", () => ({ 16 + resolveDid: vi.fn(), 17 + })); 18 + 19 + import { executeSearch } from "./searcher.js"; 20 + import { resolveDid } from "../pds/resolver.js"; 21 + import { makeEvent } from "../test/fixtures.js"; 22 + import type { FetchStepSearch } from "../db/schema.js"; 23 + 24 + const mockResolveDid = vi.mocked(resolveDid); 25 + const mockFetch = vi.fn(); 26 + vi.stubGlobal("fetch", mockFetch); 27 + 28 + const ownerDid = "did:plc:owner"; 29 + 30 + function makeSearchStep(overrides?: Partial<FetchStepSearch>): FetchStepSearch { 31 + return { 32 + kind: "search", 33 + name: "existingMirror", 34 + repo: "{{self}}", 35 + collection: "id.sifa.graph.follow", 36 + where: [{ field: "subject", operator: "eq", value: "{{event.commit.record.subject}}" }], 37 + limit: 1, 38 + ...overrides, 39 + }; 40 + } 41 + 42 + describe("executeSearch", () => { 43 + beforeEach(() => { 44 + mockResolveDid.mockReset(); 45 + mockFetch.mockReset(); 46 + }); 47 + 48 + describe("listRecords strategy (non-bsky-follow)", () => { 49 + const event = makeEvent({ 50 + commit: { 51 + rev: "r1", 52 + operation: "create", 53 + collection: "app.bsky.graph.follow", 54 + rkey: "3kabc", 55 + record: { subject: "did:plc:subj" }, 56 + }, 57 + }); 58 + 59 + it("finds a matching record on the first page and returns found:true", async () => { 60 + mockResolveDid.mockResolvedValueOnce("https://pds.example.com"); 61 + mockFetch.mockResolvedValueOnce({ 62 + ok: true, 63 + json: async () => ({ 64 + records: [ 65 + { 66 + uri: "at://did:plc:owner/id.sifa.graph.follow/rkey1", 67 + cid: "c1", 68 + value: { subject: "did:plc:subj" }, 69 + }, 70 + ], 71 + }), 72 + }); 73 + 74 + const result = await executeSearch(makeSearchStep(), event, ownerDid, {}); 75 + 76 + expect(result.found).toBe(true); 77 + expect(result.uri).toBe("at://did:plc:owner/id.sifa.graph.follow/rkey1"); 78 + expect(result.rkey).toBe("rkey1"); 79 + }); 80 + 81 + it("returns found:false when no records match on any page", async () => { 82 + mockResolveDid.mockResolvedValueOnce("https://pds.example.com"); 83 + mockFetch.mockResolvedValueOnce({ 84 + ok: true, 85 + json: async () => ({ 86 + records: [ 87 + { 88 + uri: "at://did:plc:owner/id.sifa.graph.follow/other", 89 + cid: "c", 90 + value: { subject: "did:plc:someoneelse" }, 91 + }, 92 + ], 93 + }), 94 + }); 95 + 96 + const result = await executeSearch(makeSearchStep(), event, ownerDid, {}); 97 + 98 + expect(result.found).toBe(false); 99 + expect(result.uri).toBe(""); 100 + }); 101 + 102 + it("short-circuits when the first match is found before pagination ends", async () => { 103 + mockResolveDid.mockResolvedValueOnce("https://pds.example.com"); 104 + mockFetch.mockResolvedValueOnce({ 105 + ok: true, 106 + json: async () => ({ 107 + records: [ 108 + { 109 + uri: "at://did:plc:owner/id.sifa.graph.follow/rk1", 110 + cid: "c1", 111 + value: { subject: "did:plc:subj" }, 112 + }, 113 + ], 114 + cursor: "page2", 115 + }), 116 + }); 117 + 118 + await executeSearch(makeSearchStep(), event, ownerDid, {}); 119 + // limit=1 + match on page 1 → only one HTTP call despite a cursor being present 120 + expect(mockFetch).toHaveBeenCalledTimes(1); 121 + }); 122 + 123 + it("stops at a safety cap and returns found:false on an exhausted scan", async () => { 124 + mockResolveDid.mockResolvedValueOnce("https://pds.example.com"); 125 + // Always return a non-matching page with a cursor → trigger the 20-page cap. 126 + mockFetch.mockResolvedValue({ 127 + ok: true, 128 + json: async () => ({ 129 + records: [ 130 + { 131 + uri: "at://did:plc:owner/col/x", 132 + cid: "c", 133 + value: { subject: "did:plc:other" }, 134 + }, 135 + ], 136 + cursor: "next", 137 + }), 138 + }); 139 + 140 + const result = await executeSearch(makeSearchStep(), event, ownerDid, {}); 141 + 142 + expect(result.found).toBe(false); 143 + expect(mockFetch).toHaveBeenCalledTimes(20); 144 + }); 145 + 146 + it("renders templates in repo and where values", async () => { 147 + mockResolveDid.mockResolvedValueOnce("https://pds.example.com"); 148 + mockFetch.mockResolvedValueOnce({ ok: true, json: async () => ({ records: [] }) }); 149 + 150 + await executeSearch(makeSearchStep(), event, ownerDid, {}); 151 + 152 + // {{self}} resolved to ownerDid in the URL 153 + const url = new URL(mockFetch.mock.calls[0]![0] as string); 154 + expect(url.searchParams.get("repo")).toBe("did:plc:owner"); 155 + expect(url.searchParams.get("collection")).toBe("id.sifa.graph.follow"); 156 + }); 157 + }); 158 + 159 + describe("Bluesky appview strategy (app.bsky.graph.follow + subject eq)", () => { 160 + const event = makeEvent({ 161 + commit: { 162 + rev: "r1", 163 + operation: "create", 164 + collection: "id.sifa.graph.follow", 165 + rkey: "3kabc", 166 + record: { subject: "did:plc:subj" }, 167 + }, 168 + }); 169 + 170 + const step = makeSearchStep({ 171 + collection: "app.bsky.graph.follow", 172 + where: [{ field: "subject", operator: "eq", value: "{{event.commit.record.subject}}" }], 173 + }); 174 + 175 + it("uses getRelationships and returns found:true when following", async () => { 176 + mockFetch.mockResolvedValueOnce({ 177 + ok: true, 178 + json: async () => ({ 179 + relationships: [ 180 + { 181 + $type: "app.bsky.graph.defs#relationship", 182 + did: "did:plc:subj", 183 + following: "at://did:plc:owner/app.bsky.graph.follow/followRkey", 184 + }, 185 + ], 186 + }), 187 + }); 188 + 189 + const result = await executeSearch(step, event, ownerDid, {}); 190 + 191 + expect(result.found).toBe(true); 192 + expect(result.uri).toBe("at://did:plc:owner/app.bsky.graph.follow/followRkey"); 193 + expect(result.rkey).toBe("followRkey"); 194 + // Appview path should NOT call resolveDid (no PDS lookup) 195 + expect(mockResolveDid).not.toHaveBeenCalled(); 196 + // Only one call — the appview endpoint 197 + expect(mockFetch).toHaveBeenCalledTimes(1); 198 + const url = new URL(mockFetch.mock.calls[0]![0] as string); 199 + expect(url.origin).toBe("https://api.bsky.app"); 200 + expect(url.pathname).toBe("/xrpc/app.bsky.graph.getRelationships"); 201 + }); 202 + 203 + it("returns found:false when the appview says no following relationship", async () => { 204 + mockFetch.mockResolvedValueOnce({ 205 + ok: true, 206 + json: async () => ({ 207 + relationships: [ 208 + { 209 + $type: "app.bsky.graph.defs#relationship", 210 + did: "did:plc:subj", 211 + // no `following` → not followed 212 + }, 213 + ], 214 + }), 215 + }); 216 + 217 + const result = await executeSearch(step, event, ownerDid, {}); 218 + expect(result.found).toBe(false); 219 + }); 220 + }); 221 + });
+180
lib/actions/searcher.ts
··· 1 + import type { FetchStepSearch } from "../db/schema.js"; 2 + import type { JetstreamEvent } from "../jetstream/matcher.js"; 3 + import { resolveDid } from "../pds/resolver.js"; 4 + import { 5 + resolvePlaceholder, 6 + type FetchContext, 7 + type FetchContextEntry, 8 + notFoundEntry, 9 + } from "./template.js"; 10 + 11 + const BSKY_APPVIEW = "https://api.bsky.app"; 12 + const LIST_RECORDS_PAGE_SIZE = 100; 13 + const MAX_LIST_RECORDS_PAGES = 20; 14 + const HTTP_TIMEOUT_MS = 10_000; 15 + 16 + function render(template: string, event: JetstreamEvent, upstream: FetchContext, ownerDid: string) { 17 + const automation = { did: ownerDid, rkey: "", name: "" }; 18 + return template.replace(/\{\{([^}]+)\}\}/g, (_, raw: string) => { 19 + const value = resolvePlaceholder(raw.trim(), event, upstream, automation); 20 + return typeof value === "string" ? value : ""; 21 + }); 22 + } 23 + 24 + function hasOnlyClause( 25 + step: FetchStepSearch, 26 + field: string, 27 + operator: "eq", 28 + ): { value: string } | null { 29 + if (step.where.length !== 1) return null; 30 + const clause = step.where[0]!; 31 + if (clause.field !== field || clause.operator !== operator) return null; 32 + return { value: clause.value }; 33 + } 34 + 35 + async function appviewFollowLookup( 36 + actor: string, 37 + subject: string, 38 + ): Promise<FetchContextEntry | null> { 39 + const url = new URL(`${BSKY_APPVIEW}/xrpc/app.bsky.graph.getRelationships`); 40 + url.searchParams.set("actor", actor); 41 + url.searchParams.append("others", subject); 42 + 43 + let res: Response; 44 + try { 45 + res = await fetch(url, { 46 + headers: { Accept: "application/json" }, 47 + signal: AbortSignal.timeout(HTTP_TIMEOUT_MS), 48 + }); 49 + } catch { 50 + return null; // fall through to listRecords on transport failure 51 + } 52 + if (!res.ok) return null; 53 + 54 + const data = (await res.json()) as { 55 + relationships?: Array<{ $type?: string; did?: string; following?: string }>; 56 + }; 57 + const rel = data.relationships?.[0]; 58 + if (!rel?.following) return notFoundEntry(); 59 + // `following` is the AT URI of the actor's follow record on subject. 60 + const uri = rel.following; 61 + const match = uri.match(/^at:\/\/(did:[^/]+)\/([^/]+)\/([^/]+)$/); 62 + if (!match) return { found: true, uri, cid: "", record: { subject } }; 63 + return { 64 + found: true, 65 + uri, 66 + cid: "", 67 + did: match[1]!, 68 + collection: match[2]!, 69 + rkey: match[3]!, 70 + record: { subject }, 71 + }; 72 + } 73 + 74 + async function listRecordsSearch( 75 + repo: string, 76 + collection: string, 77 + where: Array<{ field: string; value: string }>, 78 + limit: number, 79 + ): Promise<FetchContextEntry> { 80 + const pdsEndpoint = await resolveDid(repo); 81 + if (!pdsEndpoint) throw new Error(`Could not resolve PDS for ${repo}`); 82 + 83 + let cursor: string | undefined; 84 + const matches: FetchContextEntry[] = []; 85 + let pagesScanned = 0; 86 + 87 + for (let page = 0; page < MAX_LIST_RECORDS_PAGES; page++) { 88 + const url = new URL(`${pdsEndpoint}/xrpc/com.atproto.repo.listRecords`); 89 + url.searchParams.set("repo", repo); 90 + url.searchParams.set("collection", collection); 91 + url.searchParams.set("limit", String(LIST_RECORDS_PAGE_SIZE)); 92 + if (cursor) url.searchParams.set("cursor", cursor); 93 + 94 + const res = await fetch(url, { 95 + headers: { Accept: "application/json" }, 96 + signal: AbortSignal.timeout(HTTP_TIMEOUT_MS), 97 + }); 98 + if (!res.ok) throw new Error(`listRecords failed (${res.status}) for ${repo}/${collection}`); 99 + 100 + const data = (await res.json()) as { 101 + records?: Array<{ uri: string; cid?: string; value?: Record<string, unknown> }>; 102 + cursor?: string; 103 + }; 104 + 105 + pagesScanned++; 106 + 107 + for (const rec of data.records ?? []) { 108 + const record = rec.value ?? {}; 109 + if (where.every((w) => readPath(record, w.field) === w.value)) { 110 + const parsed = parseAtUri(rec.uri); 111 + matches.push({ 112 + found: true, 113 + uri: rec.uri, 114 + cid: rec.cid ?? "", 115 + ...parsed, 116 + record, 117 + }); 118 + if (matches.length >= limit) return matches[0]!; 119 + } 120 + } 121 + 122 + cursor = data.cursor; 123 + if (!cursor) break; 124 + } 125 + 126 + if (matches.length > 0) return matches[0]!; 127 + if (pagesScanned >= MAX_LIST_RECORDS_PAGES && cursor) { 128 + console.warn( 129 + `searcher: ${repo}/${collection} scan hit the ${MAX_LIST_RECORDS_PAGES}-page safety cap without a match; returning not-found`, 130 + ); 131 + } 132 + return notFoundEntry(); 133 + } 134 + 135 + function parseAtUri(uri: string): { did: string; collection: string; rkey: string } | undefined { 136 + const match = uri.match(/^at:\/\/(did:[^/]+)\/([^/]+)\/([^/]+)$/); 137 + if (!match) return undefined; 138 + return { did: match[1]!, collection: match[2]!, rkey: match[3]! }; 139 + } 140 + 141 + function readPath(record: Record<string, unknown>, path: string): string | undefined { 142 + let value: unknown = record; 143 + for (const key of path.split(".")) { 144 + if (value == null || typeof value !== "object") return undefined; 145 + value = (value as Record<string, unknown>)[key]; 146 + } 147 + if (value == null) return undefined; 148 + return typeof value === "string" ? value : JSON.stringify(value); 149 + } 150 + 151 + /** Execute a search fetch step against the target PDS (or Bluesky appview). */ 152 + export async function executeSearch( 153 + step: FetchStepSearch, 154 + event: JetstreamEvent, 155 + ownerDid: string, 156 + upstream: FetchContext, 157 + ): Promise<FetchContextEntry> { 158 + const repo = render(step.repo, event, upstream, ownerDid); 159 + if (!repo.startsWith("did:")) 160 + throw new Error(`Search repo must resolve to a DID, got: "${repo}"`); 161 + 162 + const resolvedWhere = step.where.map((w) => ({ 163 + field: w.field, 164 + value: render(w.value, event, upstream, ownerDid), 165 + })); 166 + 167 + // Appview optimization: Bluesky follows are queryable in O(1) via getRelationships. 168 + if (step.collection === "app.bsky.graph.follow") { 169 + const subjectClause = hasOnlyClause(step, "subject", "eq"); 170 + if (subjectClause) { 171 + const subject = render(subjectClause.value, event, upstream, ownerDid); 172 + if (subject) { 173 + const result = await appviewFollowLookup(repo, subject); 174 + if (result) return result; 175 + } 176 + } 177 + } 178 + 179 + return listRecordsSearch(repo, step.collection, resolvedWhere, step.limit ?? 1); 180 + }
+5 -2
lib/actions/template.test.ts
··· 239 239 it("renders fetch context placeholders", async () => { 240 240 const fetchContext = { 241 241 profile: { 242 + found: true, 242 243 uri: "at://did/col/rkey", 243 244 cid: "bafycid", 244 245 record: { displayName: "Alice" }, ··· 255 256 it("exposes did/collection/rkey from fetch context", async () => { 256 257 const fetchContext = { 257 258 repo: { 259 + found: true, 258 260 uri: "at://did:plc:owner/sh.tangled.repo/abc123", 259 261 cid: "bafycid", 260 262 did: "did:plc:owner", ··· 283 285 it("JSON-stringifies objects in placeholders", async () => { 284 286 const fetchContext = { 285 287 data: { 288 + found: true, 286 289 uri: "at://x", 287 290 cid: "c", 288 291 record: { nested: { a: 1 } }, ··· 445 448 446 449 it("does not JSON-escape text (no backslash or quote escaping)", async () => { 447 450 const fetchContext = { 448 - data: { uri: "at://x", cid: "c", record: { name: 'He said "hi"' } }, 451 + data: { found: true, uri: "at://x", cid: "c", record: { name: 'He said "hi"' } }, 449 452 }; 450 453 const result = await renderTextTemplate("{{data.record.name}}", event, fetchContext); 451 454 expect(result).toBe('He said "hi"'); ··· 472 475 473 476 it("resolves didToHandle with fetch context", async () => { 474 477 const fetchContext = { 475 - profile: { uri: "at://x", cid: "c", record: { authorDid: "did:plc:author" } }, 478 + profile: { found: true, uri: "at://x", cid: "c", record: { authorDid: "did:plc:author" } }, 476 479 }; 477 480 const result = await renderTextTemplate( 478 481 "Author: {{didToHandle(profile.record.authorDid)}}",
+21 -12
lib/actions/template.ts
··· 7 7 const KNOWN_FUNCTIONS = new Set(["didToHandle"]); 8 8 const AUTOMATION_FIELDS = new Set(["id", "name", "url"]); 9 9 10 - export type FetchContext = Record< 11 - string, 12 - { 13 - uri: string; 14 - cid: string; 15 - did?: string; 16 - collection?: string; 17 - rkey?: string; 18 - record: Record<string, unknown>; 19 - } 20 - >; 10 + export type FetchContextEntry = { 11 + /** true when the fetch resolved to an actual record; false when the target 12 + * didn't exist (404 / search returned nothing). Conditions observe this via 13 + * the `exists` / `not-exists` operators. */ 14 + found: boolean; 15 + uri: string; 16 + cid: string; 17 + did?: string; 18 + collection?: string; 19 + rkey?: string; 20 + record: Record<string, unknown>; 21 + }; 22 + 23 + export type FetchContext = Record<string, FetchContextEntry>; 24 + 25 + /** Sentinel entry for fetches that didn't find a record. Kept in the context 26 + * so conditions and templates can still address the fetch name. */ 27 + export function notFoundEntry(): FetchContextEntry { 28 + return { found: false, uri: "", cid: "", record: {} }; 29 + } 21 30 22 31 /** Subset of an automation row needed for template resolution. */ 23 32 export type AutomationContext = { ··· 39 48 * - "event.commit.record.<dotted.path>" → nested record field 40 49 * - "<fetchName>.<dotted.path>" → fetched record data 41 50 */ 42 - function resolvePlaceholder( 51 + export function resolvePlaceholder( 43 52 path: string, 44 53 event: JetstreamEvent, 45 54 fetchContext?: FetchContext,
+214 -1
lib/actions/validation.test.ts
··· 1 1 import { describe, it, expect } from "vitest"; 2 - import { validateWantedDids } from "./validation.js"; 2 + import { 3 + validateWantedDids, 4 + validateFetchConditionInputs, 5 + validateFetchSearchStep, 6 + type FetchConditionInput, 7 + type FetchSearchInput, 8 + } from "./validation.js"; 3 9 4 10 describe("validateWantedDids", () => { 5 11 it("accepts undefined/null as empty", () => { ··· 56 62 expect(res.valid).toBe(false); 57 63 }); 58 64 }); 65 + 66 + describe("validateFetchConditionInputs", () => { 67 + const fetchName = "mirror"; 68 + 69 + it("accepts undefined as empty", () => { 70 + expect(validateFetchConditionInputs(undefined, fetchName)).toEqual({ 71 + valid: true, 72 + value: [], 73 + }); 74 + }); 75 + 76 + it("accepts the 'found exists' duplicate-prevention pattern", () => { 77 + const res = validateFetchConditionInputs([{ field: "found", operator: "exists" }], fetchName); 78 + expect(res.valid).toBe(true); 79 + if (res.valid) expect(res.value).toEqual([{ field: "found", operator: "exists", value: "" }]); 80 + }); 81 + 82 + it("drops `value` for value-less operators", () => { 83 + const res = validateFetchConditionInputs( 84 + [{ field: "found", operator: "not-exists", value: "ignored" }], 85 + fetchName, 86 + ); 87 + expect(res.valid).toBe(true); 88 + if (res.valid) expect(res.value[0]!.value).toBe(""); 89 + }); 90 + 91 + it("requires `value` for comparison operators", () => { 92 + const res = validateFetchConditionInputs( 93 + [{ field: "record.subject", operator: "eq" }], 94 + fetchName, 95 + ); 96 + expect(res.valid).toBe(false); 97 + }); 98 + 99 + it("rejects unknown operators", () => { 100 + const res = validateFetchConditionInputs( 101 + [{ field: "found", operator: "matches", value: "x" }], 102 + fetchName, 103 + ); 104 + expect(res.valid).toBe(false); 105 + }); 106 + 107 + it("rejects event paths (belong on top-level conditions)", () => { 108 + const res = validateFetchConditionInputs( 109 + [{ field: "event.did", operator: "eq", value: "x" }], 110 + fetchName, 111 + ); 112 + expect(res.valid).toBe(false); 113 + if (!res.valid) expect(res.error).toContain("event paths"); 114 + }); 115 + 116 + it("accepts numeric path segments (array indexing)", () => { 117 + const res = validateFetchConditionInputs( 118 + [{ field: "record.tags.0", operator: "eq", value: "foo" }], 119 + fetchName, 120 + ); 121 + expect(res.valid).toBe(true); 122 + }); 123 + 124 + it("accepts hyphens in path segments (lexicon fields sometimes use them)", () => { 125 + const res = validateFetchConditionInputs( 126 + [{ field: "record.foo-bar", operator: "eq", value: "x" }], 127 + fetchName, 128 + ); 129 + expect(res.valid).toBe(true); 130 + }); 131 + 132 + it("rejects missing field", () => { 133 + const res = validateFetchConditionInputs( 134 + [{ operator: "exists" } as FetchConditionInput], 135 + fetchName, 136 + ); 137 + expect(res.valid).toBe(false); 138 + }); 139 + 140 + it("defaults operator to 'eq' when omitted", () => { 141 + const res = validateFetchConditionInputs( 142 + [{ field: "record.subject", value: "did:plc:x" }], 143 + fetchName, 144 + ); 145 + expect(res.valid).toBe(true); 146 + if (res.valid) expect(res.value[0]!.operator).toBe("eq"); 147 + }); 148 + 149 + it("enforces the per-fetch condition cap", () => { 150 + const many = Array.from({ length: 11 }, () => ({ field: "found", operator: "exists" })); 151 + const res = validateFetchConditionInputs(many, fetchName); 152 + expect(res.valid).toBe(false); 153 + }); 154 + }); 155 + 156 + describe("validateFetchSearchStep", () => { 157 + function baseStep(overrides: Partial<FetchSearchInput> = {}): FetchSearchInput { 158 + return { 159 + name: "existingMirror", 160 + repo: "{{self}}", 161 + collection: "app.bsky.graph.follow", 162 + where: [{ field: "subject", value: "{{event.commit.record.subject}}" }], 163 + ...overrides, 164 + }; 165 + } 166 + 167 + it("accepts a valid follow-mirror search step", () => { 168 + const res = validateFetchSearchStep(baseStep(), new Set(), new Set()); 169 + expect(res.valid).toBe(true); 170 + if (res.valid) { 171 + expect(res.value.kind).toBe("search"); 172 + expect(res.value.where).toHaveLength(1); 173 + expect(res.value.where[0]!.operator).toBe("eq"); 174 + } 175 + }); 176 + 177 + it("passes through per-fetch conditions on success", () => { 178 + const res = validateFetchSearchStep( 179 + baseStep({ conditions: [{ field: "found", operator: "not-exists" }] }), 180 + new Set(), 181 + new Set(), 182 + ); 183 + expect(res.valid).toBe(true); 184 + if (res.valid) expect(res.value.conditions).toHaveLength(1); 185 + }); 186 + 187 + it("rejects invalid name", () => { 188 + const res = validateFetchSearchStep(baseStep({ name: "1bad" }), new Set(), new Set()); 189 + expect(res.valid).toBe(false); 190 + }); 191 + 192 + it("rejects reserved names", () => { 193 + const res = validateFetchSearchStep(baseStep({ name: "event" }), new Set(), new Set()); 194 + expect(res.valid).toBe(false); 195 + }); 196 + 197 + it("rejects action-style names (`actionN`)", () => { 198 + const res = validateFetchSearchStep(baseStep({ name: "action2" }), new Set(), new Set()); 199 + expect(res.valid).toBe(false); 200 + }); 201 + 202 + it("rejects duplicate names", () => { 203 + const seen = new Set(["existingMirror"]); 204 + const res = validateFetchSearchStep(baseStep(), seen, new Set()); 205 + expect(res.valid).toBe(false); 206 + }); 207 + 208 + it("rejects invalid collection NSID", () => { 209 + const res = validateFetchSearchStep( 210 + baseStep({ collection: "not a nsid" }), 211 + new Set(), 212 + new Set(), 213 + ); 214 + expect(res.valid).toBe(false); 215 + }); 216 + 217 + it("rejects multi-clause where (phase-1 MVP cap)", () => { 218 + const res = validateFetchSearchStep( 219 + baseStep({ 220 + where: [ 221 + { field: "subject", value: "a" }, 222 + { field: "createdAt", value: "b" }, 223 + ], 224 + }), 225 + new Set(), 226 + new Set(), 227 + ); 228 + expect(res.valid).toBe(false); 229 + }); 230 + 231 + it("rejects limit !== 1 (phase-1 MVP cap)", () => { 232 + const res = validateFetchSearchStep(baseStep({ limit: 5 }), new Set(), new Set()); 233 + expect(res.valid).toBe(false); 234 + }); 235 + 236 + it("rejects non-'eq' where operators", () => { 237 + const res = validateFetchSearchStep( 238 + baseStep({ where: [{ field: "subject", operator: "contains", value: "x" }] }), 239 + new Set(), 240 + new Set(), 241 + ); 242 + expect(res.valid).toBe(false); 243 + }); 244 + 245 + it("rejects placeholders that reference unknown upstream fetches", () => { 246 + const res = validateFetchSearchStep( 247 + baseStep({ where: [{ field: "subject", value: "{{unknown.record.x}}" }] }), 248 + new Set(), 249 + new Set(), 250 + ); 251 + expect(res.valid).toBe(false); 252 + }); 253 + 254 + it("accepts upstream fetch references when the name is declared", () => { 255 + const res = validateFetchSearchStep( 256 + baseStep({ where: [{ field: "subject", value: "{{upstream.record.subject}}" }] }), 257 + new Set(), 258 + new Set(["upstream"]), 259 + ); 260 + expect(res.valid).toBe(true); 261 + }); 262 + 263 + it("propagates fetch-condition validation errors", () => { 264 + const res = validateFetchSearchStep( 265 + baseStep({ conditions: [{ field: "event.did", operator: "eq", value: "x" }] }), 266 + new Set(), 267 + new Set(), 268 + ); 269 + expect(res.valid).toBe(false); 270 + }); 271 + });
+215 -2
lib/actions/validation.ts
··· 1 1 import { SECRET_NAME_RE, SECRET_REF_RE } from "../secrets/store.js"; 2 2 import { AUTOMATION_LIMITS, BOOKMARK_LIMITS } from "../automations/limits.js"; 3 3 import { nsidRequiresWantedDids } from "../lexicons/match.js"; 4 - import { validateTextTemplate } from "./template.js"; 4 + import { isValidNsid } from "../lexicons/resolver.js"; 5 + import { PLACEHOLDER_RE, validateTextTemplate } from "./template.js"; 6 + import type { Condition, FetchStepSearch } from "../db/schema.js"; 5 7 6 8 export type ActionInput = 7 9 | { ··· 34 36 comment?: string; 35 37 }; 36 38 37 - export const VALID_OPERATORS = new Set(["eq", "startsWith", "endsWith", "contains"]); 39 + export const VALID_OPERATORS = new Set([ 40 + "eq", 41 + "startsWith", 42 + "endsWith", 43 + "contains", 44 + "exists", 45 + "not-exists", 46 + ]); 47 + 48 + /** Operators that compare against the `value` field. `exists`/`not-exists` ignore it. */ 49 + export const VALUE_LESS_OPERATORS = new Set(["exists", "not-exists"]); 50 + 38 51 export const VALID_OPERATIONS = new Set(["create", "update", "delete"]); 39 52 export const VALID_BSKY_LABELS = new Set(["sexual", "nudity", "porn", "graphic-media"]); 40 53 export const BCP47_RE = /^[a-z]{2,3}(-[A-Za-z0-9]{1,8})*$/; ··· 175 188 } 176 189 177 190 return { valid: true }; 191 + } 192 + 193 + export type FetchConditionInput = { 194 + field?: string; 195 + operator?: string; 196 + value?: string; 197 + comment?: string; 198 + }; 199 + 200 + export type FetchSearchInput = { 201 + name: string; 202 + repo: string; 203 + collection: string; 204 + where: Array<{ field: string; operator?: string; value: string }>; 205 + limit?: number; 206 + conditions?: FetchConditionInput[]; 207 + comment?: string; 208 + }; 209 + 210 + /** Valid field paths for a fetch-attached condition. The field is resolved 211 + * relative to the fetch's own result entry, not against the event. 212 + * We allow the bare "found" flag plus dotted paths into the entry — later 213 + * segments may start with a digit (array indices like `tags.0`) and may 214 + * contain hyphens (lexicon fields occasionally use them). */ 215 + const FETCH_CONDITION_FIELD_RE = /^[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z0-9_][a-zA-Z0-9_-]*)*$/; 216 + 217 + const MAX_FETCH_CONDITIONS = 10; 218 + 219 + /** Validate the condition list attached to a single fetch step. Renamed 220 + * from `validateFetchConditions` to disambiguate from the runtime-layer 221 + * `evaluateFetchConditions` in matcher.ts. */ 222 + export function validateFetchConditionInputs( 223 + input: FetchConditionInput[] | undefined, 224 + fetchName: string, 225 + ): { valid: true; value: Condition[] } | { valid: false; error: string } { 226 + if (!input) return { valid: true, value: [] }; 227 + if (!Array.isArray(input)) { 228 + return { valid: false, error: `Fetch "${fetchName}": conditions must be an array` }; 229 + } 230 + if (input.length > MAX_FETCH_CONDITIONS) { 231 + return { 232 + valid: false, 233 + error: `Fetch "${fetchName}": maximum ${MAX_FETCH_CONDITIONS} conditions allowed`, 234 + }; 235 + } 236 + 237 + const out: Condition[] = []; 238 + for (const cond of input) { 239 + if (!cond.field || typeof cond.field !== "string") { 240 + return { valid: false, error: `Fetch "${fetchName}": condition field is required` }; 241 + } 242 + if (!FETCH_CONDITION_FIELD_RE.test(cond.field)) { 243 + return { 244 + valid: false, 245 + error: `Fetch "${fetchName}": invalid condition field "${cond.field}"`, 246 + }; 247 + } 248 + // Event paths belong on top-level conditions, not on a fetch. 249 + if (cond.field === "event" || cond.field.startsWith("event.")) { 250 + return { 251 + valid: false, 252 + error: `Fetch "${fetchName}": fetch conditions cannot reference event paths; use a top-level condition instead`, 253 + }; 254 + } 255 + 256 + const operator = cond.operator ?? "eq"; 257 + if (!VALID_OPERATORS.has(operator)) { 258 + return { 259 + valid: false, 260 + error: `Fetch "${fetchName}": invalid condition operator "${operator}"`, 261 + }; 262 + } 263 + const valueLess = VALUE_LESS_OPERATORS.has(operator); 264 + if (!valueLess && (cond.value === undefined || cond.value === null)) { 265 + return { 266 + valid: false, 267 + error: `Fetch "${fetchName}": condition value is required for operator "${operator}"`, 268 + }; 269 + } 270 + 271 + out.push({ 272 + field: cond.field, 273 + operator, 274 + value: valueLess ? "" : String(cond.value), 275 + ...(cond.comment ? { comment: cond.comment } : {}), 276 + }); 277 + } 278 + 279 + return { valid: true, value: out }; 280 + } 281 + 282 + const FETCH_SEARCH_NAME_RE = /^[a-zA-Z_][a-zA-Z0-9_]*$/; 283 + const ACTION_NAME_RE = /^action\d+$/; 284 + const SEARCH_RESERVED_NAMES = new Set(["event", "now", "self", "secret", "automation"]); 285 + 286 + function placeholdersOf(template: string): string[] { 287 + const out: string[] = []; 288 + template.replace(PLACEHOLDER_RE, (_, path: string) => { 289 + out.push(path.trim()); 290 + return ""; 291 + }); 292 + return out; 293 + } 294 + 295 + function validateSearchTemplate( 296 + template: string, 297 + upstreamFetchNames: Set<string>, 298 + label: string, 299 + ): { valid: true } | { valid: false; error: string } { 300 + for (const p of placeholdersOf(template)) { 301 + if (p === "now" || p === "self" || p.startsWith("event.")) continue; 302 + const root = p.split(".")[0]!; 303 + if (upstreamFetchNames.has(root)) continue; 304 + return { valid: false, error: `Invalid placeholder in search ${label}: {{${p}}}` }; 305 + } 306 + return { valid: true }; 307 + } 308 + 309 + /** Validate a "search" fetch step. `upstreamFetchNames` are names declared 310 + * earlier in the fetches list — the search can reference them in templates. */ 311 + export function validateFetchSearchStep( 312 + step: FetchSearchInput, 313 + seenNames: Set<string>, 314 + upstreamFetchNames: Set<string>, 315 + ): { valid: true; value: FetchStepSearch } | { valid: false; error: string } { 316 + if (!step.name || !FETCH_SEARCH_NAME_RE.test(step.name)) { 317 + return { valid: false, error: `Invalid fetch name: "${step.name}"` }; 318 + } 319 + if (SEARCH_RESERVED_NAMES.has(step.name) || ACTION_NAME_RE.test(step.name)) { 320 + return { valid: false, error: `Fetch name "${step.name}" is reserved` }; 321 + } 322 + if (seenNames.has(step.name)) { 323 + return { valid: false, error: `Duplicate fetch name: "${step.name}"` }; 324 + } 325 + 326 + if (!step.repo || typeof step.repo !== "string" || !step.repo.trim()) { 327 + return { valid: false, error: `Fetch "${step.name}": repo is required` }; 328 + } 329 + const repoCheck = validateSearchTemplate(step.repo, upstreamFetchNames, "repo"); 330 + if (!repoCheck.valid) return repoCheck; 331 + 332 + if (!step.collection || !isValidNsid(step.collection)) { 333 + return { valid: false, error: `Fetch "${step.name}": invalid collection NSID` }; 334 + } 335 + 336 + if (!Array.isArray(step.where) || step.where.length === 0) { 337 + return { valid: false, error: `Fetch "${step.name}": at least one where clause is required` }; 338 + } 339 + // Phase 1: MVP caps where-clauses + limit to 1. The form only edits a single 340 + // clause; widening the validator would allow multi-clause configs to be 341 + // silently truncated on round-trip. Lift when the form grows multi-clause UI. 342 + if (step.where.length > 1) { 343 + return { 344 + valid: false, 345 + error: `Fetch "${step.name}": only a single where clause is supported in this version`, 346 + }; 347 + } 348 + 349 + const where: FetchStepSearch["where"] = []; 350 + for (const clause of step.where) { 351 + if (!clause.field || typeof clause.field !== "string") { 352 + return { valid: false, error: `Fetch "${step.name}": where field is required` }; 353 + } 354 + const operator = clause.operator ?? "eq"; 355 + if (operator !== "eq") { 356 + return { 357 + valid: false, 358 + error: `Fetch "${step.name}": only "eq" is supported in search where clauses`, 359 + }; 360 + } 361 + if (typeof clause.value !== "string") { 362 + return { valid: false, error: `Fetch "${step.name}": where value must be a string` }; 363 + } 364 + const valueCheck = validateSearchTemplate(clause.value, upstreamFetchNames, "where value"); 365 + if (!valueCheck.valid) return valueCheck; 366 + where.push({ field: clause.field, operator: "eq", value: clause.value }); 367 + } 368 + 369 + // Phase 1: only the first match is exposed to templates, so accept only limit=1. 370 + const limit = step.limit ?? 1; 371 + if (!Number.isInteger(limit) || limit !== 1) { 372 + return { valid: false, error: `Fetch "${step.name}": limit must be 1 in this version` }; 373 + } 374 + 375 + const conditionsResult = validateFetchConditionInputs(step.conditions, step.name); 376 + if (!conditionsResult.valid) return { valid: false, error: conditionsResult.error }; 377 + 378 + return { 379 + valid: true, 380 + value: { 381 + kind: "search", 382 + name: step.name, 383 + repo: step.repo, 384 + collection: step.collection, 385 + where, 386 + ...(step.limit !== undefined ? { limit } : {}), 387 + ...(conditionsResult.value.length > 0 ? { conditions: conditionsResult.value } : {}), 388 + ...(step.comment ? { comment: step.comment } : {}), 389 + }, 390 + }; 178 391 } 179 392 180 393 type BookmarkInput = {
+2
lib/automations/labels.ts
··· 3 3 startsWith: "starts with", 4 4 endsWith: "ends with", 5 5 contains: "contains", 6 + exists: "is present", 7 + "not-exists": "is missing", 6 8 }; 7 9 8 10 export const actionTypeLabels: Record<string, string> = {
+22 -1
lib/automations/pds.ts
··· 76 76 | PdsPatchRecordAction 77 77 | PdsBookmarkAction; 78 78 79 - export type PdsFetchStep = { 79 + type PdsCondition = { 80 + field: string; 81 + operator: string; 82 + value: string; 83 + comment?: string; 84 + }; 85 + 86 + export type PdsFetchStepRecord = { 80 87 $type: "run.airglow.automation#fetchStep"; 81 88 name: string; 82 89 uri: string; 90 + conditions?: PdsCondition[]; 83 91 comment?: string; 84 92 }; 93 + 94 + export type PdsFetchStepSearch = { 95 + $type: "run.airglow.automation#fetchSearchStep"; 96 + name: string; 97 + repo: string; 98 + collection: string; 99 + where: Array<{ field: string; operator: "eq"; value: string }>; 100 + limit?: number; 101 + conditions?: PdsCondition[]; 102 + comment?: string; 103 + }; 104 + 105 + export type PdsFetchStep = PdsFetchStepRecord | PdsFetchStepSearch; 85 106 86 107 type AutomationRecord = { 87 108 name: string;
+26 -5
lib/db/schema.ts
··· 63 63 return RECORD_PRODUCING_TYPES.has(type); 64 64 } 65 65 66 - export type FetchStep = { 66 + export type Condition = { 67 + field: string; 68 + operator: string; 69 + value: string; 70 + comment?: string; 71 + }; 72 + 73 + export type FetchStepRecord = { 74 + kind?: "record"; // absent on legacy rows — treated as "record" 67 75 name: string; 68 76 uri: string; // AT URI template 77 + /** Conditions evaluated against this fetch's result entry after it resolves. 78 + * If any fail, the automation is skipped silently. */ 79 + conditions?: Condition[]; 69 80 comment?: string; 70 81 }; 71 82 83 + export type FetchStepSearch = { 84 + kind: "search"; 85 + name: string; 86 + repo: string; // DID template — {{self}}, {{event.*}}, or upstream fetch ref 87 + collection: string; // literal NSID 88 + where: Array<{ field: string; operator: "eq"; value: string }>; 89 + limit?: number; // default 1 90 + conditions?: Condition[]; 91 + comment?: string; 92 + }; 93 + 94 + export type FetchStep = FetchStepRecord | FetchStepSearch; 95 + 72 96 // Local index of run.airglow.automation records living on user PDS. 73 97 // Source of truth is the PDS; this is a cache for fast Jetstream matching. 74 98 export const automations = sqliteTable( ··· 83 107 operations: text("operation", { mode: "json" }).notNull().$type<string[]>().default(["create"]), 84 108 actions: text("actions", { mode: "json" }).notNull().$type<Action[]>().default([]), 85 109 fetches: text("fetches", { mode: "json" }).notNull().$type<FetchStep[]>().default([]), 86 - conditions: text("conditions", { mode: "json" }) 87 - .notNull() 88 - .$type<Array<{ field: string; operator: string; value: string; comment?: string }>>() 89 - .default([]), 110 + conditions: text("conditions", { mode: "json" }).notNull().$type<Condition[]>().default([]), 90 111 wantedDids: text("wanted_dids", { mode: "json" }).notNull().$type<string[]>().default([]), 91 112 active: integer("active", { mode: "boolean" }).notNull().default(false), 92 113 dryRun: integer("dry_run", { mode: "boolean" }).notNull().default(false),
+1
lib/jetstream/consumer.test.ts
··· 4 4 config: { 5 5 databasePath: ":memory:", 6 6 jetstreamUrl: "wss://jetstream.test/subscribe", 7 + publicUrl: "https://airglow.test", 7 8 nsidAllowlist: [], 8 9 nsidBlocklist: [], 9 10 nsidRequireDids: [],
+73 -3
lib/jetstream/handler.test.ts
··· 1 1 import { describe, it, expect, vi, beforeEach } from "vitest"; 2 2 3 + const { mockInsert, mockInsertValues } = vi.hoisted(() => { 4 + const insertValues = vi.fn().mockResolvedValue(undefined); 5 + const insert = vi.fn(() => ({ values: insertValues })); 6 + return { mockInsert: insert, mockInsertValues: insertValues }; 7 + }); 8 + 9 + vi.mock("@/db/index.js", () => ({ 10 + db: { insert: mockInsert }, 11 + })); 12 + 3 13 vi.mock("@/webhooks/dispatcher.js", () => ({ 4 14 dispatch: vi.fn(), 5 15 })); ··· 105 115 106 116 it("resolves fetches and passes context to all actions", async () => { 107 117 const fetchContext = { 108 - profile: { uri: "at://x/col/rk", cid: "c", record: { name: "Alice" } }, 118 + profile: { found: true, uri: "at://x/col/rk", cid: "c", record: { name: "Alice" } }, 109 119 }; 110 - mockResolveFetches.mockResolvedValueOnce({ context: fetchContext, errors: [] }); 120 + mockResolveFetches.mockResolvedValueOnce({ context: fetchContext, errors: [], skip: false }); 111 121 112 122 const match = makeMatch({ 113 123 automation: { ··· 127 137 expect(mockExecuteAction).toHaveBeenCalledWith(match, 1, fetchContext); 128 138 }); 129 139 140 + it("bails silently when a fetch's post-resolution condition skips", async () => { 141 + mockResolveFetches.mockResolvedValueOnce({ 142 + context: { 143 + existingMirror: { found: true, uri: "at://x/col/rk", cid: "c", record: {} }, 144 + }, 145 + errors: [], 146 + skip: true, 147 + }); 148 + mockInsert.mockClear(); 149 + 150 + const match = makeMatch({ 151 + automation: { 152 + actions: [makeWebhookAction(), makeRecordAction()], 153 + fetches: [makeFetchStep({ name: "existingMirror" })], 154 + }, 155 + }); 156 + 157 + await handleMatchedEvent(match); 158 + 159 + // No actions should have been dispatched. 160 + expect(mockDispatch).not.toHaveBeenCalled(); 161 + expect(mockExecuteAction).not.toHaveBeenCalled(); 162 + // Non-dry-run skip: no delivery log entry written. 163 + expect(mockInsert).not.toHaveBeenCalled(); 164 + }); 165 + 166 + it("writes a dry-run delivery log when a per-fetch condition skips", async () => { 167 + mockResolveFetches.mockResolvedValueOnce({ 168 + context: { 169 + existingMirror: { found: true, uri: "at://x/col/rk", cid: "c", record: {} }, 170 + }, 171 + errors: [], 172 + skip: true, 173 + skippedBy: "existingMirror", 174 + }); 175 + mockInsert.mockClear(); 176 + mockInsertValues.mockClear(); 177 + 178 + const match = makeMatch({ 179 + automation: { 180 + actions: [makeWebhookAction()], 181 + fetches: [makeFetchStep({ name: "existingMirror" })], 182 + dryRun: true, 183 + }, 184 + }); 185 + 186 + await handleMatchedEvent(match); 187 + 188 + expect(mockDispatch).not.toHaveBeenCalled(); 189 + expect(mockInsertValues).toHaveBeenCalledTimes(1); 190 + expect(mockInsertValues).toHaveBeenCalledWith( 191 + expect.objectContaining({ 192 + dryRun: true, 193 + message: expect.stringContaining("existingMirror"), 194 + }), 195 + ); 196 + }); 197 + 130 198 it("skips fetch resolution when no fetch steps", async () => { 131 199 const match = makeMatch({ 132 200 automation: { actions: [makeWebhookAction()], fetches: [] }, ··· 139 207 140 208 it("passes partial fetch context when some fetches fail", async () => { 141 209 const partialContext = { 142 - profile: { uri: "at://x/col/rk", cid: "c", record: { name: "Alice" } }, 210 + profile: { found: true, uri: "at://x/col/rk", cid: "c", record: { name: "Alice" } }, 143 211 }; 144 212 mockResolveFetches.mockResolvedValueOnce({ 145 213 context: partialContext, 146 214 errors: [{ name: "extra", error: "PDS unreachable" }], 215 + skip: false, 147 216 }); 148 217 149 218 const match = makeMatch({ ··· 208 277 1, 209 278 expect.objectContaining({ 210 279 action1: { 280 + found: true, 211 281 uri: okWithUri.uri, 212 282 cid: okWithUri.cid, 213 283 did: "did:plc:test",
+30
lib/jetstream/handler.ts
··· 23 23 for (const err of result.errors) { 24 24 console.warn(`Fetch "${err.name}" failed for ${match.automation.uri}: ${err.error}`); 25 25 } 26 + // A per-fetch condition returned false: skip the automation silently. 27 + // This is normal filtering (e.g. "skip when the mirror already exists"), 28 + // not an error. In dry-run we still write a log entry so the user can 29 + // debug why their automation isn't firing. 30 + if (result.skip) { 31 + if (match.automation.dryRun) { 32 + await logDrySkip(match, result.skippedBy); 33 + } 34 + return; 35 + } 26 36 } 27 37 28 38 if (match.automation.dryRun) { ··· 36 46 // Inject synthetic action result so subsequent dry-run actions can reference {{actionN.*}} 37 47 if (isRecordProducingAction(action.$type)) { 38 48 fetchContext[`action${i + 1}`] = { 49 + found: true, 39 50 uri: `at://did:dry:run/${action.$type}/placeholder`, 40 51 cid: "dry-run-cid", 41 52 did: "did:dry:run", ··· 68 79 if (result.uri && result.cid) { 69 80 const { did, collection, rkey } = parseAtUri(result.uri); 70 81 fetchContext[`action${i + 1}`] = { 82 + found: true, 71 83 uri: result.uri, 72 84 cid: result.cid, 73 85 did, ··· 188 200 createdAt: new Date(), 189 201 }); 190 202 } 203 + 204 + async function logDrySkip(match: MatchedEvent, skippedBy: string | undefined) { 205 + const message = skippedBy 206 + ? `Skipped: conditions on data source "${skippedBy}" did not match` 207 + : "Skipped: per-source conditions did not match"; 208 + await db.insert(deliveryLogs).values({ 209 + automationUri: match.automation.uri, 210 + actionIndex: 0, 211 + eventTimeUs: match.event.time_us, 212 + payload: null, 213 + statusCode: null, 214 + message, 215 + error: null, 216 + dryRun: true, 217 + attempt: 1, 218 + createdAt: new Date(), 219 + }); 220 + }
+245 -1
lib/jetstream/matcher.test.ts
··· 1 1 import { describe, it, expect } from "vitest"; 2 - import { matchConditions } from "./matcher.js"; 2 + import { matchConditions, evaluateFetchConditions } from "./matcher.js"; 3 3 import { makeEvent } from "../test/fixtures.js"; 4 + import type { FetchContextEntry } from "../actions/template.js"; 4 5 5 6 describe("matchConditions", () => { 6 7 const ownerDid = "did:plc:owner123"; ··· 231 232 }); 232 233 const conditions = [{ field: "count", operator: "eq", value: "42" }]; 233 234 expect(matchConditions(event, conditions, ownerDid)).toBe(true); 235 + }); 236 + }); 237 + 238 + describe("exists / not-exists operator", () => { 239 + it("matches 'exists' when the record field is present", () => { 240 + const event = makeEvent({ 241 + commit: { 242 + rev: "r1", 243 + operation: "create", 244 + collection: "test", 245 + rkey: "k", 246 + record: { subject: "did:plc:foo" }, 247 + }, 248 + }); 249 + const conditions = [{ field: "subject", operator: "exists", value: "" }]; 250 + expect(matchConditions(event, conditions, ownerDid)).toBe(true); 251 + }); 252 + 253 + it("fails 'exists' when the record field is missing", () => { 254 + const event = makeEvent({ 255 + commit: { 256 + rev: "r1", 257 + operation: "create", 258 + collection: "test", 259 + rkey: "k", 260 + record: {}, 261 + }, 262 + }); 263 + const conditions = [{ field: "subject", operator: "exists", value: "" }]; 264 + expect(matchConditions(event, conditions, ownerDid)).toBe(false); 265 + }); 266 + 267 + it("'not-exists' is the inverse of 'exists'", () => { 268 + const event = makeEvent({ 269 + commit: { 270 + rev: "r1", 271 + operation: "create", 272 + collection: "test", 273 + rkey: "k", 274 + record: {}, 275 + }, 276 + }); 277 + const conditions = [{ field: "subject", operator: "not-exists", value: "" }]; 278 + expect(matchConditions(event, conditions, ownerDid)).toBe(true); 279 + }); 280 + }); 281 + }); 282 + 283 + describe("evaluateFetchConditions", () => { 284 + const ownerDid = "did:plc:owner123"; 285 + const foundEntry: FetchContextEntry = { 286 + found: true, 287 + uri: "at://did:plc:x/collection/rkey", 288 + cid: "cid1", 289 + record: { subject: "did:plc:target", nested: { displayName: "Alice" } }, 290 + }; 291 + const notFoundEntry: FetchContextEntry = { found: false, uri: "", cid: "", record: {} }; 292 + 293 + it("returns true with no conditions", () => { 294 + expect(evaluateFetchConditions(foundEntry, undefined, ownerDid)).toBe(true); 295 + expect(evaluateFetchConditions(foundEntry, [], ownerDid)).toBe(true); 296 + }); 297 + 298 + describe("'found' + exists/not-exists special case", () => { 299 + it("'found exists' on a found entry is true", () => { 300 + expect( 301 + evaluateFetchConditions( 302 + foundEntry, 303 + [{ field: "found", operator: "exists", value: "" }], 304 + ownerDid, 305 + ), 306 + ).toBe(true); 307 + }); 308 + 309 + it("'found exists' on a not-found entry is false", () => { 310 + expect( 311 + evaluateFetchConditions( 312 + notFoundEntry, 313 + [{ field: "found", operator: "exists", value: "" }], 314 + ownerDid, 315 + ), 316 + ).toBe(false); 317 + }); 318 + 319 + it("'found not-exists' on a not-found entry is true (duplicate-prevention pattern)", () => { 320 + expect( 321 + evaluateFetchConditions( 322 + notFoundEntry, 323 + [{ field: "found", operator: "not-exists", value: "" }], 324 + ownerDid, 325 + ), 326 + ).toBe(true); 327 + }); 328 + 329 + it("'found not-exists' on a found entry is false", () => { 330 + expect( 331 + evaluateFetchConditions( 332 + foundEntry, 333 + [{ field: "found", operator: "not-exists", value: "" }], 334 + ownerDid, 335 + ), 336 + ).toBe(false); 337 + }); 338 + }); 339 + 340 + describe("dotted paths into the record", () => { 341 + it("eq on a top-level record field", () => { 342 + expect( 343 + evaluateFetchConditions( 344 + foundEntry, 345 + [{ field: "record.subject", operator: "eq", value: "did:plc:target" }], 346 + ownerDid, 347 + ), 348 + ).toBe(true); 349 + }); 350 + 351 + it("eq on a nested record field", () => { 352 + expect( 353 + evaluateFetchConditions( 354 + foundEntry, 355 + [{ field: "record.nested.displayName", operator: "eq", value: "Alice" }], 356 + ownerDid, 357 + ), 358 + ).toBe(true); 359 + }); 360 + 361 + it("eq on a missing field is false", () => { 362 + expect( 363 + evaluateFetchConditions( 364 + foundEntry, 365 + [{ field: "record.missing", operator: "eq", value: "x" }], 366 + ownerDid, 367 + ), 368 + ).toBe(false); 369 + }); 370 + 371 + it("exists on a present nested field", () => { 372 + expect( 373 + evaluateFetchConditions( 374 + foundEntry, 375 + [{ field: "record.nested.displayName", operator: "exists", value: "" }], 376 + ownerDid, 377 + ), 378 + ).toBe(true); 379 + }); 380 + 381 + it("exists on a missing nested field", () => { 382 + expect( 383 + evaluateFetchConditions( 384 + foundEntry, 385 + [{ field: "record.missing", operator: "exists", value: "" }], 386 + ownerDid, 387 + ), 388 + ).toBe(false); 389 + }); 390 + 391 + it("not-found entries always return undefined for dotted paths", () => { 392 + expect( 393 + evaluateFetchConditions( 394 + notFoundEntry, 395 + [{ field: "record.subject", operator: "exists", value: "" }], 396 + ownerDid, 397 + ), 398 + ).toBe(false); 399 + expect( 400 + evaluateFetchConditions( 401 + notFoundEntry, 402 + [{ field: "record.subject", operator: "not-exists", value: "" }], 403 + ownerDid, 404 + ), 405 + ).toBe(true); 406 + }); 407 + }); 408 + 409 + describe("AND logic", () => { 410 + it("passes only when every condition matches", () => { 411 + expect( 412 + evaluateFetchConditions( 413 + foundEntry, 414 + [ 415 + { field: "found", operator: "exists", value: "" }, 416 + { field: "record.subject", operator: "startsWith", value: "did:plc:" }, 417 + ], 418 + ownerDid, 419 + ), 420 + ).toBe(true); 421 + }); 422 + 423 + it("short-circuits on first failure", () => { 424 + expect( 425 + evaluateFetchConditions( 426 + foundEntry, 427 + [ 428 + { field: "found", operator: "exists", value: "" }, 429 + { field: "record.subject", operator: "eq", value: "did:plc:nope" }, 430 + ], 431 + ownerDid, 432 + ), 433 + ).toBe(false); 434 + }); 435 + }); 436 + 437 + describe("{{self}} placeholder resolution", () => { 438 + it("resolves {{self}} in value before comparing (eq)", () => { 439 + const selfEntry: FetchContextEntry = { 440 + found: true, 441 + uri: "at://did:plc:x/collection/rkey", 442 + cid: "cid1", 443 + record: { subject: ownerDid }, 444 + }; 445 + expect( 446 + evaluateFetchConditions( 447 + selfEntry, 448 + [{ field: "record.subject", operator: "eq", value: "{{self}}" }], 449 + ownerDid, 450 + ), 451 + ).toBe(true); 452 + }); 453 + 454 + it("does not match when the resolved {{self}} differs from the field value", () => { 455 + expect( 456 + evaluateFetchConditions( 457 + foundEntry, 458 + [{ field: "record.subject", operator: "eq", value: "{{self}}" }], 459 + ownerDid, 460 + ), 461 + ).toBe(false); 462 + }); 463 + 464 + it("resolves {{self}} for other string operators (startsWith)", () => { 465 + const selfEntry: FetchContextEntry = { 466 + found: true, 467 + uri: "at://did:plc:x/collection/rkey", 468 + cid: "cid1", 469 + record: { subject: `${ownerDid}/extra` }, 470 + }; 471 + expect( 472 + evaluateFetchConditions( 473 + selfEntry, 474 + [{ field: "record.subject", operator: "startsWith", value: "{{self}}" }], 475 + ownerDid, 476 + ), 477 + ).toBe(true); 234 478 }); 235 479 }); 236 480 });
+83 -18
lib/jetstream/matcher.ts
··· 1 + import type { Condition } from "../db/schema.js"; 2 + import type { FetchContextEntry } from "../actions/template.js"; 3 + 1 4 export type JetstreamCommit = { 2 5 rev: string; 3 6 operation: "create" | "update" | "delete"; ··· 14 17 commit?: JetstreamCommit; 15 18 }; 16 19 17 - type Condition = { 18 - field: string; 19 - operator: string; 20 - value: string; 21 - }; 22 - 23 20 /** 24 21 * Resolve a dotted field path against an event. 25 22 * - "event.did" → event.did ··· 31 28 * Note: "event.commit.operation" is no longer supported as a condition field. 32 29 * Operation filtering is handled at the consumer level via the top-level `operation` field. 33 30 */ 34 - function resolveField(event: JetstreamEvent, field: string): string | undefined { 31 + function resolveEventField(event: JetstreamEvent, field: string): string | undefined { 35 32 if (field === "event.did" || field === "repo") return event.did; 36 33 37 34 if (!event.commit?.record) return undefined; ··· 49 46 return value === "{{self}}" ? ownerDid : value; 50 47 } 51 48 52 - function evaluateCondition(event: JetstreamEvent, condition: Condition, ownerDid: string): boolean { 53 - const actual = resolveField(event, condition.field); 54 - if (actual === undefined) return false; 55 - 56 - const expected = resolveConditionValue(condition.value, ownerDid); 57 - 58 - switch (condition.operator) { 49 + function applyStringOperator(operator: string, actual: string, expected: string): boolean { 50 + switch (operator) { 59 51 case "eq": 60 52 return actual === expected; 61 53 case "startsWith": ··· 69 61 } 70 62 } 71 63 64 + function evaluateTriggerCondition( 65 + event: JetstreamEvent, 66 + condition: Condition, 67 + ownerDid: string, 68 + ): boolean { 69 + if (condition.operator === "exists" || condition.operator === "not-exists") { 70 + const actual = resolveEventField(event, condition.field); 71 + const exists = actual !== undefined && actual !== ""; 72 + return condition.operator === "exists" ? exists : !exists; 73 + } 74 + 75 + const actual = resolveEventField(event, condition.field); 76 + if (actual === undefined) return false; 77 + const expected = resolveConditionValue(condition.value, ownerDid); 78 + return applyStringOperator(condition.operator, actual, expected); 79 + } 80 + 72 81 /** 73 - * Check if all conditions match the event. 74 - * Empty conditions = match all events for that collection. 75 - * `ownerDid` is the automation owner's DID, used to resolve {{self}} in condition values. 82 + * Check top-level conditions against the event only. These run in the consumer 83 + * on the Jetstream hot path, before any fetches. Empty conditions list matches. 76 84 */ 77 85 export function matchConditions( 78 86 event: JetstreamEvent, ··· 80 88 ownerDid: string, 81 89 ): boolean { 82 90 if (conditions.length === 0) return true; 83 - return conditions.every((cond) => evaluateCondition(event, cond, ownerDid)); 91 + return conditions.every((cond) => evaluateTriggerCondition(event, cond, ownerDid)); 92 + } 93 + 94 + /** 95 + * Read a dotted path from a fetch context entry. Returns the resolved string 96 + * value, or `undefined` if missing / entry is not-found. 97 + */ 98 + function readEntryPath(entry: FetchContextEntry, path: string): string | undefined { 99 + if (!entry.found) return undefined; 100 + let value: unknown = entry; 101 + for (const key of path.split(".")) { 102 + if (value == null || typeof value !== "object") return undefined; 103 + value = (value as Record<string, unknown>)[key]; 104 + } 105 + if (value == null) return undefined; 106 + return typeof value === "string" ? value : JSON.stringify(value); 107 + } 108 + 109 + function evaluateFetchCondition( 110 + entry: FetchContextEntry, 111 + condition: Condition, 112 + ownerDid: string, 113 + ): boolean { 114 + // Special-case: the bare `found` field combined with exists / not-exists 115 + // tests the boolean flag directly (otherwise both would be trivially true 116 + // because `readEntryPath` would always return "true" or "false" strings). 117 + if ( 118 + condition.field === "found" && 119 + (condition.operator === "exists" || condition.operator === "not-exists") 120 + ) { 121 + return condition.operator === "exists" ? entry.found : !entry.found; 122 + } 123 + 124 + if (condition.operator === "exists" || condition.operator === "not-exists") { 125 + const actual = readEntryPath(entry, condition.field); 126 + const exists = actual !== undefined && actual !== ""; 127 + return condition.operator === "exists" ? exists : !exists; 128 + } 129 + 130 + const actual = readEntryPath(entry, condition.field); 131 + if (actual === undefined) return false; 132 + const expected = resolveConditionValue(condition.value, ownerDid); 133 + return applyStringOperator(condition.operator, actual, expected); 134 + } 135 + 136 + /** 137 + * Evaluate every fetch-attached condition against its owning fetch entry. 138 + * Returns false as soon as any condition fails (short-circuit). An empty 139 + * list always passes. `ownerDid` is used to resolve `{{self}}` placeholders 140 + * in the condition value, mirroring the trigger-condition path. 141 + */ 142 + export function evaluateFetchConditions( 143 + entry: FetchContextEntry, 144 + conditions: Condition[] | undefined, 145 + ownerDid: string, 146 + ): boolean { 147 + if (!conditions || conditions.length === 0) return true; 148 + return conditions.every((cond) => evaluateFetchCondition(entry, cond, ownerDid)); 84 149 }
+22 -2
lib/pds/resolver.test.ts
··· 142 142 143 143 const result = await fetchRecord("at://did:plc:abc/app.bsky.feed.post/rk1"); 144 144 expect(result).toEqual({ 145 + found: true, 145 146 uri: "at://did:plc:abc/app.bsky.feed.post/rk1", 146 147 cid: "bafycid123", 147 148 did: "did:plc:abc", ··· 172 173 ); 173 174 }); 174 175 175 - it("throws when PDS getRecord fails", async () => { 176 + it("returns found:false when PDS getRecord returns 404", async () => { 176 177 mockResolve.mockResolvedValueOnce({ 177 178 id: "did:plc:abc", 178 179 service: [ ··· 185 186 }); 186 187 mockFetch.mockResolvedValueOnce({ ok: false, status: 404 }); 187 188 188 - await expect(fetchRecord("at://did:plc:abc/col/rk")).rejects.toThrow("getRecord failed (404)"); 189 + const result = await fetchRecord("at://did:plc:abc/col/rk"); 190 + expect(result.found).toBe(false); 191 + expect(result.uri).toBe("at://did:plc:abc/col/rk"); 192 + expect(result.record).toEqual({}); 193 + }); 194 + 195 + it("throws when PDS getRecord fails with non-404 error", async () => { 196 + mockResolve.mockResolvedValueOnce({ 197 + id: "did:plc:abc", 198 + service: [ 199 + { 200 + id: "#atproto_pds", 201 + type: "AtprotoPersonalDataServer", 202 + serviceEndpoint: "https://pds.example.com", 203 + }, 204 + ], 205 + }); 206 + mockFetch.mockResolvedValueOnce({ ok: false, status: 500 }); 207 + 208 + await expect(fetchRecord("at://did:plc:abc/col/rk")).rejects.toThrow("getRecord failed (500)"); 189 209 }); 190 210 191 211 it("returns defaults for missing cid/value in response", async () => {
+7
lib/pds/resolver.ts
··· 17 17 export const handleResolver = new HandleResolver({ timeout: 10_000 }); 18 18 19 19 export type FetchedRecord = { 20 + /** True when the PDS returned the record; false when it returned 404. */ 21 + found: boolean; 20 22 uri: string; 21 23 cid: string; 22 24 did: string; ··· 93 95 ); 94 96 } 95 97 98 + if (res.status === 404) { 99 + return { found: false, uri: atUri, cid: "", did, collection, rkey, record: {} }; 100 + } 101 + 96 102 if (!res.ok) { 97 103 throw new Error(`PDS getRecord failed (${res.status}) for ${atUri}`); 98 104 } ··· 104 110 }; 105 111 106 112 return { 113 + found: true, 107 114 uri: data.uri ?? atUri, 108 115 cid: data.cid ?? "", 109 116 did,
+4 -1
lib/test/fixtures.ts
··· 23 23 conditions: Array<{ field: string; operator: string; value: string; comment?: string }>; 24 24 wantedDids: string[]; 25 25 active: boolean; 26 + dryRun: boolean; 26 27 indexedAt: Date; 27 28 }; 28 29 ··· 95 96 96 97 export function makeFetchStep(overrides?: Partial<FetchStep>): FetchStep { 97 98 return { 99 + kind: "record", 98 100 name: "profile", 99 101 uri: "at://{{event.did}}/app.bsky.actor.profile/self", 100 102 ...overrides, 101 - }; 103 + } as FetchStep; 102 104 } 103 105 104 106 export function makeAutomation(overrides?: Partial<Automation>): Automation { ··· 115 117 conditions: [], 116 118 wantedDids: [], 117 119 active: true, 120 + dryRun: false, 118 121 indexedAt: new Date("2024-01-01T00:00:00.000Z"), 119 122 ...overrides, 120 123 };
+1 -1
lib/webhooks/dispatcher.test.ts
··· 169 169 mockFetch.mockResolvedValueOnce({ status: 200 }); 170 170 171 171 const fetchContext = { 172 - profile: { uri: "at://x/col/rk", cid: "c", record: { name: "Alice" } }, 172 + profile: { found: true, uri: "at://x/col/rk", cid: "c", record: { name: "Alice" } }, 173 173 }; 174 174 const match = makeMatch(); 175 175 await dispatch(match, 0, fetchContext);