a collection of lightweight TypeScript packages for AT Protocol, the protocol powering Bluesky
atproto bluesky typescript npm
101
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(xrpc-server)!: split error observability into onError/onSocketError

separate "produce a response" from "observe an error for telemetry" so the
router doesn't conflate them. `handleException` continues to translate
thrown errors into responses; new `onError` / `onSocketError` hooks receive
`{ error, request }` for logging/metrics and are fire-and-forget. both
observers skip client-induced errors (aborted requests, `XRPCError`,
`XRPCSubscriptionError`), so they only fire for bugs worth investigating.

`handleSubscriptionException` is removed; its default-throw behaviour was
unhelpful (unhandled rejections inside an adapter's async context), and
the new `onSocketError` is the intended replacement.

Mary 282f14f1 6b62a410

+153 -19
+9
.changeset/xrpc-server-error-observability.md
··· 1 + --- 2 + '@atcute/xrpc-server': major 3 + --- 4 + 5 + split error observability: remove `handleSubscriptionException` (its default-throw behaviour caused 6 + unhandled rejections inside adapter async contexts). add fire-and-forget `onError` and 7 + `onSocketError` telemetry hooks that receive `{ error, request }`. both hooks skip client-induced 8 + errors (aborted requests, `XRPCError`, `XRPCSubscriptionError`), so they only fire for unexpected 9 + bugs.
+17
packages/servers/xrpc-server/README.md
··· 147 147 `ForbiddenError`, `RateLimitExceededError`, `InternalServerError`, `UpstreamFailureError`, 148 148 `NotEnoughResourcesError`, `UpstreamTimeoutError`. 149 149 150 + ### observing errors 151 + 152 + for logs or metrics, use the `onError` / `onSocketError` router options. these are fire-and-forget 153 + hooks invoked alongside response generation, and they are NOT called for client-induced errors 154 + (aborted requests, `XRPCError`, `XRPCSubscriptionError`) — only for bugs worth reporting: 155 + 156 + ```ts 157 + const router = new XRPCRouter({ 158 + onError({ error, request }) { 159 + reportToSentry(error, { url: request.url }); 160 + }, 161 + onSocketError({ error, request }) { 162 + reportToSentry(error, { url: request.url, subscription: true }); 163 + }, 164 + }); 165 + ``` 166 + 150 167 ### health check 151 168 152 169 the router can optionally answer `/xrpc/_health` if you pass `handleHealthCheck`. this endpoint is
+76 -9
packages/servers/xrpc-server/lib/main/router.test.ts
··· 381 381 } 382 382 }); 383 383 384 + it('invokes onError for unexpected handler errors', async () => { 385 + const querySchema = v.query('com.example.query', { 386 + params: null, 387 + output: null, 388 + }); 389 + 390 + const onError = vi.fn(); 391 + const router = new XRPCRouter({ onError }); 392 + 393 + router.addQuery(querySchema, { 394 + async handler() { 395 + throw new Error('boom'); 396 + }, 397 + }); 398 + 399 + const request = new Request('https://example.com/xrpc/com.example.query', { method: 'GET' }); 400 + await router.fetch(request); 401 + 402 + expect(onError).toHaveBeenCalledExactlyOnceWith({ 403 + error: expect.objectContaining({ message: 'boom' }), 404 + request: expect.any(Request), 405 + }); 406 + }); 407 + 408 + it('does not invoke onError for XRPCError or aborted requests', async () => { 409 + const querySchema = v.query('com.example.query', { 410 + params: null, 411 + output: null, 412 + }); 413 + 414 + const onError = vi.fn(); 415 + const router = new XRPCRouter({ onError }); 416 + 417 + router.addQuery(querySchema, { 418 + async handler() { 419 + throw new InvalidRequestError({ message: 'bad input' }); 420 + }, 421 + }); 422 + 423 + await router.fetch(new Request('https://example.com/xrpc/com.example.query', { method: 'GET' })); 424 + expect(onError).not.toHaveBeenCalled(); 425 + }); 426 + 384 427 it('does not invoke exception handler on aborted requests', async () => { 385 428 const querySchema = v.query('com.example.query', { 386 429 params: null, ··· 1240 1283 expect(body).toEqual(expect.objectContaining({ error: 'InvalidRequest' })); 1241 1284 }); 1242 1285 1243 - it('invokes handleSubscriptionException for unexpected errors', async () => { 1286 + it('invokes onSocketError for unexpected subscription errors', async () => { 1244 1287 const subscriptionSchema = v.subscription('com.example.subscription', { 1245 1288 params: null, 1246 1289 message: v.object({ seq: v.integer() }), 1247 1290 }); 1248 1291 1249 1292 const adapter = new MockWebSocketAdapter(); 1250 - const handleSubscriptionException = vi.fn(); 1251 - const router = new XRPCRouter({ websocket: adapter, handleSubscriptionException }); 1293 + const onSocketError = vi.fn(); 1294 + const router = new XRPCRouter({ websocket: adapter, onSocketError }); 1252 1295 1253 1296 router.addSubscription(subscriptionSchema, { 1254 1297 // oxlint-disable-next-line require-yield ··· 1267 1310 }); 1268 1311 }); 1269 1312 1270 - expect(handleSubscriptionException).toBeCalledTimes(1); 1313 + expect(onSocketError).toHaveBeenCalledExactlyOnceWith({ 1314 + error: expect.objectContaining({ message: 'boom' }), 1315 + request: expect.any(Request), 1316 + }); 1317 + }); 1271 1318 1272 - expect(handleSubscriptionException).toBeCalledWith(expect.any(Error), expect.any(Request)); 1273 - expect(handleSubscriptionException).toBeCalledWith( 1274 - expect.objectContaining({ message: 'boom' }), 1275 - expect.anything(), 1276 - ); 1319 + it('does not invoke onSocketError on XRPCSubscriptionError', async () => { 1320 + const subscriptionSchema = v.subscription('com.example.subscription', { 1321 + params: null, 1322 + message: v.object({ seq: v.integer() }), 1323 + }); 1324 + 1325 + const adapter = new MockWebSocketAdapter(); 1326 + const onSocketError = vi.fn(); 1327 + const router = new XRPCRouter({ websocket: adapter, onSocketError }); 1328 + 1329 + router.addSubscription(subscriptionSchema, { 1330 + // oxlint-disable-next-line require-yield 1331 + async *handler() { 1332 + throw new XRPCSubscriptionError({ error: 'FutureCursor' }); 1333 + }, 1334 + }); 1335 + 1336 + const mock = adapter.attach(router); 1337 + using client = await mock.subscribe(`/xrpc/com.example.subscription`); 1338 + 1339 + await new Promise<void>((resolve) => { 1340 + client.onClose.subscribe(() => resolve()); 1341 + }); 1342 + 1343 + expect(onSocketError).not.toHaveBeenCalled(); 1277 1344 }); 1278 1345 }); 1279 1346 });
+51 -10
packages/servers/xrpc-server/lib/main/router.ts
··· 40 40 export type NotFoundHandler = (request: Request) => Promisable<Response>; 41 41 export type HealthCheckHandler = (request: Request) => Promisable<Response>; 42 42 export type ExceptionHandler = (error: unknown, request: Request) => Promisable<Response>; 43 - export type SubscriptionExceptionHandler = (error: unknown, request: Request) => void; 43 + 44 + /** telemetry hook invoked for unexpected HTTP handler errors; fire-and-forget. */ 45 + export type ErrorObserver = (ctx: { error: unknown; request: Request }) => void; 46 + /** telemetry hook invoked for unexpected subscription errors; fire-and-forget. */ 47 + export type SocketErrorObserver = (ctx: { error: unknown; request: Request }) => void; 44 48 45 49 export const defaultExceptionHandler: ExceptionHandler = (error: unknown) => { 46 50 if (error instanceof XRPCError) { ··· 61 65 return new Response('Not Found', { status: 404 }); 62 66 }; 63 67 64 - export const defaultSubscriptionExceptionHandler: SubscriptionExceptionHandler = (error: unknown) => { 65 - throw error; 66 - }; 67 - 68 68 export interface XRPCRouterOptions { 69 69 middlewares?: FetchMiddleware[]; 70 70 handleNotFound?: NotFoundHandler; ··· 75 75 * XRPC spec, so callers opt in explicitly. 76 76 */ 77 77 handleHealthCheck?: HealthCheckHandler; 78 + /** translates a thrown error into an HTTP response. */ 78 79 handleException?: ExceptionHandler; 79 - handleSubscriptionException?: SubscriptionExceptionHandler; 80 + /** 81 + * fire-and-forget telemetry hook for unexpected HTTP errors. not invoked for 82 + * client-induced errors (aborted requests, `XRPCError` subclasses, thrown 83 + * `Response` objects). 84 + */ 85 + onError?: ErrorObserver; 86 + /** 87 + * fire-and-forget telemetry hook for unexpected subscription errors. not 88 + * invoked for aborted signals or `XRPCSubscriptionError` (which is 89 + * translated to an error frame). 90 + */ 91 + onSocketError?: SocketErrorObserver; 80 92 websocket?: WebSocketAdapter; 81 93 } 82 94 ··· 85 97 #handleNotFound: NotFoundHandler; 86 98 #handleHealthCheck?: HealthCheckHandler; 87 99 #handleException: ExceptionHandler; 88 - #handleSubscriptionException: SubscriptionExceptionHandler; 100 + #onError?: ErrorObserver; 101 + #onSocketError?: SocketErrorObserver; 89 102 #websocket?: WebSocketAdapter; 90 103 91 104 fetch: (request: Request) => Promise<Response>; ··· 95 108 handleException = defaultExceptionHandler, 96 109 handleNotFound = defaultNotFoundHandler, 97 110 handleHealthCheck, 98 - handleSubscriptionException = defaultSubscriptionExceptionHandler, 111 + onError, 112 + onSocketError, 99 113 websocket, 100 114 }: XRPCRouterOptions = {}) { 101 115 const runner = createAsyncMiddlewareRunner([...middlewares, (request) => this.#dispatch(request)]); ··· 104 118 this.#handleException = handleException; 105 119 this.#handleNotFound = handleNotFound; 106 120 this.#handleHealthCheck = handleHealthCheck; 107 - this.#handleSubscriptionException = handleSubscriptionException; 121 + this.#onError = onError; 122 + this.#onSocketError = onSocketError; 108 123 this.#websocket = websocket; 109 124 } 110 125 126 + #observeError(error: unknown, request: Request): void { 127 + // client-induced errors are not bugs; skip telemetry 128 + if (request.signal.aborted) return; 129 + if (error instanceof XRPCError) return; 130 + if (error instanceof Response) return; 131 + 132 + try { 133 + this.#onError?.({ error, request }); 134 + } catch { 135 + // observer threw; swallow to keep response path deterministic 136 + } 137 + } 138 + 139 + #observeSocketError(error: unknown, request: Request): void { 140 + if (request.signal.aborted) return; 141 + if (error instanceof XRPCSubscriptionError) return; 142 + 143 + try { 144 + this.#onSocketError?.({ error, request }); 145 + } catch { 146 + // observer threw; swallow to keep socket close path deterministic 147 + } 148 + } 149 + 111 150 async #dispatch(request: Request): Promise<Response> { 112 151 const url = new URL(request.url); 113 152 const pathname = url.pathname; ··· 126 165 return new Response(null, { status: 499 }); 127 166 } 128 167 168 + this.#observeError(err, request); 129 169 return this.#handleException(err, request); 130 170 } 131 171 } ··· 157 197 return new Response(null, { status: 499 }); 158 198 } 159 199 200 + this.#observeError(err, request); 160 201 return this.#handleException(err, request); 161 202 } 162 203 } ··· 394 435 } 395 436 396 437 ws.close(1011, `internal server error`); 397 - this.#handleSubscriptionException(err, request); 438 + this.#observeSocketError(err, request); 398 439 } 399 440 }); 400 441