Suite of AT Protocol TypeScript libraries built on web standards
21
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 192 lines 5.4 kB view raw
1import { 2 type Attributes, 3 type Context, 4 context, 5 type Meter, 6 metrics, 7 type Span, 8 SpanStatusCode, 9 trace, 10 type Tracer, 11} from "@opentelemetry/api"; 12 13export type SyncTelemetryOptions = { 14 enabled?: boolean; 15 tracer?: Tracer; 16 meter?: Meter; 17}; 18 19export type SyncAttributes = Attributes; 20export type SyncSpan = Span; 21 22type Outcome = "ok" | "error"; 23 24export class SyncTelemetry { 25 private readonly tracer: Tracer; 26 readonly enabled: boolean; 27 28 private eventsReceived?: ReturnType<Meter["createCounter"]>; 29 private eventsParsed?: ReturnType<Meter["createCounter"]>; 30 private eventsHandled?: ReturnType<Meter["createCounter"]>; 31 private errorsTotal?: ReturnType<Meter["createCounter"]>; 32 private parseDuration?: ReturnType<Meter["createHistogram"]>; 33 private handleDuration?: ReturnType<Meter["createHistogram"]>; 34 private runnerTaskDuration?: ReturnType<Meter["createHistogram"]>; 35 private cursorSaveDuration?: ReturnType<Meter["createHistogram"]>; 36 37 constructor(opts: SyncTelemetryOptions = {}) { 38 this.enabled = opts.enabled ?? true; 39 this.tracer = opts.tracer ?? trace.getTracer("@atp/sync"); 40 const meter = opts.meter ?? metrics.getMeter("@atp/sync"); 41 if (this.enabled) { 42 this.eventsReceived = meter.createCounter("sync.events.received", { 43 description: "Number of firehose events received", 44 unit: "1", 45 }); 46 this.eventsParsed = meter.createCounter("sync.events.parsed", { 47 description: "Number of parsed sync events", 48 unit: "1", 49 }); 50 this.eventsHandled = meter.createCounter("sync.events.handled", { 51 description: "Number of handled sync events", 52 unit: "1", 53 }); 54 this.errorsTotal = meter.createCounter("sync.errors.total", { 55 description: "Number of sync processing errors", 56 unit: "1", 57 }); 58 this.parseDuration = meter.createHistogram("sync.parse.duration", { 59 description: "Duration of event parsing and auth", 60 unit: "ms", 61 }); 62 this.handleDuration = meter.createHistogram("sync.handle.duration", { 63 description: "Duration of event handler execution", 64 unit: "ms", 65 }); 66 this.runnerTaskDuration = meter.createHistogram( 67 "sync.runner.task.duration", 68 { 69 description: "Duration of runner task execution", 70 unit: "ms", 71 }, 72 ); 73 this.cursorSaveDuration = meter.createHistogram( 74 "sync.runner.cursor_save.duration", 75 { 76 description: "Duration of cursor persistence", 77 unit: "ms", 78 }, 79 ); 80 } 81 } 82 83 activeContext(): Context { 84 return context.active(); 85 } 86 87 withContext<T>(ctx: Context, fn: () => T): T { 88 return context.with(ctx, fn); 89 } 90 91 startSpan( 92 name: string, 93 attributes?: Attributes, 94 parentContext?: Context, 95 ): Span | undefined { 96 if (!this.enabled) { 97 return undefined; 98 } 99 return this.tracer.startSpan(name, { attributes }, parentContext); 100 } 101 102 async withSpan<T>( 103 name: string, 104 attributes: Attributes, 105 fn: (span: Span | undefined) => Promise<T>, 106 parentContext?: Context, 107 ): Promise<T> { 108 if (!this.enabled) { 109 return await fn(undefined); 110 } 111 const span = this.startSpan(name, attributes, parentContext); 112 if (!span) { 113 return await fn(undefined); 114 } 115 const spanContext = trace.setSpan(parentContext ?? context.active(), span); 116 return await context.with(spanContext, async () => { 117 try { 118 return await fn(span); 119 } catch (err) { 120 this.recordSpanError(span, err); 121 throw err; 122 } finally { 123 span.end(); 124 } 125 }); 126 } 127 128 recordEventReceived(eventType: string): void { 129 this.eventsReceived?.add(1, { event_type: eventType }); 130 } 131 132 recordEventsParsed(count: number, eventType: string): void { 133 if (count < 1) { 134 return; 135 } 136 this.eventsParsed?.add(count, { event_type: eventType }); 137 } 138 139 recordEventHandled(eventType: string, outcome: Outcome): void { 140 this.eventsHandled?.add(1, { event_type: eventType, outcome }); 141 } 142 143 recordError( 144 stage: "validation" | "parse" | "subscription" | "handler" | "runner", 145 eventType?: string, 146 ): void { 147 const attributes: Attributes = { stage }; 148 if (eventType !== undefined) { 149 attributes.event_type = eventType; 150 } 151 this.errorsTotal?.add(1, attributes); 152 } 153 154 recordParseDuration( 155 durationMs: number, 156 eventType: string, 157 outcome: Outcome, 158 ): void { 159 this.parseDuration?.record(durationMs, { event_type: eventType, outcome }); 160 } 161 162 recordHandleDuration( 163 durationMs: number, 164 eventType: string, 165 outcome: Outcome, 166 ): void { 167 this.handleDuration?.record(durationMs, { event_type: eventType, outcome }); 168 } 169 170 recordRunnerTaskDuration(durationMs: number, outcome: Outcome): void { 171 this.runnerTaskDuration?.record(durationMs, { outcome }); 172 } 173 174 recordCursorSaveDuration(durationMs: number, outcome: Outcome): void { 175 this.cursorSaveDuration?.record(durationMs, { outcome }); 176 } 177 178 recordSpanError(span: Span | undefined, error: unknown): void { 179 if (!span) { 180 return; 181 } 182 span.recordException(asError(error)); 183 span.setStatus({ code: SpanStatusCode.ERROR }); 184 } 185} 186 187const asError = (error: unknown): Error => { 188 if (error instanceof Error) { 189 return error; 190 } 191 return new Error(String(error)); 192};