notification manager for bsky
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 274 lines 9.2 kB view raw
1// Code-mode: the model writes JS against a typed SDK surface, validated and 2// executed in a QuickJS WASM sandbox. METHOD_DEFS is the single source of truth — 3// it drives the prompt (SDK_SURFACE), the string validator (ALLOWED_METHODS), 4// the sandbox stubs (SANDBOX_PREAMBLE), and the host replay table (METHOD_DISPATCH). 5 6import type {CallResult} from './types' 7 8const METHOD_DEFS = [ 9 { 10 fq: 'agent.app.bsky.graph.muteActor', 11 doc: 'Mute an account by DID.', 12 inputType: 'MuteActorInput', 13 input: `interface MuteActorInput {\n actor: Did\n}`, 14 signature: `muteActor(input: MuteActorInput): Promise<void>`, 15 }, 16 { 17 fq: 'agent.app.bsky.graph.unmuteActor', 18 doc: 'Undo a previous account mute by DID.', 19 inputType: 'UnmuteActorInput', 20 input: `interface UnmuteActorInput {\n actor: Did\n}`, 21 signature: `unmuteActor(input: UnmuteActorInput): Promise<void>`, 22 }, 23 { 24 fq: 'agent.app.bsky.notification.putActivitySubscription', 25 doc: 'Change activity-subscription settings for one actor DID.', 26 inputType: 'PutActivitySubscriptionInput', 27 input: [ 28 'interface PutActivitySubscriptionInput {', 29 ' subject: Did', 30 ' activitySubscription: {', 31 ' post: boolean', 32 ' reply: boolean', 33 ' }', 34 '}', 35 ].join('\n'), 36 signature: `putActivitySubscription(input: PutActivitySubscriptionInput): Promise<void>`, 37 }, 38 { 39 fq: 'agent.app.bsky.notification.updateSeen', 40 doc: 'Advance the global notification seen cursor.', 41 inputType: 'UpdateSeenInput', 42 input: `interface UpdateSeenInput {\n seenAt: string\n}`, 43 signature: `updateSeen(input: UpdateSeenInput): Promise<void>`, 44 }, 45] as const 46 47const ALLOWED_METHODS = METHOD_DEFS.map(def => def.fq) 48 49export class CodeValidationError extends Error {} 50 51export const SDK_SURFACE = [ 52 'type Did = string', 53 '', 54 'interface EvidenceNotification {', 55 ' uri: string', 56 ' reason: string', 57 ' authorDid?: Did', 58 ' authorHandle: string', 59 ' authorName: string', 60 ' indexedAt: string', 61 ' text?: string', 62 ' subjectText?: string', 63 ' isReply: boolean', 64 '}', 65 '', 66 'interface Ctx {', 67 ' sourceUris: string[]', 68 ' notifications: EvidenceNotification[]', 69 '}', 70 '', 71 ...METHOD_DEFS.flatMap(def => ['', `/** ${def.doc} */`, def.input]), 72 '', 73 'interface BlueskyAgentSurface {', 74 ' app: {', 75 ' bsky: {', 76 ' graph: {', 77 ...METHOD_DEFS.filter(def => def.fq.includes('.graph.')).map(def => ` ${def.signature}`), 78 ' }', 79 ' notification: {', 80 ...METHOD_DEFS.filter(def => def.fq.includes('.notification.')).map(def => ` ${def.signature}`), 81 ' }', 82 ' }', 83 ' }', 84 '}', 85 '', 86 'Write exactly (plain JavaScript, no type annotations):', 87 'async function run(agent, ctx) {', 88 ' // your code', 89 '}', 90].join('\n') 91 92export function validateCode(code: string) { 93 const trimmed = code.trim() 94 if (!trimmed) throw new CodeValidationError('empty code proposal') 95 if (!trimmed.includes('async function run(')) { 96 throw new CodeValidationError('proposal must define async function run(agent, ctx)') 97 } 98 99 const blocked = [ 100 'import ', 101 'require(', 102 'process.', 103 'Bun.', 104 'fetch(', 105 'XMLHttpRequest', 106 'WebSocket', 107 'setTimeout(', 108 'setInterval(', 109 ] 110 for (const token of blocked) { 111 if (trimmed.includes(token)) { 112 throw new CodeValidationError(`blocked token: ${token}`) 113 } 114 } 115 116 const matches = [...trimmed.matchAll(/agent(?:\.[A-Za-z_$][\w$]*)+/g)].map(m => m[0]) 117 for (const match of matches) { 118 const isAllowed = ALLOWED_METHODS.some(method => match.startsWith(method)) 119 if (!isAllowed) { 120 throw new CodeValidationError(`blocked agent method: ${match}`) 121 } 122 } 123} 124 125let _quickjs: Awaited<ReturnType<typeof import('quickjs-emscripten').getQuickJS>> | undefined 126 127async function getQuickJS() { 128 if (!_quickjs) { 129 const mod = await import('quickjs-emscripten') 130 _quickjs = await mod.getQuickJS() 131 } 132 return _quickjs 133} 134 135// Derive short method name and namespace path from the fq string. 136// e.g. 'agent.app.bsky.graph.muteActor' → name: 'muteActor', path: ['app','bsky','graph'] 137function parseFq(fq: string) { 138 const parts = fq.replace(/^agent\./, '').split('.') 139 return {name: parts[parts.length - 1], path: parts.slice(0, -1)} 140} 141 142// The agent type is structural — any object matching the nested method shape works. 143// biome-ignore lint/suspicious/noExplicitAny: agent is structurally typed from METHOD_DEFS 144type AgentSurface = Record<string, any> 145 146type QueuedCall = {method: string; input: unknown} 147 148// Generated from METHOD_DEFS — maps short method names to real agent calls. 149const METHOD_DISPATCH: Record<string, (agent: AgentSurface, input: unknown) => Promise<unknown>> = 150 Object.fromEntries( 151 METHOD_DEFS.map(def => { 152 const {name, path} = parseFq(def.fq) 153 return [name, (agent: AgentSurface, input: unknown) => { 154 let target = agent 155 for (const segment of path) target = target[segment] 156 return target[name](input) 157 }] 158 }), 159 ) 160 161// Generated from METHOD_DEFS — builds the nested agent stub for the sandbox. 162// Each method enqueues a {method, input} record and returns Promise.resolve(). 163const SANDBOX_PREAMBLE = (() => { 164 const tree: Record<string, Record<string, string[]>> = {} 165 for (const def of METHOD_DEFS) { 166 const {name, path} = parseFq(def.fq) 167 const ns = path.join('.') 168 tree[ns] ??= {} 169 tree[ns][name] ??= [] 170 tree[ns][name].push(name) 171 } 172 // Group by namespace path to build nested object 173 const grouped = new Map<string, string[]>() 174 for (const def of METHOD_DEFS) { 175 const {name, path} = parseFq(def.fq) 176 const key = path.join('.') 177 const existing = grouped.get(key) || [] 178 existing.push(` ${name}: function(input) { __enqueue("${name}", JSON.stringify(input)); return Promise.resolve(); }`) 179 grouped.set(key, existing) 180 } 181 // Build nested object string: app.bsky.graph → app: { bsky: { graph: { ... } } } 182 // We know all paths are app.bsky.X so we can group by the third segment 183 const namespaces = new Map<string, string[]>() 184 for (const [ns, methods] of grouped) { 185 const parts = ns.split('.') 186 const leaf = parts[parts.length - 1] 187 namespaces.set(leaf, methods) 188 } 189 const nsBlocks = [...namespaces.entries()] 190 .map(([leaf, methods]) => ` ${leaf}: {\n${methods.join(',\n')}\n }`) 191 .join(',\n') 192 return `\nvar agent = {\n app: {\n bsky: {\n${nsBlocks}\n }\n }\n};\n` 193})() 194 195const SANDBOX_TIMEOUT_MS = 5_000 196 197export async function executeCode<TCtx extends object>( 198 code: string, 199 agent: AgentSurface, 200 ctx: TCtx, 201): Promise<CallResult[]> { 202 validateCode(code) 203 const QuickJS = await getQuickJS() 204 const vm = QuickJS.newContext() 205 vm.runtime.setMemoryLimit(16 * 1024 * 1024) 206 vm.runtime.setMaxStackSize(1024 * 1024) 207 208 const deadline = Date.now() + SANDBOX_TIMEOUT_MS 209 vm.runtime.setInterruptHandler(() => { 210 if (Date.now() > deadline) return true 211 return false 212 }) 213 214 const callQueue: QueuedCall[] = [] 215 216 try { 217 const enqueueHandle = vm.newFunction('__enqueue', (methodHandle, inputHandle) => { 218 const method = vm.dump(methodHandle) 219 const inputJson = vm.dump(inputHandle) 220 const input = typeof inputJson === 'string' ? JSON.parse(inputJson) : inputJson 221 if (typeof method !== 'string' || !(method in METHOD_DISPATCH)) { 222 return 223 } 224 callQueue.push({method, input}) 225 }) 226 vm.setProp(vm.global, '__enqueue', enqueueHandle) 227 enqueueHandle.dispose() 228 229 const ctxJson = JSON.stringify(ctx) 230 const fullCode = `${SANDBOX_PREAMBLE}\nvar ctx = ${ctxJson};\n${code}\nrun(agent, ctx);` 231 232 const result = vm.evalCode(fullCode) 233 if (result.error) { 234 const err = vm.dump(result.error) 235 result.error.dispose() 236 throw new CodeValidationError(`sandbox error: ${typeof err === 'string' ? err : JSON.stringify(err)}`) 237 } 238 result.value.dispose() 239 240 const pending = vm.runtime.executePendingJobs() 241 if (pending.error) { 242 const err = vm.dump(pending.error) 243 pending.error.dispose() 244 throw new CodeValidationError(`sandbox async error: ${typeof err === 'string' ? err : JSON.stringify(err)}`) 245 } 246 } finally { 247 vm.dispose() 248 } 249 250 // Host-side replay — each call is wrapped so one failure doesn't bypass 251 // the others or the verification step. Earlier sandbox-phase errors still 252 // throw (they're not partial-success scenarios); the try/catch here only 253 // covers real upstream dispatch failures. 254 const results: CallResult[] = [] 255 for (const call of callQueue) { 256 const dispatch = METHOD_DISPATCH[call.method] 257 if (!dispatch) { 258 results.push({method: call.method, input: call.input, status: 'failed', error: 'unknown method in queue'}) 259 continue 260 } 261 try { 262 await dispatch(agent, call.input) 263 results.push({method: call.method, input: call.input, status: 'ok'}) 264 } catch (err) { 265 results.push({ 266 method: call.method, 267 input: call.input, 268 status: 'failed', 269 error: err instanceof Error ? err.message : String(err), 270 }) 271 } 272 } 273 return results 274}