Webhooks for the AT Protocol airglow.run
atproto atprotocol automation webhook
12
fork

Configure Feed

Select the types of activity you want to include in your feed.

docs: udpate readme

Hugo 15673e99 ddbfb229

+566 -571
+1 -1
CLAUDE.md
··· 1 1 # Airglow 2 2 3 - Webhooks for the AT Protocol. See README.md for the product spec. 3 + Automations for the AT Protocol. See README.md for the product spec. 4 4 5 5 ## Stack 6 6
+12 -12
README.md
··· 1 1 # Airglow 2 2 3 - Webhooks for the AT Protocol — subscribe to events, filter them, and forward them to HTTP endpoints. 3 + Automations for the AT Protocol — listen to events, filter them, and trigger actions like webhook deliveries or PDS record creation. 4 4 5 - Airglow connects to [Jetstream](https://atproto.com/guides/streaming-data#jetstream) (AT Protocol's event streaming service), matches incoming records against user-defined subscriptions, and delivers them to callback URLs. Think IFTTT or Zapier, but the trigger side is always "something happened on the AT Protocol." 5 + Airglow connects to [Jetstream](https://atproto.com/guides/streaming-data#jetstream) (AT Protocol's event streaming service), matches incoming records against user-defined automations, and executes actions automatically. Think IFTTT or Zapier, but the trigger side is always "something happened on the AT Protocol." 6 6 7 - Website: [rglw.app](https://rglw.app) 7 + Website: [airglow.run](https://airglow.run) 8 8 9 9 ## How it works 10 10 11 11 ### For end users 12 12 13 13 1. Sign in to Airglow using [AT Protocol OAuth](https://atproto.com/specs/oauth). 14 - 2. Create a subscription by choosing a lexicon to listen to (e.g. `sh.tangled.feed.star`, `site.standard.document`). 14 + 2. Create an automation by choosing a lexicon to listen to (e.g. `sh.tangled.feed.star`, `site.standard.document`). 15 15 3. Add conditions to filter events — simple equality checks on record fields like the record owner or specific values in the data. The schema is known from the lexicon, so Airglow can present the available fields. 16 - 4. Provide a callback URL to receive matching events. 16 + 4. Add actions — deliver a webhook to a callback URL, or create a record on your PDS using a template. 17 17 18 - Airglow verifies that the callback URL actually supports the selected lexicon before activating the subscription (see [Callback endpoints](#callback-endpoints) below). 18 + Airglow verifies that webhook callback URLs actually support the selected lexicon before activating the automation (see [Callback endpoints](#callback-endpoints) below). 19 19 20 20 #### Data ownership 21 21 22 - Subscriptions are stored on the user's PDS as [`app.rglw.subscription`](lexicons/app/rglw/subscription.json) records. The user's PDS is the source of truth — Airglow instances maintain a local index for fast event matching, but the data belongs to the user. This means subscriptions are portable across Airglow instances and visible to any AT Protocol client. 22 + Automations are stored on the user's PDS as [`run.airglow.automation`](lexicons/run/airglow/automation.json) records. The user's PDS is the source of truth — Airglow instances maintain a local index for fast event matching, but the data belongs to the user. This means automations are portable across Airglow instances and visible to any AT Protocol client. 23 23 24 24 ### For developers 25 25 ··· 27 27 28 28 #### Callback endpoints 29 29 30 - A callback server must expose a metadata route so Airglow can discover its endpoints and verify which lexicons each one accepts: 30 + A callback server can optionally expose a metadata route so Airglow can discover its endpoints and verify which lexicons each one accepts: 31 31 32 32 ``` 33 33 GET <server-base-url>/.well-known/airglow ··· 44 44 } 45 45 ``` 46 46 47 - When a user registers a callback URL (e.g. `https://example.com/hooks/stars`), Airglow fetches the manifest from `https://example.com/.well-known/airglow` and confirms the path `/hooks/stars` is listed and accepts the requested lexicon. 47 + When a user registers a callback URL (e.g. `https://example.com/hooks/stars`), Airglow fetches the manifest from `https://example.com/.well-known/airglow`. If the manifest is present and the path is listed with the requested lexicon, the webhook is marked as **verified**. If the manifest is missing or doesn't match, the webhook is still created but shown as **unverified**. Verification is re-checked when an automation is reactivated. 48 48 49 49 #### Webhook payload 50 50 51 - When a matching event occurs, Airglow sends a POST request to the callback URL. The payload contains the Jetstream event (commit operation, record data, repo DID, timestamp) wrapped in a Airglow envelope with metadata such as the subscription ID and matched condition. 51 + When a matching event occurs, Airglow sends a POST request to the callback URL. The payload contains the Jetstream event (commit operation, record data, repo DID, timestamp) wrapped in an Airglow envelope with metadata such as the automation ID and matched condition. 52 52 53 53 #### Request signing 54 54 ··· 62 62 63 63 ### Future: protocol-native discovery 64 64 65 - Today, users provide callback URLs manually. In the future, developers will be able to publish an `app.rglw.callback` record on their PDS, declaring their endpoint URL and supported lexicons. Airglow instances could then subscribe to this collection and index available callbacks, letting users browse and pick from discovered endpoints instead of entering URLs by hand. 65 + Today, users provide callback URLs manually. In the future, developers will be able to publish a `run.airglow.callback` record on their PDS, declaring their endpoint URL and supported lexicons. Airglow instances could then subscribe to this collection and index available callbacks, letting users browse and pick from discovered endpoints instead of entering URLs by hand. 66 66 67 67 ## Development 68 68 ··· 102 102 103 103 ```sh 104 104 goat lex lint lexicons/ # validate schemas 105 - goat lex new record app.rglw.<name> # create a new lexicon 105 + goat lex new record run.airglow.<name> # create a new lexicon 106 106 ``` 107 107 108 108 ## Self-hosting
+4 -4
app/islands/DeliveryLog.tsx
··· 27 27 setLoading(true); 28 28 setError(""); 29 29 try { 30 - const res = await fetch(`/api/subscriptions/${rkey}`, { 30 + const res = await fetch(`/api/automations/${rkey}`, { 31 31 method: "PATCH", 32 32 headers: { "Content-Type": "application/json" }, 33 33 body: JSON.stringify({ active: !isActive }), ··· 46 46 }, [rkey, isActive]); 47 47 48 48 const handleDelete = useCallback(async () => { 49 - if (!confirm("Delete this subscription? This cannot be undone.")) return; 49 + if (!confirm("Delete this automation? This cannot be undone.")) return; 50 50 setLoading(true); 51 51 setError(""); 52 52 try { 53 - const res = await fetch(`/api/subscriptions/${rkey}`, { 53 + const res = await fetch(`/api/automations/${rkey}`, { 54 54 method: "DELETE", 55 55 }); 56 56 if (!res.ok) { ··· 68 68 69 69 const refreshLogs = useCallback(async () => { 70 70 try { 71 - const res = await fetch(`/api/subscriptions/${rkey}`); 71 + const res = await fetch(`/api/automations/${rkey}`); 72 72 if (res.ok) { 73 73 const data = await res.json(); 74 74 setLogs(data.deliveryLogs || []);
app/islands/SubscriptionForm.css.ts app/islands/AutomationForm.css.ts
+9 -9
app/islands/SubscriptionForm.tsx app/islands/AutomationForm.tsx
··· 2 2 import type { RecordSchema } from "../../lib/lexicons/schema-tree.js"; 3 3 import type { Action, FetchStep } from "../../lib/db/schema.js"; 4 4 import RecordFormBuilder from "./RecordFormBuilder.js"; 5 - import * as s from "./SubscriptionForm.css.ts"; 5 + import * as s from "./AutomationForm.css.ts"; 6 6 7 7 type Field = { 8 8 path: string; ··· 28 28 }; 29 29 type ActionDraft = WebhookDraft | RecordDraft; 30 30 31 - export type SubscriptionInitial = { 31 + export type AutomationInitial = { 32 32 rkey: string; 33 33 name: string; 34 34 description: string | null; ··· 297 297 return conditions.map((c) => ({ ...c, comment: c.comment ?? "" })); 298 298 } 299 299 300 - export default function SubscriptionForm({ initial }: { initial?: SubscriptionInitial }) { 300 + export default function AutomationForm({ initial }: { initial?: AutomationInitial }) { 301 301 const isEdit = !!initial; 302 302 const initialLexicon = initial?.lexicon ?? getInitialParam("lexicon"); 303 303 const [name, setName] = useState(initial?.name ?? ""); ··· 476 476 setError(""); 477 477 setSubmitting(true); 478 478 try { 479 - const url = isEdit ? `/api/subscriptions/${initial.rkey}` : "/api/subscriptions"; 479 + const url = isEdit ? `/api/automations/${initial.rkey}` : "/api/automations"; 480 480 const res = await fetch(url, { 481 481 method: isEdit ? "PATCH" : "POST", 482 482 headers: { "Content-Type": "application/json" }, ··· 484 484 }); 485 485 const data = await res.json(); 486 486 if (!res.ok) { 487 - setError(data.error || `Failed to ${isEdit ? "update" : "create"} subscription`); 487 + setError(data.error || `Failed to ${isEdit ? "update" : "create"} automation`); 488 488 } else { 489 489 const rkey = isEdit ? initial.rkey : data.rkey; 490 - window.location.href = `/dashboard/subscriptions/${rkey}`; 490 + window.location.href = `/dashboard/automations/${rkey}`; 491 491 } 492 492 } catch { 493 493 setError("Request failed"); ··· 534 534 <textarea 535 535 id="sub-description" 536 536 class={s.textarea} 537 - placeholder="What does this subscription do?" 537 + placeholder="What does this automation do?" 538 538 value={description} 539 539 onInput={(e: Event) => setDescription((e.target as HTMLTextAreaElement).value)} 540 540 rows={2} ··· 854 854 ? "Updating..." 855 855 : "Creating..." 856 856 : isEdit 857 - ? "Update subscription" 858 - : "Create subscription"} 857 + ? "Update automation" 858 + : "Create automation"} 859 859 </button> 860 860 861 861 <datalist id="nsid-suggestions">
+1 -1
app/routes/api/lexicons/suggest.ts
··· 61 61 if (entry.isDirectory()) { 62 62 walkLexiconDir(resolvePath(dir, entry.name), results); 63 63 } else if (entry.name.endsWith(".json")) { 64 - // Convert path back to NSID: lexicons/app/rglw/subscription.json → app.rglw.subscription 64 + // Convert path back to NSID: lexicons/run/airglow/automation.json → run.airglow.automation 65 65 const full = resolvePath(dir, entry.name); 66 66 const rel = full.slice(full.indexOf("lexicons/") + "lexicons/".length); 67 67 const nsid = rel.replace(/\.json$/, "").replace(/\//g, ".");
+59 -65
app/routes/api/subscriptions/[rkey].test.ts app/routes/api/automations/[rkey].test.ts
··· 18 18 }, 19 19 })); 20 20 21 - vi.mock("@/subscriptions/verify.js", () => ({ 21 + vi.mock("@/automations/verify.js", () => ({ 22 22 verifyCallback: vi.fn(), 23 23 })); 24 24 ··· 27 27 UrlGuardError: class extends Error {}, 28 28 })); 29 29 30 - vi.mock("@/subscriptions/pds.js", () => ({ 30 + vi.mock("@/automations/pds.js", () => ({ 31 31 getRecord: vi.fn(), 32 32 putRecord: vi.fn(), 33 33 deleteRecord: vi.fn(), 34 34 })); 35 35 36 36 vi.mock("@/jetstream/consumer.js", () => ({ 37 - notifySubscriptionChange: vi.fn(), 37 + notifyAutomationChange: vi.fn(), 38 38 })); 39 39 40 40 import { GET, PATCH, DELETE } from "./[rkey].js"; 41 - import { verifyCallback } from "@/subscriptions/verify.js"; 42 - import { getRecord, putRecord, deleteRecord } from "@/subscriptions/pds.js"; 41 + import { verifyCallback } from "@/automations/verify.js"; 42 + import { getRecord, putRecord, deleteRecord } from "@/automations/pds.js"; 43 43 import { eq } from "drizzle-orm"; 44 44 import { db } from "@/db/index.js"; 45 - import { subscriptions, deliveryLogs } from "@/db/schema.js"; 45 + import { automations, deliveryLogs } from "@/db/schema.js"; 46 46 47 47 const mockVerify = vi.mocked(verifyCallback); 48 48 const mockGetRecord = vi.mocked(getRecord); ··· 50 50 const mockDeleteRecord = vi.mocked(deleteRecord); 51 51 52 52 const TEST_USER = { did: "did:plc:testuser", handle: "test.bsky.social" }; 53 - const TEST_SUB = { 54 - uri: "at://did:plc:testuser/app.rglw.subscription/rk1", 53 + const TEST_AUTO = { 54 + uri: "at://did:plc:testuser/run.airglow.automation/rk1", 55 55 did: "did:plc:testuser", 56 56 rkey: "rk1", 57 - name: "Test Sub", 57 + name: "Test Auto", 58 58 description: null, 59 59 lexicon: "app.bsky.feed.like", 60 60 actions: [ ··· 72 72 c.set("user", TEST_USER); 73 73 return next(); 74 74 }); 75 - app.get("/api/subscriptions/:rkey", ...GET); 76 - app.patch("/api/subscriptions/:rkey", ...PATCH); 77 - app.delete("/api/subscriptions/:rkey", ...DELETE); 75 + app.get("/api/automations/:rkey", ...GET); 76 + app.patch("/api/automations/:rkey", ...PATCH); 77 + app.delete("/api/automations/:rkey", ...DELETE); 78 78 return app; 79 79 } 80 80 81 81 function patchReq(rkey: string, body: Record<string, unknown>) { 82 - return new Request(`http://localhost/api/subscriptions/${rkey}`, { 82 + return new Request(`http://localhost/api/automations/${rkey}`, { 83 83 method: "PATCH", 84 84 headers: { "Content-Type": "application/json" }, 85 85 body: JSON.stringify(body), 86 86 }); 87 87 } 88 88 89 - describe("GET /api/subscriptions/:rkey", () => { 89 + describe("GET /api/automations/:rkey", () => { 90 90 let app: ReturnType<typeof createTestApp>; 91 91 92 92 beforeEach(async () => { 93 93 app = createTestApp(); 94 94 await db.delete(deliveryLogs); 95 - await db.delete(subscriptions); 95 + await db.delete(automations); 96 96 }); 97 97 98 - it("returns subscription with delivery logs", async () => { 99 - await db.insert(subscriptions).values(TEST_SUB); 98 + it("returns automation with delivery logs", async () => { 99 + await db.insert(automations).values(TEST_AUTO); 100 100 await db.insert(deliveryLogs).values({ 101 - subscriptionUri: TEST_SUB.uri, 101 + automationUri: TEST_AUTO.uri, 102 102 actionIndex: 0, 103 103 eventTimeUs: 1700000000000000, 104 104 payload: null, ··· 108 108 createdAt: new Date(), 109 109 }); 110 110 111 - const res = await app.request("http://localhost/api/subscriptions/rk1"); 111 + const res = await app.request("http://localhost/api/automations/rk1"); 112 112 expect(res.status).toBe(200); 113 113 const body = await res.json(); 114 - expect(body.name).toBe("Test Sub"); 114 + expect(body.name).toBe("Test Auto"); 115 115 expect(body.deliveryLogs).toHaveLength(1); 116 116 expect(body.deliveryLogs[0].statusCode).toBe(200); 117 117 }); 118 118 119 119 it("strips webhook secrets from response", async () => { 120 - await db.insert(subscriptions).values(TEST_SUB); 120 + await db.insert(automations).values(TEST_AUTO); 121 121 122 - const res = await app.request("http://localhost/api/subscriptions/rk1"); 122 + const res = await app.request("http://localhost/api/automations/rk1"); 123 123 const body = await res.json(); 124 124 const action = body.actions[0]; 125 125 expect(action.callbackUrl).toBe("https://example.com/hook"); 126 126 expect(action.secret).toBeUndefined(); 127 127 }); 128 128 129 - it("returns 404 for non-existent subscription", async () => { 130 - const res = await app.request("http://localhost/api/subscriptions/nonexistent"); 129 + it("returns 404 for non-existent automation", async () => { 130 + const res = await app.request("http://localhost/api/automations/nonexistent"); 131 131 expect(res.status).toBe(404); 132 132 }); 133 133 134 - it("returns 404 for another user's subscription", async () => { 135 - await db.insert(subscriptions).values({ ...TEST_SUB, did: "did:plc:other" }); 134 + it("returns 404 for another user's automation", async () => { 135 + await db.insert(automations).values({ ...TEST_AUTO, did: "did:plc:other" }); 136 136 137 - const res = await app.request("http://localhost/api/subscriptions/rk1"); 137 + const res = await app.request("http://localhost/api/automations/rk1"); 138 138 expect(res.status).toBe(404); 139 139 }); 140 140 }); 141 141 142 - describe("PATCH /api/subscriptions/:rkey", () => { 142 + describe("PATCH /api/automations/:rkey", () => { 143 143 let app: ReturnType<typeof createTestApp>; 144 144 145 145 beforeEach(async () => { ··· 149 149 mockPutRecord.mockReset().mockResolvedValue(undefined); 150 150 151 151 await db.delete(deliveryLogs); 152 - await db.delete(subscriptions); 153 - await db.insert(subscriptions).values(TEST_SUB); 152 + await db.delete(automations); 153 + await db.insert(automations).values(TEST_AUTO); 154 154 }); 155 155 156 156 it("updates name only", async () => { 157 157 const res = await app.request(patchReq("rk1", { name: "Updated Name" })); 158 158 expect(res.status).toBe(200); 159 159 160 - const sub = await db.query.subscriptions.findFirst(); 161 - expect(sub!.name).toBe("Updated Name"); 162 - expect(sub!.lexicon).toBe("app.bsky.feed.like"); // unchanged 160 + const auto = await db.query.automations.findFirst(); 161 + expect(auto!.name).toBe("Updated Name"); 162 + expect(auto!.lexicon).toBe("app.bsky.feed.like"); // unchanged 163 163 }); 164 164 165 165 it("toggles active to false", async () => { 166 166 const res = await app.request(patchReq("rk1", { active: false })); 167 167 expect(res.status).toBe(200); 168 168 169 - const sub = await db.query.subscriptions.findFirst(); 170 - expect(sub!.active).toBe(false); 169 + const auto = await db.query.automations.findFirst(); 170 + expect(auto!.active).toBe(false); 171 171 }); 172 172 173 173 it("re-verifies callbacks when reactivating and updates verified status", async () => { 174 174 // First deactivate 175 - await db 176 - .update(subscriptions) 177 - .set({ active: false }) 178 - .where(eq(subscriptions.uri, TEST_SUB.uri)); 175 + await db.update(automations).set({ active: false }).where(eq(automations.uri, TEST_AUTO.uri)); 179 176 180 177 const res = await app.request(patchReq("rk1", { active: true })); 181 178 expect(res.status).toBe(200); 182 179 expect(mockVerify).toHaveBeenCalledWith("https://example.com/hook", "app.bsky.feed.like"); 183 180 184 - const sub = await db.query.subscriptions.findFirst(); 185 - const action = sub!.actions[0]! as { verified: boolean }; 181 + const auto = await db.query.automations.findFirst(); 182 + const action = auto!.actions[0]! as { verified: boolean }; 186 183 expect(action.verified).toBe(true); 187 184 }); 188 185 189 186 it("reactivation succeeds even when verification fails", async () => { 190 - await db 191 - .update(subscriptions) 192 - .set({ active: false }) 193 - .where(eq(subscriptions.uri, TEST_SUB.uri)); 187 + await db.update(automations).set({ active: false }).where(eq(automations.uri, TEST_AUTO.uri)); 194 188 195 189 mockVerify.mockResolvedValueOnce({ ok: false, error: "Manifest not found" }); 196 190 197 191 const res = await app.request(patchReq("rk1", { active: true })); 198 192 expect(res.status).toBe(200); 199 193 200 - const sub = await db.query.subscriptions.findFirst(); 201 - expect(sub!.active).toBe(true); 202 - const action = sub!.actions[0]! as { verified: boolean }; 194 + const auto = await db.query.automations.findFirst(); 195 + expect(auto!.active).toBe(true); 196 + const action = auto!.actions[0]! as { verified: boolean }; 203 197 expect(action.verified).toBe(false); 204 198 }); 205 199 ··· 211 205 ); 212 206 expect(res.status).toBe(200); 213 207 214 - const sub = await db.query.subscriptions.findFirst(); 215 - const action = sub!.actions[0]! as { secret: string }; 208 + const auto = await db.query.automations.findFirst(); 209 + const action = auto!.actions[0]! as { secret: string }; 216 210 expect(action.secret).toBe("old-secret"); 217 211 }); 218 212 ··· 224 218 ); 225 219 expect(res.status).toBe(200); 226 220 227 - const sub = await db.query.subscriptions.findFirst(); 228 - const action = sub!.actions[0]! as { secret: string }; 221 + const auto = await db.query.automations.findFirst(); 222 + const action = auto!.actions[0]! as { secret: string }; 229 223 expect(action.secret).not.toBe("old-secret"); 230 224 expect(action.secret).toHaveLength(32); 231 225 }); ··· 239 233 expect(res.status).toBe(400); 240 234 }); 241 235 242 - it("returns 404 for non-existent subscription", async () => { 236 + it("returns 404 for non-existent automation", async () => { 243 237 const res = await app.request(patchReq("nonexistent", { name: "New" })); 244 238 expect(res.status).toBe(404); 245 239 }); ··· 257 251 }); 258 252 }); 259 253 260 - describe("DELETE /api/subscriptions/:rkey", () => { 254 + describe("DELETE /api/automations/:rkey", () => { 261 255 let app: ReturnType<typeof createTestApp>; 262 256 263 257 beforeEach(async () => { ··· 265 259 mockDeleteRecord.mockReset().mockResolvedValue(undefined); 266 260 267 261 await db.delete(deliveryLogs); 268 - await db.delete(subscriptions); 269 - await db.insert(subscriptions).values(TEST_SUB); 262 + await db.delete(automations); 263 + await db.insert(automations).values(TEST_AUTO); 270 264 }); 271 265 272 - it("deletes subscription from DB", async () => { 266 + it("deletes automation from DB", async () => { 273 267 const res = await app.request( 274 - new Request("http://localhost/api/subscriptions/rk1", { method: "DELETE" }), 268 + new Request("http://localhost/api/automations/rk1", { method: "DELETE" }), 275 269 ); 276 270 expect(res.status).toBe(200); 277 271 278 - const subs = await db.query.subscriptions.findMany(); 279 - expect(subs).toHaveLength(0); 272 + const autos = await db.query.automations.findMany(); 273 + expect(autos).toHaveLength(0); 280 274 }); 281 275 282 276 it("cascade-deletes delivery logs", async () => { 283 277 await db.insert(deliveryLogs).values({ 284 - subscriptionUri: TEST_SUB.uri, 278 + automationUri: TEST_AUTO.uri, 285 279 actionIndex: 0, 286 280 eventTimeUs: 1700000000000000, 287 281 payload: null, ··· 291 285 createdAt: new Date(), 292 286 }); 293 287 294 - await app.request(new Request("http://localhost/api/subscriptions/rk1", { method: "DELETE" })); 288 + await app.request(new Request("http://localhost/api/automations/rk1", { method: "DELETE" })); 295 289 296 290 const logs = await db.query.deliveryLogs.findMany(); 297 291 expect(logs).toHaveLength(0); 298 292 }); 299 293 300 - it("returns 404 for non-existent subscription", async () => { 294 + it("returns 404 for non-existent automation", async () => { 301 295 const res = await app.request( 302 - new Request("http://localhost/api/subscriptions/nonexistent", { method: "DELETE" }), 296 + new Request("http://localhost/api/automations/nonexistent", { method: "DELETE" }), 303 297 ); 304 298 expect(res.status).toBe(404); 305 299 }); ··· 308 302 mockDeleteRecord.mockRejectedValueOnce(new Error("PDS down")); 309 303 310 304 const res = await app.request( 311 - new Request("http://localhost/api/subscriptions/rk1", { method: "DELETE" }), 305 + new Request("http://localhost/api/automations/rk1", { method: "DELETE" }), 312 306 ); 313 307 expect(res.status).toBe(502); 314 308 });
+49 -49
app/routes/api/subscriptions/[rkey].ts app/routes/api/automations/[rkey].ts
··· 3 3 import { nanoid } from "nanoid"; 4 4 import { db } from "@/db/index.js"; 5 5 import { 6 - subscriptions, 6 + automations, 7 7 deliveryLogs, 8 8 type Action, 9 9 type WebhookAction, ··· 17 17 deleteRecord, 18 18 type PdsAction, 19 19 type PdsFetchStep, 20 - } from "@/subscriptions/pds.js"; 21 - import { verifyCallback } from "@/subscriptions/verify.js"; 20 + } from "@/automations/pds.js"; 21 + import { verifyCallback } from "@/automations/verify.js"; 22 22 import { assertPublicUrl, UrlGuardError } from "@/url-guard.js"; 23 23 import { validateTemplate, validateFetchStep } from "@/actions/template.js"; 24 - import { notifySubscriptionChange } from "@/jetstream/consumer.js"; 24 + import { notifyAutomationChange } from "@/jetstream/consumer.js"; 25 25 26 26 type ActionInput = 27 27 | { type: "webhook"; callbackUrl: string; comment?: string } ··· 29 29 30 30 const VALID_OPERATORS = new Set(["eq", "startsWith", "endsWith", "contains"]); 31 31 32 - function findSubscription(did: string, rkey: string) { 33 - return db.query.subscriptions.findFirst({ 34 - where: and(eq(subscriptions.did, did), eq(subscriptions.rkey, rkey)), 32 + function findAutomation(did: string, rkey: string) { 33 + return db.query.automations.findFirst({ 34 + where: and(eq(automations.did, did), eq(automations.rkey, rkey)), 35 35 }); 36 36 } 37 37 ··· 39 39 const user = c.get("user"); 40 40 const rkey = c.req.param("rkey")!; 41 41 42 - const sub = await findSubscription(user.did, rkey); 43 - if (!sub) return c.json({ error: "Subscription not found" }, 404); 42 + const auto = await findAutomation(user.did, rkey); 43 + if (!auto) return c.json({ error: "Automation not found" }, 404); 44 44 45 45 const logs = await db.query.deliveryLogs.findMany({ 46 - where: eq(deliveryLogs.subscriptionUri, sub.uri), 46 + where: eq(deliveryLogs.automationUri, auto.uri), 47 47 orderBy: desc(deliveryLogs.createdAt), 48 48 limit: 50, 49 49 }); 50 50 51 51 return c.json({ 52 - uri: sub.uri, 53 - rkey: sub.rkey, 54 - name: sub.name, 55 - description: sub.description, 56 - lexicon: sub.lexicon, 57 - actions: sub.actions.map((a) => 52 + uri: auto.uri, 53 + rkey: auto.rkey, 54 + name: auto.name, 55 + description: auto.description, 56 + lexicon: auto.lexicon, 57 + actions: auto.actions.map((a) => 58 58 a.$type === "webhook" 59 59 ? { 60 60 $type: a.$type, ··· 64 64 } 65 65 : a, 66 66 ), 67 - fetches: sub.fetches, 68 - conditions: sub.conditions, 69 - active: sub.active, 70 - indexedAt: sub.indexedAt.getTime(), 67 + fetches: auto.fetches, 68 + conditions: auto.conditions, 69 + active: auto.active, 70 + indexedAt: auto.indexedAt.getTime(), 71 71 deliveryLogs: logs.map((l) => ({ 72 72 id: l.id, 73 73 actionIndex: l.actionIndex, ··· 84 84 const user = c.get("user"); 85 85 const rkey = c.req.param("rkey")!; 86 86 87 - const sub = await findSubscription(user.did, rkey); 88 - if (!sub) return c.json({ error: "Subscription not found" }, 404); 87 + const auto = await findAutomation(user.did, rkey); 88 + if (!auto) return c.json({ error: "Automation not found" }, 404); 89 89 90 90 const body = await c.req.json<{ 91 91 name?: string; ··· 97 97 }>(); 98 98 99 99 // Validate name/description if provided 100 - const name = body.name !== undefined ? body.name : sub.name; 100 + const name = body.name !== undefined ? body.name : auto.name; 101 101 if (!name || typeof name !== "string" || !name.trim()) { 102 102 return c.json({ error: "Name is required" }, 400); 103 103 } 104 104 if (name.length > 128) { 105 105 return c.json({ error: "Name must be 128 characters or less" }, 400); 106 106 } 107 - const description = body.description !== undefined ? body.description : sub.description; 107 + const description = body.description !== undefined ? body.description : auto.description; 108 108 if (description && description.length > 1024) { 109 109 return c.json({ error: "Description must be 1024 characters or less" }, 400); 110 110 } ··· 118 118 value: cond.value, 119 119 ...(cond.comment ? { comment: cond.comment } : {}), 120 120 })) 121 - : sub.conditions; 121 + : auto.conditions; 122 122 if (conditions.length > 20) { 123 123 return c.json({ error: "Maximum 20 conditions allowed" }, 400); 124 124 } ··· 127 127 return c.json({ error: `Invalid condition operator: ${cond.operator}` }, 400); 128 128 } 129 129 } 130 - const active = body.active ?? sub.active; 130 + const active = body.active ?? auto.active; 131 131 132 132 // Resolve fetch steps — full replacement when provided 133 - let localFetches = sub.fetches; 133 + let localFetches = auto.fetches; 134 134 let pdsFetches: PdsFetchStep[] | null = null; 135 135 136 136 if (body.fetches) { ··· 152 152 ...(f.comment ? { comment: f.comment } : {}), 153 153 }); 154 154 newPdsFetches.push({ 155 - $type: "app.rglw.subscription#fetchStep", 155 + $type: "run.airglow.automation#fetchStep", 156 156 name: f.name, 157 157 uri: f.uri, 158 158 ...(f.comment ? { comment: f.comment } : {}), ··· 164 164 const fetchNames = localFetches.map((f) => f.name); 165 165 166 166 // Resolve actions — full replacement when provided 167 - let localActions = sub.actions; 167 + let localActions = auto.actions; 168 168 let pdsActions: PdsAction[] | null = null; 169 169 170 170 if (body.actions) { ··· 191 191 } 192 192 193 193 // Verify callback (non-blocking — stores verified status) 194 - const verification = await verifyCallback(input.callbackUrl, sub.lexicon); 194 + const verification = await verifyCallback(input.callbackUrl, auto.lexicon); 195 195 196 196 // Preserve existing secret if callbackUrl unchanged 197 - const existing = sub.actions.find( 197 + const existing = auto.actions.find( 198 198 (a): a is WebhookAction => a.$type === "webhook" && a.callbackUrl === input.callbackUrl, 199 199 ); 200 200 const secret = existing?.secret ?? nanoid(32); ··· 207 207 ...(input.comment ? { comment: input.comment } : {}), 208 208 } satisfies WebhookAction); 209 209 newPdsActions.push({ 210 - $type: "app.rglw.subscription#webhookAction", 210 + $type: "run.airglow.automation#webhookAction", 211 211 callbackUrl: input.callbackUrl, 212 212 ...(input.comment ? { comment: input.comment } : {}), 213 213 }); ··· 233 233 ...(input.comment ? { comment: input.comment } : {}), 234 234 } satisfies RecordAction); 235 235 newPdsActions.push({ 236 - $type: "app.rglw.subscription#recordAction", 236 + $type: "run.airglow.automation#recordAction", 237 237 targetCollection: input.targetCollection, 238 238 recordTemplate: input.recordTemplate, 239 239 ...(input.comment ? { comment: input.comment } : {}), ··· 248 248 } 249 249 250 250 // Re-verify webhook callbacks when reactivating (updates verified status) 251 - if (body.active === true && !sub.active && !body.actions) { 251 + if (body.active === true && !auto.active && !body.actions) { 252 252 localActions = await Promise.all( 253 253 localActions.map(async (action) => { 254 254 if (action.$type !== "webhook") return action; ··· 257 257 } catch { 258 258 return { ...action, verified: false }; 259 259 } 260 - const verification = await verifyCallback(action.callbackUrl, sub.lexicon); 260 + const verification = await verifyCallback(action.callbackUrl, auto.lexicon); 261 261 return { ...action, verified: verification.ok }; 262 262 }), 263 263 ); ··· 265 265 266 266 // Read existing PDS record to preserve createdAt 267 267 const existing = await getRecord(user.did, rkey); 268 - const createdAt = (existing?.createdAt as string) || sub.indexedAt.toISOString(); 268 + const createdAt = (existing?.createdAt as string) || auto.indexedAt.toISOString(); 269 269 270 270 // Build PDS actions from local actions if not already built 271 271 if (!pdsActions) { 272 272 pdsActions = localActions.map((a): PdsAction => { 273 273 if (a.$type === "webhook") { 274 274 return { 275 - $type: "app.rglw.subscription#webhookAction", 275 + $type: "run.airglow.automation#webhookAction", 276 276 callbackUrl: a.callbackUrl, 277 277 ...(a.comment ? { comment: a.comment } : {}), 278 278 }; 279 279 } 280 280 return { 281 - $type: "app.rglw.subscription#recordAction", 281 + $type: "run.airglow.automation#recordAction", 282 282 targetCollection: a.targetCollection, 283 283 recordTemplate: a.recordTemplate, 284 284 ...(a.comment ? { comment: a.comment } : {}), ··· 291 291 await putRecord(user.did, rkey, { 292 292 name: name.trim(), 293 293 description: description?.trim() || undefined, 294 - lexicon: sub.lexicon, 294 + lexicon: auto.lexicon, 295 295 actions: pdsActions, 296 296 fetches: 297 297 pdsFetches ?? 298 298 localFetches.map((f) => ({ 299 - $type: "app.rglw.subscription#fetchStep" as const, 299 + $type: "run.airglow.automation#fetchStep" as const, 300 300 name: f.name, 301 301 uri: f.uri, 302 302 ...(f.comment ? { comment: f.comment } : {}), ··· 307 307 }); 308 308 } catch (err) { 309 309 console.error("Failed to update PDS record:", err); 310 - return c.json({ error: "Failed to update subscription on PDS" }, 502); 310 + return c.json({ error: "Failed to update automation on PDS" }, 502); 311 311 } 312 312 313 313 // Update local index 314 314 const now = new Date(); 315 315 await db 316 - .update(subscriptions) 316 + .update(automations) 317 317 .set({ 318 318 name: name.trim(), 319 319 description: description?.trim() || null, ··· 323 323 active, 324 324 indexedAt: now, 325 325 }) 326 - .where(eq(subscriptions.uri, sub.uri)); 326 + .where(eq(automations.uri, auto.uri)); 327 327 328 - notifySubscriptionChange(); 328 + notifyAutomationChange(); 329 329 return c.json({ ok: true }); 330 330 }); 331 331 ··· 333 333 const user = c.get("user"); 334 334 const rkey = c.req.param("rkey")!; 335 335 336 - const sub = await findSubscription(user.did, rkey); 337 - if (!sub) return c.json({ error: "Subscription not found" }, 404); 336 + const auto = await findAutomation(user.did, rkey); 337 + if (!auto) return c.json({ error: "Automation not found" }, 404); 338 338 339 339 // Delete from PDS 340 340 try { 341 341 await deleteRecord(user.did, rkey); 342 342 } catch (err) { 343 343 console.error("Failed to delete PDS record:", err); 344 - return c.json({ error: "Failed to delete subscription from PDS" }, 502); 344 + return c.json({ error: "Failed to delete automation from PDS" }, 502); 345 345 } 346 346 347 347 // Remove from local index (cascade deletes delivery logs) 348 - await db.delete(subscriptions).where(eq(subscriptions.uri, sub.uri)); 348 + await db.delete(automations).where(eq(automations.uri, auto.uri)); 349 349 350 - notifySubscriptionChange(); 350 + notifyAutomationChange(); 351 351 return c.json({ ok: true }); 352 352 });
app/routes/api/subscriptions/_middleware.ts app/routes/api/automations/_middleware.ts
+56 -56
app/routes/api/subscriptions/index.test.ts app/routes/api/automations/index.test.ts
··· 18 18 }, 19 19 })); 20 20 21 - vi.mock("@/subscriptions/verify.js", () => ({ 21 + vi.mock("@/automations/verify.js", () => ({ 22 22 verifyCallback: vi.fn(), 23 23 })); 24 24 ··· 27 27 UrlGuardError: class extends Error {}, 28 28 })); 29 29 30 - vi.mock("@/subscriptions/pds.js", () => ({ 30 + vi.mock("@/automations/pds.js", () => ({ 31 31 createRecord: vi.fn(), 32 32 deleteRecord: vi.fn(), 33 33 })); 34 34 35 35 vi.mock("@/jetstream/consumer.js", () => ({ 36 - notifySubscriptionChange: vi.fn(), 36 + notifyAutomationChange: vi.fn(), 37 37 })); 38 38 39 39 import { GET, POST } from "./index.js"; 40 - import { verifyCallback } from "@/subscriptions/verify.js"; 40 + import { verifyCallback } from "@/automations/verify.js"; 41 41 import { assertPublicUrl } from "@/url-guard.js"; 42 - import { createRecord, deleteRecord } from "@/subscriptions/pds.js"; 42 + import { createRecord, deleteRecord } from "@/automations/pds.js"; 43 43 import { db } from "@/db/index.js"; 44 - import { subscriptions } from "@/db/schema.js"; 44 + import { automations } from "@/db/schema.js"; 45 45 46 46 const mockVerify = vi.mocked(verifyCallback); 47 47 const mockAssertPublicUrl = vi.mocked(assertPublicUrl); ··· 56 56 c.set("user", TEST_USER); 57 57 return next(); 58 58 }); 59 - app.get("/api/subscriptions", ...GET); 60 - app.post("/api/subscriptions", ...POST); 59 + app.get("/api/automations", ...GET); 60 + app.post("/api/automations", ...POST); 61 61 return app; 62 62 } 63 63 ··· 69 69 }); 70 70 } 71 71 72 - describe("POST /api/subscriptions", () => { 72 + describe("POST /api/automations", () => { 73 73 let app: ReturnType<typeof createTestApp>; 74 74 75 75 beforeEach(async () => { ··· 78 78 mockCreateRecord.mockReset(); 79 79 mockDeleteRecord.mockReset(); 80 80 81 - await db.delete(subscriptions); 81 + await db.delete(automations); 82 82 83 83 mockVerify.mockResolvedValue({ ok: true }); 84 84 mockCreateRecord.mockResolvedValue({ 85 - uri: "at://did:plc:testuser/app.rglw.subscription/tid1", 85 + uri: "at://did:plc:testuser/run.airglow.automation/tid1", 86 86 rkey: "tid1", 87 87 }); 88 88 }); 89 89 90 - it("creates a webhook subscription and returns 201", async () => { 90 + it("creates a webhook automation and returns 201", async () => { 91 91 const res = await app.request( 92 - jsonReq("/api/subscriptions", { 93 - name: "My Sub", 92 + jsonReq("/api/automations", { 93 + name: "My Auto", 94 94 lexicon: "app.bsky.feed.like", 95 95 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], 96 96 }), ··· 98 98 99 99 expect(res.status).toBe(201); 100 100 const body = await res.json(); 101 - expect(body.uri).toContain("app.rglw.subscription"); 101 + expect(body.uri).toContain("run.airglow.automation"); 102 102 expect(body.rkey).toBe("tid1"); 103 103 expect(body.actions[0].type).toBe("webhook"); 104 104 expect(body.actions[0].secret).toBeDefined(); 105 105 }); 106 106 107 - it("creates a record action subscription", async () => { 107 + it("creates a record action automation", async () => { 108 108 const res = await app.request( 109 - jsonReq("/api/subscriptions", { 110 - name: "Record Sub", 109 + jsonReq("/api/automations", { 110 + name: "Record Auto", 111 111 lexicon: "app.bsky.feed.like", 112 112 actions: [ 113 113 { ··· 124 124 125 125 it("returns 400 for missing name", async () => { 126 126 const res = await app.request( 127 - jsonReq("/api/subscriptions", { 127 + jsonReq("/api/automations", { 128 128 name: "", 129 129 lexicon: "app.bsky.feed.like", 130 130 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], ··· 135 135 136 136 it("returns 400 for name too long", async () => { 137 137 const res = await app.request( 138 - jsonReq("/api/subscriptions", { 138 + jsonReq("/api/automations", { 139 139 name: "a".repeat(129), 140 140 lexicon: "app.bsky.feed.like", 141 141 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], ··· 146 146 147 147 it("returns 400 for description too long", async () => { 148 148 const res = await app.request( 149 - jsonReq("/api/subscriptions", { 149 + jsonReq("/api/automations", { 150 150 name: "Test", 151 151 description: "x".repeat(1025), 152 152 lexicon: "app.bsky.feed.like", ··· 158 158 159 159 it("returns 400 for invalid lexicon NSID", async () => { 160 160 const res = await app.request( 161 - jsonReq("/api/subscriptions", { 161 + jsonReq("/api/automations", { 162 162 name: "Test", 163 163 lexicon: "not-valid", 164 164 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], ··· 169 169 170 170 it("returns 403 for blocked lexicon", async () => { 171 171 const res = await app.request( 172 - jsonReq("/api/subscriptions", { 172 + jsonReq("/api/automations", { 173 173 name: "Test", 174 174 lexicon: "blocked.nsid.something", 175 175 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], ··· 180 180 181 181 it("returns 400 for no actions", async () => { 182 182 const res = await app.request( 183 - jsonReq("/api/subscriptions", { 183 + jsonReq("/api/automations", { 184 184 name: "Test", 185 185 lexicon: "app.bsky.feed.like", 186 186 actions: [], ··· 191 191 192 192 it("returns 400 for too many actions", async () => { 193 193 const res = await app.request( 194 - jsonReq("/api/subscriptions", { 194 + jsonReq("/api/automations", { 195 195 name: "Test", 196 196 lexicon: "app.bsky.feed.like", 197 197 actions: Array.from({ length: 11 }, () => ({ ··· 207 207 mockAssertPublicUrl.mockRejectedValueOnce(new Error("Invalid URL")); 208 208 209 209 const res = await app.request( 210 - jsonReq("/api/subscriptions", { 210 + jsonReq("/api/automations", { 211 211 name: "Test", 212 212 lexicon: "app.bsky.feed.like", 213 213 actions: [{ type: "webhook", callbackUrl: "not-a-url" }], ··· 223 223 ); 224 224 225 225 const res = await app.request( 226 - jsonReq("/api/subscriptions", { 226 + jsonReq("/api/automations", { 227 227 name: "Test", 228 228 lexicon: "app.bsky.feed.like", 229 229 actions: [{ type: "webhook", callbackUrl: "https://127.0.0.1/hook" }], ··· 234 234 expect(body.error).toContain("private network"); 235 235 }); 236 236 237 - it("creates subscription with verified=false when verification fails", async () => { 237 + it("creates automation with verified=false when verification fails", async () => { 238 238 mockVerify.mockResolvedValueOnce({ ok: false, error: "Callback failed" }); 239 239 240 240 const res = await app.request( 241 - jsonReq("/api/subscriptions", { 241 + jsonReq("/api/automations", { 242 242 name: "Test", 243 243 lexicon: "app.bsky.feed.like", 244 244 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], ··· 246 246 ); 247 247 expect(res.status).toBe(201); 248 248 249 - const sub = await db.query.subscriptions.findFirst(); 250 - const action = sub!.actions[0]! as { verified: boolean }; 249 + const auto = await db.query.automations.findFirst(); 250 + const action = auto!.actions[0]! as { verified: boolean }; 251 251 expect(action.verified).toBe(false); 252 252 }); 253 253 254 - it("creates subscription with verified=true when verification passes", async () => { 254 + it("creates automation with verified=true when verification passes", async () => { 255 255 const res = await app.request( 256 - jsonReq("/api/subscriptions", { 256 + jsonReq("/api/automations", { 257 257 name: "Test", 258 258 lexicon: "app.bsky.feed.like", 259 259 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], ··· 261 261 ); 262 262 expect(res.status).toBe(201); 263 263 264 - const sub = await db.query.subscriptions.findFirst(); 265 - const action = sub!.actions[0]! as { verified: boolean }; 264 + const auto = await db.query.automations.findFirst(); 265 + const action = auto!.actions[0]! as { verified: boolean }; 266 266 expect(action.verified).toBe(true); 267 267 }); 268 268 269 269 it("returns 400 for invalid record template", async () => { 270 270 const res = await app.request( 271 - jsonReq("/api/subscriptions", { 271 + jsonReq("/api/automations", { 272 272 name: "Test", 273 273 lexicon: "app.bsky.feed.like", 274 274 actions: [ ··· 281 281 282 282 it("returns 400 for invalid condition operator", async () => { 283 283 const res = await app.request( 284 - jsonReq("/api/subscriptions", { 284 + jsonReq("/api/automations", { 285 285 name: "Test", 286 286 lexicon: "app.bsky.feed.like", 287 287 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], ··· 293 293 294 294 it("returns 400 for too many conditions", async () => { 295 295 const res = await app.request( 296 - jsonReq("/api/subscriptions", { 296 + jsonReq("/api/automations", { 297 297 name: "Test", 298 298 lexicon: "app.bsky.feed.like", 299 299 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], ··· 309 309 310 310 it("returns 400 for too many fetch steps", async () => { 311 311 const res = await app.request( 312 - jsonReq("/api/subscriptions", { 312 + jsonReq("/api/automations", { 313 313 name: "Test", 314 314 lexicon: "app.bsky.feed.like", 315 315 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], ··· 326 326 mockCreateRecord.mockRejectedValueOnce(new Error("PDS down")); 327 327 328 328 const res = await app.request( 329 - jsonReq("/api/subscriptions", { 329 + jsonReq("/api/automations", { 330 330 name: "Test", 331 331 lexicon: "app.bsky.feed.like", 332 332 actions: [{ type: "webhook", callbackUrl: "https://example.com/hook" }], ··· 337 337 338 338 it("returns 400 for invalid action type", async () => { 339 339 const res = await app.request( 340 - jsonReq("/api/subscriptions", { 340 + jsonReq("/api/automations", { 341 341 name: "Test", 342 342 lexicon: "app.bsky.feed.like", 343 343 actions: [{ type: "invalid" }], ··· 347 347 }); 348 348 }); 349 349 350 - describe("GET /api/subscriptions", () => { 350 + describe("GET /api/automations", () => { 351 351 let app: ReturnType<typeof createTestApp>; 352 352 353 353 beforeEach(async () => { 354 354 app = createTestApp(); 355 - await db.delete(subscriptions); 355 + await db.delete(automations); 356 356 }); 357 357 358 - it("returns user subscriptions", async () => { 359 - await db.insert(subscriptions).values({ 360 - uri: "at://did:plc:testuser/app.rglw.subscription/rk1", 358 + it("returns user automations", async () => { 359 + await db.insert(automations).values({ 360 + uri: "at://did:plc:testuser/run.airglow.automation/rk1", 361 361 did: "did:plc:testuser", 362 362 rkey: "rk1", 363 - name: "Sub 1", 363 + name: "Auto 1", 364 364 description: null, 365 365 lexicon: "app.bsky.feed.like", 366 366 actions: [{ $type: "webhook", callbackUrl: "https://x.com/h", secret: "s" }], ··· 370 370 indexedAt: new Date(), 371 371 }); 372 372 373 - const res = await app.request("http://localhost/api/subscriptions"); 373 + const res = await app.request("http://localhost/api/automations"); 374 374 expect(res.status).toBe(200); 375 375 const body = await res.json(); 376 376 expect(body).toHaveLength(1); 377 - expect(body[0].name).toBe("Sub 1"); 377 + expect(body[0].name).toBe("Auto 1"); 378 378 expect(body[0].rkey).toBe("rk1"); 379 379 }); 380 380 381 - it("returns empty array when no subscriptions", async () => { 382 - const res = await app.request("http://localhost/api/subscriptions"); 381 + it("returns empty array when no automations", async () => { 382 + const res = await app.request("http://localhost/api/automations"); 383 383 expect(res.status).toBe(200); 384 384 const body = await res.json(); 385 385 expect(body).toEqual([]); 386 386 }); 387 387 388 - it("does not return other users' subscriptions", async () => { 389 - await db.insert(subscriptions).values({ 390 - uri: "at://did:plc:other/app.rglw.subscription/rk1", 388 + it("does not return other users' automations", async () => { 389 + await db.insert(automations).values({ 390 + uri: "at://did:plc:other/run.airglow.automation/rk1", 391 391 did: "did:plc:other", 392 392 rkey: "rk1", 393 - name: "Other Sub", 393 + name: "Other Auto", 394 394 description: null, 395 395 lexicon: "app.bsky.feed.like", 396 396 actions: [], ··· 400 400 indexedAt: new Date(), 401 401 }); 402 402 403 - const res = await app.request("http://localhost/api/subscriptions"); 403 + const res = await app.request("http://localhost/api/automations"); 404 404 const body = await res.json(); 405 405 expect(body).toEqual([]); 406 406 });
+15 -15
app/routes/api/subscriptions/index.ts app/routes/api/automations/index.ts
··· 3 3 import { nanoid } from "nanoid"; 4 4 import { db } from "@/db/index.js"; 5 5 import { 6 - subscriptions, 6 + automations, 7 7 type Action, 8 8 type WebhookAction, 9 9 type RecordAction, ··· 11 11 } from "@/db/schema.js"; 12 12 import { config } from "@/config.js"; 13 13 import { isValidNsid, isNsidAllowed } from "@/lexicons/resolver.js"; 14 - import { verifyCallback } from "@/subscriptions/verify.js"; 14 + import { verifyCallback } from "@/automations/verify.js"; 15 15 import { assertPublicUrl, UrlGuardError } from "@/url-guard.js"; 16 16 import { 17 17 createRecord, 18 18 deleteRecord, 19 19 type PdsAction, 20 20 type PdsFetchStep, 21 - } from "@/subscriptions/pds.js"; 21 + } from "@/automations/pds.js"; 22 22 import { validateTemplate, validateFetchStep } from "@/actions/template.js"; 23 - import { notifySubscriptionChange } from "@/jetstream/consumer.js"; 23 + import { notifyAutomationChange } from "@/jetstream/consumer.js"; 24 24 25 25 type ActionInput = 26 26 | { type: "webhook"; callbackUrl: string; comment?: string } ··· 30 30 31 31 export const GET = createRoute(async (c) => { 32 32 const user = c.get("user"); 33 - const rows = await db.query.subscriptions.findMany({ 34 - where: eq(subscriptions.did, user.did), 33 + const rows = await db.query.automations.findMany({ 34 + where: eq(automations.did, user.did), 35 35 }); 36 36 return c.json( 37 37 rows.map((r) => ({ ··· 98 98 } 99 99 100 100 // Normalize and validate conditions 101 - // {{self}} is kept as-is — resolved at match time to the subscription owner's DID 101 + // {{self}} is kept as-is — resolved at match time to the automation owner's DID 102 102 const conditions = (body.conditions ?? []) 103 103 .filter((cond) => cond.field && cond.value) 104 104 .map((cond) => ({ ··· 133 133 seenNames.add(f.name); 134 134 localFetches.push({ name: f.name, uri: f.uri, ...(f.comment ? { comment: f.comment } : {}) }); 135 135 pdsFetches.push({ 136 - $type: "app.rglw.subscription#fetchStep", 136 + $type: "run.airglow.automation#fetchStep", 137 137 name: f.name, 138 138 uri: f.uri, 139 139 ...(f.comment ? { comment: f.comment } : {}), ··· 169 169 ...(input.comment ? { comment: input.comment } : {}), 170 170 } satisfies WebhookAction); 171 171 pdsActions.push({ 172 - $type: "app.rglw.subscription#webhookAction", 172 + $type: "run.airglow.automation#webhookAction", 173 173 callbackUrl: input.callbackUrl, 174 174 ...(input.comment ? { comment: input.comment } : {}), 175 175 }); ··· 195 195 ...(input.comment ? { comment: input.comment } : {}), 196 196 } satisfies RecordAction); 197 197 pdsActions.push({ 198 - $type: "app.rglw.subscription#recordAction", 198 + $type: "run.airglow.automation#recordAction", 199 199 targetCollection: input.targetCollection, 200 200 recordTemplate: input.recordTemplate, 201 201 ...(input.comment ? { comment: input.comment } : {}), ··· 226 226 rkey = result.rkey; 227 227 } catch (err) { 228 228 console.error("Failed to create PDS record:", err); 229 - return c.json({ error: "Failed to write subscription to PDS" }, 502); 229 + return c.json({ error: "Failed to write automation to PDS" }, 502); 230 230 } 231 231 232 232 // Index locally 233 233 try { 234 - await db.insert(subscriptions).values({ 234 + await db.insert(automations).values({ 235 235 uri, 236 236 did: user.did, 237 237 rkey, ··· 251 251 } catch { 252 252 /* best-effort */ 253 253 } 254 - console.error("Failed to index subscription locally:", err); 255 - return c.json({ error: "Failed to save subscription" }, 500); 254 + console.error("Failed to index automation locally:", err); 255 + return c.json({ error: "Failed to save automation" }, 500); 256 256 } 257 257 258 - notifySubscriptionChange(); 258 + notifyAutomationChange(); 259 259 260 260 // Return secrets for webhook actions so the user can copy them 261 261 const actionSecrets = localActions.map((a, i) => ({
+19 -19
app/routes/dashboard/index.tsx
··· 1 1 import { createRoute } from "honox/factory"; 2 2 import { eq } from "drizzle-orm"; 3 3 import { db } from "@/db/index.js"; 4 - import { subscriptions } from "@/db/schema.js"; 4 + import { automations } from "@/db/schema.js"; 5 5 import { AppShell } from "../../components/Layout/AppShell/index.js"; 6 6 import { Header } from "../../components/Layout/Header/index.js"; 7 7 import { Container } from "../../components/Layout/Container/index.js"; ··· 16 16 17 17 export default createRoute(async (c) => { 18 18 const user = c.get("user"); 19 - const subs = await db.query.subscriptions.findMany({ 20 - where: eq(subscriptions.did, user.did), 19 + const autos = await db.query.automations.findMany({ 20 + where: eq(automations.did, user.did), 21 21 }); 22 22 23 23 return c.render( 24 24 <AppShell header={<Header user={user} actions={<ThemeToggle />} />}> 25 25 <Container> 26 26 <PageHeader 27 - title="Subscriptions" 28 - description={`${subs.length} subscription${subs.length !== 1 ? "s" : ""}`} 27 + title="Automations" 28 + description={`${autos.length} automation${autos.length !== 1 ? "s" : ""}`} 29 29 actions={ 30 - <Button href="/dashboard/subscriptions/new" size="sm"> 31 - New Subscription 30 + <Button href="/dashboard/automations/new" size="sm"> 31 + New Automation 32 32 </Button> 33 33 } 34 34 /> 35 35 36 - {subs.length === 0 ? ( 36 + {autos.length === 0 ? ( 37 37 <Card variant="flat"> 38 38 <div class={centerTextSm}> 39 - <p>No subscriptions yet.</p> 39 + <p>No automations yet.</p> 40 40 <p> 41 - <Button href="/dashboard/subscriptions/new" variant="secondary" size="sm"> 42 - Create your first subscription 41 + <Button href="/dashboard/automations/new" variant="secondary" size="sm"> 42 + Create your first automation 43 43 </Button> 44 44 </p> 45 45 </div> ··· 56 56 </tr> 57 57 </thead> 58 58 <tbody> 59 - {subs.map((sub) => ( 60 - <tr key={sub.uri}> 59 + {autos.map((auto) => ( 60 + <tr key={auto.uri}> 61 61 <td> 62 - <a href={`/dashboard/subscriptions/${sub.rkey}`}>{sub.name}</a> 62 + <a href={`/dashboard/automations/${auto.rkey}`}>{auto.name}</a> 63 63 </td> 64 64 <td> 65 - <InlineCode>{sub.lexicon}</InlineCode> 65 + <InlineCode>{auto.lexicon}</InlineCode> 66 66 </td> 67 67 <td> 68 - {sub.actions.length} action{sub.actions.length !== 1 ? "s" : ""} 68 + {auto.actions.length} action{auto.actions.length !== 1 ? "s" : ""} 69 69 </td> 70 70 <td> 71 - <Badge variant={sub.active ? "success" : "neutral"}> 72 - {sub.active ? "Active" : "Inactive"} 71 + <Badge variant={auto.active ? "success" : "neutral"}> 72 + {auto.active ? "Active" : "Inactive"} 73 73 </Badge> 74 74 </td> 75 75 <td> 76 - <Button href={`/dashboard/subscriptions/${sub.rkey}`} variant="ghost" size="sm"> 76 + <Button href={`/dashboard/automations/${auto.rkey}`} variant="ghost" size="sm"> 77 77 View 78 78 </Button> 79 79 </td>
+24 -24
app/routes/dashboard/subscriptions/[rkey].tsx app/routes/dashboard/automations/[rkey].tsx
··· 1 1 import { createRoute } from "honox/factory"; 2 2 import { eq, and, desc } from "drizzle-orm"; 3 3 import { db } from "@/db/index.js"; 4 - import { subscriptions, deliveryLogs } from "@/db/schema.js"; 4 + import { automations, deliveryLogs } from "@/db/schema.js"; 5 5 import { AppShell } from "../../../components/Layout/AppShell/index.js"; 6 6 import { Header } from "../../../components/Layout/Header/index.js"; 7 7 import { Container } from "../../../components/Layout/Container/index.js"; ··· 20 20 const user = c.get("user"); 21 21 const rkey = c.req.param("rkey")!; 22 22 23 - const sub = await db.query.subscriptions.findFirst({ 24 - where: and(eq(subscriptions.did, user.did), eq(subscriptions.rkey, rkey)), 23 + const auto = await db.query.automations.findFirst({ 24 + where: and(eq(automations.did, user.did), eq(automations.rkey, rkey)), 25 25 }); 26 - if (!sub) { 26 + if (!auto) { 27 27 c.status(404); 28 28 return c.render( 29 29 <AppShell header={<Header user={user} actions={<ThemeToggle />} />}> ··· 36 36 </Button> 37 37 } 38 38 /> 39 - <p>This subscription does not exist.</p> 39 + <p>This automation does not exist.</p> 40 40 </Container> 41 41 </AppShell>, 42 42 { title: "Not Found — Airglow" }, ··· 44 44 } 45 45 46 46 const logs = await db.query.deliveryLogs.findMany({ 47 - where: eq(deliveryLogs.subscriptionUri, sub.uri), 47 + where: eq(deliveryLogs.automationUri, auto.uri), 48 48 orderBy: desc(deliveryLogs.createdAt), 49 49 limit: 50, 50 50 }); ··· 53 53 <AppShell header={<Header user={user} actions={<ThemeToggle />} />}> 54 54 <Container> 55 55 <PageHeader 56 - title={sub.name} 57 - description={sub.description ?? undefined} 56 + title={auto.name} 57 + description={auto.description ?? undefined} 58 58 actions={ 59 59 <div class={inlineCluster}> 60 - <Badge variant={sub.active ? "success" : "neutral"}> 61 - {sub.active ? "Active" : "Inactive"} 60 + <Badge variant={auto.active ? "success" : "neutral"}> 61 + {auto.active ? "Active" : "Inactive"} 62 62 </Badge> 63 - <Button href={`/dashboard/subscriptions/${rkey}/edit`} variant="secondary" size="sm"> 63 + <Button href={`/dashboard/automations/${rkey}/edit`} variant="secondary" size="sm"> 64 64 Edit 65 65 </Button> 66 66 <Button href="/dashboard" variant="ghost" size="sm"> ··· 75 75 <DescriptionList> 76 76 <dt>Lexicon</dt> 77 77 <dd> 78 - <InlineCode>{sub.lexicon}</InlineCode> 78 + <InlineCode>{auto.lexicon}</InlineCode> 79 79 </dd> 80 80 <dt>Status</dt> 81 81 <dd> 82 - <Badge variant={sub.active ? "success" : "neutral"}> 83 - {sub.active ? "Active" : "Inactive"} 82 + <Badge variant={auto.active ? "success" : "neutral"}> 83 + {auto.active ? "Active" : "Inactive"} 84 84 </Badge> 85 85 </dd> 86 86 <dt>AT URI</dt> 87 87 <dd> 88 - <InlineCode>{sub.uri}</InlineCode> 88 + <InlineCode>{auto.uri}</InlineCode> 89 89 </dd> 90 90 </DescriptionList> 91 91 </Card> 92 92 93 - {sub.conditions.length > 0 && ( 93 + {auto.conditions.length > 0 && ( 94 94 <Card variant="flat"> 95 95 <Stack gap={3}> 96 96 <h3>Conditions</h3> 97 97 <ul class={plainList}> 98 - {sub.conditions.map((cond, i) => { 98 + {auto.conditions.map((cond, i) => { 99 99 const opLabels: Record<string, string> = { 100 100 eq: "equals", 101 101 startsWith: "starts with", ··· 116 116 </Card> 117 117 )} 118 118 119 - {sub.fetches.length > 0 && ( 119 + {auto.fetches.length > 0 && ( 120 120 <Card variant="flat"> 121 121 <Stack gap={3}> 122 122 <h3>Data Sources</h3> 123 123 <ul class={plainList}> 124 - {sub.fetches.map((f, i) => ( 124 + {auto.fetches.map((f, i) => ( 125 125 <li key={i}> 126 126 <InlineCode>{f.name}</InlineCode> &larr; <InlineCode>{f.uri}</InlineCode> 127 127 {f.comment && <span class={textMuted}> — {f.comment}</span>} ··· 133 133 )} 134 134 135 135 <Stack gap={3}> 136 - <h3>Actions ({sub.actions.length})</h3> 137 - {sub.actions.map((action, i) => ( 136 + <h3>Actions ({auto.actions.length})</h3> 137 + {auto.actions.map((action, i) => ( 138 138 <Card key={i} variant="flat"> 139 139 <Stack gap={2}> 140 140 <h4> ··· 180 180 </Stack> 181 181 182 182 <DeliveryLog 183 - rkey={sub.rkey} 184 - active={sub.active} 183 + rkey={auto.rkey} 184 + active={auto.active} 185 185 initialLogs={logs.map((l) => ({ 186 186 id: l.id, 187 187 actionIndex: l.actionIndex, ··· 195 195 </Stack> 196 196 </Container> 197 197 </AppShell>, 198 - { title: `${sub.name} — Airglow` }, 198 + { title: `${auto.name} — Airglow` }, 199 199 ); 200 200 });
+18 -18
app/routes/dashboard/subscriptions/[rkey]/edit.tsx app/routes/dashboard/automations/[rkey]/edit.tsx
··· 1 1 import { createRoute } from "honox/factory"; 2 2 import { eq, and } from "drizzle-orm"; 3 3 import { db } from "@/db/index.js"; 4 - import { subscriptions } from "@/db/schema.js"; 4 + import { automations } from "@/db/schema.js"; 5 5 import { AppShell } from "../../../../components/Layout/AppShell/index.js"; 6 6 import { Header } from "../../../../components/Layout/Header/index.js"; 7 7 import { Container } from "../../../../components/Layout/Container/index.js"; ··· 9 9 import { Card } from "../../../../components/Card/index.js"; 10 10 import { Button } from "../../../../components/Button/index.js"; 11 11 import ThemeToggle from "../../../../islands/ThemeToggle.js"; 12 - import SubscriptionForm from "../../../../islands/SubscriptionForm.js"; 12 + import AutomationForm from "../../../../islands/AutomationForm.js"; 13 13 14 14 export default createRoute(async (c) => { 15 15 const user = c.get("user"); 16 16 const rkey = c.req.param("rkey")!; 17 17 18 - const sub = await db.query.subscriptions.findFirst({ 19 - where: and(eq(subscriptions.did, user.did), eq(subscriptions.rkey, rkey)), 18 + const auto = await db.query.automations.findFirst({ 19 + where: and(eq(automations.did, user.did), eq(automations.rkey, rkey)), 20 20 }); 21 21 22 - if (!sub) { 22 + if (!auto) { 23 23 c.status(404); 24 24 return c.render( 25 25 <AppShell header={<Header user={user} actions={<ThemeToggle />} />}> ··· 32 32 </Button> 33 33 } 34 34 /> 35 - <p>This subscription does not exist.</p> 35 + <p>This automation does not exist.</p> 36 36 </Container> 37 37 </AppShell>, 38 38 { title: "Not Found — Airglow" }, ··· 43 43 <AppShell header={<Header user={user} actions={<ThemeToggle />} />}> 44 44 <Container> 45 45 <PageHeader 46 - title={`Edit: ${sub.name}`} 46 + title={`Edit: ${auto.name}`} 47 47 actions={ 48 - <Button href={`/dashboard/subscriptions/${rkey}`} variant="ghost" size="sm"> 48 + <Button href={`/dashboard/automations/${rkey}`} variant="ghost" size="sm"> 49 49 &larr; Back 50 50 </Button> 51 51 } 52 52 /> 53 53 <Card variant="flat"> 54 - <SubscriptionForm 54 + <AutomationForm 55 55 initial={{ 56 - rkey: sub.rkey, 57 - name: sub.name, 58 - description: sub.description, 59 - lexicon: sub.lexicon, 60 - actions: sub.actions, 61 - fetches: sub.fetches, 62 - conditions: sub.conditions, 63 - active: sub.active, 56 + rkey: auto.rkey, 57 + name: auto.name, 58 + description: auto.description, 59 + lexicon: auto.lexicon, 60 + actions: auto.actions, 61 + fetches: auto.fetches, 62 + conditions: auto.conditions, 63 + active: auto.active, 64 64 }} 65 65 /> 66 66 </Card> 67 67 </Container> 68 68 </AppShell>, 69 - { title: `Edit ${sub.name} — Airglow` }, 69 + { title: `Edit ${auto.name} — Airglow` }, 70 70 ); 71 71 });
+4 -4
app/routes/dashboard/subscriptions/new.tsx app/routes/dashboard/automations/new.tsx
··· 6 6 import { Card } from "../../../components/Card/index.js"; 7 7 import { Button } from "../../../components/Button/index.js"; 8 8 import ThemeToggle from "../../../islands/ThemeToggle.js"; 9 - import SubscriptionForm from "../../../islands/SubscriptionForm.js"; 9 + import AutomationForm from "../../../islands/AutomationForm.js"; 10 10 11 11 export default createRoute((c) => { 12 12 const user = c.get("user"); ··· 15 15 <AppShell header={<Header user={user} actions={<ThemeToggle />} />}> 16 16 <Container> 17 17 <PageHeader 18 - title="New Subscription" 18 + title="New Automation" 19 19 actions={ 20 20 <Button href="/dashboard" variant="ghost" size="sm"> 21 21 &larr; Back ··· 23 23 } 24 24 /> 25 25 <Card variant="flat"> 26 - <SubscriptionForm /> 26 + <AutomationForm /> 27 27 </Card> 28 28 </Container> 29 29 </AppShell>, 30 - { title: "New Subscription — Airglow" }, 30 + { title: "New Automation — Airglow" }, 31 31 ); 32 32 });
+18 -17
app/routes/index.tsx
··· 14 14 <AppShell header={<Header user={user} actions={<ThemeToggle />} />}> 15 15 <Container> 16 16 <section class={s.hero}> 17 - <h1 class={s.heroTitle}>Webhooks for the AT Protocol</h1> 17 + <h1 class={s.heroTitle}>Automations for the AT Protocol</h1> 18 18 <p class={s.heroSubtitle}> 19 - Subscribe to events across the AT Protocol network and receive real-time webhook 20 - deliveries. Filter by lexicon, match conditions, and track every delivery. 19 + Listen to events across the AT Protocol network and trigger actions automatically. 20 + Filter by lexicon, deliver webhooks, create records on your PDS, and track every run. 21 21 </p> 22 22 {user ? ( 23 23 <Button href="/dashboard" size="lg"> ··· 32 32 33 33 <section class={s.features}> 34 34 <div class={s.featureCard}> 35 - <h3 class={s.featureTitle}>Real-time Webhooks</h3> 35 + <h3 class={s.featureTitle}>Webhook Delivery</h3> 36 36 <p class={s.featureDesc}> 37 37 Receive HTTP POST callbacks instantly when matching events occur on the AT Protocol 38 38 network via Jetstream. 39 39 </p> 40 40 </div> 41 41 <div class={s.featureCard}> 42 - <h3 class={s.featureTitle}>Lexicon Filtering</h3> 42 + <h3 class={s.featureTitle}>Record Creation</h3> 43 43 <p class={s.featureDesc}> 44 - Subscribe to specific record types by NSID. Add field-level conditions to match 45 - exactly the events you need. 44 + Automatically create records on your PDS when events match. Use templates with 45 + placeholders to build records from event data. 46 46 </p> 47 47 </div> 48 48 <div class={s.featureCard}> 49 - <h3 class={s.featureTitle}>Delivery Tracking</h3> 49 + <h3 class={s.featureTitle}>Smart Filtering</h3> 50 50 <p class={s.featureDesc}> 51 - Full delivery log with status codes, retry attempts, and error details. Know exactly 52 - what happened with every event. 51 + Listen to specific record types by NSID. Add field-level conditions with operators 52 + like equals, starts with, or contains. 53 53 </p> 54 54 </div> 55 55 <div class={s.featureCard}> 56 - <h3 class={s.featureTitle}>HMAC Signing</h3> 56 + <h3 class={s.featureTitle}>Delivery Tracking</h3> 57 57 <p class={s.featureDesc}> 58 - Every webhook is signed with a per-subscription HMAC secret so your callback can 59 - verify authenticity. 58 + Full delivery log with status codes, retry attempts, and error details. Know exactly 59 + what happened with every event. 60 60 </p> 61 61 </div> 62 62 </section> ··· 71 71 </li> 72 72 <li class={s.step}> 73 73 <div class={s.stepNumber}>2</div> 74 - <h3 class={s.stepTitle}>Subscribe</h3> 74 + <h3 class={s.stepTitle}>Automate</h3> 75 75 <p class={s.stepDesc}> 76 - Choose a lexicon, set conditions, and provide your callback URL. 76 + Choose a lexicon, set conditions, and add actions like webhooks or record creation. 77 77 </p> 78 78 </li> 79 79 <li class={s.step}> 80 80 <div class={s.stepNumber}>3</div> 81 81 <h3 class={s.stepTitle}>Receive</h3> 82 82 <p class={s.stepDesc}> 83 - Get signed webhook deliveries in real time with automatic retries. 83 + Get signed webhook deliveries and automatic record creation in real time with 84 + retries. 84 85 </p> 85 86 </li> 86 87 </ol> 87 88 </section> 88 89 </Container> 89 90 </AppShell>, 90 - { title: "Airglow — Webhooks for the AT Protocol" }, 91 + { title: "Airglow — Automations for the AT Protocol" }, 91 92 ); 92 93 });
+2 -2
app/server.ts
··· 26 26 }), 27 27 ); 28 28 29 - // Rate limiting on auth and subscription-creation endpoints 29 + // Rate limiting on auth and automation-creation endpoints 30 30 const authLimiter = rateLimit({ windowMs: 60_000, max: 10 }); 31 31 const apiLimiter = rateLimit({ windowMs: 60_000, max: 30 }); 32 32 33 33 app.use("/auth/*", authLimiter); 34 - app.use("/api/subscriptions", apiLimiter); 34 + app.use("/api/automations", apiLimiter); 35 35 36 36 // Start Jetstream consumer — routes matched events to action handlers 37 37 startJetstream(handleMatchedEvent);
+7 -7
lexicons/app/rglw/subscription.json lexicons/run/airglow/automation.json
··· 1 1 { 2 2 "lexicon": 1, 3 - "id": "app.rglw.subscription", 3 + "id": "run.airglow.automation", 4 4 "defs": { 5 5 "main": { 6 6 "type": "record", 7 - "description": "A subscription that triggers actions when matching AT Protocol events occur.", 7 + "description": "An automation that triggers actions when matching AT Protocol events occur.", 8 8 "key": "tid", 9 9 "record": { 10 10 "type": "object", ··· 12 12 "properties": { 13 13 "name": { 14 14 "type": "string", 15 - "description": "User-defined name for this subscription.", 15 + "description": "User-defined name for this automation.", 16 16 "maxLength": 128 17 17 }, 18 18 "description": { 19 19 "type": "string", 20 - "description": "Optional description of what this subscription does.", 20 + "description": "Optional description of what this automation does.", 21 21 "maxLength": 1024 22 22 }, 23 23 "lexicon": { 24 24 "type": "string", 25 - "description": "NSID of the collection to subscribe to.", 25 + "description": "NSID of the collection to listen to.", 26 26 "maxLength": 256 27 27 }, 28 28 "actions": { ··· 55 55 }, 56 56 "active": { 57 57 "type": "boolean", 58 - "description": "Whether this subscription is currently active.", 58 + "description": "Whether this automation is currently active.", 59 59 "default": true 60 60 }, 61 61 "createdAt": { ··· 85 85 }, 86 86 "recordAction": { 87 87 "type": "object", 88 - "description": "Create a record on the subscriber's PDS when a matching event occurs.", 88 + "description": "Create a record on the user's PDS when a matching event occurs.", 89 89 "required": ["targetCollection", "recordTemplate"], 90 90 "properties": { 91 91 "targetCollection": {
+15 -15
lib/actions/executor.test.ts
··· 5 5 return { db: createTestDb() }; 6 6 }); 7 7 8 - vi.mock("@/subscriptions/pds.js", () => ({ 8 + vi.mock("@/automations/pds.js", () => ({ 9 9 createArbitraryRecord: vi.fn(), 10 10 })); 11 11 12 12 import { executeAction } from "./executor.js"; 13 - import { createArbitraryRecord } from "../subscriptions/pds.js"; 13 + import { createArbitraryRecord } from "../automations/pds.js"; 14 14 import { db } from "../db/index.js"; 15 - import { subscriptions, deliveryLogs } from "../db/schema.js"; 16 - import { makeMatch, makeRecordAction, makeSubscription } from "../test/fixtures.js"; 15 + import { automations, deliveryLogs } from "../db/schema.js"; 16 + import { makeMatch, makeRecordAction, makeAutomation } from "../test/fixtures.js"; 17 17 18 18 const mockCreateRecord = vi.mocked(createArbitraryRecord); 19 19 ··· 24 24 mockCreateRecord.mockReset(); 25 25 26 26 await db.delete(deliveryLogs); 27 - await db.delete(subscriptions); 28 - await db.insert(subscriptions).values(makeSubscription()); 27 + await db.delete(automations); 28 + await db.insert(automations).values(makeAutomation()); 29 29 }); 30 30 31 31 afterEach(() => { ··· 39 39 targetCollection: "app.bsky.feed.post", 40 40 recordTemplate: '{"text":"Post by {{event.did}}","createdAt":"{{now}}"}', 41 41 }); 42 - const match = makeMatch({ subscription: { actions: [action] } }); 42 + const match = makeMatch({ automation: { actions: [action] } }); 43 43 await executeAction(match, 0); 44 44 45 - expect(mockCreateRecord).toHaveBeenCalledWith(match.subscription.did, "app.bsky.feed.post", { 45 + expect(mockCreateRecord).toHaveBeenCalledWith(match.automation.did, "app.bsky.feed.post", { 46 46 text: "Post by did:plc:testuser123", 47 47 createdAt: "2024-06-15T12:00:00.000Z", 48 48 }); ··· 56 56 const action = makeRecordAction({ 57 57 recordTemplate: '{"broken":{{event.commit.record.missing}}}', 58 58 }); 59 - const match = makeMatch({ subscription: { actions: [action] } }); 59 + const match = makeMatch({ automation: { actions: [action] } }); 60 60 await executeAction(match, 0); 61 61 62 62 expect(mockCreateRecord).not.toHaveBeenCalled(); ··· 73 73 ); 74 74 75 75 const action = makeRecordAction(); 76 - const match = makeMatch({ subscription: { actions: [action] } }); 76 + const match = makeMatch({ automation: { actions: [action] } }); 77 77 await executeAction(match, 0); 78 78 79 79 const logs = await db.query.deliveryLogs.findMany(); ··· 85 85 mockCreateRecord.mockRejectedValueOnce(new Error("Network timeout")); 86 86 87 87 const action = makeRecordAction(); 88 - const match = makeMatch({ subscription: { actions: [action] } }); 88 + const match = makeMatch({ automation: { actions: [action] } }); 89 89 await executeAction(match, 0); 90 90 91 91 const logs = await db.query.deliveryLogs.findMany(); ··· 99 99 .mockResolvedValueOnce({ uri: "at://x/col/rk", cid: "c" }); 100 100 101 101 const action = makeRecordAction(); 102 - const match = makeMatch({ subscription: { actions: [action] } }); 102 + const match = makeMatch({ automation: { actions: [action] } }); 103 103 await executeAction(match, 0); 104 104 105 105 expect(mockCreateRecord).toHaveBeenCalledTimes(1); ··· 115 115 mockCreateRecord.mockRejectedValueOnce(new Error("PDS failed (400): bad request")); 116 116 117 117 const action = makeRecordAction(); 118 - const match = makeMatch({ subscription: { actions: [action] } }); 118 + const match = makeMatch({ automation: { actions: [action] } }); 119 119 await executeAction(match, 0); 120 120 121 121 await vi.advanceTimersByTimeAsync(60_000); ··· 128 128 const action = makeRecordAction({ 129 129 recordTemplate: '{"name":"{{profile.record.displayName}}"}', 130 130 }); 131 - const match = makeMatch({ subscription: { actions: [action] } }); 131 + const match = makeMatch({ automation: { actions: [action] } }); 132 132 const fetchContext = { 133 133 profile: { uri: "at://x/col/rk", cid: "c", record: { displayName: "Alice" } }, 134 134 }; 135 135 136 136 await executeAction(match, 0, fetchContext); 137 137 138 - expect(mockCreateRecord).toHaveBeenCalledWith(match.subscription.did, "app.bsky.feed.post", { 138 + expect(mockCreateRecord).toHaveBeenCalledWith(match.automation.did, "app.bsky.feed.post", { 139 139 name: "Alice", 140 140 }); 141 141 });
+10 -10
lib/actions/executor.ts
··· 1 1 import { db } from "../db/index.js"; 2 2 import { deliveryLogs, type RecordAction } from "../db/schema.js"; 3 - import { createArbitraryRecord } from "../subscriptions/pds.js"; 3 + import { createArbitraryRecord } from "../automations/pds.js"; 4 4 import { renderTemplate, type FetchContext } from "./template.js"; 5 5 import type { MatchedEvent } from "../jetstream/consumer.js"; 6 6 ··· 11 11 action: RecordAction, 12 12 fetchContext?: FetchContext, 13 13 ): Promise<{ statusCode: number; error?: string }> { 14 - const { subscription, event } = match; 14 + const { automation, event } = match; 15 15 16 16 let record: Record<string, unknown>; 17 17 try { 18 - record = renderTemplate(action.recordTemplate, event, fetchContext, subscription.did); 18 + record = renderTemplate(action.recordTemplate, event, fetchContext, automation.did); 19 19 } catch (err) { 20 20 return { 21 21 statusCode: 0, ··· 24 24 } 25 25 26 26 try { 27 - await createArbitraryRecord(subscription.did, action.targetCollection, record); 27 + await createArbitraryRecord(automation.did, action.targetCollection, record); 28 28 return { statusCode: 200 }; 29 29 } catch (err) { 30 30 const message = err instanceof Error ? err.message : String(err); ··· 35 35 } 36 36 37 37 async function logDelivery( 38 - subscriptionUri: string, 38 + automationUri: string, 39 39 actionIndex: number, 40 40 eventTimeUs: number, 41 41 payload: string | null, ··· 44 44 attempt: number, 45 45 ) { 46 46 await db.insert(deliveryLogs).values({ 47 - subscriptionUri, 47 + automationUri, 48 48 actionIndex, 49 49 eventTimeUs, 50 50 payload, ··· 73 73 74 74 setTimeout(async () => { 75 75 try { 76 - const action = match.subscription.actions[actionIndex] as RecordAction; 76 + const action = match.automation.actions[actionIndex] as RecordAction; 77 77 const result = await execute(match, action, fetchContext); 78 78 const body = JSON.stringify({ 79 79 targetCollection: action.targetCollection, ··· 81 81 }); 82 82 83 83 await logDelivery( 84 - match.subscription.uri, 84 + match.automation.uri, 85 85 actionIndex, 86 86 match.event.time_us, 87 87 isSuccess(result.statusCode) ? null : body, ··· 105 105 actionIndex: number, 106 106 fetchContext?: FetchContext, 107 107 ) { 108 - const action = match.subscription.actions[actionIndex] as RecordAction; 108 + const action = match.automation.actions[actionIndex] as RecordAction; 109 109 const result = await execute(match, action, fetchContext); 110 110 111 111 const body = JSON.stringify({ ··· 114 114 }); 115 115 116 116 await logDelivery( 117 - match.subscription.uri, 117 + match.automation.uri, 118 118 actionIndex, 119 119 match.event.time_us, 120 120 isSuccess(result.statusCode) ? null : body,
+16 -16
lib/db/migrations/0000_yellow_silvermane.sql lib/db/migrations/0000_chunky_sersi.sql
··· 1 + CREATE TABLE `automations` ( 2 + `uri` text PRIMARY KEY NOT NULL, 3 + `did` text NOT NULL, 4 + `rkey` text NOT NULL, 5 + `name` text NOT NULL, 6 + `description` text, 7 + `lexicon` text NOT NULL, 8 + `actions` text DEFAULT '[]' NOT NULL, 9 + `fetches` text DEFAULT '[]' NOT NULL, 10 + `conditions` text DEFAULT '[]' NOT NULL, 11 + `active` integer DEFAULT false NOT NULL, 12 + `indexed_at` integer NOT NULL 13 + ); 14 + --> statement-breakpoint 1 15 CREATE TABLE `delivery_logs` ( 2 16 `id` integer PRIMARY KEY AUTOINCREMENT NOT NULL, 3 - `subscription_uri` text NOT NULL, 17 + `automation_uri` text NOT NULL, 4 18 `action_index` integer DEFAULT 0 NOT NULL, 5 19 `event_time_us` integer NOT NULL, 6 20 `payload` text, ··· 8 22 `error` text, 9 23 `attempt` integer DEFAULT 1 NOT NULL, 10 24 `created_at` integer NOT NULL, 11 - FOREIGN KEY (`subscription_uri`) REFERENCES `subscriptions`(`uri`) ON UPDATE no action ON DELETE cascade 25 + FOREIGN KEY (`automation_uri`) REFERENCES `automations`(`uri`) ON UPDATE no action ON DELETE cascade 12 26 ); 13 27 --> statement-breakpoint 14 28 CREATE TABLE `lexicon_cache` ( ··· 27 41 `key` text PRIMARY KEY NOT NULL, 28 42 `value` text NOT NULL, 29 43 `expires_at` integer 30 - ); 31 - --> statement-breakpoint 32 - CREATE TABLE `subscriptions` ( 33 - `uri` text PRIMARY KEY NOT NULL, 34 - `did` text NOT NULL, 35 - `rkey` text NOT NULL, 36 - `name` text NOT NULL, 37 - `description` text, 38 - `lexicon` text NOT NULL, 39 - `actions` text DEFAULT '[]' NOT NULL, 40 - `fetches` text DEFAULT '[]' NOT NULL, 41 - `conditions` text DEFAULT '[]' NOT NULL, 42 - `active` integer DEFAULT false NOT NULL, 43 - `indexed_at` integer NOT NULL 44 44 ); 45 45 --> statement-breakpoint 46 46 CREATE TABLE `users` (
+98 -98
lib/db/migrations/meta/0000_snapshot.json
··· 1 1 { 2 2 "version": "6", 3 3 "dialect": "sqlite", 4 - "id": "9f071f8b-3361-461b-940c-b8577b94e01d", 4 + "id": "053c31b3-dfd6-439c-af03-dd65e031d610", 5 5 "prevId": "00000000-0000-0000-0000-000000000000", 6 6 "tables": { 7 + "automations": { 8 + "name": "automations", 9 + "columns": { 10 + "uri": { 11 + "name": "uri", 12 + "type": "text", 13 + "primaryKey": true, 14 + "notNull": true, 15 + "autoincrement": false 16 + }, 17 + "did": { 18 + "name": "did", 19 + "type": "text", 20 + "primaryKey": false, 21 + "notNull": true, 22 + "autoincrement": false 23 + }, 24 + "rkey": { 25 + "name": "rkey", 26 + "type": "text", 27 + "primaryKey": false, 28 + "notNull": true, 29 + "autoincrement": false 30 + }, 31 + "name": { 32 + "name": "name", 33 + "type": "text", 34 + "primaryKey": false, 35 + "notNull": true, 36 + "autoincrement": false 37 + }, 38 + "description": { 39 + "name": "description", 40 + "type": "text", 41 + "primaryKey": false, 42 + "notNull": false, 43 + "autoincrement": false 44 + }, 45 + "lexicon": { 46 + "name": "lexicon", 47 + "type": "text", 48 + "primaryKey": false, 49 + "notNull": true, 50 + "autoincrement": false 51 + }, 52 + "actions": { 53 + "name": "actions", 54 + "type": "text", 55 + "primaryKey": false, 56 + "notNull": true, 57 + "autoincrement": false, 58 + "default": "'[]'" 59 + }, 60 + "fetches": { 61 + "name": "fetches", 62 + "type": "text", 63 + "primaryKey": false, 64 + "notNull": true, 65 + "autoincrement": false, 66 + "default": "'[]'" 67 + }, 68 + "conditions": { 69 + "name": "conditions", 70 + "type": "text", 71 + "primaryKey": false, 72 + "notNull": true, 73 + "autoincrement": false, 74 + "default": "'[]'" 75 + }, 76 + "active": { 77 + "name": "active", 78 + "type": "integer", 79 + "primaryKey": false, 80 + "notNull": true, 81 + "autoincrement": false, 82 + "default": false 83 + }, 84 + "indexed_at": { 85 + "name": "indexed_at", 86 + "type": "integer", 87 + "primaryKey": false, 88 + "notNull": true, 89 + "autoincrement": false 90 + } 91 + }, 92 + "indexes": {}, 93 + "foreignKeys": {}, 94 + "compositePrimaryKeys": {}, 95 + "uniqueConstraints": {}, 96 + "checkConstraints": {} 97 + }, 7 98 "delivery_logs": { 8 99 "name": "delivery_logs", 9 100 "columns": { ··· 14 105 "notNull": true, 15 106 "autoincrement": true 16 107 }, 17 - "subscription_uri": { 18 - "name": "subscription_uri", 108 + "automation_uri": { 109 + "name": "automation_uri", 19 110 "type": "text", 20 111 "primaryKey": false, 21 112 "notNull": true, ··· 75 166 }, 76 167 "indexes": {}, 77 168 "foreignKeys": { 78 - "delivery_logs_subscription_uri_subscriptions_uri_fk": { 79 - "name": "delivery_logs_subscription_uri_subscriptions_uri_fk", 169 + "delivery_logs_automation_uri_automations_uri_fk": { 170 + "name": "delivery_logs_automation_uri_automations_uri_fk", 80 171 "tableFrom": "delivery_logs", 81 - "tableTo": "subscriptions", 82 - "columnsFrom": ["subscription_uri"], 172 + "tableTo": "automations", 173 + "columnsFrom": ["automation_uri"], 83 174 "columnsTo": ["uri"], 84 175 "onDelete": "cascade", 85 176 "onUpdate": "no action" ··· 173 264 "type": "integer", 174 265 "primaryKey": false, 175 266 "notNull": false, 176 - "autoincrement": false 177 - } 178 - }, 179 - "indexes": {}, 180 - "foreignKeys": {}, 181 - "compositePrimaryKeys": {}, 182 - "uniqueConstraints": {}, 183 - "checkConstraints": {} 184 - }, 185 - "subscriptions": { 186 - "name": "subscriptions", 187 - "columns": { 188 - "uri": { 189 - "name": "uri", 190 - "type": "text", 191 - "primaryKey": true, 192 - "notNull": true, 193 - "autoincrement": false 194 - }, 195 - "did": { 196 - "name": "did", 197 - "type": "text", 198 - "primaryKey": false, 199 - "notNull": true, 200 - "autoincrement": false 201 - }, 202 - "rkey": { 203 - "name": "rkey", 204 - "type": "text", 205 - "primaryKey": false, 206 - "notNull": true, 207 - "autoincrement": false 208 - }, 209 - "name": { 210 - "name": "name", 211 - "type": "text", 212 - "primaryKey": false, 213 - "notNull": true, 214 - "autoincrement": false 215 - }, 216 - "description": { 217 - "name": "description", 218 - "type": "text", 219 - "primaryKey": false, 220 - "notNull": false, 221 - "autoincrement": false 222 - }, 223 - "lexicon": { 224 - "name": "lexicon", 225 - "type": "text", 226 - "primaryKey": false, 227 - "notNull": true, 228 - "autoincrement": false 229 - }, 230 - "actions": { 231 - "name": "actions", 232 - "type": "text", 233 - "primaryKey": false, 234 - "notNull": true, 235 - "autoincrement": false, 236 - "default": "'[]'" 237 - }, 238 - "fetches": { 239 - "name": "fetches", 240 - "type": "text", 241 - "primaryKey": false, 242 - "notNull": true, 243 - "autoincrement": false, 244 - "default": "'[]'" 245 - }, 246 - "conditions": { 247 - "name": "conditions", 248 - "type": "text", 249 - "primaryKey": false, 250 - "notNull": true, 251 - "autoincrement": false, 252 - "default": "'[]'" 253 - }, 254 - "active": { 255 - "name": "active", 256 - "type": "integer", 257 - "primaryKey": false, 258 - "notNull": true, 259 - "autoincrement": false, 260 - "default": false 261 - }, 262 - "indexed_at": { 263 - "name": "indexed_at", 264 - "type": "integer", 265 - "primaryKey": false, 266 - "notNull": true, 267 267 "autoincrement": false 268 268 } 269 269 },
+2 -2
lib/db/migrations/meta/_journal.json
··· 5 5 { 6 6 "idx": 0, 7 7 "version": "6", 8 - "when": 1775647145739, 9 - "tag": "0000_yellow_silvermane", 8 + "when": 1775679586984, 9 + "tag": "0000_chunky_sersi", 10 10 "breakpoints": true 11 11 } 12 12 ]
+6 -6
lib/db/schema.ts
··· 7 7 createdAt: integer("created_at", { mode: "timestamp_ms" }).notNull(), 8 8 }); 9 9 10 - // Action types for subscription actions (stored as JSON) 10 + // Action types for automation actions (stored as JSON) 11 11 export type WebhookAction = { 12 12 $type: "webhook"; 13 13 callbackUrl: string; ··· 31 31 comment?: string; 32 32 }; 33 33 34 - // Local index of app.rglw.subscription records living on user PDS. 34 + // Local index of run.airglow.automation records living on user PDS. 35 35 // Source of truth is the PDS; this is a cache for fast Jetstream matching. 36 - export const subscriptions = sqliteTable("subscriptions", { 37 - uri: text("uri").primaryKey(), // at://did/app.rglw.subscription/rkey 36 + export const automations = sqliteTable("automations", { 37 + uri: text("uri").primaryKey(), // at://did/run.airglow.automation/rkey 38 38 did: text("did").notNull(), 39 39 rkey: text("rkey").notNull(), 40 40 name: text("name").notNull(), ··· 52 52 53 53 export const deliveryLogs = sqliteTable("delivery_logs", { 54 54 id: integer("id").primaryKey({ autoIncrement: true }), 55 - subscriptionUri: text("subscription_uri") 55 + automationUri: text("automation_uri") 56 56 .notNull() 57 - .references(() => subscriptions.uri, { onDelete: "cascade" }), 57 + .references(() => automations.uri, { onDelete: "cascade" }), 58 58 actionIndex: integer("action_index").notNull().default(0), 59 59 eventTimeUs: integer("event_time_us").notNull(), 60 60 payload: text("payload"), // JSON, stored for failed deliveries
+40 -40
lib/jetstream/consumer.test.ts
··· 16 16 17 17 import { JetstreamConsumer, type MatchedEvent } from "./consumer.js"; 18 18 import { db } from "../db/index.js"; 19 - import { subscriptions } from "../db/schema.js"; 20 - import { makeSubscription, makeEvent } from "../test/fixtures.js"; 19 + import { automations } from "../db/schema.js"; 20 + import { makeAutomation, makeEvent } from "../test/fixtures.js"; 21 21 22 22 // Mock WebSocket to prevent real connections 23 23 class MockWebSocket { ··· 37 37 handler = vi.fn(); 38 38 consumer = new JetstreamConsumer(handler); 39 39 40 - await db.delete(subscriptions); 40 + await db.delete(automations); 41 41 }); 42 42 43 43 afterEach(() => { ··· 45 45 vi.useRealTimers(); 46 46 }); 47 47 48 - describe("refreshSubscriptions", () => { 49 - it("loads active subscriptions grouped by collection", async () => { 50 - await db.insert(subscriptions).values([ 51 - makeSubscription({ 48 + describe("refreshAutomations", () => { 49 + it("loads active automations grouped by collection", async () => { 50 + await db.insert(automations).values([ 51 + makeAutomation({ 52 52 uri: "at://u/s/1", 53 53 rkey: "1", 54 54 lexicon: "app.bsky.feed.like", 55 55 active: true, 56 56 }), 57 - makeSubscription({ 57 + makeAutomation({ 58 58 uri: "at://u/s/2", 59 59 rkey: "2", 60 60 lexicon: "app.bsky.feed.like", 61 61 active: true, 62 62 }), 63 - makeSubscription({ 63 + makeAutomation({ 64 64 uri: "at://u/s/3", 65 65 rkey: "3", 66 66 lexicon: "app.bsky.feed.post", ··· 68 68 }), 69 69 ]); 70 70 71 - await consumer.refreshSubscriptions(); 71 + await consumer.refreshAutomations(); 72 72 73 73 // The consumer is internal, but we can test it indirectly via processEvent 74 74 const event = makeEvent({ ··· 83 83 // Access processEvent via prototype 84 84 (consumer as any).processEvent(event); 85 85 86 - // Both like subscriptions should match (empty conditions = match all) 86 + // Both like automations should match (empty conditions = match all) 87 87 expect(handler).toHaveBeenCalledTimes(2); 88 88 }); 89 89 90 - it("excludes subscriptions whose lexicon is on the blocklist", async () => { 90 + it("excludes automations whose lexicon is on the blocklist", async () => { 91 91 const { config } = await import("../config.js"); 92 92 (config as any).nsidBlocklist = ["app.bsky.feed.like"]; 93 93 94 - await db.insert(subscriptions).values([ 95 - makeSubscription({ 94 + await db.insert(automations).values([ 95 + makeAutomation({ 96 96 uri: "at://u/s/1", 97 97 rkey: "1", 98 98 lexicon: "app.bsky.feed.like", 99 99 active: true, 100 100 }), 101 - makeSubscription({ 101 + makeAutomation({ 102 102 uri: "at://u/s/2", 103 103 rkey: "2", 104 104 lexicon: "app.bsky.feed.post", ··· 106 106 }), 107 107 ]); 108 108 109 - await consumer.refreshSubscriptions(); 109 + await consumer.refreshAutomations(); 110 110 111 111 // Blocked lexicon should not trigger handler 112 112 (consumer as any).processEvent( ··· 140 140 (config as any).nsidBlocklist = []; 141 141 }); 142 142 143 - it("does not load inactive subscriptions", async () => { 143 + it("does not load inactive automations", async () => { 144 144 await db 145 - .insert(subscriptions) 146 - .values(makeSubscription({ uri: "at://u/s/1", rkey: "1", active: false })); 145 + .insert(automations) 146 + .values(makeAutomation({ uri: "at://u/s/1", rkey: "1", active: false })); 147 147 148 - await consumer.refreshSubscriptions(); 148 + await consumer.refreshAutomations(); 149 149 150 150 const event = makeEvent(); 151 151 (consumer as any).processEvent(event); ··· 155 155 }); 156 156 157 157 describe("processEvent", () => { 158 - it("calls handler for matching subscriptions", async () => { 159 - await db.insert(subscriptions).values( 160 - makeSubscription({ 158 + it("calls handler for matching automations", async () => { 159 + await db.insert(automations).values( 160 + makeAutomation({ 161 161 uri: "at://u/s/1", 162 162 rkey: "1", 163 163 lexicon: "app.bsky.feed.like", 164 164 conditions: [], 165 165 }), 166 166 ); 167 - await consumer.refreshSubscriptions(); 167 + await consumer.refreshAutomations(); 168 168 169 169 const event = makeEvent({ 170 170 commit: { ··· 180 180 expect(handler).toHaveBeenCalledOnce(); 181 181 const match = handler.mock.calls[0]![0]; 182 182 expect(match.event).toBe(event); 183 - expect(match.subscription.rkey).toBe("1"); 183 + expect(match.automation.rkey).toBe("1"); 184 184 }); 185 185 186 186 it("does not call handler for non-matching collection", async () => { 187 - await db.insert(subscriptions).values( 188 - makeSubscription({ 187 + await db.insert(automations).values( 188 + makeAutomation({ 189 189 uri: "at://u/s/1", 190 190 rkey: "1", 191 191 lexicon: "app.bsky.feed.post", 192 192 }), 193 193 ); 194 - await consumer.refreshSubscriptions(); 194 + await consumer.refreshAutomations(); 195 195 196 196 const event = makeEvent({ 197 197 commit: { ··· 208 208 }); 209 209 210 210 it("does not call handler when conditions do not match", async () => { 211 - await db.insert(subscriptions).values( 212 - makeSubscription({ 211 + await db.insert(automations).values( 212 + makeAutomation({ 213 213 uri: "at://u/s/1", 214 214 rkey: "1", 215 215 lexicon: "app.bsky.feed.like", 216 216 conditions: [{ field: "event.did", operator: "eq", value: "did:plc:specific" }], 217 217 }), 218 218 ); 219 - await consumer.refreshSubscriptions(); 219 + await consumer.refreshAutomations(); 220 220 221 221 const event = makeEvent({ 222 222 did: "did:plc:other", ··· 233 233 expect(handler).not.toHaveBeenCalled(); 234 234 }); 235 235 236 - it("calls handler for multiple matching subscriptions on same collection", async () => { 236 + it("calls handler for multiple matching automations on same collection", async () => { 237 237 await db 238 - .insert(subscriptions) 238 + .insert(automations) 239 239 .values([ 240 - makeSubscription({ uri: "at://u/s/1", rkey: "1", lexicon: "app.bsky.feed.like" }), 241 - makeSubscription({ uri: "at://u/s/2", rkey: "2", lexicon: "app.bsky.feed.like" }), 240 + makeAutomation({ uri: "at://u/s/1", rkey: "1", lexicon: "app.bsky.feed.like" }), 241 + makeAutomation({ uri: "at://u/s/2", rkey: "2", lexicon: "app.bsky.feed.like" }), 242 242 ]); 243 - await consumer.refreshSubscriptions(); 243 + await consumer.refreshAutomations(); 244 244 245 245 const event = makeEvent({ 246 246 commit: { ··· 256 256 expect(handler).toHaveBeenCalledTimes(2); 257 257 }); 258 258 259 - it("evaluates {{self}} condition against subscription owner DID", async () => { 260 - await db.insert(subscriptions).values( 261 - makeSubscription({ 259 + it("evaluates {{self}} condition against automation owner DID", async () => { 260 + await db.insert(automations).values( 261 + makeAutomation({ 262 262 uri: "at://u/s/1", 263 263 rkey: "1", 264 264 did: "did:plc:owner", ··· 266 266 conditions: [{ field: "event.did", operator: "eq", value: "{{self}}" }], 267 267 }), 268 268 ); 269 - await consumer.refreshSubscriptions(); 269 + await consumer.refreshAutomations(); 270 270 271 271 // Event from the owner 272 272 const ownerEvent = makeEvent({
+23 -23
lib/jetstream/consumer.ts
··· 1 1 import { eq } from "drizzle-orm"; 2 2 import { db } from "../db/index.js"; 3 - import { subscriptions } from "../db/schema.js"; 3 + import { automations } from "../db/schema.js"; 4 4 import { config } from "../config.js"; 5 5 import { isNsidAllowed } from "../lexicons/resolver.js"; 6 6 import { matchConditions, type JetstreamEvent } from "./matcher.js"; 7 7 import { readFileSync, writeFileSync, mkdirSync } from "node:fs"; 8 8 import { dirname, join } from "node:path"; 9 9 10 - type Subscription = typeof subscriptions.$inferSelect; 10 + type Automation = typeof automations.$inferSelect; 11 11 12 12 export type MatchedEvent = { 13 - subscription: Subscription; 13 + automation: Automation; 14 14 event: JetstreamEvent; 15 15 }; 16 16 ··· 22 22 23 23 export class JetstreamConsumer { 24 24 private ws: WebSocket | null = null; 25 - private subsByCollection = new Map<string, Subscription[]>(); 25 + private autosByCollection = new Map<string, Automation[]>(); 26 26 private cursor: number | null = null; 27 27 private cursorDirty = false; 28 28 private cursorTimer: ReturnType<typeof setInterval> | null = null; ··· 38 38 async start() { 39 39 this.running = true; 40 40 this.cursor = this.loadCursor(); 41 - await this.refreshSubscriptions(); 41 + await this.refreshAutomations(); 42 42 this.cursorTimer = setInterval(() => this.flushCursor(), CURSOR_FLUSH_INTERVAL); 43 43 } 44 44 ··· 51 51 this.ws = null; 52 52 } 53 53 54 - async refreshSubscriptions() { 55 - const rows = await db.query.subscriptions.findMany({ 56 - where: eq(subscriptions.active, true), 54 + async refreshAutomations() { 55 + const rows = await db.query.automations.findMany({ 56 + where: eq(automations.active, true), 57 57 }); 58 58 59 - const byCollection = new Map<string, Subscription[]>(); 59 + const byCollection = new Map<string, Automation[]>(); 60 60 for (const row of rows) { 61 61 if (!isNsidAllowed(row.lexicon, config.nsidAllowlist, config.nsidBlocklist)) continue; 62 62 const list = byCollection.get(row.lexicon) || []; ··· 64 64 byCollection.set(row.lexicon, list); 65 65 } 66 66 67 - const oldCollections = new Set(this.subsByCollection.keys()); 67 + const oldCollections = new Set(this.autosByCollection.keys()); 68 68 const newCollections = new Set(byCollection.keys()); 69 - this.subsByCollection = byCollection; 69 + this.autosByCollection = byCollection; 70 70 71 71 const collectionsChanged = 72 72 oldCollections.size !== newCollections.size || ··· 94 94 this.reconnectTimer = null; 95 95 } 96 96 97 - const collections = [...this.subsByCollection.keys()]; 97 + const collections = [...this.autosByCollection.keys()]; 98 98 if (collections.length === 0) { 99 - console.log("Jetstream: no active subscriptions, waiting"); 99 + console.log("Jetstream: no active automations, waiting"); 100 100 return; 101 101 } 102 102 ··· 134 134 135 135 this.ws.addEventListener("close", () => { 136 136 this.ws = null; 137 - if (this.running && this.subsByCollection.size > 0) { 137 + if (this.running && this.autosByCollection.size > 0) { 138 138 console.log(`Jetstream: reconnecting in ${this.reconnectDelay}ms`); 139 139 this.reconnectTimer = setTimeout(() => { 140 140 this.reconnectTimer = null; ··· 151 151 152 152 private processEvent(event: JetstreamEvent) { 153 153 const collection = event.commit!.collection; 154 - const subs = this.subsByCollection.get(collection); 155 - if (!subs) return; 154 + const autos = this.autosByCollection.get(collection); 155 + if (!autos) return; 156 156 157 - for (const sub of subs) { 158 - if (matchConditions(event, sub.conditions, sub.did)) { 159 - void Promise.resolve(this.handler({ subscription: sub, event })).catch((err) => { 157 + for (const auto of autos) { 158 + if (matchConditions(event, auto.conditions, auto.did)) { 159 + void Promise.resolve(this.handler({ automation: auto, event })).catch((err) => { 160 160 console.error("Jetstream: handler error:", err); 161 161 }); 162 162 } ··· 202 202 return consumer; 203 203 } 204 204 205 - /** Call after creating/updating/deleting subscriptions. */ 206 - export function notifySubscriptionChange() { 207 - consumer?.refreshSubscriptions().catch((err) => { 208 - console.error("Jetstream: failed to refresh subscriptions:", err); 205 + /** Call after creating/updating/deleting automations. */ 206 + export function notifyAutomationChange() { 207 + consumer?.refreshAutomations().catch((err) => { 208 + console.error("Jetstream: failed to refresh automations:", err); 209 209 }); 210 210 }
+9 -9
lib/jetstream/handler.test.ts
··· 31 31 32 32 it("dispatches a webhook action", async () => { 33 33 const match = makeMatch({ 34 - subscription: { actions: [makeWebhookAction()], fetches: [] }, 34 + automation: { actions: [makeWebhookAction()], fetches: [] }, 35 35 }); 36 36 37 37 await handleMatchedEvent(match); ··· 43 43 44 44 it("executes a record action", async () => { 45 45 const match = makeMatch({ 46 - subscription: { actions: [makeRecordAction()], fetches: [] }, 46 + automation: { actions: [makeRecordAction()], fetches: [] }, 47 47 }); 48 48 49 49 await handleMatchedEvent(match); ··· 54 54 55 55 it("handles mixed action types with correct routing", async () => { 56 56 const match = makeMatch({ 57 - subscription: { 57 + automation: { 58 58 actions: [ 59 59 makeWebhookAction(), 60 60 makeRecordAction(), ··· 80 80 mockResolveFetches.mockResolvedValueOnce({ context: fetchContext, errors: [] }); 81 81 82 82 const match = makeMatch({ 83 - subscription: { 83 + automation: { 84 84 actions: [makeWebhookAction(), makeRecordAction()], 85 85 fetches: [makeFetchStep()], 86 86 }, ··· 89 89 await handleMatchedEvent(match); 90 90 91 91 expect(mockResolveFetches).toHaveBeenCalledWith( 92 - match.subscription.fetches, 92 + match.automation.fetches, 93 93 match.event, 94 - match.subscription.did, 94 + match.automation.did, 95 95 ); 96 96 expect(mockDispatch).toHaveBeenCalledWith(match, 0, fetchContext); 97 97 expect(mockExecuteAction).toHaveBeenCalledWith(match, 1, fetchContext); ··· 99 99 100 100 it("skips fetch resolution when no fetch steps", async () => { 101 101 const match = makeMatch({ 102 - subscription: { actions: [makeWebhookAction()], fetches: [] }, 102 + automation: { actions: [makeWebhookAction()], fetches: [] }, 103 103 }); 104 104 105 105 await handleMatchedEvent(match); ··· 117 117 }); 118 118 119 119 const match = makeMatch({ 120 - subscription: { 120 + automation: { 121 121 actions: [makeWebhookAction()], 122 122 fetches: [makeFetchStep(), makeFetchStep({ name: "extra", uri: "at://x/col/rk2" })], 123 123 }, ··· 133 133 mockDispatch.mockRejectedValueOnce(new Error("webhook failed")); 134 134 135 135 const match = makeMatch({ 136 - subscription: { 136 + automation: { 137 137 actions: [makeWebhookAction(), makeRecordAction()], 138 138 fetches: [], 139 139 },
+6 -6
lib/jetstream/handler.ts
··· 7 7 export async function handleMatchedEvent(match: MatchedEvent) { 8 8 let fetchContext: Record<string, { uri: string; cid: string; record: Record<string, unknown> }> = 9 9 {}; 10 - if (match.subscription.fetches.length > 0) { 10 + if (match.automation.fetches.length > 0) { 11 11 const result = await resolveFetches( 12 - match.subscription.fetches, 12 + match.automation.fetches, 13 13 match.event, 14 - match.subscription.did, 14 + match.automation.did, 15 15 ); 16 16 fetchContext = result.context; 17 17 for (const err of result.errors) { 18 - console.warn(`Fetch "${err.name}" failed for ${match.subscription.uri}: ${err.error}`); 18 + console.warn(`Fetch "${err.name}" failed for ${match.automation.uri}: ${err.error}`); 19 19 } 20 20 } 21 21 22 - for (let i = 0; i < match.subscription.actions.length; i++) { 23 - const action = match.subscription.actions[i]!; 22 + for (let i = 0; i < match.automation.actions.length; i++) { 23 + const action = match.automation.actions[i]!; 24 24 const handler = action.$type === "record" ? executeAction : dispatch; 25 25 handler(match, i, fetchContext).catch((err) => { 26 26 console.error(`Action ${i} (${action.$type}) delivery error:`, err);
lib/subscriptions/pds.test.ts lib/automations/pds.test.ts
+8 -8
lib/subscriptions/pds.ts lib/automations/pds.ts
··· 1 1 import { getOAuthClient } from "../auth/client.js"; 2 2 import { config } from "../config.js"; 3 3 4 - const COLLECTION = "app.rglw.subscription"; 4 + const COLLECTION = "run.airglow.automation"; 5 5 6 6 const isLocalDev = 7 7 Boolean(config.pdsUrl) || ··· 32 32 } 33 33 34 34 type PdsWebhookAction = { 35 - $type: "app.rglw.subscription#webhookAction"; 35 + $type: "run.airglow.automation#webhookAction"; 36 36 callbackUrl: string; 37 37 comment?: string; 38 38 }; 39 39 40 40 type PdsRecordAction = { 41 - $type: "app.rglw.subscription#recordAction"; 41 + $type: "run.airglow.automation#recordAction"; 42 42 targetCollection: string; 43 43 recordTemplate: string; 44 44 comment?: string; ··· 47 47 export type PdsAction = PdsWebhookAction | PdsRecordAction; 48 48 49 49 export type PdsFetchStep = { 50 - $type: "app.rglw.subscription#fetchStep"; 50 + $type: "run.airglow.automation#fetchStep"; 51 51 name: string; 52 52 uri: string; 53 53 comment?: string; 54 54 }; 55 55 56 - type SubscriptionRecord = { 56 + type AutomationRecord = { 57 57 name: string; 58 58 description?: string; 59 59 lexicon: string; ··· 85 85 86 86 export async function createRecord( 87 87 did: string, 88 - record: SubscriptionRecord, 88 + record: AutomationRecord, 89 89 ): Promise<{ uri: string; rkey: string }> { 90 90 const rkey = generateTid(); 91 91 if (isLocalDev) { ··· 123 123 export async function putRecord( 124 124 did: string, 125 125 rkey: string, 126 - record: SubscriptionRecord, 126 + record: AutomationRecord, 127 127 ): Promise<void> { 128 128 if (isLocalDev) return; 129 129 await pdsCall(did, "com.atproto.repo.putRecord", { ··· 143 143 }); 144 144 } 145 145 146 - /** Create a record in any collection on the user's PDS (for action subscriptions). */ 146 + /** Create a record in any collection on the user's PDS (for record actions). */ 147 147 export async function createArbitraryRecord( 148 148 did: string, 149 149 collection: string,
lib/subscriptions/verify.test.ts lib/automations/verify.test.ts
lib/subscriptions/verify.ts lib/automations/verify.ts
+1 -1
lib/test/db.ts
··· 5 5 import * as schema from "../db/schema.js"; 6 6 7 7 const migrationSql = readFileSync( 8 - resolve(__dirname, "../db/migrations/0000_yellow_silvermane.sql"), 8 + resolve(__dirname, "../db/migrations/0000_chunky_sersi.sql"), 9 9 "utf-8", 10 10 ); 11 11
+6 -6
lib/test/fixtures.ts
··· 2 2 import type { Action, WebhookAction, RecordAction, FetchStep } from "../db/schema.js"; 3 3 import type { MatchedEvent } from "../jetstream/consumer.js"; 4 4 5 - type Subscription = { 5 + type Automation = { 6 6 uri: string; 7 7 did: string; 8 8 rkey: string; ··· 65 65 }; 66 66 } 67 67 68 - export function makeSubscription(overrides?: Partial<Subscription>): Subscription { 68 + export function makeAutomation(overrides?: Partial<Automation>): Automation { 69 69 return { 70 - uri: "at://did:plc:testuser123/app.rglw.subscription/abc123", 70 + uri: "at://did:plc:testuser123/run.airglow.automation/abc123", 71 71 did: "did:plc:testuser123", 72 72 rkey: "abc123", 73 - name: "Test Subscription", 73 + name: "Test Automation", 74 74 description: null, 75 75 lexicon: "app.bsky.feed.like", 76 76 actions: [makeWebhookAction()], ··· 83 83 } 84 84 85 85 export function makeMatch(overrides?: { 86 - subscription?: Partial<Subscription>; 86 + automation?: Partial<Automation>; 87 87 event?: Partial<JetstreamEvent>; 88 88 }): MatchedEvent { 89 89 return { 90 - subscription: makeSubscription(overrides?.subscription) as any, 90 + automation: makeAutomation(overrides?.automation) as any, 91 91 event: makeEvent(overrides?.event), 92 92 }; 93 93 }
+9 -9
lib/webhooks/dispatcher.test.ts
··· 13 13 import { dispatch } from "./dispatcher.js"; 14 14 import { sign } from "./signer.js"; 15 15 import { db } from "../db/index.js"; 16 - import { subscriptions, deliveryLogs } from "../db/schema.js"; 17 - import { makeMatch, makeWebhookAction, makeSubscription } from "../test/fixtures.js"; 16 + import { automations, deliveryLogs } from "../db/schema.js"; 17 + import { makeMatch, makeWebhookAction, makeAutomation } from "../test/fixtures.js"; 18 18 19 19 describe("dispatch", () => { 20 20 const mockFetch = vi.fn(); ··· 26 26 27 27 // Clean up and re-seed so FK is satisfied 28 28 await db.delete(deliveryLogs); 29 - await db.delete(subscriptions); 30 - const sub = makeSubscription(); 31 - await db.insert(subscriptions).values(sub); 29 + await db.delete(automations); 30 + const auto = makeAutomation(); 31 + await db.insert(automations).values(auto); 32 32 }); 33 33 34 34 afterEach(() => { ··· 40 40 mockFetch.mockResolvedValueOnce({ status: 200 }); 41 41 42 42 const action = makeWebhookAction({ callbackUrl: "https://example.com/hook", secret: "s3cret" }); 43 - const match = makeMatch({ subscription: { actions: [action] } }); 43 + const match = makeMatch({ automation: { actions: [action] } }); 44 44 45 45 await dispatch(match, 0); 46 46 ··· 51 51 52 52 const headers = options.headers; 53 53 expect(headers["Content-Type"]).toBe("application/json"); 54 - expect(headers["X-Airglow-Subscription"]).toBe(match.subscription.uri); 54 + expect(headers["X-Airglow-Automation"]).toBe(match.automation.uri); 55 55 expect(headers["X-Airglow-Timestamp"]).toBeDefined(); 56 56 57 57 // Verify HMAC signature ··· 61 61 62 62 // Verify payload structure 63 63 const payload = JSON.parse(body); 64 - expect(payload.subscription).toBe(match.subscription.uri); 65 - expect(payload.lexicon).toBe(match.subscription.lexicon); 64 + expect(payload.automation).toBe(match.automation.uri); 65 + expect(payload.lexicon).toBe(match.automation.lexicon); 66 66 expect(payload.event.did).toBe(match.event.did); 67 67 expect(payload.event.commit.operation).toBe("create"); 68 68 });
+19 -19
lib/webhooks/dispatcher.ts
··· 8 8 const RETRY_DELAYS = [5_000, 30_000]; // 1st retry after 5s, 2nd after 30s 9 9 10 10 type WebhookPayload = { 11 - subscription: string; 11 + automation: string; 12 12 lexicon: string; 13 13 conditions: Array<{ field: string; operator: string; value: string }>; 14 14 event: { ··· 27 27 }; 28 28 29 29 function buildPayload(match: MatchedEvent, fetchContext?: FetchContext): WebhookPayload { 30 - const { subscription, event } = match; 30 + const { automation, event } = match; 31 31 return { 32 - subscription: subscription.uri, 33 - lexicon: subscription.lexicon, 34 - conditions: subscription.conditions, 32 + automation: automation.uri, 33 + lexicon: automation.lexicon, 34 + conditions: automation.conditions, 35 35 event: { 36 36 did: event.did, 37 37 time_us: event.time_us, ··· 54 54 callbackUrl: string, 55 55 body: string, 56 56 secret: string, 57 - subscriptionUri: string, 57 + automationUri: string, 58 58 ): Promise<{ statusCode: number; error?: string }> { 59 59 const timestamp = Date.now(); 60 60 const signature = sign(body, secret); ··· 68 68 headers: { 69 69 "Content-Type": "application/json", 70 70 "X-Airglow-Signature": `sha256=${signature}`, 71 - "X-Airglow-Subscription": subscriptionUri, 71 + "X-Airglow-Automation": automationUri, 72 72 "X-Airglow-Timestamp": String(timestamp), 73 73 }, 74 74 body, ··· 81 81 } 82 82 83 83 async function logDelivery( 84 - subscriptionUri: string, 84 + automationUri: string, 85 85 actionIndex: number, 86 86 eventTimeUs: number, 87 87 payload: string | null, ··· 90 90 attempt: number, 91 91 ) { 92 92 await db.insert(deliveryLogs).values({ 93 - subscriptionUri, 93 + automationUri, 94 94 actionIndex, 95 95 eventTimeUs, 96 96 payload, ··· 110 110 } 111 111 112 112 function scheduleRetry( 113 - subscriptionUri: string, 113 + automationUri: string, 114 114 actionIndex: number, 115 115 callbackUrl: string, 116 116 secret: string, ··· 122 122 123 123 setTimeout(async () => { 124 124 try { 125 - const result = await deliver(callbackUrl, body, secret, subscriptionUri); 125 + const result = await deliver(callbackUrl, body, secret, automationUri); 126 126 127 127 await logDelivery( 128 - subscriptionUri, 128 + automationUri, 129 129 actionIndex, 130 130 eventTimeUs, 131 131 isSuccess(result.statusCode) ? null : body, ··· 136 136 137 137 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 138 138 scheduleRetry( 139 - subscriptionUri, 139 + automationUri, 140 140 actionIndex, 141 141 callbackUrl, 142 142 secret, ··· 151 151 }, RETRY_DELAYS[retryIndex]); 152 152 } 153 153 154 - /** Deliver a matched event to the subscription's callback URL. */ 154 + /** Deliver a matched event to the automation's callback URL. */ 155 155 export async function dispatch( 156 156 match: MatchedEvent, 157 157 actionIndex: number, 158 158 fetchContext?: FetchContext, 159 159 ) { 160 - const { subscription, event } = match; 161 - const action = subscription.actions[actionIndex] as WebhookAction; 160 + const { automation, event } = match; 161 + const action = automation.actions[actionIndex] as WebhookAction; 162 162 const payload = buildPayload(match, fetchContext); 163 163 const body = JSON.stringify(payload); 164 164 165 - const result = await deliver(action.callbackUrl, body, action.secret, subscription.uri); 165 + const result = await deliver(action.callbackUrl, body, action.secret, automation.uri); 166 166 167 167 await logDelivery( 168 - subscription.uri, 168 + automation.uri, 169 169 actionIndex, 170 170 event.time_us, 171 171 isSuccess(result.statusCode) ? null : body, ··· 177 177 // Schedule retries for server errors / network failures 178 178 if (!isSuccess(result.statusCode) && isRetryable(result.statusCode)) { 179 179 scheduleRetry( 180 - subscription.uri, 180 + automation.uri, 181 181 actionIndex, 182 182 action.callbackUrl, 183 183 action.secret,