Suite of AT Protocol TypeScript libraries built on web standards
1import {
2 type Attributes,
3 type Context,
4 context,
5 type Meter,
6 metrics,
7 type Span,
8 SpanStatusCode,
9 trace,
10 type Tracer,
11} from "@opentelemetry/api";
12
13export type SyncTelemetryOptions = {
14 enabled?: boolean;
15 tracer?: Tracer;
16 meter?: Meter;
17};
18
19export type SyncAttributes = Attributes;
20export type SyncSpan = Span;
21
22type Outcome = "ok" | "error";
23
24export class SyncTelemetry {
25 private readonly tracer: Tracer;
26 readonly enabled: boolean;
27
28 private eventsReceived?: ReturnType<Meter["createCounter"]>;
29 private eventsParsed?: ReturnType<Meter["createCounter"]>;
30 private eventsHandled?: ReturnType<Meter["createCounter"]>;
31 private errorsTotal?: ReturnType<Meter["createCounter"]>;
32 private parseDuration?: ReturnType<Meter["createHistogram"]>;
33 private handleDuration?: ReturnType<Meter["createHistogram"]>;
34 private runnerTaskDuration?: ReturnType<Meter["createHistogram"]>;
35 private cursorSaveDuration?: ReturnType<Meter["createHistogram"]>;
36
37 constructor(opts: SyncTelemetryOptions = {}) {
38 this.enabled = opts.enabled ?? true;
39 this.tracer = opts.tracer ?? trace.getTracer("@atp/sync");
40 const meter = opts.meter ?? metrics.getMeter("@atp/sync");
41 if (this.enabled) {
42 this.eventsReceived = meter.createCounter("sync.events.received", {
43 description: "Number of firehose events received",
44 unit: "1",
45 });
46 this.eventsParsed = meter.createCounter("sync.events.parsed", {
47 description: "Number of parsed sync events",
48 unit: "1",
49 });
50 this.eventsHandled = meter.createCounter("sync.events.handled", {
51 description: "Number of handled sync events",
52 unit: "1",
53 });
54 this.errorsTotal = meter.createCounter("sync.errors.total", {
55 description: "Number of sync processing errors",
56 unit: "1",
57 });
58 this.parseDuration = meter.createHistogram("sync.parse.duration", {
59 description: "Duration of event parsing and auth",
60 unit: "ms",
61 });
62 this.handleDuration = meter.createHistogram("sync.handle.duration", {
63 description: "Duration of event handler execution",
64 unit: "ms",
65 });
66 this.runnerTaskDuration = meter.createHistogram(
67 "sync.runner.task.duration",
68 {
69 description: "Duration of runner task execution",
70 unit: "ms",
71 },
72 );
73 this.cursorSaveDuration = meter.createHistogram(
74 "sync.runner.cursor_save.duration",
75 {
76 description: "Duration of cursor persistence",
77 unit: "ms",
78 },
79 );
80 }
81 }
82
83 activeContext(): Context {
84 return context.active();
85 }
86
87 withContext<T>(ctx: Context, fn: () => T): T {
88 return context.with(ctx, fn);
89 }
90
91 startSpan(
92 name: string,
93 attributes?: Attributes,
94 parentContext?: Context,
95 ): Span | undefined {
96 if (!this.enabled) {
97 return undefined;
98 }
99 return this.tracer.startSpan(name, { attributes }, parentContext);
100 }
101
102 async withSpan<T>(
103 name: string,
104 attributes: Attributes,
105 fn: (span: Span | undefined) => Promise<T>,
106 parentContext?: Context,
107 ): Promise<T> {
108 if (!this.enabled) {
109 return await fn(undefined);
110 }
111 const span = this.startSpan(name, attributes, parentContext);
112 if (!span) {
113 return await fn(undefined);
114 }
115 const spanContext = trace.setSpan(parentContext ?? context.active(), span);
116 return await context.with(spanContext, async () => {
117 try {
118 return await fn(span);
119 } catch (err) {
120 this.recordSpanError(span, err);
121 throw err;
122 } finally {
123 span.end();
124 }
125 });
126 }
127
128 recordEventReceived(eventType: string): void {
129 this.eventsReceived?.add(1, { event_type: eventType });
130 }
131
132 recordEventsParsed(count: number, eventType: string): void {
133 if (count < 1) {
134 return;
135 }
136 this.eventsParsed?.add(count, { event_type: eventType });
137 }
138
139 recordEventHandled(eventType: string, outcome: Outcome): void {
140 this.eventsHandled?.add(1, { event_type: eventType, outcome });
141 }
142
143 recordError(
144 stage: "validation" | "parse" | "subscription" | "handler" | "runner",
145 eventType?: string,
146 ): void {
147 const attributes: Attributes = { stage };
148 if (eventType !== undefined) {
149 attributes.event_type = eventType;
150 }
151 this.errorsTotal?.add(1, attributes);
152 }
153
154 recordParseDuration(
155 durationMs: number,
156 eventType: string,
157 outcome: Outcome,
158 ): void {
159 this.parseDuration?.record(durationMs, { event_type: eventType, outcome });
160 }
161
162 recordHandleDuration(
163 durationMs: number,
164 eventType: string,
165 outcome: Outcome,
166 ): void {
167 this.handleDuration?.record(durationMs, { event_type: eventType, outcome });
168 }
169
170 recordRunnerTaskDuration(durationMs: number, outcome: Outcome): void {
171 this.runnerTaskDuration?.record(durationMs, { outcome });
172 }
173
174 recordCursorSaveDuration(durationMs: number, outcome: Outcome): void {
175 this.cursorSaveDuration?.record(durationMs, { outcome });
176 }
177
178 recordSpanError(span: Span | undefined, error: unknown): void {
179 if (!span) {
180 return;
181 }
182 span.recordException(asError(error));
183 span.setStatus({ code: SpanStatusCode.ERROR });
184 }
185}
186
187const asError = (error: unknown): Error => {
188 if (error instanceof Error) {
189 return error;
190 }
191 return new Error(String(error));
192};