···134134 wantedDids: text("wanted_dids", { mode: "json" }).notNull().$type<string[]>().default([]),
135135 active: integer("active", { mode: "boolean" }).notNull().default(false),
136136 dryRun: integer("dry_run", { mode: "boolean" }).notNull().default(false),
137137+ // Populated when the engine auto-disables the automation (e.g. rate-limit
138138+ // breach). Cleared when the user re-enables. Format: "rate_limit:<window>".
139139+ disabledReason: text("disabled_reason"),
140140+ disabledAt: integer("disabled_at", { mode: "timestamp_ms" }),
141141+ // Cut-off for rate-limit counting. Delivery log rows before this timestamp
142142+ // are ignored by the rolling-window check. Set to the re-enable moment so
143143+ // a manual re-enable after a rate-limit trip doesn't instantly re-trip on
144144+ // the same logs. Null on automations that have never been rate-limited.
145145+ rateLimitResetAt: integer("rate_limit_reset_at", { mode: "timestamp_ms" }),
137146 indexedAt: integer("indexed_at", { mode: "timestamp_ms" }).notNull(),
138147 },
139148 (table) => [
+97
lib/jetstream/handler.test.ts
···12121313vi.mock("@/webhooks/dispatcher.js", () => ({
1414 dispatch: vi.fn(),
1515+ buildPayload: vi.fn(() => ({ event: "dry-run" })),
1516}));
16171718vi.mock("@/actions/executor.js", () => ({
···28292930vi.mock("@/actions/fetcher.js", () => ({
3031 resolveFetches: vi.fn(),
3232+}));
3333+3434+vi.mock("./rate-limit.js", () => ({
3535+ checkRateLimit: vi.fn().mockResolvedValue(null),
3636+ disableForRateLimit: vi.fn().mockResolvedValue(true),
3737+}));
3838+3939+vi.mock("./consumer.js", () => ({
4040+ notifyAutomationChange: vi.fn(),
3141}));
32423343import { handleMatchedEvent } from "./handler.js";
···3646import { executeBskyPost } from "../actions/bsky-post.js";
3747import { executePatchRecord } from "../actions/patch-record.js";
3848import { resolveFetches } from "../actions/fetcher.js";
4949+import { checkRateLimit, disableForRateLimit } from "./rate-limit.js";
5050+import { notifyAutomationChange } from "./consumer.js";
3951import {
4052 makeMatch,
4153 makeWebhookAction,
···5769const mockExecuteBskyPost = vi.mocked(executeBskyPost);
5870const mockExecutePatchRecord = vi.mocked(executePatchRecord);
5971const mockResolveFetches = vi.mocked(resolveFetches);
7272+const mockCheckRateLimit = vi.mocked(checkRateLimit);
7373+const mockDisableForRateLimit = vi.mocked(disableForRateLimit);
7474+const mockNotifyAutomationChange = vi.mocked(notifyAutomationChange);
60756176describe("handleMatchedEvent", () => {
6277 beforeEach(() => {
···6580 mockExecuteBskyPost.mockReset().mockResolvedValue(okWithUri);
6681 mockExecutePatchRecord.mockReset().mockResolvedValue(okWithUri);
6782 mockResolveFetches.mockReset();
8383+ mockCheckRateLimit.mockReset().mockResolvedValue(null);
8484+ mockDisableForRateLimit.mockReset().mockResolvedValue(true);
8585+ mockNotifyAutomationChange.mockReset();
8686+ mockInsertValues.mockClear();
6887 });
69887089 it("dispatches a webhook action", async () => {
···306325 // So we verify the record action result is present but webhook result is not
307326 const finalCtx = mockExecuteAction.mock.calls[0]![2]!;
308327 expect(finalCtx).not.toHaveProperty("action1");
328328+ });
329329+330330+ describe("rate limiting", () => {
331331+ it("disables the automation and skips actions when a window is breached", async () => {
332332+ mockCheckRateLimit.mockResolvedValueOnce({ window: "second", count: 10, limit: 10 });
333333+334334+ const match = makeMatch({
335335+ automation: { actions: [makeWebhookAction()], fetches: [] },
336336+ });
337337+338338+ await handleMatchedEvent(match);
339339+340340+ expect(mockCheckRateLimit).toHaveBeenCalledWith(match.automation);
341341+ expect(mockDisableForRateLimit).toHaveBeenCalledWith(match.automation.uri, {
342342+ window: "second",
343343+ count: 10,
344344+ limit: 10,
345345+ });
346346+ expect(mockNotifyAutomationChange).toHaveBeenCalledOnce();
347347+ expect(mockDispatch).not.toHaveBeenCalled();
348348+ expect(mockResolveFetches).not.toHaveBeenCalled();
349349+350350+ // A delivery log entry was written explaining why the automation stopped.
351351+ expect(mockInsertValues).toHaveBeenCalledOnce();
352352+ const logged = mockInsertValues.mock.calls[0]![0] as { error: string; dryRun: boolean };
353353+ expect(logged.error).toContain("Rate limit exceeded");
354354+ expect(logged.error).toContain("second");
355355+ expect(logged.dryRun).toBe(false);
356356+ });
357357+358358+ it("does not log or notify when another handler already disabled the automation", async () => {
359359+ // Concurrent path: checkRateLimit sees a breach (stale read from before
360360+ // the re-partition), but disableForRateLimit returns false because the
361361+ // first handler already flipped `active` to false. We must not spam the
362362+ // delivery log or re-notify.
363363+ mockCheckRateLimit.mockResolvedValueOnce({ window: "second", count: 10, limit: 10 });
364364+ mockDisableForRateLimit.mockResolvedValueOnce(false);
365365+366366+ const match = makeMatch({
367367+ automation: { actions: [makeWebhookAction()], fetches: [] },
368368+ });
369369+370370+ await handleMatchedEvent(match);
371371+372372+ expect(mockDisableForRateLimit).toHaveBeenCalledOnce();
373373+ expect(mockNotifyAutomationChange).not.toHaveBeenCalled();
374374+ expect(mockInsertValues).not.toHaveBeenCalled();
375375+ expect(mockDispatch).not.toHaveBeenCalled();
376376+ });
377377+378378+ it("does not rate-limit dry-run automations", async () => {
379379+ // The rate-limit mock would return a breach if consulted — but for
380380+ // dry-run it should never be called in the first place.
381381+ mockCheckRateLimit.mockResolvedValue({ window: "minute", count: 100, limit: 100 });
382382+383383+ const match = makeMatch({
384384+ automation: { actions: [makeWebhookAction()], fetches: [], dryRun: true },
385385+ });
386386+387387+ await handleMatchedEvent(match);
388388+389389+ expect(mockCheckRateLimit).not.toHaveBeenCalled();
390390+ expect(mockDisableForRateLimit).not.toHaveBeenCalled();
391391+ });
392392+393393+ it("runs actions normally when no window is breached", async () => {
394394+ mockCheckRateLimit.mockResolvedValueOnce(null);
395395+396396+ const match = makeMatch({
397397+ automation: { actions: [makeWebhookAction()], fetches: [] },
398398+ });
399399+400400+ await handleMatchedEvent(match);
401401+402402+ expect(mockCheckRateLimit).toHaveBeenCalledOnce();
403403+ expect(mockDisableForRateLimit).not.toHaveBeenCalled();
404404+ expect(mockDispatch).toHaveBeenCalledOnce();
405405+ });
309406 });
310407});
+46-1
lib/jetstream/handler.ts
···1010import { resolveFetches } from "../actions/fetcher.js";
1111import { renderTemplate, renderTextTemplate, type FetchContext } from "../actions/template.js";
1212import { parseAtUri } from "../pds/resolver.js";
1313-import type { MatchedEvent } from "./consumer.js";
1313+import { notifyAutomationChange, type MatchedEvent } from "./consumer.js";
1414+import { checkRateLimit, disableForRateLimit, type RateLimitBreach } from "./rate-limit.js";
14151516/** Handle a matched Jetstream event: resolve fetches, then dispatch all actions. */
1617export async function handleMatchedEvent(match: MatchedEvent) {
1818+ // Rate-limit gate. Runs before fetches so a breached automation stops
1919+ // spending PDS resources entirely. Dry-run fires never count toward limits
2020+ // (checkRateLimit filters them out), so dry-run automations pass freely.
2121+ // Skip the check when the automation is already in dry-run — no point
2222+ // disabling something that can't cause damage.
2323+ if (!match.automation.dryRun) {
2424+ const breach = await checkRateLimit(match.automation);
2525+ if (breach) {
2626+ await handleRateLimitBreach(match, breach);
2727+ return;
2828+ }
2929+ }
3030+1731 let fetchContext: FetchContext = {};
1832 if (match.automation.fetches.length > 0) {
1933 const result = await resolveFetches(
···214228 attempt: 1,
215229 createdAt: new Date(),
216230 });
231231+}
232232+233233+/**
234234+ * Disable the automation, write a log entry so the user sees why, and notify
235235+ * the Jetstream manager to drop it from the subscription so no more events are
236236+ * routed to it.
237237+ *
238238+ * `disableForRateLimit` is guarded on `active = true`, so only the first of
239239+ * several concurrent breach handlers (in-flight events beating the re-partition)
240240+ * actually flips the row — only that caller writes the disable log entry and
241241+ * notifies. Later callers no-op silently. The log entry uses `dryRun: false`
242242+ * so it shows up in the real delivery log.
243243+ */
244244+async function handleRateLimitBreach(match: MatchedEvent, breach: RateLimitBreach) {
245245+ const msg = `Rate limit exceeded: ${breach.count} actions in last ${breach.window}. Automation disabled.`;
246246+ const justDisabled = await disableForRateLimit(match.automation.uri, breach);
247247+ if (!justDisabled) return;
248248+ console.warn(`[rate-limit] ${match.automation.uri}: ${msg}`);
249249+ await db.insert(deliveryLogs).values({
250250+ automationUri: match.automation.uri,
251251+ actionIndex: 0,
252252+ eventTimeUs: match.event.time_us,
253253+ payload: null,
254254+ statusCode: null,
255255+ message: null,
256256+ error: msg,
257257+ dryRun: false,
258258+ attempt: 1,
259259+ createdAt: new Date(),
260260+ });
261261+ notifyAutomationChange();
217262}
218263219264async function logDrySkip(match: MatchedEvent, skippedBy: string | undefined) {
+162
lib/jetstream/rate-limit.test.ts
···11+import { describe, it, expect, vi, beforeEach } from "vitest";
22+import { eq } from "drizzle-orm";
33+44+vi.mock("@/config.js", () => ({
55+ config: {
66+ databasePath: ":memory:",
77+ jetstreamUrl: "wss://jetstream.test/subscribe",
88+ publicUrl: "https://airglow.test",
99+ nsidAllowlist: [],
1010+ nsidBlocklist: [],
1111+ nsidRequireDids: [],
1212+ },
1313+}));
1414+1515+vi.mock("@/db/index.js", async () => {
1616+ const { createTestDb } = await import("../test/db.js");
1717+ return { db: createTestDb() };
1818+});
1919+2020+import {
2121+ checkRateLimit,
2222+ disableForRateLimit,
2323+ getRateLimitCounts,
2424+ RATE_LIMIT_WINDOWS,
2525+} from "./rate-limit.js";
2626+import { db } from "../db/index.js";
2727+import { automations, deliveryLogs } from "../db/schema.js";
2828+import { makeAutomation } from "../test/fixtures.js";
2929+3030+const URI = "at://did:plc:test/run.airglow.automation/rl";
3131+const NOW = 1_800_000_000_000; // fixed reference "now"
3232+3333+async function seedAutomation() {
3434+ const auto = makeAutomation({ uri: URI, rkey: "rl" });
3535+ await db.insert(automations).values(auto);
3636+}
3737+3838+/** Re-read the automation row so callers get a fresh `rateLimitResetAt` after
3939+ * updates. The rate-limit API takes the row directly (no hidden DB reads). */
4040+async function loadAuto() {
4141+ const row = await db.query.automations.findFirst({
4242+ where: eq(automations.uri, URI),
4343+ });
4444+ if (!row) throw new Error("seedAutomation must be called first");
4545+ return row;
4646+}
4747+4848+async function insertLogs(count: number, offsetsMs: number[], dryRun = false) {
4949+ const values = offsetsMs.slice(0, count).map((offset) => ({
5050+ automationUri: URI,
5151+ actionIndex: 0,
5252+ eventTimeUs: NOW * 1000,
5353+ dryRun,
5454+ attempt: 1,
5555+ createdAt: new Date(NOW - offset),
5656+ }));
5757+ if (values.length > 0) await db.insert(deliveryLogs).values(values);
5858+}
5959+6060+describe("rate-limit", () => {
6161+ beforeEach(async () => {
6262+ await db.delete(deliveryLogs);
6363+ await db.delete(automations);
6464+ await seedAutomation();
6565+ });
6666+6767+ it("returns null when well below every window limit", async () => {
6868+ await insertLogs(3, [100, 200, 300]);
6969+ const breach = await checkRateLimit(await loadAuto(), NOW);
7070+ expect(breach).toBeNull();
7171+ });
7272+7373+ it("flags the second window when 10+ actions land within 1s", async () => {
7474+ // 10 logs within the last 900ms — at-or-above the 10/sec limit.
7575+ await insertLogs(
7676+ 10,
7777+ Array.from({ length: 10 }, (_, i) => 100 + i * 80),
7878+ );
7979+ const breach = await checkRateLimit(await loadAuto(), NOW);
8080+ expect(breach).toEqual({ window: "second", count: 10, limit: 10 });
8181+ });
8282+8383+ it("flags the minute window when 100 actions spread over 30s", async () => {
8484+ // Space them so only 1 lands in the last second (below 10), but 100 land
8585+ // in the last 30s (at the 100/min limit).
8686+ const offsets = Array.from({ length: 100 }, (_, i) => 1_500 + i * 300);
8787+ await insertLogs(100, offsets);
8888+ const breach = await checkRateLimit(await loadAuto(), NOW);
8989+ expect(breach).toEqual({ window: "minute", count: 100, limit: 100 });
9090+ });
9191+9292+ it("flags the hour window when 500 actions spread over 50 minutes", async () => {
9393+ // Space them so neither sec nor min breaches but hour does.
9494+ const offsets = Array.from({ length: 500 }, (_, i) => 61_000 + i * 6_000);
9595+ await insertLogs(500, offsets);
9696+ const breach = await checkRateLimit(await loadAuto(), NOW);
9797+ expect(breach).toEqual({ window: "hour", count: 500, limit: 500 });
9898+ });
9999+100100+ it("does not count dry-run fires toward the limit", async () => {
101101+ await insertLogs(
102102+ 20,
103103+ Array.from({ length: 20 }, (_, i) => 100 + i * 40),
104104+ true,
105105+ );
106106+ const breach = await checkRateLimit(await loadAuto(), NOW);
107107+ expect(breach).toBeNull();
108108+ });
109109+110110+ it("ignores log rows older than rateLimitResetAt", async () => {
111111+ // 10 old logs (from ~5s ago) that would breach the minute window if not
112112+ // filtered — but the reset cutoff was set 2s ago, so they're ignored.
113113+ await insertLogs(
114114+ 10,
115115+ Array.from({ length: 10 }, (_, i) => 5_000 + i * 50),
116116+ );
117117+ await db
118118+ .update(automations)
119119+ .set({ rateLimitResetAt: new Date(NOW - 2_000) })
120120+ .where(eq(automations.uri, URI));
121121+122122+ const breach = await checkRateLimit(await loadAuto(), NOW);
123123+ expect(breach).toBeNull();
124124+ });
125125+126126+ it("disableForRateLimit flips active=false and returns true on transition", async () => {
127127+ const changed = await disableForRateLimit(URI, { window: "second", count: 10, limit: 10 }, NOW);
128128+ expect(changed).toBe(true);
129129+130130+ const row = await loadAuto();
131131+ expect(row.active).toBe(false);
132132+ expect(row.disabledReason).toBe("rate_limit:second");
133133+ expect(row.disabledAt?.getTime()).toBe(NOW);
134134+ });
135135+136136+ it("disableForRateLimit is a no-op when already inactive", async () => {
137137+ await disableForRateLimit(URI, { window: "second", count: 10, limit: 10 }, NOW);
138138+ // Second call: row is already active=false, WHERE clause filters it out.
139139+ const changed = await disableForRateLimit(
140140+ URI,
141141+ { window: "minute", count: 100, limit: 100 },
142142+ NOW + 1_000,
143143+ );
144144+ expect(changed).toBe(false);
145145+146146+ // The original reason + timestamp are preserved — not overwritten by the
147147+ // second caller.
148148+ const row = await loadAuto();
149149+ expect(row.disabledReason).toBe("rate_limit:second");
150150+ expect(row.disabledAt?.getTime()).toBe(NOW);
151151+ });
152152+153153+ it("getRateLimitCounts returns one entry per window", async () => {
154154+ await insertLogs(3, [100, 30_000, 300_000]);
155155+ const counts = await getRateLimitCounts(await loadAuto(), NOW);
156156+ expect(counts).toHaveLength(RATE_LIMIT_WINDOWS.length);
157157+ expect(counts.map((c) => c.window)).toEqual(["second", "minute", "hour"]);
158158+ expect(counts[0]!.count).toBe(1); // only the 100ms-ago one
159159+ expect(counts[1]!.count).toBe(2); // 100ms and 30s ago
160160+ expect(counts[2]!.count).toBe(3); // all three within the hour
161161+ });
162162+});
+142
lib/jetstream/rate-limit.ts
···11+import { and, eq, gte } from "drizzle-orm";
22+import { db } from "../db/index.js";
33+import { automations, deliveryLogs } from "../db/schema.js";
44+55+/**
66+ * Per-automation rate limits. Any window hitting its limit auto-disables the
77+ * automation. Counts only real (non dry-run) action executions — dry-run fires
88+ * log separately and never contribute to the limit.
99+ *
1010+ * Tuned to catch runaway automations (infinite-loop mirrors, self-triggering
1111+ * actions) before they can melt the PDS, while leaving plenty of headroom for
1212+ * normal traffic on a per-user workflow.
1313+ */
1414+export type RateLimitWindow = {
1515+ name: "second" | "minute" | "hour";
1616+ /** Window length in milliseconds. */
1717+ ms: number;
1818+ /** Max action executions allowed within the window. */
1919+ limit: number;
2020+};
2121+2222+export const RATE_LIMIT_WINDOWS: readonly RateLimitWindow[] = [
2323+ { name: "second", ms: 1_000, limit: 10 },
2424+ { name: "minute", ms: 60_000, limit: 100 },
2525+ { name: "hour", ms: 3_600_000, limit: 500 },
2626+];
2727+2828+const HOUR_WINDOW_MS = RATE_LIMIT_WINDOWS[RATE_LIMIT_WINDOWS.length - 1]!.ms;
2929+3030+export type RateLimitBreach = {
3131+ window: RateLimitWindow["name"];
3232+ count: number;
3333+ limit: number;
3434+};
3535+3636+export type RateLimitCount = {
3737+ window: RateLimitWindow["name"];
3838+ count: number;
3939+ limit: number;
4040+};
4141+4242+/** Minimal automation shape the rate limiter needs. Takes the row already
4343+ * loaded by the caller to avoid a redundant DB round-trip on the hot path. */
4444+export type RateLimitAutomation = {
4545+ uri: string;
4646+ rateLimitResetAt: Date | null;
4747+};
4848+4949+/** Timestamps of recent (non dry-run) log rows. Honours `rateLimitResetAt` so
5050+ * a manual re-enable after an auto-disable starts the counters from zero
5151+ * instead of immediately re-tripping on the same logs. */
5252+async function loadRecentTimestamps(
5353+ automation: RateLimitAutomation,
5454+ now: number,
5555+): Promise<number[]> {
5656+ const windowCutoff = now - HOUR_WINDOW_MS;
5757+ const resetCutoff = automation.rateLimitResetAt?.getTime() ?? 0;
5858+ const cutoff = new Date(Math.max(windowCutoff, resetCutoff));
5959+ const rows = await db
6060+ .select({ createdAt: deliveryLogs.createdAt })
6161+ .from(deliveryLogs)
6262+ .where(
6363+ and(
6464+ eq(deliveryLogs.automationUri, automation.uri),
6565+ eq(deliveryLogs.dryRun, false),
6666+ gte(deliveryLogs.createdAt, cutoff),
6767+ ),
6868+ );
6969+ return rows.map((r) => r.createdAt.getTime());
7070+}
7171+7272+function countWithin(timestamps: number[], now: number, ms: number): number {
7373+ const cutoff = now - ms;
7474+ let count = 0;
7575+ for (const t of timestamps) {
7676+ if (t >= cutoff) count++;
7777+ }
7878+ return count;
7979+}
8080+8181+/**
8282+ * Check every window. Returns the first (shortest) window whose count is at or
8383+ * above its limit, or `null` if the automation is within all limits.
8484+ *
8585+ * "At or above" means the *next* action would push past — the current in-flight
8686+ * fire is the last one allowed before we disable. Callers check this *before*
8787+ * running actions, so a breach short-circuits the fire.
8888+ *
8989+ * Two near-simultaneous events can both see count=limit-1 and both execute;
9090+ * acceptable slack for the "catch runaway loops" use case.
9191+ */
9292+export async function checkRateLimit(
9393+ automation: RateLimitAutomation,
9494+ now: number = Date.now(),
9595+): Promise<RateLimitBreach | null> {
9696+ const timestamps = await loadRecentTimestamps(automation, now);
9797+ for (const window of RATE_LIMIT_WINDOWS) {
9898+ const count = countWithin(timestamps, now, window.ms);
9999+ if (count >= window.limit) {
100100+ return { window: window.name, count, limit: window.limit };
101101+ }
102102+ }
103103+ return null;
104104+}
105105+106106+/** Snapshot of current usage across all windows. Drives the dashboard gauges. */
107107+export async function getRateLimitCounts(
108108+ automation: RateLimitAutomation,
109109+ now: number = Date.now(),
110110+): Promise<RateLimitCount[]> {
111111+ const timestamps = await loadRecentTimestamps(automation, now);
112112+ return RATE_LIMIT_WINDOWS.map((w) => ({
113113+ window: w.name,
114114+ count: countWithin(timestamps, now, w.ms),
115115+ limit: w.limit,
116116+ }));
117117+}
118118+119119+/**
120120+ * Flip the automation to inactive with a "rate_limit:<window>" reason. The
121121+ * update is guarded on `active = true` so concurrent breach handlers (several
122122+ * in-flight events beating the jetstream re-partition) only one of them wins.
123123+ * Returns `true` if this call actually changed the row, `false` if someone else
124124+ * got there first. Callers use the return value to avoid spamming the delivery
125125+ * log with duplicate "disabled" entries.
126126+ */
127127+export async function disableForRateLimit(
128128+ uri: string,
129129+ breach: RateLimitBreach,
130130+ now: number = Date.now(),
131131+): Promise<boolean> {
132132+ const updated = await db
133133+ .update(automations)
134134+ .set({
135135+ active: false,
136136+ disabledReason: `rate_limit:${breach.window}`,
137137+ disabledAt: new Date(now),
138138+ })
139139+ .where(and(eq(automations.uri, uri), eq(automations.active, true)))
140140+ .returning({ uri: automations.uri });
141141+ return updated.length > 0;
142142+}