···77SECRETS_KEY= # optional, enables encrypted user secrets. openssl rand -base64 32
88NSID_ALLOWLIST=
99NSID_BLOCKLIST=
1010+# Glob patterns (e.g. app.bsky.*) for NSIDs that require automations to
1111+# declare a non-empty wantedDids list. Use for high-volume collections
1212+# where a firehose-wide subscription would be too noisy.
1313+NSID_REQUIRES_DIDS=
1014# Hand-picked automation AT URIs, comma-separated. Shown on the homepage
1115# and pinned at the top of the gallery tagged "Featured".
1216# Example: at://did:plc:abc/run.airglow.automation/xyz,at://did:plc:def/run.airglow.automation/abc
···6060 "ref": "#condition"
6161 }
6262 },
6363+ "wantedDids": {
6464+ "type": "array",
6565+ "description": "Repo DIDs to subscribe to at the Jetstream level. Required for NSIDs that the instance restricts to known-DID subscriptions. Supports the {{self}} placeholder for the owner's DID.",
6666+ "maxLength": 10,
6767+ "items": {
6868+ "type": "string",
6969+ "maxLength": 256
7070+ }
7171+ },
6372 "fetches": {
6473 "type": "array",
6574 "description": "Records to fetch from PDS before executing actions. Fetched data is available as named variables in action templates.",
···4444 secretsKey,
4545 nsidAllowlist: env("NSID_ALLOWLIST", "").split(",").filter(Boolean),
4646 nsidBlocklist: env("NSID_BLOCKLIST", "").split(",").filter(Boolean),
4747+ // NSIDs listed here are only allowed when the automation declares a non-empty
4848+ // wantedDids. Used to gate high-volume collections (e.g. app.bsky.*) on
4949+ // Jetstream-level DID filtering instead of a blanket firehose subscription.
5050+ nsidRequireDids: env("NSID_REQUIRES_DIDS", "").split(",").filter(Boolean),
4751 // Hand-picked AT URIs (at://did/run.airglow.automation/rkey), comma-separated.
4852 // These appear on the homepage and at the top of the gallery tagged "Featured".
4953 featuredAutomations: env("FEATURED_AUTOMATIONS", "")
···22import { db } from "../db/index.js";
33import { automations } from "../db/schema.js";
44import { config } from "../config.js";
55-import { isNsidAllowed } from "../lexicons/resolver.js";
66-import { matchConditions, type JetstreamEvent } from "./matcher.js";
55+import { isNsidAllowed, nsidRequiresWantedDids } from "../lexicons/resolver.js";
66+import { matchConditions, resolveConditionValue, type JetstreamEvent } from "./matcher.js";
7788type Automation = typeof automations.$inferSelect;
99···14141515type EventHandler = (match: MatchedEvent) => void | Promise<void>;
16161717-export class JetstreamConsumer {
1717+/** Resolve `{{self}}` placeholders and dedupe/sort into a canonical list. */
1818+function canonicalDids(wantedDids: string[], ownerDid: string): string[] {
1919+ const resolved = new Set<string>();
2020+ for (const entry of wantedDids) {
2121+ const did = resolveConditionValue(entry, ownerDid);
2222+ if (did) resolved.add(did);
2323+ }
2424+ return [...resolved].sort();
2525+}
2626+2727+/** One Jetstream WebSocket, scoped to a fixed wantedDids set. */
2828+export class JetstreamSubscription {
1829 private ws: WebSocket | null = null;
1930 private autosByCollectionOp = new Map<string, Automation[]>();
3131+ private collections: string[] = [];
2032 private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
2133 private reconnectDelay = 1000;
2234 private running = false;
2323- private handler: EventHandler;
3535+ private readonly wantedDids: string[];
3636+ private readonly label: string;
3737+ private readonly handler: EventHandler;
24382525- constructor(handler: EventHandler) {
3939+ constructor(wantedDids: string[], handler: EventHandler) {
4040+ this.wantedDids = wantedDids;
2641 this.handler = handler;
2727- }
2828-2929- async start() {
3030- this.running = true;
3131- await this.refreshAutomations();
3232- }
3333-3434- stop() {
3535- this.running = false;
3636- if (this.reconnectTimer) clearTimeout(this.reconnectTimer);
3737- this.ws?.close();
3838- this.ws = null;
4242+ this.label = wantedDids.length === 0 ? "global" : `dids=${wantedDids.length}`;
3943 }
40444141- async refreshAutomations() {
4242- const rows = await db.query.automations.findMany({
4343- where: eq(automations.active, true),
4444- });
4545+ /** Replace the automation set. Reconnects only if wantedCollections changed. */
4646+ update(byCollectionOp: Map<string, Automation[]>) {
4747+ const newCollections = [...deriveCollections(byCollectionOp)].sort();
4848+ const changed =
4949+ newCollections.length !== this.collections.length ||
5050+ newCollections.some((c, i) => c !== this.collections[i]);
45514646- const byCollectionOp = new Map<string, Automation[]>();
4747- for (const row of rows) {
4848- if (!isNsidAllowed(row.lexicon, config.nsidAllowlist, config.nsidBlocklist)) continue;
4949- for (const op of row.operations) {
5050- const key = `${row.lexicon}\0${op}`;
5151- const list = byCollectionOp.get(key) || [];
5252- list.push(row);
5353- byCollectionOp.set(key, list);
5454- }
5555- }
5656-5757- const deriveCollections = (map: Map<string, Automation[]>) => {
5858- const cols = new Set<string>();
5959- for (const key of map.keys()) cols.add(key.slice(0, key.indexOf("\0")));
6060- return cols;
6161- };
6262-6363- const oldCollections = deriveCollections(this.autosByCollectionOp);
6464- const newCollections = deriveCollections(byCollectionOp);
6552 this.autosByCollectionOp = byCollectionOp;
5353+ this.collections = newCollections;
66546767- const collectionsChanged =
6868- oldCollections.size !== newCollections.size ||
6969- [...newCollections].some((c) => !oldCollections.has(c)) ||
7070- [...oldCollections].some((c) => !newCollections.has(c));
5555+ if (!this.running) return;
71567272- if (!collectionsChanged) return;
5757+ if (!changed) return;
73587474- const hadCollections = oldCollections.size > 0;
7575- const hasCollections = newCollections.size > 0;
5959+ if (newCollections.length === 0) {
6060+ this.ws?.close();
6161+ return;
6262+ }
76637777- if (!hadCollections && hasCollections) {
6464+ if (!this.ws) {
7865 this.connect();
7979- } else if (hadCollections && !hasCollections) {
8080- this.ws?.close();
8181- } else if (this.ws && this.ws.readyState <= WebSocket.OPEN) {
8282- this.ws.close(); // onclose reconnects with updated params
6666+ } else {
6767+ // onclose reconnects with the updated params
6868+ this.ws.close();
8369 }
8470 }
85718686- private connect() {
8787- if (!this.running) return;
7272+ start() {
7373+ this.running = true;
7474+ if (this.collections.length > 0) this.connect();
7575+ }
7676+7777+ stop() {
7878+ this.running = false;
8879 if (this.reconnectTimer) {
8980 clearTimeout(this.reconnectTimer);
9081 this.reconnectTimer = null;
9182 }
8383+ this.ws?.close();
8484+ this.ws = null;
8585+ }
92869393- const collections = [
9494- ...new Set([...this.autosByCollectionOp.keys()].map((k) => k.slice(0, k.indexOf("\0")))),
9595- ];
9696- if (collections.length === 0) {
9797- console.log("Jetstream: no active automations, waiting");
9898- return;
8787+ private connect() {
8888+ if (!this.running) return;
8989+ if (this.reconnectTimer) {
9090+ clearTimeout(this.reconnectTimer);
9191+ this.reconnectTimer = null;
9992 }
9393+ if (this.collections.length === 0) return;
1009410195 const url = new URL(config.jetstreamUrl);
102102- for (const col of collections) {
103103- url.searchParams.append("wantedCollections", col);
104104- }
9696+ for (const col of this.collections) url.searchParams.append("wantedCollections", col);
9797+ for (const did of this.wantedDids) url.searchParams.append("wantedDids", did);
9898+10599 console.log(
106106- `Jetstream: connecting (${collections.length} collection${collections.length === 1 ? "" : "s"})`,
100100+ `Jetstream[${this.label}]: connecting (${this.collections.length} collection${this.collections.length === 1 ? "" : "s"}${this.wantedDids.length > 0 ? `, ${this.wantedDids.length} did${this.wantedDids.length === 1 ? "" : "s"}` : ""})`,
107101 );
108102 this.ws = new WebSocket(url.toString());
109103110104 this.ws.addEventListener("open", () => {
111111- console.log("Jetstream: connected");
105105+ console.log(`Jetstream[${this.label}]: connected`);
112106 this.reconnectDelay = 1000;
113107 });
114108···116110 try {
117111 const event = JSON.parse(msg.data as string) as JetstreamEvent;
118112 if (event.kind === "commit" && event.commit) {
119119- this.processEvent(event);
113113+ this.dispatch(event);
120114 }
121115 } catch (err) {
122122- console.error("Jetstream: failed to process message:", err);
116116+ console.error(`Jetstream[${this.label}]: failed to process message:`, err);
123117 }
124118 });
125119126120 this.ws.addEventListener("close", () => {
127121 this.ws = null;
128128- if (this.running && this.autosByCollectionOp.size > 0) {
129129- console.log(`Jetstream: reconnecting in ${this.reconnectDelay}ms`);
122122+ if (this.running && this.collections.length > 0) {
123123+ console.log(`Jetstream[${this.label}]: reconnecting in ${this.reconnectDelay}ms`);
130124 this.reconnectTimer = setTimeout(() => {
131125 this.reconnectTimer = null;
132126 this.connect();
···136130 });
137131138132 this.ws.addEventListener("error", (err) => {
139139- console.error("Jetstream: WebSocket error:", err);
133133+ console.error(`Jetstream[${this.label}]: WebSocket error:`, err);
140134 });
141135 }
142136143143- private processEvent(event: JetstreamEvent) {
137137+ dispatch(event: JetstreamEvent) {
144138 const collection = event.commit!.collection;
145139 const operation = event.commit!.operation;
146140 const key = `${collection}\0${operation}`;
···150144 for (const auto of autos) {
151145 if (matchConditions(event, auto.conditions, auto.did)) {
152146 void Promise.resolve(this.handler({ automation: auto, event })).catch((err) => {
153153- console.error("Jetstream: handler error:", err);
147147+ console.error(`Jetstream[${this.label}]: handler error:`, err);
154148 });
155149 }
156150 }
157151 }
158152}
159153154154+function deriveCollections(map: Map<string, Automation[]>): Set<string> {
155155+ const cols = new Set<string>();
156156+ for (const key of map.keys()) cols.add(key.slice(0, key.indexOf("\0")));
157157+ return cols;
158158+}
159159+160160+/** Manages one subscription per distinct wantedDids set (canonical key). */
161161+export class JetstreamManager {
162162+ private subscriptions = new Map<string, JetstreamSubscription>();
163163+ private running = false;
164164+ private readonly handler: EventHandler;
165165+166166+ constructor(handler: EventHandler) {
167167+ this.handler = handler;
168168+ }
169169+170170+ async start() {
171171+ this.running = true;
172172+ await this.refreshAutomations();
173173+ }
174174+175175+ stop() {
176176+ this.running = false;
177177+ for (const sub of this.subscriptions.values()) sub.stop();
178178+ this.subscriptions.clear();
179179+ }
180180+181181+ async refreshAutomations() {
182182+ const rows = await db.query.automations.findMany({
183183+ where: eq(automations.active, true),
184184+ });
185185+186186+ // Partition by canonical DID-set key. Empty key = global (no DID filter).
187187+ const partitions = new Map<
188188+ string,
189189+ { wantedDids: string[]; byCollectionOp: Map<string, Automation[]> }
190190+ >();
191191+192192+ for (const row of rows) {
193193+ if (!isNsidAllowed(row.lexicon, config.nsidAllowlist, config.nsidBlocklist)) continue;
194194+195195+ const resolvedDids = canonicalDids(row.wantedDids, row.did);
196196+ if (
197197+ nsidRequiresWantedDids(row.lexicon, config.nsidRequireDids) &&
198198+ resolvedDids.length === 0
199199+ ) {
200200+ console.warn(
201201+ `Jetstream: skipping ${row.uri} — ${row.lexicon} requires wantedDids but none are set`,
202202+ );
203203+ continue;
204204+ }
205205+206206+ const key = resolvedDids.join(",");
207207+ let bucket = partitions.get(key);
208208+ if (!bucket) {
209209+ bucket = { wantedDids: resolvedDids, byCollectionOp: new Map() };
210210+ partitions.set(key, bucket);
211211+ }
212212+ for (const op of row.operations) {
213213+ const cellKey = `${row.lexicon}\0${op}`;
214214+ const list = bucket.byCollectionOp.get(cellKey) || [];
215215+ list.push(row);
216216+ bucket.byCollectionOp.set(cellKey, list);
217217+ }
218218+ }
219219+220220+ // Tear down subscriptions with no partition.
221221+ for (const [key, sub] of this.subscriptions) {
222222+ if (!partitions.has(key)) {
223223+ sub.stop();
224224+ this.subscriptions.delete(key);
225225+ }
226226+ }
227227+228228+ // Create or update subscriptions for each partition.
229229+ for (const [key, { wantedDids, byCollectionOp }] of partitions) {
230230+ let sub = this.subscriptions.get(key);
231231+ if (!sub) {
232232+ sub = new JetstreamSubscription(wantedDids, this.handler);
233233+ this.subscriptions.set(key, sub);
234234+ sub.update(byCollectionOp);
235235+ if (this.running) sub.start();
236236+ } else {
237237+ sub.update(byCollectionOp);
238238+ }
239239+ }
240240+ }
241241+242242+ /** Number of live subscriptions (mainly for tests/introspection). */
243243+ get subscriptionCount() {
244244+ return this.subscriptions.size;
245245+ }
246246+247247+ /** Canonical DID-set keys of live subscriptions (mainly for tests). */
248248+ get subscriptionKeys() {
249249+ return [...this.subscriptions.keys()];
250250+ }
251251+252252+ /**
253253+ * Dispatch an event to every subscription. In production the WebSocket
254254+ * message handler per subscription does this directly; this helper makes
255255+ * the fan-out testable without standing up real sockets.
256256+ */
257257+ dispatch(event: JetstreamEvent) {
258258+ for (const sub of this.subscriptions.values()) sub.dispatch(event);
259259+ }
260260+}
261261+160262// — Singleton management —
161263162162-let consumer: JetstreamConsumer | null = null;
264264+let manager: JetstreamManager | null = null;
163265164164-export function startJetstream(handler: EventHandler): JetstreamConsumer {
165165- if (consumer) return consumer;
166166- consumer = new JetstreamConsumer(handler);
167167- consumer.start().catch((err) => {
266266+export function startJetstream(handler: EventHandler): JetstreamManager {
267267+ if (manager) return manager;
268268+ manager = new JetstreamManager(handler);
269269+ manager.start().catch((err) => {
168270 console.error("Jetstream: failed to start:", err);
169271 });
170170- return consumer;
272272+ return manager;
171273}
172274173173-export function getJetstream(): JetstreamConsumer | null {
174174- return consumer;
275275+export function getJetstream(): JetstreamManager | null {
276276+ return manager;
175277}
176278177279/** Call after creating/updating/deleting automations. */
178280export function notifyAutomationChange() {
179179- consumer?.refreshAutomations().catch((err) => {
281281+ manager?.refreshAutomations().catch((err) => {
180282 console.error("Jetstream: failed to refresh automations:", err);
181283 });
182284}
+1-1
lib/jetstream/matcher.ts
···4545 return typeof value === "string" ? value : JSON.stringify(value);
4646}
47474848-function resolveConditionValue(value: string, ownerDid: string): string {
4848+export function resolveConditionValue(value: string, ownerDid: string): string {
4949 return value === "{{self}}" ? ownerDid : value;
5050}
5151
+25
lib/lexicons/match.ts
···11+// Browser-safe NSID glob-matching helpers. No node: imports — safe to use from
22+// both server code and client islands.
33+44+export function nsidMatches(nsid: string, pattern: string): boolean {
55+ return pattern.endsWith(".*") ? nsid.startsWith(pattern.slice(0, -1)) : nsid === pattern;
66+}
77+88+/**
99+ * Check whether an NSID is allowed by the instance's allowlist/blocklist.
1010+ * Blocklist takes precedence.
1111+ */
1212+export function isNsidAllowed(nsid: string, allowlist: string[], blocklist: string[]): boolean {
1313+ if (blocklist.some((p) => nsidMatches(nsid, p))) return false;
1414+ if (allowlist.length > 0) return allowlist.some((p) => nsidMatches(nsid, p));
1515+ return true;
1616+}
1717+1818+/**
1919+ * Whether an NSID may only be subscribed to via an explicit wantedDids list.
2020+ * Used for high-volume collections (e.g. app.bsky.*) where a firehose-wide
2121+ * subscription would overwhelm the worker.
2222+ */
2323+export function nsidRequiresWantedDids(nsid: string, patterns: string[]): boolean {
2424+ return patterns.some((p) => nsidMatches(nsid, p));
2525+}