Webhooks for the AT Protocol airglow.run
atproto atprotocol automation webhook
12
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: improve block list handling

Hugo f42b1076 657d7fad

+184 -44
+1 -5
app/routes/api/lexicons/[nsid].ts
··· 1 1 import { createRoute } from "honox/factory"; 2 2 import { getCached, setCache } from "@/lexicons/cache.js"; 3 - import { isValidNsid, isNsidAllowed, resolve } from "@/lexicons/resolver.js"; 3 + import { isValidNsid, resolve } from "@/lexicons/resolver.js"; 4 4 import { buildRecordSchema, type RecordSchema } from "@/lexicons/schema-tree.js"; 5 - import { config } from "@/config.js"; 6 5 7 6 export const GET = createRoute(async (c) => { 8 7 const nsid = c.req.param("nsid")!; 9 8 10 9 if (!isValidNsid(nsid)) { 11 10 return c.json({ error: "Invalid NSID format" }, 400); 12 - } 13 - if (!isNsidAllowed(nsid, config.nsidAllowlist, config.nsidBlocklist)) { 14 - return c.json({ error: "This NSID is not allowed on this instance" }, 403); 15 11 } 16 12 17 13 let schema = await getCached(nsid);
+27 -1
app/routes/api/subscriptions/[rkey].test.ts
··· 22 22 verifyCallback: vi.fn(), 23 23 })); 24 24 25 + vi.mock("@/url-guard.js", () => ({ 26 + assertPublicUrl: vi.fn(async (url: string) => new URL(url)), 27 + UrlGuardError: class extends Error {}, 28 + })); 29 + 25 30 vi.mock("@/subscriptions/pds.js", () => ({ 26 31 getRecord: vi.fn(), 27 32 putRecord: vi.fn(), ··· 165 170 expect(sub!.active).toBe(false); 166 171 }); 167 172 168 - it("re-verifies callbacks when reactivating", async () => { 173 + it("re-verifies callbacks when reactivating and updates verified status", async () => { 169 174 // First deactivate 170 175 await db 171 176 .update(subscriptions) ··· 175 180 const res = await app.request(patchReq("rk1", { active: true })); 176 181 expect(res.status).toBe(200); 177 182 expect(mockVerify).toHaveBeenCalledWith("https://example.com/hook", "app.bsky.feed.like"); 183 + 184 + const sub = await db.query.subscriptions.findFirst(); 185 + const action = sub!.actions[0]! as { verified: boolean }; 186 + expect(action.verified).toBe(true); 187 + }); 188 + 189 + it("reactivation succeeds even when verification fails", async () => { 190 + await db 191 + .update(subscriptions) 192 + .set({ active: false }) 193 + .where(eq(subscriptions.uri, TEST_SUB.uri)); 194 + 195 + mockVerify.mockResolvedValueOnce({ ok: false, error: "Manifest not found" }); 196 + 197 + const res = await app.request(patchReq("rk1", { active: true })); 198 + expect(res.status).toBe(200); 199 + 200 + const sub = await db.query.subscriptions.findFirst(); 201 + expect(sub!.active).toBe(true); 202 + const action = sub!.actions[0]! as { verified: boolean }; 203 + expect(action.verified).toBe(false); 178 204 }); 179 205 180 206 it("preserves webhook secret when URL unchanged", async () => {
+26 -21
app/routes/api/subscriptions/[rkey].ts
··· 10 10 type RecordAction, 11 11 type FetchStep, 12 12 } from "@/db/schema.js"; 13 - import { config } from "@/config.js"; 14 - import { isValidNsid, isNsidAllowed } from "@/lexicons/resolver.js"; 13 + import { isValidNsid } from "@/lexicons/resolver.js"; 15 14 import { 16 15 getRecord, 17 16 putRecord, ··· 20 19 type PdsFetchStep, 21 20 } from "@/subscriptions/pds.js"; 22 21 import { verifyCallback } from "@/subscriptions/verify.js"; 22 + import { assertPublicUrl, UrlGuardError } from "@/url-guard.js"; 23 23 import { validateTemplate, validateFetchStep } from "@/actions/template.js"; 24 24 import { notifySubscriptionChange } from "@/jetstream/consumer.js"; 25 25 ··· 56 56 lexicon: sub.lexicon, 57 57 actions: sub.actions.map((a) => 58 58 a.$type === "webhook" 59 - ? { $type: a.$type, callbackUrl: a.callbackUrl, comment: a.comment } 59 + ? { 60 + $type: a.$type, 61 + callbackUrl: a.callbackUrl, 62 + verified: a.verified ?? false, 63 + comment: a.comment, 64 + } 60 65 : a, 61 66 ), 62 67 fetches: sub.fetches, ··· 179 184 return c.json({ error: "callbackUrl is required for webhook actions" }, 400); 180 185 } 181 186 try { 182 - new URL(input.callbackUrl); 183 - } catch { 184 - return c.json({ error: "Invalid callback URL" }, 400); 187 + await assertPublicUrl(input.callbackUrl); 188 + } catch (err) { 189 + const message = err instanceof UrlGuardError ? err.message : "Invalid callback URL"; 190 + return c.json({ error: message }, 400); 185 191 } 186 192 187 - // Re-verify callback 193 + // Verify callback (non-blocking — stores verified status) 188 194 const verification = await verifyCallback(input.callbackUrl, sub.lexicon); 189 - if (!verification.ok) { 190 - return c.json({ error: verification.error }, 422); 191 - } 192 195 193 196 // Preserve existing secret if callbackUrl unchanged 194 197 const existing = sub.actions.find( ··· 200 203 $type: "webhook", 201 204 callbackUrl: input.callbackUrl, 202 205 secret, 206 + verified: verification.ok, 203 207 ...(input.comment ? { comment: input.comment } : {}), 204 208 } satisfies WebhookAction); 205 209 newPdsActions.push({ ··· 214 218 if (!isValidNsid(input.targetCollection)) { 215 219 return c.json({ error: "Invalid target collection NSID" }, 400); 216 220 } 217 - if (!isNsidAllowed(input.targetCollection, config.nsidAllowlist, config.nsidBlocklist)) { 218 - return c.json({ error: "This target collection is not allowed on this instance" }, 403); 219 - } 220 221 if (!input.recordTemplate) { 221 222 return c.json({ error: "recordTemplate is required for record actions" }, 400); 222 223 } ··· 246 247 pdsActions = newPdsActions; 247 248 } 248 249 249 - // Re-verify webhook callbacks when reactivating 250 + // Re-verify webhook callbacks when reactivating (updates verified status) 250 251 if (body.active === true && !sub.active && !body.actions) { 251 - for (const action of localActions) { 252 - if (action.$type === "webhook") { 253 - const verification = await verifyCallback(action.callbackUrl, sub.lexicon); 254 - if (!verification.ok) { 255 - return c.json({ error: verification.error }, 422); 252 + localActions = await Promise.all( 253 + localActions.map(async (action) => { 254 + if (action.$type !== "webhook") return action; 255 + try { 256 + await assertPublicUrl(action.callbackUrl); 257 + } catch { 258 + return { ...action, verified: false }; 256 259 } 257 - } 258 - } 260 + const verification = await verifyCallback(action.callbackUrl, sub.lexicon); 261 + return { ...action, verified: verification.ok }; 262 + }), 263 + ); 259 264 } 260 265 261 266 // Read existing PDS record to preserve createdAt
+48 -2
app/routes/api/subscriptions/index.test.ts
··· 22 22 verifyCallback: vi.fn(), 23 23 })); 24 24 25 + vi.mock("@/url-guard.js", () => ({ 26 + assertPublicUrl: vi.fn(async (url: string) => new URL(url)), 27 + UrlGuardError: class extends Error {}, 28 + })); 29 + 25 30 vi.mock("@/subscriptions/pds.js", () => ({ 26 31 createRecord: vi.fn(), 27 32 deleteRecord: vi.fn(), ··· 33 38 34 39 import { GET, POST } from "./index.js"; 35 40 import { verifyCallback } from "@/subscriptions/verify.js"; 41 + import { assertPublicUrl } from "@/url-guard.js"; 36 42 import { createRecord, deleteRecord } from "@/subscriptions/pds.js"; 37 43 import { db } from "@/db/index.js"; 38 44 import { subscriptions } from "@/db/schema.js"; 39 45 40 46 const mockVerify = vi.mocked(verifyCallback); 47 + const mockAssertPublicUrl = vi.mocked(assertPublicUrl); 41 48 const mockCreateRecord = vi.mocked(createRecord); 42 49 const mockDeleteRecord = vi.mocked(deleteRecord); 43 50 ··· 197 204 }); 198 205 199 206 it("returns 400 for invalid callback URL", async () => { 207 + mockAssertPublicUrl.mockRejectedValueOnce(new Error("Invalid URL")); 208 + 200 209 const res = await app.request( 201 210 jsonReq("/api/subscriptions", { 202 211 name: "Test", ··· 207 216 expect(res.status).toBe(400); 208 217 }); 209 218 210 - it("returns 422 when callback verification fails", async () => { 219 + it("returns 400 for private callback URL", async () => { 220 + const { UrlGuardError } = await import("@/url-guard.js"); 221 + mockAssertPublicUrl.mockRejectedValueOnce( 222 + new UrlGuardError("Callback URL must not target a private network"), 223 + ); 224 + 225 + const res = await app.request( 226 + jsonReq("/api/subscriptions", { 227 + name: "Test", 228 + lexicon: "app.bsky.feed.like", 229 + actions: [{ type: "webhook", callbackUrl: "https://127.0.0.1/hook" }], 230 + }), 231 + ); 232 + expect(res.status).toBe(400); 233 + const body = await res.json(); 234 + expect(body.error).toContain("private network"); 235 + }); 236 + 237 + it("creates subscription with verified=false when verification fails", async () => { 211 238 mockVerify.mockResolvedValueOnce({ ok: false, error: "Callback failed" }); 212 239 213 240 const res = await app.request( ··· 217 244 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 218 245 }), 219 246 ); 220 - expect(res.status).toBe(422); 247 + expect(res.status).toBe(201); 248 + 249 + const sub = await db.query.subscriptions.findFirst(); 250 + const action = sub!.actions[0]! as { verified: boolean }; 251 + expect(action.verified).toBe(false); 252 + }); 253 + 254 + it("creates subscription with verified=true when verification passes", async () => { 255 + const res = await app.request( 256 + jsonReq("/api/subscriptions", { 257 + name: "Test", 258 + lexicon: "app.bsky.feed.like", 259 + actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 260 + }), 261 + ); 262 + expect(res.status).toBe(201); 263 + 264 + const sub = await db.query.subscriptions.findFirst(); 265 + const action = sub!.actions[0]! as { verified: boolean }; 266 + expect(action.verified).toBe(true); 221 267 }); 222 268 223 269 it("returns 400 for invalid record template", async () => {
+12 -10
app/routes/api/subscriptions/index.ts
··· 12 12 import { config } from "@/config.js"; 13 13 import { isValidNsid, isNsidAllowed } from "@/lexicons/resolver.js"; 14 14 import { verifyCallback } from "@/subscriptions/verify.js"; 15 + import { assertPublicUrl, UrlGuardError } from "@/url-guard.js"; 15 16 import { 16 17 createRecord, 17 18 deleteRecord, ··· 41 42 lexicon: r.lexicon, 42 43 actions: r.actions.map((a) => 43 44 a.$type === "webhook" 44 - ? { $type: a.$type, callbackUrl: a.callbackUrl, comment: a.comment } 45 + ? { 46 + $type: a.$type, 47 + callbackUrl: a.callbackUrl, 48 + verified: a.verified ?? false, 49 + comment: a.comment, 50 + } 45 51 : a, 46 52 ), 47 53 fetches: r.fetches, ··· 146 152 return c.json({ error: "callbackUrl is required for webhook actions" }, 400); 147 153 } 148 154 try { 149 - new URL(input.callbackUrl); 150 - } catch { 151 - return c.json({ error: "Invalid callback URL" }, 400); 155 + await assertPublicUrl(input.callbackUrl); 156 + } catch (err) { 157 + const message = err instanceof UrlGuardError ? err.message : "Invalid callback URL"; 158 + return c.json({ error: message }, 400); 152 159 } 153 160 154 161 const verification = await verifyCallback(input.callbackUrl, body.lexicon); 155 - if (!verification.ok) { 156 - return c.json({ error: verification.error }, 422); 157 - } 158 162 159 163 const secret = nanoid(32); 160 164 localActions.push({ 161 165 $type: "webhook", 162 166 callbackUrl: input.callbackUrl, 163 167 secret, 168 + verified: verification.ok, 164 169 ...(input.comment ? { comment: input.comment } : {}), 165 170 } satisfies WebhookAction); 166 171 pdsActions.push({ ··· 174 179 } 175 180 if (!isValidNsid(input.targetCollection)) { 176 181 return c.json({ error: "Invalid target collection NSID" }, 400); 177 - } 178 - if (!isNsidAllowed(input.targetCollection, config.nsidAllowlist, config.nsidBlocklist)) { 179 - return c.json({ error: "This target collection is not allowed on this instance" }, 403); 180 182 } 181 183 if (!input.recordTemplate) { 182 184 return c.json({ error: "recordTemplate is required for record actions" }, 400);
+8
app/routes/dashboard/subscriptions/[rkey].tsx
··· 139 139 <Stack gap={2}> 140 140 <h4> 141 141 {action.$type === "webhook" ? "Webhook" : "Record"} {i + 1} 142 + {action.$type === "webhook" && ( 143 + <> 144 + {" "} 145 + <Badge variant={action.verified ? "success" : "neutral"}> 146 + {action.verified ? "Verified" : "Unverified"} 147 + </Badge> 148 + </> 149 + )} 142 150 {action.comment && <span class={textMuted}> — {action.comment}</span>} 143 151 </h4> 144 152 <DescriptionList>
+1
lib/db/schema.ts
··· 12 12 $type: "webhook"; 13 13 callbackUrl: string; 14 14 secret: string; // instance-local HMAC secret, not stored on PDS 15 + verified?: boolean; // true if /.well-known/airglow manifest matched 15 16 comment?: string; 16 17 }; 17 18
+55
lib/jetstream/consumer.test.ts
··· 4 4 config: { 5 5 databasePath: ":memory:", 6 6 jetstreamUrl: "wss://jetstream.test/subscribe", 7 + nsidAllowlist: [], 8 + nsidBlocklist: [], 7 9 }, 8 10 })); 9 11 ··· 83 85 84 86 // Both like subscriptions should match (empty conditions = match all) 85 87 expect(handler).toHaveBeenCalledTimes(2); 88 + }); 89 + 90 + it("excludes subscriptions whose lexicon is on the blocklist", async () => { 91 + const { config } = await import("../config.js"); 92 + (config as any).nsidBlocklist = ["app.bsky.feed.like"]; 93 + 94 + await db.insert(subscriptions).values([ 95 + makeSubscription({ 96 + uri: "at://u/s/1", 97 + rkey: "1", 98 + lexicon: "app.bsky.feed.like", 99 + active: true, 100 + }), 101 + makeSubscription({ 102 + uri: "at://u/s/2", 103 + rkey: "2", 104 + lexicon: "app.bsky.feed.post", 105 + active: true, 106 + }), 107 + ]); 108 + 109 + await consumer.refreshSubscriptions(); 110 + 111 + // Blocked lexicon should not trigger handler 112 + (consumer as any).processEvent( 113 + makeEvent({ 114 + commit: { 115 + rev: "r", 116 + operation: "create", 117 + collection: "app.bsky.feed.like", 118 + rkey: "rk", 119 + record: {}, 120 + }, 121 + }), 122 + ); 123 + expect(handler).not.toHaveBeenCalled(); 124 + 125 + // Allowed lexicon should still work 126 + (consumer as any).processEvent( 127 + makeEvent({ 128 + commit: { 129 + rev: "r", 130 + operation: "create", 131 + collection: "app.bsky.feed.post", 132 + rkey: "rk", 133 + record: {}, 134 + }, 135 + }), 136 + ); 137 + expect(handler).toHaveBeenCalledOnce(); 138 + 139 + // Reset 140 + (config as any).nsidBlocklist = []; 86 141 }); 87 142 88 143 it("does not load inactive subscriptions", async () => {
+2
lib/jetstream/consumer.ts
··· 2 2 import { db } from "../db/index.js"; 3 3 import { subscriptions } from "../db/schema.js"; 4 4 import { config } from "../config.js"; 5 + import { isNsidAllowed } from "../lexicons/resolver.js"; 5 6 import { matchConditions, type JetstreamEvent } from "./matcher.js"; 6 7 import { readFileSync, writeFileSync, mkdirSync } from "node:fs"; 7 8 import { dirname, join } from "node:path"; ··· 57 58 58 59 const byCollection = new Map<string, Subscription[]>(); 59 60 for (const row of rows) { 61 + if (!isNsidAllowed(row.lexicon, config.nsidAllowlist, config.nsidBlocklist)) continue; 60 62 const list = byCollection.get(row.lexicon) || []; 61 63 list.push(row); 62 64 byCollection.set(row.lexicon, list);
+4 -5
lib/subscriptions/verify.ts
··· 1 - import { assertPublicUrl, UrlGuardError } from "../url-guard.js"; 2 - 3 1 type AirglowManifest = { 4 2 callbacks: Array<{ 5 3 path: string; ··· 11 9 * Verify that a callback URL accepts the given lexicon. 12 10 * Fetches the .well-known/airglow manifest from the callback's origin 13 11 * and checks that the path is listed with the correct lexicon. 12 + * 13 + * Callers must validate the URL (assertPublicUrl) before calling this. 14 14 */ 15 15 export async function verifyCallback( 16 16 callbackUrl: string, ··· 18 18 ): Promise<{ ok: true } | { ok: false; error: string }> { 19 19 let url: URL; 20 20 try { 21 - url = await assertPublicUrl(callbackUrl); 22 - } catch (err) { 23 - if (err instanceof UrlGuardError) return { ok: false, error: err.message }; 21 + url = new URL(callbackUrl); 22 + } catch { 24 23 return { ok: false, error: "Invalid callback URL" }; 25 24 } 26 25