Suite of AT Protocol TypeScript libraries built on web standards
21
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(sync): otel out-of-the-box

+559 -28
+1
deno.json
··· 13 13 "lex-gen" 14 14 ], 15 15 "imports": { 16 + "@opentelemetry/api": "npm:@opentelemetry/api@^1.9.0", 16 17 "@std/assert": "jsr:@std/assert@^1.0.16" 17 18 } 18 19 }
+7 -1
deno.lock
··· 39 39 "npm:@did-plc/lib@^0.0.4": "0.0.4", 40 40 "npm:@did-plc/server@^0.0.1": "0.0.1_express@4.21.2", 41 41 "npm:@ipld/dag-cbor@^9.2.5": "9.2.5", 42 + "npm:@opentelemetry/api@^1.9.0": "1.9.0", 42 43 "npm:@types/node@*": "24.2.0", 43 44 "npm:get-port@^7.1.0": "7.1.0", 44 45 "npm:key-encoder@^2.0.3": "2.0.3", ··· 268 269 }, 269 270 "@noble/secp256k1@1.7.2": { 270 271 "integrity": "sha512-/qzwYl5eFLH8OWIecQWM31qld2g1NfjgylK+TNhqtaUKP37Nm+Y+z30Fjhw0Ct8p9yCQEm2N3W/AckdIb3SMcQ==" 272 + }, 273 + "@opentelemetry/api@1.9.0": { 274 + "integrity": "sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==" 271 275 }, 272 276 "@types/bn.js@5.2.0": { 273 277 "integrity": "sha512-DLbJ1BPqxvQhIGbeu8VbUC1DiAiahHtAYvA0ZEAa4P31F7IaArc8z3C3BRQdWX4mtLQuABG4yzp76ZrS02Ui1Q==", ··· 1100 1104 }, 1101 1105 "workspace": { 1102 1106 "dependencies": [ 1103 - "jsr:@std/assert@^1.0.16" 1107 + "jsr:@std/assert@^1.0.16", 1108 + "npm:@opentelemetry/api@^1.9.0" 1104 1109 ], 1105 1110 "members": { 1106 1111 "bytes": { ··· 1162 1167 }, 1163 1168 "sync": { 1164 1169 "dependencies": [ 1170 + "npm:@opentelemetry/api@^1.9.0", 1165 1171 "npm:multiformats@^13.4.1", 1166 1172 "npm:p-queue@^8.1.1" 1167 1173 ]
+1
sync/deno.json
··· 4 4 "exports": "./mod.ts", 5 5 "license": "MIT", 6 6 "imports": { 7 + "@opentelemetry/api": "npm:@opentelemetry/api@^1.9.0", 7 8 "multiformats": "npm:multiformats@^13.4.1", 8 9 "p-queue": "npm:p-queue@^8.1.1" 9 10 },
+77 -23
sync/firehose/index.ts
··· 27 27 SyncEvt, 28 28 } from "../events.ts"; 29 29 import type { EventRunner } from "../runner/index.ts"; 30 + import { SyncTelemetry, type SyncTelemetryOptions } from "../telemetry.ts"; 30 31 import { didAndSeqForEvt } from "../util.ts"; 31 32 import { 32 33 type Account, ··· 79 80 excludeAccount?: boolean; 80 81 excludeCommit?: boolean; 81 82 excludeSync?: boolean; 83 + telemetry?: SyncTelemetryOptions; 82 84 }; 83 85 84 86 /** ··· 192 194 private abortController: AbortController; 193 195 private destoryDefer: Deferrable; 194 196 private matchCollection: ((col: string) => boolean) | null = null; 197 + private telemetry: SyncTelemetry; 195 198 196 199 constructor(public opts: FirehoseOptions) { 197 200 this.destoryDefer = createDeferrable(); 198 201 this.abortController = new AbortController(); 202 + const runnerTelemetry = this.opts.runner?.getTelemetry?.(); 203 + if (runnerTelemetry) { 204 + if (opts.telemetry !== undefined) { 205 + throw new Error( 206 + "Telemetry configured on both Firehose and runner. Configure telemetry in one place.", 207 + ); 208 + } 209 + this.telemetry = runnerTelemetry; 210 + } else { 211 + this.telemetry = new SyncTelemetry(opts.telemetry); 212 + this.opts.runner?.setTelemetry?.(this.telemetry); 213 + } 199 214 if (this.opts.getCursor && this.opts.runner) { 200 215 throw new Error("Must set only `getCursor` or `runner`"); 201 216 } ··· 236 251 try { 237 252 return isValidRepoEvent(value); 238 253 } catch (err) { 254 + this.telemetry.recordError("validation"); 239 255 this.opts.onError(new FirehoseValidationError(err, value)); 240 256 } 241 257 }, ··· 245 261 async start(): Promise<void> { 246 262 try { 247 263 for await (const evt of this.sub) { 264 + const eventType = getRepoEventType(evt); 265 + this.telemetry.recordEventReceived(eventType); 266 + const eventContext = this.telemetry.activeContext(); 248 267 if (this.opts.runner) { 249 268 const parsed = didAndSeqForEvt(evt); 250 269 if (!parsed) { ··· 254 273 parsed.did, 255 274 parsed.seq, 256 275 async () => { 257 - const parsed = await this.parseEvt(evt); 258 - for (const write of parsed) { 259 - try { 260 - await this.opts.handleEvent(write); 261 - } catch (err) { 262 - this.opts.onError(new FirehoseHandlerError(err, write)); 263 - } 264 - } 276 + await this.telemetry.withContext(eventContext, async () => { 277 + await this.processEvt(evt, eventType); 278 + }); 265 279 }, 266 280 ); 267 281 } else { 268 - await this.processEvt(evt); 282 + await this.telemetry.withContext(eventContext, async () => { 283 + await this.processEvt(evt, eventType); 284 + }); 269 285 } 270 286 } 271 287 } catch (err) { ··· 276 292 this.destoryDefer.resolve(); 277 293 return; 278 294 } 295 + this.telemetry.recordError("subscription"); 279 296 this.opts.onError(new FirehoseSubscriptionError(err)); 280 297 await wait(this.opts.subscriptionReconnectDelay ?? 3000); 281 298 return this.start(); 282 299 } 283 300 } 284 301 285 - private async parseEvt(evt: RepoEvent): Promise<Event[]> { 302 + private async parseEvt( 303 + evt: RepoEvent, 304 + eventType: string, 305 + ): Promise<{ events: Event[]; outcome: "ok" | "error" }> { 286 306 try { 287 307 if (isCommit(evt) && !this.opts.excludeCommit) { 288 - return this.opts.unauthenticatedCommits 308 + const events = this.opts.unauthenticatedCommits 289 309 ? await parseCommitUnauthenticated(evt, this.matchCollection) 290 310 : await parseCommitAuthenticated( 291 311 this.opts.idResolver, 292 312 evt, 293 313 this.matchCollection, 294 314 ); 315 + return { events, outcome: "ok" }; 295 316 } else if (isAccount(evt) && !this.opts.excludeAccount) { 296 317 const parsed = parseAccount(evt); 297 - return parsed ? [parsed] : []; 318 + return { events: parsed ? [parsed] : [], outcome: "ok" }; 298 319 } else if (isIdentity(evt) && !this.opts.excludeIdentity) { 299 320 const parsed = await parseIdentity( 300 321 this.opts.idResolver, 301 322 evt, 302 323 this.opts.unauthenticatedHandles, 303 324 ); 304 - return parsed ? [parsed] : []; 325 + return { events: parsed ? [parsed] : [], outcome: "ok" }; 305 326 } else if (isSync(evt) && !this.opts.excludeSync) { 306 327 const parsed = await parseSync(evt); 307 - return parsed ? [parsed] : []; 328 + return { events: parsed ? [parsed] : [], outcome: "ok" }; 308 329 } else { 309 - return []; 330 + return { events: [], outcome: "ok" }; 310 331 } 311 332 } catch (err) { 333 + this.telemetry.recordError("parse", eventType); 312 334 this.opts.onError(new FirehoseParseError(err, evt)); 313 - return []; 335 + return { events: [], outcome: "error" }; 314 336 } 315 337 } 316 338 317 - private async processEvt(evt: RepoEvent) { 318 - const parsed = await this.parseEvt(evt); 319 - for (const write of parsed) { 320 - try { 321 - await this.opts.handleEvent(write); 322 - } catch (err) { 339 + private async processEvt( 340 + evt: RepoEvent, 341 + eventType: string, 342 + ) { 343 + const parseStart = performance.now(); 344 + const parsed = await this.parseEvt(evt, eventType); 345 + this.telemetry.recordParseDuration( 346 + performance.now() - parseStart, 347 + eventType, 348 + parsed.outcome, 349 + ); 350 + this.telemetry.recordEventsParsed(parsed.events.length, eventType); 351 + for (const write of parsed.events) { 352 + const handleStart = performance.now(); 353 + let outcome: "ok" | "error" = "ok"; 354 + await this.telemetry.withSpan( 355 + "sync.firehose.event.handle", 356 + { event_type: write.event }, 357 + async () => { 358 + await this.opts.handleEvent(write); 359 + }, 360 + ).catch((err) => { 361 + outcome = "error"; 362 + this.telemetry.recordError("handler", write.event); 323 363 this.opts.onError(new FirehoseHandlerError(err, write)); 324 - } 364 + }); 365 + this.telemetry.recordHandleDuration( 366 + performance.now() - handleStart, 367 + write.event, 368 + outcome, 369 + ); 370 + this.telemetry.recordEventHandled(write.event, outcome); 325 371 } 326 372 } 327 373 ··· 578 624 super("error in firehose event handler", { cause: err }); 579 625 } 580 626 } 627 + 628 + const getRepoEventType = (evt: RepoEvent): string => { 629 + if (isCommit(evt)) return "commit"; 630 + if (isAccount(evt)) return "account"; 631 + if (isIdentity(evt)) return "identity"; 632 + if (isSync(evt)) return "sync"; 633 + return "unknown"; 634 + };
+1
sync/mod.ts
··· 70 70 export * from "./runner/index.ts"; 71 71 export * from "./firehose/index.ts"; 72 72 export * from "./events.ts"; 73 + export * from "./telemetry.ts";
+55 -4
sync/runner/memory-runner.ts
··· 1 1 import PQueue from "p-queue"; 2 2 import { ConsecutiveList } from "./consecutive-list.ts"; 3 3 import type { EventRunner } from "./types.ts"; 4 + import { SyncTelemetry, type SyncTelemetryOptions } from "../telemetry.ts"; 4 5 5 6 /** 6 7 * Options for {@link MemoryRunner} ··· 14 15 concurrency?: number; 15 16 startCursor?: number; 16 17 setCursorInterval?: number; // milliseconds between persisted cursor saves (throttling) 18 + telemetry?: SyncTelemetryOptions; 17 19 }; 18 20 19 21 /** A queue with arbitrarily many partitions, each processing work sequentially. ··· 26 28 cursor: number | undefined; 27 29 private lastCursorSave = 0; 28 30 private savingCursor = false; 31 + private telemetry: SyncTelemetry; 29 32 30 33 constructor(public opts: MemoryRunnerOptions = {}) { 31 34 this.mainQueue = new PQueue({ concurrency: opts.concurrency ?? Infinity }); 32 35 this.cursor = opts.startCursor; 36 + this.telemetry = new SyncTelemetry(opts.telemetry); 37 + } 38 + 39 + getTelemetry(): SyncTelemetry { 40 + return this.telemetry; 41 + } 42 + 43 + setTelemetry(telemetry: SyncTelemetry): void { 44 + this.telemetry = telemetry; 33 45 } 34 46 35 47 getCursor(): number | undefined { ··· 60 72 if (this.mainQueue.isPaused) return; 61 73 const item = this.consecutive.push(seq); 62 74 await this.addTask(did, async () => { 63 - await handler(); 75 + const taskStart = performance.now(); 76 + try { 77 + await handler(); 78 + this.telemetry.recordRunnerTaskDuration( 79 + performance.now() - taskStart, 80 + "ok", 81 + ); 82 + } catch (err) { 83 + this.telemetry.recordRunnerTaskDuration( 84 + performance.now() - taskStart, 85 + "error", 86 + ); 87 + this.telemetry.recordError("runner"); 88 + throw err; 89 + } 64 90 const latest = item.complete().at(-1); 65 91 if (latest !== undefined) { 66 92 this.cursor = latest; 67 93 const { setCursor, setCursorInterval } = this.opts; 68 94 if (setCursor) { 69 95 if (!setCursorInterval) { 70 - await setCursor(this.cursor); 96 + const saveStart = performance.now(); 97 + try { 98 + await setCursor(this.cursor!); 99 + this.telemetry.recordCursorSaveDuration( 100 + performance.now() - saveStart, 101 + "ok", 102 + ); 103 + } catch (err) { 104 + this.telemetry.recordCursorSaveDuration( 105 + performance.now() - saveStart, 106 + "error", 107 + ); 108 + this.telemetry.recordError("runner"); 109 + throw err; 110 + } 71 111 this.lastCursorSave = Date.now(); 72 112 } else { 73 113 const now = Date.now(); ··· 75 115 now - this.lastCursorSave >= setCursorInterval && 76 116 !this.savingCursor 77 117 ) { 78 - // Set timestamp & flag before awaiting to avoid multiple saves racing in same interval 79 118 this.lastCursorSave = now; 80 119 this.savingCursor = true; 120 + const saveStart = performance.now(); 81 121 try { 82 - await setCursor(this.cursor); 122 + await setCursor(this.cursor!); 123 + this.telemetry.recordCursorSaveDuration( 124 + performance.now() - saveStart, 125 + "ok", 126 + ); 127 + } catch (err) { 128 + this.telemetry.recordCursorSaveDuration( 129 + performance.now() - saveStart, 130 + "error", 131 + ); 132 + this.telemetry.recordError("runner"); 133 + throw err; 83 134 } finally { 84 135 this.savingCursor = false; 85 136 }
+4
sync/runner/types.ts
··· 1 + import type { SyncTelemetry } from "../telemetry.ts"; 2 + 1 3 /** 2 4 * Generic event runner interface 3 5 * for event tracking and processing 4 6 */ 5 7 export interface EventRunner { 6 8 getCursor(): Awaited<number | undefined>; 9 + getTelemetry?(): SyncTelemetry; 10 + setTelemetry?(telemetry: SyncTelemetry): void; 7 11 trackEvent( 8 12 did: string, 9 13 seq: number,
+192
sync/telemetry.ts
··· 1 + import { 2 + type Attributes, 3 + type Context, 4 + context, 5 + type Meter, 6 + metrics, 7 + type Span, 8 + SpanStatusCode, 9 + trace, 10 + type Tracer, 11 + } from "@opentelemetry/api"; 12 + 13 + export type SyncTelemetryOptions = { 14 + enabled?: boolean; 15 + tracer?: Tracer; 16 + meter?: Meter; 17 + }; 18 + 19 + export type SyncAttributes = Attributes; 20 + export type SyncSpan = Span; 21 + 22 + type Outcome = "ok" | "error"; 23 + 24 + export class SyncTelemetry { 25 + private readonly tracer: Tracer; 26 + readonly enabled: boolean; 27 + 28 + private eventsReceived?: ReturnType<Meter["createCounter"]>; 29 + private eventsParsed?: ReturnType<Meter["createCounter"]>; 30 + private eventsHandled?: ReturnType<Meter["createCounter"]>; 31 + private errorsTotal?: ReturnType<Meter["createCounter"]>; 32 + private parseDuration?: ReturnType<Meter["createHistogram"]>; 33 + private handleDuration?: ReturnType<Meter["createHistogram"]>; 34 + private runnerTaskDuration?: ReturnType<Meter["createHistogram"]>; 35 + private cursorSaveDuration?: ReturnType<Meter["createHistogram"]>; 36 + 37 + constructor(opts: SyncTelemetryOptions = {}) { 38 + this.enabled = opts.enabled ?? true; 39 + this.tracer = opts.tracer ?? trace.getTracer("@atp/sync"); 40 + const meter = opts.meter ?? metrics.getMeter("@atp/sync"); 41 + if (this.enabled) { 42 + this.eventsReceived = meter.createCounter("sync.events.received", { 43 + description: "Number of firehose events received", 44 + unit: "1", 45 + }); 46 + this.eventsParsed = meter.createCounter("sync.events.parsed", { 47 + description: "Number of parsed sync events", 48 + unit: "1", 49 + }); 50 + this.eventsHandled = meter.createCounter("sync.events.handled", { 51 + description: "Number of handled sync events", 52 + unit: "1", 53 + }); 54 + this.errorsTotal = meter.createCounter("sync.errors.total", { 55 + description: "Number of sync processing errors", 56 + unit: "1", 57 + }); 58 + this.parseDuration = meter.createHistogram("sync.parse.duration", { 59 + description: "Duration of event parsing and auth", 60 + unit: "ms", 61 + }); 62 + this.handleDuration = meter.createHistogram("sync.handle.duration", { 63 + description: "Duration of event handler execution", 64 + unit: "ms", 65 + }); 66 + this.runnerTaskDuration = meter.createHistogram( 67 + "sync.runner.task.duration", 68 + { 69 + description: "Duration of runner task execution", 70 + unit: "ms", 71 + }, 72 + ); 73 + this.cursorSaveDuration = meter.createHistogram( 74 + "sync.runner.cursor_save.duration", 75 + { 76 + description: "Duration of cursor persistence", 77 + unit: "ms", 78 + }, 79 + ); 80 + } 81 + } 82 + 83 + activeContext(): Context { 84 + return context.active(); 85 + } 86 + 87 + withContext<T>(ctx: Context, fn: () => T): T { 88 + return context.with(ctx, fn); 89 + } 90 + 91 + startSpan( 92 + name: string, 93 + attributes?: Attributes, 94 + parentContext?: Context, 95 + ): Span | undefined { 96 + if (!this.enabled) { 97 + return undefined; 98 + } 99 + return this.tracer.startSpan(name, { attributes }, parentContext); 100 + } 101 + 102 + async withSpan<T>( 103 + name: string, 104 + attributes: Attributes, 105 + fn: (span: Span | undefined) => Promise<T>, 106 + parentContext?: Context, 107 + ): Promise<T> { 108 + if (!this.enabled) { 109 + return await fn(undefined); 110 + } 111 + const span = this.startSpan(name, attributes, parentContext); 112 + if (!span) { 113 + return await fn(undefined); 114 + } 115 + const spanContext = trace.setSpan(parentContext ?? context.active(), span); 116 + return await context.with(spanContext, async () => { 117 + try { 118 + return await fn(span); 119 + } catch (err) { 120 + this.recordSpanError(span, err); 121 + throw err; 122 + } finally { 123 + span.end(); 124 + } 125 + }); 126 + } 127 + 128 + recordEventReceived(eventType: string): void { 129 + this.eventsReceived?.add(1, { event_type: eventType }); 130 + } 131 + 132 + recordEventsParsed(count: number, eventType: string): void { 133 + if (count < 1) { 134 + return; 135 + } 136 + this.eventsParsed?.add(count, { event_type: eventType }); 137 + } 138 + 139 + recordEventHandled(eventType: string, outcome: Outcome): void { 140 + this.eventsHandled?.add(1, { event_type: eventType, outcome }); 141 + } 142 + 143 + recordError( 144 + stage: "validation" | "parse" | "subscription" | "handler" | "runner", 145 + eventType?: string, 146 + ): void { 147 + const attributes: Attributes = { stage }; 148 + if (eventType !== undefined) { 149 + attributes.event_type = eventType; 150 + } 151 + this.errorsTotal?.add(1, attributes); 152 + } 153 + 154 + recordParseDuration( 155 + durationMs: number, 156 + eventType: string, 157 + outcome: Outcome, 158 + ): void { 159 + this.parseDuration?.record(durationMs, { event_type: eventType, outcome }); 160 + } 161 + 162 + recordHandleDuration( 163 + durationMs: number, 164 + eventType: string, 165 + outcome: Outcome, 166 + ): void { 167 + this.handleDuration?.record(durationMs, { event_type: eventType, outcome }); 168 + } 169 + 170 + recordRunnerTaskDuration(durationMs: number, outcome: Outcome): void { 171 + this.runnerTaskDuration?.record(durationMs, { outcome }); 172 + } 173 + 174 + recordCursorSaveDuration(durationMs: number, outcome: Outcome): void { 175 + this.cursorSaveDuration?.record(durationMs, { outcome }); 176 + } 177 + 178 + recordSpanError(span: Span | undefined, error: unknown): void { 179 + if (!span) { 180 + return; 181 + } 182 + span.recordException(asError(error)); 183 + span.setStatus({ code: SpanStatusCode.ERROR }); 184 + } 185 + } 186 + 187 + const asError = (error: unknown): Error => { 188 + if (error instanceof Error) { 189 + return error; 190 + } 191 + return new Error(String(error)); 192 + };
+221
sync/tests/telemetry_test.ts
··· 1 + import { assertEquals, assertRejects, assertThrows } from "@std/assert"; 2 + import { IdResolver, MemoryCache } from "@atp/identity"; 3 + import { Firehose } from "../firehose/index.ts"; 4 + import { MemoryRunner } from "../runner/index.ts"; 5 + import { SyncTelemetry, type SyncTelemetryOptions } from "../telemetry.ts"; 6 + 7 + type Attributes = Record<string, string | number | boolean>; 8 + type CounterCall = { value: number; attributes?: Attributes }; 9 + type HistogramCall = { value: number; attributes?: Attributes }; 10 + 11 + class MockCounter { 12 + calls: CounterCall[] = []; 13 + 14 + add(value: number, attributes?: Attributes): void { 15 + this.calls.push({ value, attributes }); 16 + } 17 + } 18 + 19 + class MockHistogram { 20 + calls: HistogramCall[] = []; 21 + 22 + record(value: number, attributes?: Attributes): void { 23 + this.calls.push({ value, attributes }); 24 + } 25 + } 26 + 27 + class MockSpan { 28 + ended = false; 29 + exceptions: Error[] = []; 30 + status?: { code: number }; 31 + 32 + constructor( 33 + public name: string, 34 + public attributes: Attributes, 35 + ) {} 36 + 37 + end(): void { 38 + this.ended = true; 39 + } 40 + 41 + recordException(error: Error): void { 42 + this.exceptions.push(error); 43 + } 44 + 45 + setStatus(status: { code: number }): void { 46 + this.status = status; 47 + } 48 + } 49 + 50 + class MockTracer { 51 + spans: MockSpan[] = []; 52 + 53 + startSpan(name: string, options?: { attributes?: Attributes }): MockSpan { 54 + const span = new MockSpan(name, options?.attributes ?? {}); 55 + this.spans.push(span); 56 + return span; 57 + } 58 + } 59 + 60 + class MockMeter { 61 + counters = new Map<string, MockCounter>(); 62 + histograms = new Map<string, MockHistogram>(); 63 + 64 + createCounter(name: string): MockCounter { 65 + const counter = new MockCounter(); 66 + this.counters.set(name, counter); 67 + return counter; 68 + } 69 + 70 + createHistogram(name: string): MockHistogram { 71 + const histogram = new MockHistogram(); 72 + this.histograms.set(name, histogram); 73 + return histogram; 74 + } 75 + } 76 + 77 + Deno.test("SyncTelemetry records counters and histograms", () => { 78 + const tracer = new MockTracer(); 79 + const meter = new MockMeter(); 80 + const telemetry = new SyncTelemetry({ 81 + tracer: tracer as unknown as SyncTelemetryOptions["tracer"], 82 + meter: meter as unknown as SyncTelemetryOptions["meter"], 83 + }); 84 + 85 + telemetry.recordEventReceived("commit"); 86 + telemetry.recordEventsParsed(2, "commit"); 87 + telemetry.recordEventHandled("create", "ok"); 88 + telemetry.recordError("parse", "commit"); 89 + telemetry.recordParseDuration(10, "commit", "ok"); 90 + telemetry.recordHandleDuration(12, "create", "error"); 91 + 92 + assertEquals(meter.counters.get("sync.events.received")?.calls.length, 1); 93 + assertEquals(meter.counters.get("sync.events.parsed")?.calls[0]?.value, 2); 94 + assertEquals(meter.counters.get("sync.events.handled")?.calls.length, 1); 95 + assertEquals(meter.counters.get("sync.errors.total")?.calls.length, 1); 96 + assertEquals(meter.histograms.get("sync.parse.duration")?.calls.length, 1); 97 + assertEquals(meter.histograms.get("sync.handle.duration")?.calls.length, 1); 98 + }); 99 + 100 + Deno.test("SyncTelemetry marks errored spans", async () => { 101 + const tracer = new MockTracer(); 102 + const meter = new MockMeter(); 103 + const telemetry = new SyncTelemetry({ 104 + tracer: tracer as unknown as SyncTelemetryOptions["tracer"], 105 + meter: meter as unknown as SyncTelemetryOptions["meter"], 106 + }); 107 + 108 + await assertRejects(async () => { 109 + await telemetry.withSpan( 110 + "span", 111 + {}, 112 + () => Promise.reject(new Error("boom")), 113 + ); 114 + }); 115 + 116 + assertEquals(tracer.spans.length, 1); 117 + assertEquals(tracer.spans[0]?.ended, true); 118 + assertEquals(tracer.spans[0]?.exceptions.length, 1); 119 + assertEquals(typeof tracer.spans[0]?.status?.code, "number"); 120 + }); 121 + 122 + Deno.test("MemoryRunner emits runner telemetry", async () => { 123 + const tracer = new MockTracer(); 124 + const meter = new MockMeter(); 125 + const runner = new MemoryRunner({ 126 + telemetry: { 127 + tracer: tracer as unknown as SyncTelemetryOptions["tracer"], 128 + meter: meter as unknown as SyncTelemetryOptions["meter"], 129 + }, 130 + setCursor: async () => { 131 + await Promise.resolve(); 132 + }, 133 + }); 134 + 135 + await runner.trackEvent("did:plc:alice", 1, async () => { 136 + await Promise.resolve(); 137 + }); 138 + 139 + assertEquals(tracer.spans.length, 0); 140 + assertEquals( 141 + meter.histograms.get("sync.runner.task.duration")?.calls.length, 142 + 1, 143 + ); 144 + assertEquals( 145 + meter.histograms.get("sync.runner.cursor_save.duration")?.calls.length, 146 + 1, 147 + ); 148 + }); 149 + 150 + Deno.test("SyncTelemetry can be disabled", async () => { 151 + const tracer = new MockTracer(); 152 + const meter = new MockMeter(); 153 + const telemetry = new SyncTelemetry({ 154 + enabled: false, 155 + tracer: tracer as unknown as SyncTelemetryOptions["tracer"], 156 + meter: meter as unknown as SyncTelemetryOptions["meter"], 157 + }); 158 + 159 + await telemetry.withSpan("span", {}, async () => { 160 + await Promise.resolve(); 161 + }); 162 + telemetry.recordEventReceived("commit"); 163 + 164 + assertEquals(tracer.spans.length, 0); 165 + assertEquals(meter.counters.size, 0); 166 + }); 167 + 168 + Deno.test("Firehose shares telemetry instance with runner", () => { 169 + let sharedTelemetry: SyncTelemetry | undefined; 170 + const runner = { 171 + getCursor: () => undefined, 172 + setTelemetry: (telemetry: SyncTelemetry) => { 173 + sharedTelemetry = telemetry; 174 + }, 175 + trackEvent: async () => { 176 + await Promise.resolve(); 177 + }, 178 + }; 179 + const idResolver = new IdResolver({ 180 + plcUrl: "http://localhost:3000", 181 + didCache: new MemoryCache(), 182 + }); 183 + 184 + new Firehose({ 185 + idResolver, 186 + runner, 187 + handleEvent: async () => { 188 + await Promise.resolve(); 189 + }, 190 + onError: () => {}, 191 + }); 192 + 193 + assertEquals(sharedTelemetry instanceof SyncTelemetry, true); 194 + }); 195 + 196 + Deno.test("Firehose rejects conflicting telemetry configuration", () => { 197 + const runnerTelemetry = new SyncTelemetry(); 198 + const runner = { 199 + getCursor: () => undefined, 200 + getTelemetry: () => runnerTelemetry, 201 + trackEvent: async () => { 202 + await Promise.resolve(); 203 + }, 204 + }; 205 + const idResolver = new IdResolver({ 206 + plcUrl: "http://localhost:3000", 207 + didCache: new MemoryCache(), 208 + }); 209 + 210 + assertThrows(() => { 211 + new Firehose({ 212 + idResolver, 213 + runner, 214 + telemetry: {}, 215 + handleEvent: async () => { 216 + await Promise.resolve(); 217 + }, 218 + onError: () => {}, 219 + }); 220 + }); 221 + });