Webhooks for the AT Protocol airglow.run
atproto atprotocol automation webhook
12
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: various improvements

Hugo 14077da8 97b1788b

+318 -446
+13 -75
lib/actions/bookmark.ts
··· 2 2 import { type BookmarkAction } from "../db/schema.js"; 3 3 import { createArbitraryRecord } from "../automations/pds.js"; 4 4 import { renderTextTemplate, type FetchContext } from "./template.js"; 5 - import { RETRY_DELAYS, isSuccess, isRetryable, logDelivery } from "./delivery.js"; 6 - import type { ActionResult } from "./executor.js"; 5 + import { parsePdsError, wrapWithDelivery, type ActionResult } from "./delivery.js"; 7 6 import type { MatchedEvent } from "../jetstream/consumer.js"; 8 7 import { config } from "../config.js"; 9 8 ··· 117 116 const created = await createArbitraryRecord(automation.did, TARGET_COLLECTION, record); 118 117 return { statusCode: 200, uri: created.uri, cid: created.cid }; 119 118 } catch (err) { 120 - const message = err instanceof Error ? err.message : String(err); 121 - const statusMatch = message.match(/\((\d{3})\)/); 122 - const statusCode = statusMatch ? Number(statusMatch[1]) : 0; 123 - return { statusCode, error: message }; 119 + return parsePdsError(err); 124 120 } 125 121 } 126 122 127 - function actionPayload(action: BookmarkAction): string { 128 - return JSON.stringify({ 129 - targetSource: action.targetSource, 130 - targetTitle: action.targetTitle, 131 - bodyValue: action.bodyValue, 132 - tags: action.tags, 133 - }); 134 - } 135 - 136 - function scheduleRetry( 137 - match: MatchedEvent, 138 - actionIndex: number, 139 - retryIndex: number, 140 - fetchContext?: FetchContext, 141 - item?: unknown, 142 - ) { 143 - if (retryIndex >= RETRY_DELAYS.length) return; 144 - 145 - setTimeout(async () => { 146 - try { 147 - const action = match.automation.actions[actionIndex] as BookmarkAction; 148 - const result = await execute(match, action, fetchContext, item); 149 - const body = actionPayload(action); 150 - 151 - await logDelivery( 152 - match.automation.uri, 153 - actionIndex, 154 - match.event.time_us, 155 - isSuccess(result.statusCode) ? null : body, 156 - result.statusCode, 157 - result.error ?? null, 158 - retryIndex + 2, 159 - ); 160 - 161 - if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 162 - scheduleRetry(match, actionIndex, retryIndex + 1, fetchContext, item); 163 - } 164 - } catch (err) { 165 - console.error("Bookmark retry error:", err); 166 - } 167 - }, RETRY_DELAYS[retryIndex]); 168 - } 169 - 170 123 /** Execute a bookmark action for a matched event. */ 171 - export async function executeBookmark( 172 - match: MatchedEvent, 173 - actionIndex: number, 174 - fetchContext?: FetchContext, 175 - item?: unknown, 176 - ): Promise<ActionResult> { 177 - const action = match.automation.actions[actionIndex] as BookmarkAction; 178 - const result = await execute(match, action, fetchContext, item); 179 - const body = actionPayload(action); 180 - 181 - await logDelivery( 182 - match.automation.uri, 183 - actionIndex, 184 - match.event.time_us, 185 - isSuccess(result.statusCode) ? null : body, 186 - result.statusCode, 187 - result.error ?? null, 188 - 1, 189 - ); 190 - 191 - if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 192 - scheduleRetry(match, actionIndex, 0, fetchContext, item); 193 - } 194 - 195 - return result; 196 - } 124 + export const executeBookmark = wrapWithDelivery( 125 + (match, i) => match.automation.actions[i] as BookmarkAction, 126 + execute, 127 + (action) => 128 + JSON.stringify({ 129 + targetSource: action.targetSource, 130 + targetTitle: action.targetTitle, 131 + bodyValue: action.bodyValue, 132 + tags: action.tags, 133 + }), 134 + );
+7 -67
lib/actions/bsky-post.ts
··· 2 2 import { createArbitraryRecord } from "../automations/pds.js"; 3 3 import { renderTextTemplate, type FetchContext } from "./template.js"; 4 4 import { detectFacets } from "./richtext.js"; 5 - import { RETRY_DELAYS, isSuccess, isRetryable, logDelivery } from "./delivery.js"; 6 - import type { ActionResult } from "./executor.js"; 5 + import { parsePdsError, wrapWithDelivery, type ActionResult } from "./delivery.js"; 7 6 import type { MatchedEvent } from "../jetstream/consumer.js"; 8 7 9 8 const TARGET_COLLECTION = "app.bsky.feed.post"; ··· 58 57 const created = await createArbitraryRecord(automation.did, TARGET_COLLECTION, record); 59 58 return { statusCode: 200, uri: created.uri, cid: created.cid }; 60 59 } catch (err) { 61 - const message = err instanceof Error ? err.message : String(err); 62 - const statusMatch = message.match(/\((\d{3})\)/); 63 - const statusCode = statusMatch ? Number(statusMatch[1]) : 0; 64 - return { statusCode, error: message }; 60 + return parsePdsError(err); 65 61 } 66 62 } 67 63 68 - function scheduleRetry( 69 - match: MatchedEvent, 70 - actionIndex: number, 71 - retryIndex: number, 72 - fetchContext?: FetchContext, 73 - item?: unknown, 74 - ) { 75 - if (retryIndex >= RETRY_DELAYS.length) return; 76 - 77 - setTimeout(async () => { 78 - try { 79 - const action = match.automation.actions[actionIndex] as BskyPostAction; 80 - const result = await execute(match, action, fetchContext, item); 81 - const body = JSON.stringify({ textTemplate: action.textTemplate }); 82 - 83 - await logDelivery( 84 - match.automation.uri, 85 - actionIndex, 86 - match.event.time_us, 87 - isSuccess(result.statusCode) ? null : body, 88 - result.statusCode, 89 - result.error ?? null, 90 - retryIndex + 2, 91 - ); 92 - 93 - if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 94 - scheduleRetry(match, actionIndex, retryIndex + 1, fetchContext, item); 95 - } 96 - } catch (err) { 97 - console.error("Bsky-post retry error:", err); 98 - } 99 - }, RETRY_DELAYS[retryIndex]); 100 - } 101 - 102 64 /** Execute a bsky-post action for a matched event. */ 103 - export async function executeBskyPost( 104 - match: MatchedEvent, 105 - actionIndex: number, 106 - fetchContext?: FetchContext, 107 - item?: unknown, 108 - ): Promise<ActionResult> { 109 - const action = match.automation.actions[actionIndex] as BskyPostAction; 110 - const result = await execute(match, action, fetchContext, item); 111 - 112 - const body = JSON.stringify({ textTemplate: action.textTemplate }); 113 - 114 - await logDelivery( 115 - match.automation.uri, 116 - actionIndex, 117 - match.event.time_us, 118 - isSuccess(result.statusCode) ? null : body, 119 - result.statusCode, 120 - result.error ?? null, 121 - 1, 122 - ); 123 - 124 - if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 125 - scheduleRetry(match, actionIndex, 0, fetchContext, item); 126 - } 127 - 128 - return result; 129 - } 65 + export const executeBskyPost = wrapWithDelivery( 66 + (match, i) => match.automation.actions[i] as BskyPostAction, 67 + execute, 68 + (action) => JSON.stringify({ textTemplate: action.textTemplate }), 69 + );
+105
lib/actions/delivery.ts
··· 1 1 import { db } from "../db/index.js"; 2 2 import { deliveryLogs } from "../db/schema.js"; 3 + import type { MatchedEvent } from "../jetstream/consumer.js"; 4 + import type { FetchContext } from "./template.js"; 5 + 6 + /** Result returned by every action executor for chaining. */ 7 + export type ActionResult = { 8 + statusCode: number; 9 + error?: string; 10 + /** Human-readable, non-error summary. Used by skip-style results (e.g. the 11 + * follow action's built-in safety checks) that want a readable trace in the 12 + * delivery log without marking the fire as a failure. */ 13 + message?: string; 14 + /** AT URI of the created/updated record (record-producing actions only). */ 15 + uri?: string; 16 + /** CID of the created/updated record (record-producing actions only). */ 17 + cid?: string; 18 + }; 3 19 4 20 export const RETRY_DELAYS = [5_000, 30_000]; 5 21 ··· 41 57 createdAt: new Date(), 42 58 }); 43 59 } 60 + 61 + /** Parse a PDS/XRPC error into an ActionResult-compatible shape. 62 + * Extracts the HTTP status code from the "(NNN)" pattern in the message. */ 63 + export function parsePdsError(err: unknown): { statusCode: number; error: string } { 64 + const message = err instanceof Error ? err.message : String(err); 65 + const statusMatch = message.match(/\((\d{3})\)/); 66 + const statusCode = statusMatch ? Number(statusMatch[1]) : 0; 67 + return { statusCode, error: message }; 68 + } 69 + 70 + type ExecuteFn<A> = ( 71 + match: MatchedEvent, 72 + action: A, 73 + fetchContext?: FetchContext, 74 + item?: unknown, 75 + ) => Promise<ActionResult>; 76 + 77 + type PayloadFn<A> = (action: A) => string; 78 + 79 + /** Wrap an action's `execute` function with delivery logging and retry logic. 80 + * Returns a function matching the `ActionHandler` signature in handler.ts. */ 81 + export function wrapWithDelivery<A>( 82 + getAction: (match: MatchedEvent, actionIndex: number) => A, 83 + execute: ExecuteFn<A>, 84 + actionPayload: PayloadFn<A>, 85 + ) { 86 + function scheduleRetry( 87 + match: MatchedEvent, 88 + action: A, 89 + actionIndex: number, 90 + retryIndex: number, 91 + fetchContext?: FetchContext, 92 + item?: unknown, 93 + ) { 94 + if (retryIndex >= RETRY_DELAYS.length) return; 95 + 96 + setTimeout(async () => { 97 + try { 98 + const result = await execute(match, action, fetchContext, item); 99 + const body = actionPayload(action); 100 + 101 + await logDelivery( 102 + match.automation.uri, 103 + actionIndex, 104 + match.event.time_us, 105 + isSuccess(result.statusCode) ? null : body, 106 + result.statusCode, 107 + result.error ?? null, 108 + retryIndex + 2, 109 + result.message ?? null, 110 + ); 111 + 112 + if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 113 + scheduleRetry(match, action, actionIndex, retryIndex + 1, fetchContext, item); 114 + } 115 + } catch (err) { 116 + console.error("Action retry error:", err); 117 + } 118 + }, RETRY_DELAYS[retryIndex]); 119 + } 120 + 121 + return async function executeWrapped( 122 + match: MatchedEvent, 123 + actionIndex: number, 124 + fetchContext?: FetchContext, 125 + item?: unknown, 126 + ): Promise<ActionResult> { 127 + const action = getAction(match, actionIndex); 128 + const result = await execute(match, action, fetchContext, item); 129 + const body = actionPayload(action); 130 + 131 + await logDelivery( 132 + match.automation.uri, 133 + actionIndex, 134 + match.event.time_us, 135 + isSuccess(result.statusCode) ? null : body, 136 + result.statusCode, 137 + result.error ?? null, 138 + 1, 139 + result.message ?? null, 140 + ); 141 + 142 + if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 143 + scheduleRetry(match, action, actionIndex, 0, fetchContext, item); 144 + } 145 + 146 + return result; 147 + }; 148 + }
+12 -85
lib/actions/executor.ts
··· 1 1 import { type RecordAction } from "../db/schema.js"; 2 2 import { createArbitraryRecord } from "../automations/pds.js"; 3 3 import { renderTemplate, type FetchContext } from "./template.js"; 4 - import { RETRY_DELAYS, isSuccess, isRetryable, logDelivery } from "./delivery.js"; 4 + import { parsePdsError, wrapWithDelivery, type ActionResult } from "./delivery.js"; 5 5 import type { MatchedEvent } from "../jetstream/consumer.js"; 6 6 7 - /** Result returned by every action executor for chaining. */ 8 - export type ActionResult = { 9 - statusCode: number; 10 - error?: string; 11 - /** Human-readable, non-error summary. Used by skip-style results (e.g. the 12 - * follow action's built-in safety checks) that want a readable trace in the 13 - * delivery log without marking the fire as a failure. */ 14 - message?: string; 15 - /** AT URI of the created/updated record (record-producing actions only). */ 16 - uri?: string; 17 - /** CID of the created/updated record (record-producing actions only). */ 18 - cid?: string; 19 - }; 7 + export type { ActionResult }; 20 8 21 9 async function execute( 22 10 match: MatchedEvent, ··· 40 28 const created = await createArbitraryRecord(automation.did, action.targetCollection, record); 41 29 return { statusCode: 200, uri: created.uri, cid: created.cid }; 42 30 } catch (err) { 43 - const message = err instanceof Error ? err.message : String(err); 44 - const statusMatch = message.match(/\((\d{3})\)/); 45 - const statusCode = statusMatch ? Number(statusMatch[1]) : 0; 46 - return { statusCode, error: message }; 31 + return parsePdsError(err); 47 32 } 48 33 } 49 34 50 - function scheduleRetry( 51 - match: MatchedEvent, 52 - actionIndex: number, 53 - retryIndex: number, 54 - fetchContext?: FetchContext, 55 - item?: unknown, 56 - ) { 57 - if (retryIndex >= RETRY_DELAYS.length) return; 58 - 59 - setTimeout(async () => { 60 - try { 61 - const action = match.automation.actions[actionIndex] as RecordAction; 62 - const result = await execute(match, action, fetchContext, item); 63 - const body = JSON.stringify({ 64 - targetCollection: action.targetCollection, 65 - recordTemplate: action.recordTemplate, 66 - }); 67 - 68 - await logDelivery( 69 - match.automation.uri, 70 - actionIndex, 71 - match.event.time_us, 72 - isSuccess(result.statusCode) ? null : body, 73 - result.statusCode, 74 - result.error ?? null, 75 - retryIndex + 2, 76 - ); 77 - 78 - if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 79 - scheduleRetry(match, actionIndex, retryIndex + 1, fetchContext, item); 80 - } 81 - } catch (err) { 82 - console.error("Action retry error:", err); 83 - } 84 - }, RETRY_DELAYS[retryIndex]); 85 - } 86 - 87 35 /** Execute a record action for a matched event. */ 88 - export async function executeAction( 89 - match: MatchedEvent, 90 - actionIndex: number, 91 - fetchContext?: FetchContext, 92 - item?: unknown, 93 - ): Promise<ActionResult> { 94 - const action = match.automation.actions[actionIndex] as RecordAction; 95 - const result = await execute(match, action, fetchContext, item); 96 - 97 - const body = JSON.stringify({ 98 - targetCollection: action.targetCollection, 99 - recordTemplate: action.recordTemplate, 100 - }); 101 - 102 - await logDelivery( 103 - match.automation.uri, 104 - actionIndex, 105 - match.event.time_us, 106 - isSuccess(result.statusCode) ? null : body, 107 - result.statusCode, 108 - result.error ?? null, 109 - 1, 110 - ); 111 - 112 - if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 113 - scheduleRetry(match, actionIndex, 0, fetchContext, item); 114 - } 115 - 116 - return result; 117 - } 36 + export const executeAction = wrapWithDelivery( 37 + (match, i) => match.automation.actions[i] as RecordAction, 38 + execute, 39 + (action) => 40 + JSON.stringify({ 41 + targetCollection: action.targetCollection, 42 + recordTemplate: action.recordTemplate, 43 + }), 44 + );
+14 -78
lib/actions/follow.ts
··· 3 3 import { fetchRecord } from "../pds/resolver.js"; 4 4 import { executeSearch } from "./searcher.js"; 5 5 import { renderTextTemplate, type FetchContext } from "./template.js"; 6 - import { RETRY_DELAYS, SKIP_STATUS, isSuccess, isRetryable, logDelivery } from "./delivery.js"; 6 + import { SKIP_STATUS, parsePdsError, wrapWithDelivery, type ActionResult } from "./delivery.js"; 7 7 import { DID_RE } from "./validation.js"; 8 8 import { FOLLOW_TARGETS } from "../automations/follow-targets.js"; 9 - import type { ActionResult } from "./executor.js"; 10 9 import type { MatchedEvent } from "../jetstream/consumer.js"; 11 10 12 11 async function checkProfileExists( ··· 77 76 } catch (err) { 78 77 // Same rationale as the profile pre-check: a transient search failure 79 78 // shouldn't block the follow. Worst case is a duplicate record, which 80 - // the Bluesky appview collapses anyway — storage carries two rows until 79 + // the Bluesky appview collapses anyway -- storage carries two rows until 81 80 // one is cleaned up, but the public graph is still correct. 82 81 console.warn( 83 82 `follow: already-follows pre-check failed for ${subject} on ${action.target}: ${err instanceof Error ? err.message : String(err)}`, ··· 114 113 return { statusCode: 400, error: `subject is not a valid DID: "${subject}"` }; 115 114 } 116 115 117 - // Built-in safety checks — run sequentially so users don't have to add them 116 + // Built-in safety checks -- run sequentially so users don't have to add them 118 117 // manually as fetches + conditions. Either check resolving to "skip" writes 119 118 // a 204 log entry and short-circuits before the PDS write. 120 119 const profileSkip = await checkProfileExists(action, subject); ··· 133 132 const created = await createArbitraryRecord(automation.did, collection, record); 134 133 return { statusCode: 200, uri: created.uri, cid: created.cid }; 135 134 } catch (err) { 136 - const message = err instanceof Error ? err.message : String(err); 137 - const statusMatch = message.match(/\((\d{3})\)/); 138 - const statusCode = statusMatch ? Number(statusMatch[1]) : 0; 139 - return { statusCode, error: message }; 135 + return parsePdsError(err); 140 136 } 141 137 } 142 138 143 - function actionPayload(action: FollowAction): string { 144 - return JSON.stringify({ 145 - target: action.target, 146 - collection: FOLLOW_TARGETS[action.target].collection, 147 - subject: action.subject, 148 - }); 149 - } 150 - 151 - function scheduleRetry( 152 - match: MatchedEvent, 153 - actionIndex: number, 154 - retryIndex: number, 155 - fetchContext?: FetchContext, 156 - item?: unknown, 157 - ) { 158 - if (retryIndex >= RETRY_DELAYS.length) return; 159 - 160 - setTimeout(async () => { 161 - try { 162 - const action = match.automation.actions[actionIndex] as FollowAction; 163 - const result = await execute(match, action, fetchContext, item); 164 - const body = actionPayload(action); 165 - 166 - await logDelivery( 167 - match.automation.uri, 168 - actionIndex, 169 - match.event.time_us, 170 - isSuccess(result.statusCode) ? null : body, 171 - result.statusCode, 172 - result.error ?? null, 173 - retryIndex + 2, 174 - result.message ?? null, 175 - ); 176 - 177 - if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 178 - scheduleRetry(match, actionIndex, retryIndex + 1, fetchContext, item); 179 - } 180 - } catch (err) { 181 - console.error("Follow retry error:", err); 182 - } 183 - }, RETRY_DELAYS[retryIndex]); 184 - } 185 - 186 139 /** Execute a follow action for a matched event. */ 187 - export async function executeFollow( 188 - match: MatchedEvent, 189 - actionIndex: number, 190 - fetchContext?: FetchContext, 191 - item?: unknown, 192 - ): Promise<ActionResult> { 193 - const action = match.automation.actions[actionIndex] as FollowAction; 194 - const result = await execute(match, action, fetchContext, item); 195 - const body = actionPayload(action); 196 - 197 - await logDelivery( 198 - match.automation.uri, 199 - actionIndex, 200 - match.event.time_us, 201 - isSuccess(result.statusCode) ? null : body, 202 - result.statusCode, 203 - result.error ?? null, 204 - 1, 205 - result.message ?? null, 206 - ); 207 - 208 - if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 209 - scheduleRetry(match, actionIndex, 0, fetchContext, item); 210 - } 211 - 212 - return result; 213 - } 140 + export const executeFollow = wrapWithDelivery( 141 + (match, i) => match.automation.actions[i] as FollowAction, 142 + execute, 143 + (action) => 144 + JSON.stringify({ 145 + target: action.target, 146 + collection: FOLLOW_TARGETS[action.target].collection, 147 + subject: action.subject, 148 + }), 149 + );
+12 -75
lib/actions/patch-record.ts
··· 2 2 import { patchArbitraryRecord } from "../automations/pds.js"; 3 3 import { fetchRecord, parseAtUri } from "../pds/resolver.js"; 4 4 import { renderTemplate, resolveUriTemplate, type FetchContext } from "./template.js"; 5 - import { RETRY_DELAYS, isSuccess, isRetryable, logDelivery } from "./delivery.js"; 6 - import type { ActionResult } from "./executor.js"; 5 + import { parsePdsError, wrapWithDelivery, type ActionResult } from "./delivery.js"; 7 6 import type { MatchedEvent } from "../jetstream/consumer.js"; 8 7 9 8 async function execute( ··· 79 78 ); 80 79 return { statusCode: 200, uri: result.uri, cid: result.cid }; 81 80 } catch (err) { 82 - const message = err instanceof Error ? err.message : String(err); 83 - const statusMatch = message.match(/\((\d{3})\)/); 84 - const statusCode = statusMatch ? Number(statusMatch[1]) : 0; 85 - return { statusCode, error: message }; 81 + return parsePdsError(err); 86 82 } 87 83 } 88 84 89 - function scheduleRetry( 90 - match: MatchedEvent, 91 - actionIndex: number, 92 - retryIndex: number, 93 - fetchContext?: FetchContext, 94 - item?: unknown, 95 - ) { 96 - if (retryIndex >= RETRY_DELAYS.length) return; 97 - 98 - setTimeout(async () => { 99 - try { 100 - const action = match.automation.actions[actionIndex] as PatchRecordAction; 101 - const result = await execute(match, action, fetchContext, item); 102 - const body = JSON.stringify({ 103 - targetCollection: action.targetCollection, 104 - baseRecordUri: action.baseRecordUri, 105 - recordTemplate: action.recordTemplate, 106 - }); 107 - 108 - await logDelivery( 109 - match.automation.uri, 110 - actionIndex, 111 - match.event.time_us, 112 - isSuccess(result.statusCode) ? null : body, 113 - result.statusCode, 114 - result.error ?? null, 115 - retryIndex + 2, 116 - ); 117 - 118 - if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 119 - scheduleRetry(match, actionIndex, retryIndex + 1, fetchContext, item); 120 - } 121 - } catch (err) { 122 - console.error("Patch-record retry error:", err); 123 - } 124 - }, RETRY_DELAYS[retryIndex]); 125 - } 126 - 127 85 /** Execute a patch-record action for a matched event. */ 128 - export async function executePatchRecord( 129 - match: MatchedEvent, 130 - actionIndex: number, 131 - fetchContext?: FetchContext, 132 - item?: unknown, 133 - ): Promise<ActionResult> { 134 - const action = match.automation.actions[actionIndex] as PatchRecordAction; 135 - const result = await execute(match, action, fetchContext, item); 136 - 137 - const body = JSON.stringify({ 138 - targetCollection: action.targetCollection, 139 - baseRecordUri: action.baseRecordUri, 140 - recordTemplate: action.recordTemplate, 141 - }); 142 - 143 - await logDelivery( 144 - match.automation.uri, 145 - actionIndex, 146 - match.event.time_us, 147 - isSuccess(result.statusCode) ? null : body, 148 - result.statusCode, 149 - result.error ?? null, 150 - 1, 151 - ); 152 - 153 - if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 154 - scheduleRetry(match, actionIndex, 0, fetchContext, item); 155 - } 156 - 157 - return result; 158 - } 86 + export const executePatchRecord = wrapWithDelivery( 87 + (match, i) => match.automation.actions[i] as PatchRecordAction, 88 + execute, 89 + (action) => 90 + JSON.stringify({ 91 + targetCollection: action.targetCollection, 92 + baseRecordUri: action.baseRecordUri, 93 + recordTemplate: action.recordTemplate, 94 + }), 95 + );
+20
lib/actions/template.test.ts
··· 549 549 resolvePlaceholder("item.prototype", event, undefined, undefined, item), 550 550 ).toBeUndefined(); 551 551 }); 552 + 553 + it("refuses to walk into __proto__ / constructor / prototype on event paths", () => { 554 + expect(resolvePlaceholder("event.__proto__", event)).toBeUndefined(); 555 + expect(resolvePlaceholder("event.commit.record.__proto__", event)).toBeUndefined(); 556 + expect(resolvePlaceholder("event.constructor.name", event)).toBeUndefined(); 557 + }); 558 + 559 + it("refuses to walk into __proto__ / constructor / prototype on fetch context paths", () => { 560 + const fetchContext = { 561 + post: { 562 + found: true, 563 + uri: "at://did:plc:x/app.bsky.feed.post/abc", 564 + cid: "cid1", 565 + record: { text: "hello" }, 566 + }, 567 + }; 568 + expect(resolvePlaceholder("post.__proto__", event, fetchContext)).toBeUndefined(); 569 + expect(resolvePlaceholder("post.record.__proto__", event, fetchContext)).toBeUndefined(); 570 + expect(resolvePlaceholder("post.constructor.name", event, fetchContext)).toBeUndefined(); 571 + }); 552 572 }); 553 573 554 574 describe("validateTemplate", () => {
+76 -59
lib/actions/template.ts
··· 92 92 if (rest) { 93 93 for (const key of rest.split(".")) { 94 94 if (value == null || typeof value !== "object") return undefined; 95 + if (isUnsafeKey(key)) return undefined; 95 96 value = (value as Record<string, unknown>)[key]; 96 97 } 97 98 } ··· 106 107 let value: unknown = event; 107 108 for (const key of rest.split(".")) { 108 109 if (value == null || typeof value !== "object") return undefined; 110 + if (isUnsafeKey(key)) return undefined; 109 111 value = (value as Record<string, unknown>)[key]; 110 112 } 111 113 return value; ··· 194 196 }); 195 197 } 196 198 199 + /** 200 + * Validate a single placeholder path against the available context. Returns an 201 + * error string when invalid, or `null` when the placeholder is valid. 202 + */ 203 + function validatePlaceholderPath( 204 + rawPlaceholder: string, 205 + opts: { 206 + fetchSet: Set<string>; 207 + actionSet: Set<string>; 208 + hasItem?: boolean; 209 + allowFunctions?: boolean; 210 + allowAutomation?: boolean; 211 + errorPrefix?: string; 212 + }, 213 + ): string | null { 214 + const call = opts.allowFunctions ? parseFunctionCall(rawPlaceholder) : null; 215 + 216 + if (!opts.allowFunctions && parseFunctionCall(rawPlaceholder)) { 217 + return `Function calls not allowed in ${opts.errorPrefix ?? "placeholder"}: {{${rawPlaceholder}}}`; 218 + } 219 + 220 + const toValidate = call ? call.arg : rawPlaceholder; 221 + 222 + if (toValidate === "now" || toValidate === "self" || toValidate.startsWith("event.")) return null; 223 + 224 + if (toValidate === "item" || toValidate.startsWith("item.")) { 225 + if (opts.hasItem) return null; 226 + const prefix = opts.errorPrefix 227 + ? `Invalid placeholder in ${opts.errorPrefix}` 228 + : "Invalid placeholder"; 229 + return `${prefix}: {{${rawPlaceholder}}}. The "item.*" placeholder is only available on actions with a forEach config.`; 230 + } 231 + 232 + if (opts.allowAutomation && toValidate.startsWith("automation.")) { 233 + const field = toValidate.slice("automation.".length); 234 + if (AUTOMATION_FIELDS.has(field)) return null; 235 + return `${opts.errorPrefix ? `Invalid placeholder in ${opts.errorPrefix}` : "Invalid placeholder"}: {{${rawPlaceholder}}}`; 236 + } 237 + 238 + const root = toValidate.split(".")[0]!; 239 + if (opts.fetchSet.has(root)) return null; 240 + if (opts.actionSet.has(root)) return null; 241 + 242 + return `${opts.errorPrefix ? `Invalid placeholder in ${opts.errorPrefix}` : "Invalid placeholder"}: {{${rawPlaceholder}}}`; 243 + } 244 + 197 245 /** Validate template syntax at creation time. */ 198 246 export function validateTemplate( 199 247 template: string, ··· 228 276 return { valid: false, error: "Template must contain at least one {{placeholder}}" }; 229 277 } 230 278 231 - const fetchSet = new Set(fetchNames ?? []); 232 - const actionSet = new Set(actionNames ?? []); 279 + const opts = { 280 + fetchSet: new Set(fetchNames ?? []), 281 + actionSet: new Set(actionNames ?? []), 282 + hasItem, 283 + allowFunctions: true, 284 + allowAutomation: true, 285 + }; 233 286 for (const p of placeholders) { 234 - const call = parseFunctionCall(p); 235 - const toValidate = call ? call.arg : p; 236 - if (toValidate === "now" || toValidate === "self" || toValidate.startsWith("event.")) continue; 237 - if (toValidate === "item" || toValidate.startsWith("item.")) { 238 - if (hasItem) continue; 239 - return { 240 - valid: false, 241 - error: `Invalid placeholder: {{${p}}}. The "item.*" placeholder is only available on actions with a forEach config.`, 242 - }; 243 - } 244 - if (toValidate.startsWith("automation.")) { 245 - const field = toValidate.slice("automation.".length); 246 - if (AUTOMATION_FIELDS.has(field)) continue; 247 - return { valid: false, error: `Invalid placeholder: {{${p}}}` }; 248 - } 249 - const root = toValidate.split(".")[0]!; 250 - if (fetchSet.has(root)) continue; 251 - if (actionSet.has(root)) continue; 252 - return { valid: false, error: `Invalid placeholder: {{${p}}}` }; 287 + const error = validatePlaceholderPath(p, opts); 288 + if (error) return { valid: false, error }; 253 289 } 254 290 255 291 return { valid: true, placeholders }; ··· 338 374 }; 339 375 } 340 376 341 - const fetchSet = new Set(fetchNames ?? []); 342 - const actionSet = new Set(actionNames ?? []); 377 + const opts = { 378 + fetchSet: new Set(fetchNames ?? []), 379 + actionSet: new Set(actionNames ?? []), 380 + hasItem, 381 + allowFunctions: false, 382 + allowAutomation: false, 383 + errorPrefix: "base record URI", 384 + }; 343 385 for (const p of placeholders) { 344 - if (parseFunctionCall(p)) { 345 - return { valid: false, error: `Function calls not allowed in base record URI: {{${p}}}` }; 346 - } 347 - if (p === "now" || p === "self" || p.startsWith("event.")) continue; 348 - if (p === "item" || p.startsWith("item.")) { 349 - if (hasItem) continue; 350 - return { 351 - valid: false, 352 - error: `Invalid placeholder in base record URI: {{${p}}}. The "item.*" placeholder is only available on actions with a forEach config.`, 353 - }; 354 - } 355 - const root = p.split(".")[0]!; 356 - if (fetchSet.has(root)) continue; 357 - if (actionSet.has(root)) continue; 358 - return { valid: false, error: `Invalid placeholder in base record URI: {{${p}}}` }; 386 + const error = validatePlaceholderPath(p, opts); 387 + if (error) return { valid: false, error }; 359 388 } 360 389 361 390 return { valid: true, placeholders }; ··· 378 407 return ""; 379 408 }); 380 409 381 - const fetchSet = new Set(fetchNames ?? []); 382 - const actionSet = new Set(actionNames ?? []); 410 + const opts = { 411 + fetchSet: new Set(fetchNames ?? []), 412 + actionSet: new Set(actionNames ?? []), 413 + hasItem, 414 + allowFunctions: true, 415 + allowAutomation: true, 416 + }; 383 417 for (const p of placeholders) { 384 - const call = parseFunctionCall(p); 385 - const toValidate = call ? call.arg : p; 386 - if (toValidate === "now" || toValidate === "self" || toValidate.startsWith("event.")) continue; 387 - if (toValidate === "item" || toValidate.startsWith("item.")) { 388 - if (hasItem) continue; 389 - return { 390 - valid: false, 391 - error: `Invalid placeholder: {{${p}}}. The "item.*" placeholder is only available on actions with a forEach config.`, 392 - }; 393 - } 394 - if (toValidate.startsWith("automation.")) { 395 - const field = toValidate.slice("automation.".length); 396 - if (AUTOMATION_FIELDS.has(field)) continue; 397 - return { valid: false, error: `Invalid placeholder: {{${p}}}` }; 398 - } 399 - const root = toValidate.split(".")[0]!; 400 - if (fetchSet.has(root)) continue; 401 - if (actionSet.has(root)) continue; 402 - return { valid: false, error: `Invalid placeholder: {{${p}}}` }; 418 + const error = validatePlaceholderPath(p, opts); 419 + if (error) return { valid: false, error }; 403 420 } 404 421 405 422 return { valid: true, placeholders };
+1 -1
lib/jetstream/handler.test.ts
··· 274 274 it("continues the action chain after an action skips with 204", async () => { 275 275 // A 204 skip (e.g. a follow action's "already following" pre-flight) 276 276 // must not stop the chain: it's success, not failure. Lock it in so 277 - // future refactors of `isActionSuccess` don't accidentally drop 204. 277 + // future refactors of `isSuccess` don't accidentally drop 204. 278 278 mockDispatch.mockResolvedValueOnce({ statusCode: 204, message: "Skipped: something" }); 279 279 280 280 const match = makeMatch({
+3 -6
lib/jetstream/handler.ts
··· 8 8 import { executeFollow } from "../actions/follow.js"; 9 9 import { FOLLOW_TARGETS } from "../automations/follow-targets.js"; 10 10 import { resolveFetches } from "../actions/fetcher.js"; 11 + import { isSuccess } from "../actions/delivery.js"; 11 12 import { renderTemplate, renderTextTemplate, type FetchContext } from "../actions/template.js"; 12 13 import { parseAtUri } from "../pds/resolver.js"; 13 14 import { collectItems, matchItemConditions } from "./matcher.js"; ··· 192 193 const toRun = truncated ? matched.slice(0, MAX_FOR_EACH_ITEMS_PER_ACTION) : matched; 193 194 for (const item of toRun) { 194 195 const result: ActionResult = await handler(match, i, fetchContext, item); 195 - if (isActionSuccess(result.statusCode)) { 196 + if (isSuccess(result.statusCode)) { 196 197 lastSuccess = result; 197 198 } else { 198 199 console.error( ··· 213 214 } 214 215 } else { 215 216 const result: ActionResult = await handler(match, i, fetchContext); 216 - if (isActionSuccess(result.statusCode)) { 217 + if (isSuccess(result.statusCode)) { 217 218 lastSuccess = result; 218 219 } else { 219 220 console.error( ··· 244 245 break; 245 246 } 246 247 } 247 - } 248 - 249 - function isActionSuccess(code: number): boolean { 250 - return code >= 200 && code < 400; 251 248 } 252 249 253 250 async function logDryRun(
+53
lib/jetstream/matcher.test.ts
··· 284 284 expect(matchConditions(event, conditions, ownerDid)).toBe(true); 285 285 }); 286 286 }); 287 + 288 + describe("unsafe key guards", () => { 289 + it("treats __proto__ / constructor / prototype as missing in record paths", () => { 290 + const event = makeEvent({ 291 + commit: { 292 + rev: "r1", 293 + operation: "create", 294 + collection: "test", 295 + rkey: "k", 296 + record: { status: "active" }, 297 + }, 298 + }); 299 + expect( 300 + matchConditions(event, [{ field: "__proto__", operator: "exists", value: "" }], ownerDid), 301 + ).toBe(false); 302 + expect( 303 + matchConditions(event, [{ field: "constructor", operator: "exists", value: "" }], ownerDid), 304 + ).toBe(false); 305 + expect( 306 + matchConditions( 307 + event, 308 + [{ field: "constructor.name", operator: "eq", value: "Object" }], 309 + ownerDid, 310 + ), 311 + ).toBe(false); 312 + }); 313 + }); 287 314 }); 288 315 289 316 describe("evaluateFetchConditions", () => { ··· 481 508 ownerDid, 482 509 ), 483 510 ).toBe(true); 511 + }); 512 + }); 513 + 514 + describe("unsafe key guards", () => { 515 + it("treats __proto__ / constructor / prototype as missing in entry paths", () => { 516 + expect( 517 + evaluateFetchConditions( 518 + foundEntry, 519 + [{ field: "__proto__", operator: "exists", value: "" }], 520 + ownerDid, 521 + ), 522 + ).toBe(false); 523 + expect( 524 + evaluateFetchConditions( 525 + foundEntry, 526 + [{ field: "record.__proto__", operator: "exists", value: "" }], 527 + ownerDid, 528 + ), 529 + ).toBe(false); 530 + expect( 531 + evaluateFetchConditions( 532 + foundEntry, 533 + [{ field: "record.constructor.name", operator: "eq", value: "Object" }], 534 + ownerDid, 535 + ), 536 + ).toBe(false); 484 537 }); 485 538 }); 486 539 });
+2
lib/jetstream/matcher.ts
··· 36 36 let value: unknown = event.commit.record; 37 37 for (const key of field.split(".")) { 38 38 if (value == null || typeof value !== "object") return undefined; 39 + if (isUnsafeKey(key)) return undefined; 39 40 value = (value as Record<string, unknown>)[key]; 40 41 } 41 42 if (value == null) return undefined; ··· 100 101 let value: unknown = entry; 101 102 for (const key of path.split(".")) { 102 103 if (value == null || typeof value !== "object") return undefined; 104 + if (isUnsafeKey(key)) return undefined; 103 105 value = (value as Record<string, unknown>)[key]; 104 106 } 105 107 if (value == null) return undefined;