Webhooks for the AT Protocol airglow.run
atproto atprotocol automation webhook
12
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: mandatory condition on type

Hugo 5b7adc05 c99c0e8a

+518 -77
+23 -8
app/islands/AutomationForm.tsx
··· 33 33 name: string; 34 34 description: string | null; 35 35 lexicon: string; 36 + operation: string; 36 37 actions: Action[]; 37 38 fetches: FetchStep[]; 38 39 conditions: Array<{ field: string; operator: string; value: string; comment?: string }>; ··· 43 44 44 45 const BUILTIN_CONDITION_FIELDS: Field[] = [ 45 46 { path: "event.did", type: "string", description: "DID of the repo that emitted the event" }, 46 - { 47 - path: "event.commit.operation", 48 - type: "string", 49 - description: "create, update, or delete", 50 - }, 51 47 ]; 52 48 53 49 const BUILTIN_PLACEHOLDERS = [ ··· 303 299 const [name, setName] = useState(initial?.name ?? ""); 304 300 const [description, setDescription] = useState(initial?.description ?? ""); 305 301 const [lexicon, setLexicon] = useState(initialLexicon); 302 + const [operation, setOperation] = useState(initial?.operation ?? "create"); 306 303 const [fields, setFields] = useState<Field[]>([]); 307 304 const [fieldsLoading, setFieldsLoading] = useState(false); 308 305 const [fieldsError, setFieldsError] = useState(""); ··· 328 325 if (isEdit) { 329 326 if (name !== (initial.name ?? "")) return true; 330 327 if (description !== (initial.description ?? "")) return true; 328 + if (operation !== (initial.operation ?? "create")) return true; 331 329 if (JSON.stringify(conditions) !== JSON.stringify(toConditionDrafts(initial.conditions))) 332 330 return true; 333 331 if (JSON.stringify(fetches) !== JSON.stringify(toFetchDrafts(initial.fetches))) return true; ··· 342 340 fetches.length || 343 341 actions.length 344 342 ); 345 - }, [name, description, lexicon, conditions, fetches, actions, isEdit]); 343 + }, [name, description, lexicon, operation, conditions, fetches, actions, isEdit]); 346 344 347 345 useEffect(() => { 348 346 if (!isDirty) return; ··· 469 467 }, []); 470 468 471 469 const previewPayload = useMemo(() => { 472 - const payload: Record<string, unknown> = { name, lexicon }; 470 + const payload: Record<string, unknown> = { name, lexicon, operation }; 473 471 if (description.trim()) payload.description = description.trim(); 474 472 const filteredFetches = fetches.filter((f) => f.name && f.uri); 475 473 if (filteredFetches.length > 0 || isEdit) { ··· 499 497 }; 500 498 }); 501 499 return JSON.stringify(payload, null, 2); 502 - }, [name, description, lexicon, fetches, conditions, actions]); 500 + }, [name, description, lexicon, operation, fetches, conditions, actions]); 503 501 504 502 const handleSubmit = useCallback( 505 503 async (e: Event) => { ··· 601 599 {isEdit && <span class={s.hint}>Lexicon cannot be changed after creation</span>} 602 600 {fieldsLoading && <span class={s.hint}>Loading fields...</span>} 603 601 {fieldsError && <span class={s.errorText}>{fieldsError}</span>} 602 + </div> 603 + 604 + <div class={s.fieldGroup}> 605 + <label class={s.label} for="operation"> 606 + Operation 607 + </label> 608 + <select 609 + id="operation" 610 + class={s.select} 611 + value={operation} 612 + onChange={(e: Event) => setOperation((e.target as HTMLSelectElement).value)} 613 + > 614 + <option value="create">create</option> 615 + <option value="update">update</option> 616 + <option value="delete">delete</option> 617 + </select> 618 + <span class={s.hint}>Type of commit event this automation responds to</span> 604 619 </div> 605 620 606 621 {allPlaceholders.length > 0 && (
+1
app/routes/api/automations/[rkey].test.ts
··· 57 57 name: "Test Auto", 58 58 description: null, 59 59 lexicon: "app.bsky.feed.like", 60 + operation: "create", 60 61 actions: [ 61 62 { $type: "webhook" as const, callbackUrl: "https://example.com/hook", secret: "old-secret" }, 62 63 ],
+11 -1
app/routes/api/automations/[rkey].ts
··· 28 28 | { type: "record"; targetCollection: string; recordTemplate: string; comment?: string }; 29 29 30 30 const VALID_OPERATORS = new Set(["eq", "startsWith", "endsWith", "contains"]); 31 + const VALID_OPERATIONS = new Set(["create", "update", "delete"]); 31 32 32 33 function findAutomation(did: string, rkey: string) { 33 34 return db.query.automations.findFirst({ ··· 56 57 name: auto.name, 57 58 description: auto.description, 58 59 lexicon: auto.lexicon, 60 + operation: auto.operation, 59 61 actions: auto.actions.map((a) => 60 62 a.$type === "webhook" 61 63 ? { ··· 93 95 const body = await c.req.json<{ 94 96 name?: string; 95 97 description?: string | null; 98 + operation?: string; 96 99 actions?: ActionInput[]; 97 100 fetches?: Array<{ name: string; uri: string; comment?: string }>; 98 101 conditions?: Array<{ field: string; operator?: string; value: string; comment?: string }>; ··· 112 115 return c.json({ error: "Description must be 1024 characters or less" }, 400); 113 116 } 114 117 118 + const operation = body.operation ?? auto.operation; 119 + if (!VALID_OPERATIONS.has(operation)) { 120 + return c.json({ error: "Operation must be one of: create, update, delete" }, 400); 121 + } 122 + 115 123 const conditions = body.conditions 116 124 ? body.conditions 117 - .filter((cond) => cond.field && cond.value) 125 + .filter((cond) => cond.field && cond.value && cond.field !== "event.commit.operation") 118 126 .map((cond) => ({ 119 127 field: cond.field, 120 128 operator: cond.operator ?? "eq", ··· 295 303 name: name.trim(), 296 304 description: description?.trim() || undefined, 297 305 lexicon: auto.lexicon, 306 + operation, 298 307 actions: pdsActions, 299 308 fetches: 300 309 pdsFetches ?? ··· 320 329 .set({ 321 330 name: name.trim(), 322 331 description: description?.trim() || null, 332 + operation, 323 333 actions: localActions, 324 334 fetches: localFetches, 325 335 conditions,
+1
app/routes/api/automations/[rkey]/logs.test.ts
··· 30 30 name: "Test Auto", 31 31 description: null, 32 32 lexicon: "app.bsky.feed.like", 33 + operation: "create", 33 34 actions: [{ $type: "webhook" as const, callbackUrl: "https://example.com/hook", secret: "sec" }], 34 35 fetches: [] as any[], 35 36 conditions: [] as any[],
+44
app/routes/api/automations/index.test.ts
··· 87 87 }); 88 88 }); 89 89 90 + it("returns 400 for missing operation", async () => { 91 + const res = await app.request( 92 + jsonReq("/api/automations", { 93 + name: "Test", 94 + lexicon: "app.bsky.feed.like", 95 + actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 96 + }), 97 + ); 98 + expect(res.status).toBe(400); 99 + const body = await res.json(); 100 + expect(body.error).toContain("Operation"); 101 + }); 102 + 103 + it("returns 400 for invalid operation", async () => { 104 + const res = await app.request( 105 + jsonReq("/api/automations", { 106 + name: "Test", 107 + lexicon: "app.bsky.feed.like", 108 + operation: "invalid", 109 + actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 110 + }), 111 + ); 112 + expect(res.status).toBe(400); 113 + }); 114 + 90 115 it("creates a webhook automation and returns 201", async () => { 91 116 const res = await app.request( 92 117 jsonReq("/api/automations", { 93 118 name: "My Auto", 94 119 lexicon: "app.bsky.feed.like", 120 + operation: "create", 95 121 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 96 122 }), 97 123 ); ··· 109 135 jsonReq("/api/automations", { 110 136 name: "Record Auto", 111 137 lexicon: "app.bsky.feed.like", 138 + operation: "create", 112 139 actions: [ 113 140 { 114 141 type: "record", ··· 127 154 jsonReq("/api/automations", { 128 155 name: "", 129 156 lexicon: "app.bsky.feed.like", 157 + operation: "create", 130 158 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 131 159 }), 132 160 ); ··· 138 166 jsonReq("/api/automations", { 139 167 name: "a".repeat(129), 140 168 lexicon: "app.bsky.feed.like", 169 + operation: "create", 141 170 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 142 171 }), 143 172 ); ··· 150 179 name: "Test", 151 180 description: "x".repeat(1025), 152 181 lexicon: "app.bsky.feed.like", 182 + operation: "create", 153 183 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 154 184 }), 155 185 ); ··· 161 191 jsonReq("/api/automations", { 162 192 name: "Test", 163 193 lexicon: "not-valid", 194 + operation: "create", 164 195 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 165 196 }), 166 197 ); ··· 172 203 jsonReq("/api/automations", { 173 204 name: "Test", 174 205 lexicon: "blocked.nsid.something", 206 + operation: "create", 175 207 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 176 208 }), 177 209 ); ··· 183 215 jsonReq("/api/automations", { 184 216 name: "Test", 185 217 lexicon: "app.bsky.feed.like", 218 + operation: "create", 186 219 actions: [], 187 220 }), 188 221 ); ··· 194 227 jsonReq("/api/automations", { 195 228 name: "Test", 196 229 lexicon: "app.bsky.feed.like", 230 + operation: "create", 197 231 actions: Array.from({ length: 11 }, () => ({ 198 232 type: "webhook", 199 233 callbackUrl: "https://example.com/hook", ··· 210 244 jsonReq("/api/automations", { 211 245 name: "Test", 212 246 lexicon: "app.bsky.feed.like", 247 + operation: "create", 213 248 actions: [{ type: "webhook", callbackUrl: "not-a-url" }], 214 249 }), 215 250 ); ··· 226 261 jsonReq("/api/automations", { 227 262 name: "Test", 228 263 lexicon: "app.bsky.feed.like", 264 + operation: "create", 229 265 actions: [{ type: "webhook", callbackUrl: "https://127.0.0.1/hook" }], 230 266 }), 231 267 ); ··· 241 277 jsonReq("/api/automations", { 242 278 name: "Test", 243 279 lexicon: "app.bsky.feed.like", 280 + operation: "create", 244 281 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 245 282 }), 246 283 ); ··· 256 293 jsonReq("/api/automations", { 257 294 name: "Test", 258 295 lexicon: "app.bsky.feed.like", 296 + operation: "create", 259 297 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 260 298 }), 261 299 ); ··· 271 309 jsonReq("/api/automations", { 272 310 name: "Test", 273 311 lexicon: "app.bsky.feed.like", 312 + operation: "create", 274 313 actions: [ 275 314 { type: "record", targetCollection: "app.bsky.feed.post", recordTemplate: "not json" }, 276 315 ], ··· 284 323 jsonReq("/api/automations", { 285 324 name: "Test", 286 325 lexicon: "app.bsky.feed.like", 326 + operation: "create", 287 327 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 288 328 conditions: [{ field: "event.did", operator: "invalid", value: "x" }], 289 329 }), ··· 296 336 jsonReq("/api/automations", { 297 337 name: "Test", 298 338 lexicon: "app.bsky.feed.like", 339 + operation: "create", 299 340 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 300 341 conditions: Array.from({ length: 21 }, () => ({ 301 342 field: "event.did", ··· 312 353 jsonReq("/api/automations", { 313 354 name: "Test", 314 355 lexicon: "app.bsky.feed.like", 356 + operation: "create", 315 357 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 316 358 fetches: Array.from({ length: 6 }, (_, i) => ({ 317 359 name: `step${i}`, ··· 329 371 jsonReq("/api/automations", { 330 372 name: "Test", 331 373 lexicon: "app.bsky.feed.like", 374 + operation: "create", 332 375 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 333 376 }), 334 377 ); ··· 340 383 jsonReq("/api/automations", { 341 384 name: "Test", 342 385 lexicon: "app.bsky.feed.like", 386 + operation: "create", 343 387 actions: [{ type: "invalid" }], 344 388 }), 345 389 );
+11 -1
app/routes/api/automations/index.ts
··· 27 27 | { type: "record"; targetCollection: string; recordTemplate: string; comment?: string }; 28 28 29 29 const VALID_OPERATORS = new Set(["eq", "startsWith", "endsWith", "contains"]); 30 + const VALID_OPERATIONS = new Set(["create", "update", "delete"]); 30 31 31 32 export const GET = createRoute(async (c) => { 32 33 const user = c.get("user"); ··· 40 41 name: r.name, 41 42 description: r.description, 42 43 lexicon: r.lexicon, 44 + operation: r.operation, 43 45 actions: r.actions.map((a) => 44 46 a.$type === "webhook" 45 47 ? { ··· 64 66 name: string; 65 67 description?: string; 66 68 lexicon: string; 69 + operation: string; 67 70 actions: ActionInput[]; 68 71 fetches?: Array<{ name: string; uri: string; comment?: string }>; 69 72 conditions?: Array<{ field: string; operator?: string; value: string; comment?: string }>; ··· 89 92 return c.json({ error: "This lexicon is not allowed on this instance" }, 403); 90 93 } 91 94 95 + // Validate operation 96 + if (!body.operation || !VALID_OPERATIONS.has(body.operation)) { 97 + return c.json({ error: "Operation must be one of: create, update, delete" }, 400); 98 + } 99 + 92 100 // Validate actions 93 101 if (!body.actions || body.actions.length === 0) { 94 102 return c.json({ error: "At least one action is required" }, 400); ··· 100 108 // Normalize and validate conditions 101 109 // {{self}} is kept as-is — resolved at match time to the automation owner's DID 102 110 const conditions = (body.conditions ?? []) 103 - .filter((cond) => cond.field && cond.value) 111 + .filter((cond) => cond.field && cond.value && cond.field !== "event.commit.operation") 104 112 .map((cond) => ({ 105 113 field: cond.field, 106 114 operator: cond.operator ?? "eq", ··· 216 224 name: body.name.trim(), 217 225 description: body.description?.trim() || undefined, 218 226 lexicon: body.lexicon, 227 + operation: body.operation, 219 228 actions: pdsActions, 220 229 fetches: pdsFetches.length > 0 ? pdsFetches : undefined, 221 230 conditions, ··· 238 247 name: body.name.trim(), 239 248 description: body.description?.trim() || null, 240 249 lexicon: body.lexicon, 250 + operation: body.operation, 241 251 actions: localActions, 242 252 fetches: localFetches, 243 253 conditions,
+4
app/routes/dashboard/automations/[rkey].tsx
··· 81 81 <dd> 82 82 <InlineCode>{auto.lexicon}</InlineCode> 83 83 </dd> 84 + <dt>Operation</dt> 85 + <dd> 86 + <InlineCode>{auto.operation}</InlineCode> 87 + </dd> 84 88 <dt>Status</dt> 85 89 <dd> 86 90 <span data-automation-status>
+1
app/routes/dashboard/automations/[rkey]/edit.tsx
··· 57 57 name: auto.name, 58 58 description: auto.description, 59 59 lexicon: auto.lexicon, 60 + operation: auto.operation, 60 61 actions: auto.actions, 61 62 fetches: auto.fetches, 62 63 conditions: auto.conditions,
+4
app/routes/dashboard/index.tsx
··· 50 50 <tr> 51 51 <th>Name</th> 52 52 <th>Lexicon</th> 53 + <th>Operation</th> 53 54 <th>Actions</th> 54 55 <th>Status</th> 55 56 <th></th> ··· 63 64 </td> 64 65 <td> 65 66 <InlineCode>{auto.lexicon}</InlineCode> 67 + </td> 68 + <td> 69 + <InlineCode>{auto.operation}</InlineCode> 66 70 </td> 67 71 <td> 68 72 {auto.actions.length} action{auto.actions.length !== 1 ? "s" : ""}
+7 -1
lexicons/run/airglow/automation.json
··· 8 8 "key": "tid", 9 9 "record": { 10 10 "type": "object", 11 - "required": ["name", "lexicon", "actions", "createdAt"], 11 + "required": ["name", "lexicon", "operation", "actions", "createdAt"], 12 12 "properties": { 13 13 "name": { 14 14 "type": "string", ··· 24 24 "type": "string", 25 25 "description": "NSID of the collection to listen to.", 26 26 "maxLength": 256 27 + }, 28 + "operation": { 29 + "type": "string", 30 + "description": "The commit operation this automation responds to.", 31 + "knownValues": ["create", "update", "delete"], 32 + "maxLength": 16 27 33 }, 28 34 "actions": { 29 35 "type": "array",
+1
lib/automations/pds.ts
··· 57 57 name: string; 58 58 description?: string; 59 59 lexicon: string; 60 + operation: string; 60 61 actions: PdsAction[]; 61 62 fetches?: PdsFetchStep[]; 62 63 conditions: Array<{ field: string; operator: string; value: string; comment?: string }>;
+1
lib/db/migrations/0001_brave_war_machine.sql
··· 1 + ALTER TABLE `automations` ADD `operation` text DEFAULT 'create' NOT NULL;
+339
lib/db/migrations/meta/0001_snapshot.json
··· 1 + { 2 + "version": "6", 3 + "dialect": "sqlite", 4 + "id": "63ec3970-d9f2-4540-918e-56c550f7f5b9", 5 + "prevId": "053c31b3-dfd6-439c-af03-dd65e031d610", 6 + "tables": { 7 + "automations": { 8 + "name": "automations", 9 + "columns": { 10 + "uri": { 11 + "name": "uri", 12 + "type": "text", 13 + "primaryKey": true, 14 + "notNull": true, 15 + "autoincrement": false 16 + }, 17 + "did": { 18 + "name": "did", 19 + "type": "text", 20 + "primaryKey": false, 21 + "notNull": true, 22 + "autoincrement": false 23 + }, 24 + "rkey": { 25 + "name": "rkey", 26 + "type": "text", 27 + "primaryKey": false, 28 + "notNull": true, 29 + "autoincrement": false 30 + }, 31 + "name": { 32 + "name": "name", 33 + "type": "text", 34 + "primaryKey": false, 35 + "notNull": true, 36 + "autoincrement": false 37 + }, 38 + "description": { 39 + "name": "description", 40 + "type": "text", 41 + "primaryKey": false, 42 + "notNull": false, 43 + "autoincrement": false 44 + }, 45 + "lexicon": { 46 + "name": "lexicon", 47 + "type": "text", 48 + "primaryKey": false, 49 + "notNull": true, 50 + "autoincrement": false 51 + }, 52 + "operation": { 53 + "name": "operation", 54 + "type": "text", 55 + "primaryKey": false, 56 + "notNull": true, 57 + "autoincrement": false, 58 + "default": "'create'" 59 + }, 60 + "actions": { 61 + "name": "actions", 62 + "type": "text", 63 + "primaryKey": false, 64 + "notNull": true, 65 + "autoincrement": false, 66 + "default": "'[]'" 67 + }, 68 + "fetches": { 69 + "name": "fetches", 70 + "type": "text", 71 + "primaryKey": false, 72 + "notNull": true, 73 + "autoincrement": false, 74 + "default": "'[]'" 75 + }, 76 + "conditions": { 77 + "name": "conditions", 78 + "type": "text", 79 + "primaryKey": false, 80 + "notNull": true, 81 + "autoincrement": false, 82 + "default": "'[]'" 83 + }, 84 + "active": { 85 + "name": "active", 86 + "type": "integer", 87 + "primaryKey": false, 88 + "notNull": true, 89 + "autoincrement": false, 90 + "default": false 91 + }, 92 + "indexed_at": { 93 + "name": "indexed_at", 94 + "type": "integer", 95 + "primaryKey": false, 96 + "notNull": true, 97 + "autoincrement": false 98 + } 99 + }, 100 + "indexes": {}, 101 + "foreignKeys": {}, 102 + "compositePrimaryKeys": {}, 103 + "uniqueConstraints": {}, 104 + "checkConstraints": {} 105 + }, 106 + "delivery_logs": { 107 + "name": "delivery_logs", 108 + "columns": { 109 + "id": { 110 + "name": "id", 111 + "type": "integer", 112 + "primaryKey": true, 113 + "notNull": true, 114 + "autoincrement": true 115 + }, 116 + "automation_uri": { 117 + "name": "automation_uri", 118 + "type": "text", 119 + "primaryKey": false, 120 + "notNull": true, 121 + "autoincrement": false 122 + }, 123 + "action_index": { 124 + "name": "action_index", 125 + "type": "integer", 126 + "primaryKey": false, 127 + "notNull": true, 128 + "autoincrement": false, 129 + "default": 0 130 + }, 131 + "event_time_us": { 132 + "name": "event_time_us", 133 + "type": "integer", 134 + "primaryKey": false, 135 + "notNull": true, 136 + "autoincrement": false 137 + }, 138 + "payload": { 139 + "name": "payload", 140 + "type": "text", 141 + "primaryKey": false, 142 + "notNull": false, 143 + "autoincrement": false 144 + }, 145 + "status_code": { 146 + "name": "status_code", 147 + "type": "integer", 148 + "primaryKey": false, 149 + "notNull": false, 150 + "autoincrement": false 151 + }, 152 + "error": { 153 + "name": "error", 154 + "type": "text", 155 + "primaryKey": false, 156 + "notNull": false, 157 + "autoincrement": false 158 + }, 159 + "attempt": { 160 + "name": "attempt", 161 + "type": "integer", 162 + "primaryKey": false, 163 + "notNull": true, 164 + "autoincrement": false, 165 + "default": 1 166 + }, 167 + "created_at": { 168 + "name": "created_at", 169 + "type": "integer", 170 + "primaryKey": false, 171 + "notNull": true, 172 + "autoincrement": false 173 + } 174 + }, 175 + "indexes": {}, 176 + "foreignKeys": { 177 + "delivery_logs_automation_uri_automations_uri_fk": { 178 + "name": "delivery_logs_automation_uri_automations_uri_fk", 179 + "tableFrom": "delivery_logs", 180 + "tableTo": "automations", 181 + "columnsFrom": ["automation_uri"], 182 + "columnsTo": ["uri"], 183 + "onDelete": "cascade", 184 + "onUpdate": "no action" 185 + } 186 + }, 187 + "compositePrimaryKeys": {}, 188 + "uniqueConstraints": {}, 189 + "checkConstraints": {} 190 + }, 191 + "lexicon_cache": { 192 + "name": "lexicon_cache", 193 + "columns": { 194 + "nsid": { 195 + "name": "nsid", 196 + "type": "text", 197 + "primaryKey": true, 198 + "notNull": true, 199 + "autoincrement": false 200 + }, 201 + "schema": { 202 + "name": "schema", 203 + "type": "text", 204 + "primaryKey": false, 205 + "notNull": true, 206 + "autoincrement": false 207 + }, 208 + "fetched_at": { 209 + "name": "fetched_at", 210 + "type": "integer", 211 + "primaryKey": false, 212 + "notNull": true, 213 + "autoincrement": false 214 + } 215 + }, 216 + "indexes": {}, 217 + "foreignKeys": {}, 218 + "compositePrimaryKeys": {}, 219 + "uniqueConstraints": {}, 220 + "checkConstraints": {} 221 + }, 222 + "oauth_sessions": { 223 + "name": "oauth_sessions", 224 + "columns": { 225 + "key": { 226 + "name": "key", 227 + "type": "text", 228 + "primaryKey": true, 229 + "notNull": true, 230 + "autoincrement": false 231 + }, 232 + "value": { 233 + "name": "value", 234 + "type": "text", 235 + "primaryKey": false, 236 + "notNull": true, 237 + "autoincrement": false 238 + }, 239 + "expires_at": { 240 + "name": "expires_at", 241 + "type": "integer", 242 + "primaryKey": false, 243 + "notNull": false, 244 + "autoincrement": false 245 + } 246 + }, 247 + "indexes": {}, 248 + "foreignKeys": {}, 249 + "compositePrimaryKeys": {}, 250 + "uniqueConstraints": {}, 251 + "checkConstraints": {} 252 + }, 253 + "oauth_states": { 254 + "name": "oauth_states", 255 + "columns": { 256 + "key": { 257 + "name": "key", 258 + "type": "text", 259 + "primaryKey": true, 260 + "notNull": true, 261 + "autoincrement": false 262 + }, 263 + "value": { 264 + "name": "value", 265 + "type": "text", 266 + "primaryKey": false, 267 + "notNull": true, 268 + "autoincrement": false 269 + }, 270 + "expires_at": { 271 + "name": "expires_at", 272 + "type": "integer", 273 + "primaryKey": false, 274 + "notNull": false, 275 + "autoincrement": false 276 + } 277 + }, 278 + "indexes": {}, 279 + "foreignKeys": {}, 280 + "compositePrimaryKeys": {}, 281 + "uniqueConstraints": {}, 282 + "checkConstraints": {} 283 + }, 284 + "users": { 285 + "name": "users", 286 + "columns": { 287 + "id": { 288 + "name": "id", 289 + "type": "integer", 290 + "primaryKey": true, 291 + "notNull": true, 292 + "autoincrement": true 293 + }, 294 + "did": { 295 + "name": "did", 296 + "type": "text", 297 + "primaryKey": false, 298 + "notNull": true, 299 + "autoincrement": false 300 + }, 301 + "handle": { 302 + "name": "handle", 303 + "type": "text", 304 + "primaryKey": false, 305 + "notNull": true, 306 + "autoincrement": false 307 + }, 308 + "created_at": { 309 + "name": "created_at", 310 + "type": "integer", 311 + "primaryKey": false, 312 + "notNull": true, 313 + "autoincrement": false 314 + } 315 + }, 316 + "indexes": { 317 + "users_did_unique": { 318 + "name": "users_did_unique", 319 + "columns": ["did"], 320 + "isUnique": true 321 + } 322 + }, 323 + "foreignKeys": {}, 324 + "compositePrimaryKeys": {}, 325 + "uniqueConstraints": {}, 326 + "checkConstraints": {} 327 + } 328 + }, 329 + "views": {}, 330 + "enums": {}, 331 + "_meta": { 332 + "schemas": {}, 333 + "tables": {}, 334 + "columns": {} 335 + }, 336 + "internal": { 337 + "indexes": {} 338 + } 339 + }
+7
lib/db/migrations/meta/_journal.json
··· 8 8 "when": 1775679586984, 9 9 "tag": "0000_chunky_sersi", 10 10 "breakpoints": true 11 + }, 12 + { 13 + "idx": 1, 14 + "version": "6", 15 + "when": 1775725820160, 16 + "tag": "0001_brave_war_machine", 17 + "breakpoints": true 11 18 } 12 19 ] 13 20 }
+1
lib/db/schema.ts
··· 40 40 name: text("name").notNull(), 41 41 description: text("description"), 42 42 lexicon: text("lexicon").notNull(), // NSID being watched 43 + operation: text("operation").notNull().default("create"), // "create" | "update" | "delete" 43 44 actions: text("actions", { mode: "json" }).notNull().$type<Action[]>().default([]), 44 45 fetches: text("fetches", { mode: "json" }).notNull().$type<FetchStep[]>().default([]), 45 46 conditions: text("conditions", { mode: "json" })
+25
lib/jetstream/consumer.test.ts
··· 207 207 expect(handler).not.toHaveBeenCalled(); 208 208 }); 209 209 210 + it("does not call handler for non-matching operation", async () => { 211 + await db.insert(automations).values( 212 + makeAutomation({ 213 + uri: "at://u/s/1", 214 + rkey: "1", 215 + lexicon: "app.bsky.feed.like", 216 + operation: "create", 217 + }), 218 + ); 219 + await consumer.refreshAutomations(); 220 + 221 + const event = makeEvent({ 222 + commit: { 223 + rev: "r", 224 + operation: "delete", 225 + collection: "app.bsky.feed.like", 226 + rkey: "rk", 227 + record: {}, 228 + }, 229 + }); 230 + (consumer as any).processEvent(event); 231 + 232 + expect(handler).not.toHaveBeenCalled(); 233 + }); 234 + 210 235 it("does not call handler when conditions do not match", async () => { 211 236 await db.insert(automations).values( 212 237 makeAutomation({
+21 -51
lib/jetstream/consumer.ts
··· 4 4 import { config } from "../config.js"; 5 5 import { isNsidAllowed } from "../lexicons/resolver.js"; 6 6 import { matchConditions, type JetstreamEvent } from "./matcher.js"; 7 - import { readFileSync, writeFileSync, mkdirSync } from "node:fs"; 8 - import { dirname, join } from "node:path"; 9 7 10 8 type Automation = typeof automations.$inferSelect; 11 9 ··· 16 14 17 15 type EventHandler = (match: MatchedEvent) => void | Promise<void>; 18 16 19 - const CURSOR_FLUSH_INTERVAL = 5_000; // persist cursor every 5s 20 - 21 - const cursorPath = join(dirname(config.databasePath), "jetstream-cursor"); 22 - 23 17 export class JetstreamConsumer { 24 18 private ws: WebSocket | null = null; 25 - private autosByCollection = new Map<string, Automation[]>(); 26 - private cursor: number | null = null; 27 - private cursorDirty = false; 28 - private cursorTimer: ReturnType<typeof setInterval> | null = null; 19 + private autosByCollectionOp = new Map<string, Automation[]>(); 29 20 private reconnectTimer: ReturnType<typeof setTimeout> | null = null; 30 21 private reconnectDelay = 1000; 31 22 private running = false; ··· 37 28 38 29 async start() { 39 30 this.running = true; 40 - this.cursor = this.loadCursor(); 41 31 await this.refreshAutomations(); 42 - this.cursorTimer = setInterval(() => this.flushCursor(), CURSOR_FLUSH_INTERVAL); 43 32 } 44 33 45 34 stop() { 46 35 this.running = false; 47 - if (this.cursorTimer) clearInterval(this.cursorTimer); 48 36 if (this.reconnectTimer) clearTimeout(this.reconnectTimer); 49 - this.flushCursor(); 50 37 this.ws?.close(); 51 38 this.ws = null; 52 39 } ··· 56 43 where: eq(automations.active, true), 57 44 }); 58 45 59 - const byCollection = new Map<string, Automation[]>(); 46 + const byCollectionOp = new Map<string, Automation[]>(); 60 47 for (const row of rows) { 61 48 if (!isNsidAllowed(row.lexicon, config.nsidAllowlist, config.nsidBlocklist)) continue; 62 - const list = byCollection.get(row.lexicon) || []; 49 + const key = `${row.lexicon}\0${row.operation}`; 50 + const list = byCollectionOp.get(key) || []; 63 51 list.push(row); 64 - byCollection.set(row.lexicon, list); 52 + byCollectionOp.set(key, list); 65 53 } 66 54 67 - const oldCollections = new Set(this.autosByCollection.keys()); 68 - const newCollections = new Set(byCollection.keys()); 69 - this.autosByCollection = byCollection; 55 + const deriveCollections = (map: Map<string, Automation[]>) => { 56 + const cols = new Set<string>(); 57 + for (const key of map.keys()) cols.add(key.slice(0, key.indexOf("\0"))); 58 + return cols; 59 + }; 60 + 61 + const oldCollections = deriveCollections(this.autosByCollectionOp); 62 + const newCollections = deriveCollections(byCollectionOp); 63 + this.autosByCollectionOp = byCollectionOp; 70 64 71 65 const collectionsChanged = 72 66 oldCollections.size !== newCollections.size || ··· 94 88 this.reconnectTimer = null; 95 89 } 96 90 97 - const collections = [...this.autosByCollection.keys()]; 91 + const collections = [ 92 + ...new Set([...this.autosByCollectionOp.keys()].map((k) => k.slice(0, k.indexOf("\0")))), 93 + ]; 98 94 if (collections.length === 0) { 99 95 console.log("Jetstream: no active automations, waiting"); 100 96 return; ··· 104 100 for (const col of collections) { 105 101 url.searchParams.append("wantedCollections", col); 106 102 } 107 - if (this.cursor != null) { 108 - url.searchParams.set("cursor", String(this.cursor)); 109 - } 110 - 111 103 console.log( 112 104 `Jetstream: connecting (${collections.length} collection${collections.length === 1 ? "" : "s"})`, 113 105 ); ··· 121 113 this.ws.addEventListener("message", (msg) => { 122 114 try { 123 115 const event = JSON.parse(msg.data as string) as JetstreamEvent; 124 - this.cursor = event.time_us; 125 - this.cursorDirty = true; 126 - 127 116 if (event.kind === "commit" && event.commit) { 128 117 this.processEvent(event); 129 118 } ··· 134 123 135 124 this.ws.addEventListener("close", () => { 136 125 this.ws = null; 137 - if (this.running && this.autosByCollection.size > 0) { 126 + if (this.running && this.autosByCollectionOp.size > 0) { 138 127 console.log(`Jetstream: reconnecting in ${this.reconnectDelay}ms`); 139 128 this.reconnectTimer = setTimeout(() => { 140 129 this.reconnectTimer = null; ··· 151 140 152 141 private processEvent(event: JetstreamEvent) { 153 142 const collection = event.commit!.collection; 154 - const autos = this.autosByCollection.get(collection); 143 + const operation = event.commit!.operation; 144 + const key = `${collection}\0${operation}`; 145 + const autos = this.autosByCollectionOp.get(key); 155 146 if (!autos) return; 156 147 157 148 for (const auto of autos) { ··· 160 151 console.error("Jetstream: handler error:", err); 161 152 }); 162 153 } 163 - } 164 - } 165 - 166 - private loadCursor(): number | null { 167 - try { 168 - const data = readFileSync(cursorPath, "utf-8").trim(); 169 - const cursor = Number(data); 170 - return Number.isFinite(cursor) ? cursor : null; 171 - } catch { 172 - return null; 173 - } 174 - } 175 - 176 - private flushCursor() { 177 - if (!this.cursorDirty || this.cursor == null) return; 178 - try { 179 - mkdirSync(dirname(cursorPath), { recursive: true }); 180 - writeFileSync(cursorPath, String(this.cursor)); 181 - this.cursorDirty = false; 182 - } catch (err) { 183 - console.error("Jetstream: failed to persist cursor:", err); 184 154 } 185 155 } 186 156 }
-5
lib/jetstream/matcher.test.ts
··· 157 157 expect(matchConditions(event, conditions, ownerDid)).toBe(true); 158 158 }); 159 159 160 - it("resolves event.commit.operation", () => { 161 - const conditions = [{ field: "event.commit.operation", operator: "eq", value: "create" }]; 162 - expect(matchConditions(makeEvent(), conditions, ownerDid)).toBe(true); 163 - }); 164 - 165 160 it("resolves dotted record path (single level)", () => { 166 161 const event = makeEvent({ 167 162 commit: {
+3 -2
lib/jetstream/matcher.ts
··· 23 23 /** 24 24 * Resolve a dotted field path against an event. 25 25 * - "event.did" → event.did 26 - * - "event.commit.operation" → event.commit.operation 27 26 * - "status" → event.commit.record.status 28 27 * - "record.path" → event.commit.record.record.path 29 28 * 30 29 * Legacy: "repo" → event.did 30 + * 31 + * Note: "event.commit.operation" is no longer supported as a condition field. 32 + * Operation filtering is handled at the consumer level via the top-level `operation` field. 31 33 */ 32 34 function resolveField(event: JetstreamEvent, field: string): string | undefined { 33 35 if (field === "event.did" || field === "repo") return event.did; 34 - if (field === "event.commit.operation") return event.commit?.operation; 35 36 36 37 if (!event.commit?.record) return undefined; 37 38
+11 -8
lib/test/db.ts
··· 1 - import { readFileSync } from "node:fs"; 1 + import { readFileSync, readdirSync } from "node:fs"; 2 2 import { resolve } from "node:path"; 3 3 import Database from "better-sqlite3"; 4 4 import { drizzle } from "drizzle-orm/better-sqlite3"; 5 5 import * as schema from "../db/schema.js"; 6 6 7 - const migrationSql = readFileSync( 8 - resolve(__dirname, "../db/migrations/0000_chunky_sersi.sql"), 9 - "utf-8", 10 - ); 7 + const migrationsDir = resolve(__dirname, "../db/migrations"); 8 + const migrationFiles = readdirSync(migrationsDir) 9 + .filter((f) => f.endsWith(".sql")) 10 + .sort(); 11 11 12 12 export function createTestDb() { 13 13 const sqlite = new Database(":memory:"); 14 14 sqlite.pragma("foreign_keys = ON"); 15 15 16 - for (const statement of migrationSql.split("--> statement-breakpoint")) { 17 - const sql = statement.trim(); 18 - if (sql) sqlite.exec(sql); 16 + for (const file of migrationFiles) { 17 + const sql = readFileSync(resolve(migrationsDir, file), "utf-8"); 18 + for (const statement of sql.split("--> statement-breakpoint")) { 19 + const s = statement.trim(); 20 + if (s) sqlite.exec(s); 21 + } 19 22 } 20 23 21 24 return drizzle(sqlite, { schema });
+2
lib/test/fixtures.ts
··· 9 9 name: string; 10 10 description: string | null; 11 11 lexicon: string; 12 + operation: string; 12 13 actions: Action[]; 13 14 fetches: FetchStep[]; 14 15 conditions: Array<{ field: string; operator: string; value: string; comment?: string }>; ··· 73 74 name: "Test Automation", 74 75 description: null, 75 76 lexicon: "app.bsky.feed.like", 77 + operation: "create", 76 78 actions: [makeWebhookAction()], 77 79 fetches: [], 78 80 conditions: [],