notification manager for bsky
1// Code-mode: the model writes JS against a typed SDK surface, validated and
2// executed in a QuickJS WASM sandbox. METHOD_DEFS is the single source of truth —
3// it drives the prompt (SDK_SURFACE), the string validator (ALLOWED_METHODS),
4// the sandbox stubs (SANDBOX_PREAMBLE), and the host replay table (METHOD_DISPATCH).
5
6import type {CallResult} from './types'
7
8const METHOD_DEFS = [
9 {
10 fq: 'agent.app.bsky.graph.muteActor',
11 doc: 'Mute an account by DID.',
12 inputType: 'MuteActorInput',
13 input: `interface MuteActorInput {\n actor: Did\n}`,
14 signature: `muteActor(input: MuteActorInput): Promise<void>`,
15 },
16 {
17 fq: 'agent.app.bsky.graph.unmuteActor',
18 doc: 'Undo a previous account mute by DID.',
19 inputType: 'UnmuteActorInput',
20 input: `interface UnmuteActorInput {\n actor: Did\n}`,
21 signature: `unmuteActor(input: UnmuteActorInput): Promise<void>`,
22 },
23 {
24 fq: 'agent.app.bsky.notification.putActivitySubscription',
25 doc: 'Change activity-subscription settings for one actor DID.',
26 inputType: 'PutActivitySubscriptionInput',
27 input: [
28 'interface PutActivitySubscriptionInput {',
29 ' subject: Did',
30 ' activitySubscription: {',
31 ' post: boolean',
32 ' reply: boolean',
33 ' }',
34 '}',
35 ].join('\n'),
36 signature: `putActivitySubscription(input: PutActivitySubscriptionInput): Promise<void>`,
37 },
38 {
39 fq: 'agent.app.bsky.notification.updateSeen',
40 doc: 'Advance the global notification seen cursor.',
41 inputType: 'UpdateSeenInput',
42 input: `interface UpdateSeenInput {\n seenAt: string\n}`,
43 signature: `updateSeen(input: UpdateSeenInput): Promise<void>`,
44 },
45] as const
46
47const ALLOWED_METHODS = METHOD_DEFS.map(def => def.fq)
48
49export class CodeValidationError extends Error {}
50
51export const SDK_SURFACE = [
52 'type Did = string',
53 '',
54 'interface EvidenceNotification {',
55 ' uri: string',
56 ' reason: string',
57 ' authorDid?: Did',
58 ' authorHandle: string',
59 ' authorName: string',
60 ' indexedAt: string',
61 ' text?: string',
62 ' subjectText?: string',
63 ' isReply: boolean',
64 '}',
65 '',
66 'interface Ctx {',
67 ' sourceUris: string[]',
68 ' notifications: EvidenceNotification[]',
69 '}',
70 '',
71 ...METHOD_DEFS.flatMap(def => ['', `/** ${def.doc} */`, def.input]),
72 '',
73 'interface BlueskyAgentSurface {',
74 ' app: {',
75 ' bsky: {',
76 ' graph: {',
77 ...METHOD_DEFS.filter(def => def.fq.includes('.graph.')).map(def => ` ${def.signature}`),
78 ' }',
79 ' notification: {',
80 ...METHOD_DEFS.filter(def => def.fq.includes('.notification.')).map(def => ` ${def.signature}`),
81 ' }',
82 ' }',
83 ' }',
84 '}',
85 '',
86 'Write exactly (plain JavaScript, no type annotations):',
87 'async function run(agent, ctx) {',
88 ' // your code',
89 '}',
90].join('\n')
91
92export function validateCode(code: string) {
93 const trimmed = code.trim()
94 if (!trimmed) throw new CodeValidationError('empty code proposal')
95 if (!trimmed.includes('async function run(')) {
96 throw new CodeValidationError('proposal must define async function run(agent, ctx)')
97 }
98
99 const blocked = [
100 'import ',
101 'require(',
102 'process.',
103 'Bun.',
104 'fetch(',
105 'XMLHttpRequest',
106 'WebSocket',
107 'setTimeout(',
108 'setInterval(',
109 ]
110 for (const token of blocked) {
111 if (trimmed.includes(token)) {
112 throw new CodeValidationError(`blocked token: ${token}`)
113 }
114 }
115
116 const matches = [...trimmed.matchAll(/agent(?:\.[A-Za-z_$][\w$]*)+/g)].map(m => m[0])
117 for (const match of matches) {
118 const isAllowed = ALLOWED_METHODS.some(method => match.startsWith(method))
119 if (!isAllowed) {
120 throw new CodeValidationError(`blocked agent method: ${match}`)
121 }
122 }
123}
124
125let _quickjs: Awaited<ReturnType<typeof import('quickjs-emscripten').getQuickJS>> | undefined
126
127async function getQuickJS() {
128 if (!_quickjs) {
129 const mod = await import('quickjs-emscripten')
130 _quickjs = await mod.getQuickJS()
131 }
132 return _quickjs
133}
134
135// Derive short method name and namespace path from the fq string.
136// e.g. 'agent.app.bsky.graph.muteActor' → name: 'muteActor', path: ['app','bsky','graph']
137function parseFq(fq: string) {
138 const parts = fq.replace(/^agent\./, '').split('.')
139 return {name: parts[parts.length - 1], path: parts.slice(0, -1)}
140}
141
142// The agent type is structural — any object matching the nested method shape works.
143// biome-ignore lint/suspicious/noExplicitAny: agent is structurally typed from METHOD_DEFS
144type AgentSurface = Record<string, any>
145
146type QueuedCall = {method: string; input: unknown}
147
148// Generated from METHOD_DEFS — maps short method names to real agent calls.
149const METHOD_DISPATCH: Record<string, (agent: AgentSurface, input: unknown) => Promise<unknown>> =
150 Object.fromEntries(
151 METHOD_DEFS.map(def => {
152 const {name, path} = parseFq(def.fq)
153 return [name, (agent: AgentSurface, input: unknown) => {
154 let target = agent
155 for (const segment of path) target = target[segment]
156 return target[name](input)
157 }]
158 }),
159 )
160
161// Generated from METHOD_DEFS — builds the nested agent stub for the sandbox.
162// Each method enqueues a {method, input} record and returns Promise.resolve().
163const SANDBOX_PREAMBLE = (() => {
164 const tree: Record<string, Record<string, string[]>> = {}
165 for (const def of METHOD_DEFS) {
166 const {name, path} = parseFq(def.fq)
167 const ns = path.join('.')
168 tree[ns] ??= {}
169 tree[ns][name] ??= []
170 tree[ns][name].push(name)
171 }
172 // Group by namespace path to build nested object
173 const grouped = new Map<string, string[]>()
174 for (const def of METHOD_DEFS) {
175 const {name, path} = parseFq(def.fq)
176 const key = path.join('.')
177 const existing = grouped.get(key) || []
178 existing.push(` ${name}: function(input) { __enqueue("${name}", JSON.stringify(input)); return Promise.resolve(); }`)
179 grouped.set(key, existing)
180 }
181 // Build nested object string: app.bsky.graph → app: { bsky: { graph: { ... } } }
182 // We know all paths are app.bsky.X so we can group by the third segment
183 const namespaces = new Map<string, string[]>()
184 for (const [ns, methods] of grouped) {
185 const parts = ns.split('.')
186 const leaf = parts[parts.length - 1]
187 namespaces.set(leaf, methods)
188 }
189 const nsBlocks = [...namespaces.entries()]
190 .map(([leaf, methods]) => ` ${leaf}: {\n${methods.join(',\n')}\n }`)
191 .join(',\n')
192 return `\nvar agent = {\n app: {\n bsky: {\n${nsBlocks}\n }\n }\n};\n`
193})()
194
195const SANDBOX_TIMEOUT_MS = 5_000
196
197export async function executeCode<TCtx extends object>(
198 code: string,
199 agent: AgentSurface,
200 ctx: TCtx,
201): Promise<CallResult[]> {
202 validateCode(code)
203 const QuickJS = await getQuickJS()
204 const vm = QuickJS.newContext()
205 vm.runtime.setMemoryLimit(16 * 1024 * 1024)
206 vm.runtime.setMaxStackSize(1024 * 1024)
207
208 const deadline = Date.now() + SANDBOX_TIMEOUT_MS
209 vm.runtime.setInterruptHandler(() => {
210 if (Date.now() > deadline) return true
211 return false
212 })
213
214 const callQueue: QueuedCall[] = []
215
216 try {
217 const enqueueHandle = vm.newFunction('__enqueue', (methodHandle, inputHandle) => {
218 const method = vm.dump(methodHandle)
219 const inputJson = vm.dump(inputHandle)
220 const input = typeof inputJson === 'string' ? JSON.parse(inputJson) : inputJson
221 if (typeof method !== 'string' || !(method in METHOD_DISPATCH)) {
222 return
223 }
224 callQueue.push({method, input})
225 })
226 vm.setProp(vm.global, '__enqueue', enqueueHandle)
227 enqueueHandle.dispose()
228
229 const ctxJson = JSON.stringify(ctx)
230 const fullCode = `${SANDBOX_PREAMBLE}\nvar ctx = ${ctxJson};\n${code}\nrun(agent, ctx);`
231
232 const result = vm.evalCode(fullCode)
233 if (result.error) {
234 const err = vm.dump(result.error)
235 result.error.dispose()
236 throw new CodeValidationError(`sandbox error: ${typeof err === 'string' ? err : JSON.stringify(err)}`)
237 }
238 result.value.dispose()
239
240 const pending = vm.runtime.executePendingJobs()
241 if (pending.error) {
242 const err = vm.dump(pending.error)
243 pending.error.dispose()
244 throw new CodeValidationError(`sandbox async error: ${typeof err === 'string' ? err : JSON.stringify(err)}`)
245 }
246 } finally {
247 vm.dispose()
248 }
249
250 // Host-side replay — each call is wrapped so one failure doesn't bypass
251 // the others or the verification step. Earlier sandbox-phase errors still
252 // throw (they're not partial-success scenarios); the try/catch here only
253 // covers real upstream dispatch failures.
254 const results: CallResult[] = []
255 for (const call of callQueue) {
256 const dispatch = METHOD_DISPATCH[call.method]
257 if (!dispatch) {
258 results.push({method: call.method, input: call.input, status: 'failed', error: 'unknown method in queue'})
259 continue
260 }
261 try {
262 await dispatch(agent, call.input)
263 results.push({method: call.method, input: call.input, status: 'ok'})
264 } catch (err) {
265 results.push({
266 method: call.method,
267 input: call.input,
268 status: 'failed',
269 error: err instanceof Error ? err.message : String(err),
270 })
271 }
272 }
273 return results
274}