···11+# Logs
22+logs
33+*.log
44+npm-debug.log*
55+yarn-debug.log*
66+yarn-error.log*
77+lerna-debug.log*
88+.pnpm-debug.log*
99+1010+# Diagnostic reports (https://nodejs.org/api/report.html)
1111+report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
1212+1313+# Runtime data
1414+pids
1515+*.pid
1616+*.seed
1717+*.pid.lock
1818+1919+# Directory for instrumented libs generated by jscoverage/JSCover
2020+lib-cov
2121+2222+# Coverage directory used by tools like istanbul
2323+coverage
2424+*.lcov
2525+2626+# nyc test coverage
2727+.nyc_output
2828+2929+# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
3030+.grunt
3131+3232+# Bower dependency directory (https://bower.io/)
3333+bower_components
3434+3535+# node-waf configuration
3636+.lock-wscript
3737+3838+# Compiled binary addons (https://nodejs.org/api/addons.html)
3939+build/Release
4040+4141+# Dependency directories
4242+node_modules/
4343+jspm_packages/
4444+4545+# Snowpack dependency directory (https://snowpack.dev/)
4646+web_modules/
4747+4848+# TypeScript cache
4949+*.tsbuildinfo
5050+5151+# Optional npm cache directory
5252+.npm
5353+5454+# Optional eslint cache
5555+.eslintcache
5656+5757+# Optional stylelint cache
5858+.stylelintcache
5959+6060+# Microbundle cache
6161+.rpt2_cache/
6262+.rts2_cache_cjs/
6363+.rts2_cache_es/
6464+.rts2_cache_umd/
6565+6666+# Optional REPL history
6767+.node_repl_history
6868+6969+# Output of 'npm pack'
7070+*.tgz
7171+7272+# Yarn Integrity file
7373+.yarn-integrity
7474+7575+# dotenv environment variable files
7676+.env
7777+.env.development.local
7878+.env.test.local
7979+.env.production.local
8080+.env.local
8181+8282+# parcel-bundler cache (https://parceljs.org/)
8383+.cache
8484+.parcel-cache
8585+8686+# Next.js build output
8787+.next
8888+out
8989+9090+# Nuxt.js build / generate output
9191+.nuxt
9292+dist
9393+9494+# Gatsby files
9595+.cache/
9696+# Comment in the public line in if your project uses Gatsby and not Next.js
9797+# https://nextjs.org/blog/next-9-1#public-directory-support
9898+# public
9999+100100+# vuepress build output
101101+.vuepress/dist
102102+103103+# vuepress v2.x temp and cache directory
104104+.temp
105105+.cache
106106+107107+# vitepress build output
108108+**/.vitepress/dist
109109+110110+# vitepress cache directory
111111+**/.vitepress/cache
112112+113113+# Docusaurus cache and generated files
114114+.docusaurus
115115+116116+# Serverless directories
117117+.serverless/
118118+119119+# FuseBox cache
120120+.fusebox/
121121+122122+# DynamoDB Local files
123123+.dynamodb/
124124+125125+# TernJS port file
126126+.tern-port
127127+128128+# Stores VSCode versions used for testing VSCode extensions
129129+.vscode-test
130130+131131+# yarn v2
132132+.yarn/cache
133133+.yarn/unplugged
134134+.yarn/build-state.yml
135135+.yarn/install-state.gz
136136+.pnp.*
+18
LICENSE
···11+Copyright 2025 Spark Social PBC
22+33+Permission is hereby granted, free of charge, to any person obtaining a copy of
44+this software and associated documentation files (the “Software”), to deal in
55+the Software without restriction, including without limitation the rights to
66+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
77+the Software, and to permit persons to whom the Software is furnished to do so,
88+subject to the following conditions:
99+1010+The above copyright notice and this permission notice shall be included in all
1111+copies or substantial portions of the Software.
1212+1313+THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
1414+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
1515+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
1616+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
1717+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
1818+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+210
common/async.ts
···11+import { bailableWait } from "./util.ts";
22+33+// reads values from a generator into a list
44+// breaks when isDone signals `true` AND `waitFor` completes OR when a max length is reached
55+// NOTE: does not signal generator to close. it *will* continue to produce values
66+export const readFromGenerator = async <T>(
77+ gen: AsyncGenerator<T>,
88+ isDone: (last?: T) => Promise<boolean> | boolean,
99+ waitFor: Promise<unknown> = Promise.resolve(),
1010+ maxLength = Number.MAX_SAFE_INTEGER,
1111+): Promise<T[]> => {
1212+ const evts: T[] = [];
1313+ let bail: undefined | (() => void);
1414+ let hasBroke = false;
1515+ const awaitDone = async () => {
1616+ if (await isDone(evts.at(-1))) {
1717+ return true;
1818+ }
1919+ const bailable = bailableWait(20);
2020+ await bailable.wait();
2121+ bail = bailable.bail;
2222+ if (hasBroke) return false;
2323+ return await awaitDone();
2424+ };
2525+ const breakOn: Promise<void> = new Promise((resolve) => {
2626+ waitFor.then(() => {
2727+ awaitDone().then(() => resolve());
2828+ });
2929+ });
3030+3131+ try {
3232+ while (evts.length < maxLength) {
3333+ const maybeEvt = await Promise.race([gen.next(), breakOn]);
3434+ if (!maybeEvt) break;
3535+ const evt = maybeEvt as IteratorResult<T>;
3636+ if (evt.done) break;
3737+ evts.push(evt.value);
3838+ }
3939+ } finally {
4040+ hasBroke = true;
4141+ bail && bail();
4242+ }
4343+ return evts;
4444+};
4545+4646+export type Deferrable = {
4747+ resolve: () => void;
4848+ complete: Promise<void>;
4949+};
5050+5151+export const createDeferrable = (): Deferrable => {
5252+ let resolve!: () => void;
5353+ const promise: Promise<void> = new Promise((res) => {
5454+ resolve = () => res();
5555+ });
5656+ return { resolve, complete: promise };
5757+};
5858+5959+export const createDeferrables = (count: number): Deferrable[] => {
6060+ const list: Deferrable[] = [];
6161+ for (let i = 0; i < count; i++) {
6262+ list.push(createDeferrable());
6363+ }
6464+ return list;
6565+};
6666+6767+export const allComplete = async (deferrables: Deferrable[]): Promise<void> => {
6868+ await Promise.all(deferrables.map((d) => d.complete));
6969+};
7070+7171+export class AsyncBuffer<T> {
7272+ private buffer: T[] = [];
7373+ private promise: Promise<void>;
7474+ private resolve: () => void;
7575+ private closed = false;
7676+ private toThrow: unknown | undefined;
7777+7878+ constructor(public maxSize?: number) {
7979+ // Initializing to satisfy types/build, immediately reset by resetPromise()
8080+ this.promise = Promise.resolve();
8181+ this.resolve = () => null;
8282+ this.resetPromise();
8383+ }
8484+8585+ get curr(): T[] {
8686+ return this.buffer;
8787+ }
8888+8989+ get size(): number {
9090+ return this.buffer.length;
9191+ }
9292+9393+ get isClosed(): boolean {
9494+ return this.closed;
9595+ }
9696+9797+ resetPromise() {
9898+ this.promise = new Promise<void>((r) => (this.resolve = r));
9999+ }
100100+101101+ push(item: T) {
102102+ this.buffer.push(item);
103103+ this.resolve();
104104+ }
105105+106106+ pushMany(items: T[]) {
107107+ items.forEach((i) => this.buffer.push(i));
108108+ this.resolve();
109109+ }
110110+111111+ async *events(): AsyncGenerator<T> {
112112+ while (true) {
113113+ if (this.closed && this.buffer.length === 0) {
114114+ if (this.toThrow) {
115115+ throw this.toThrow;
116116+ } else {
117117+ return;
118118+ }
119119+ }
120120+ await this.promise;
121121+ if (this.toThrow) {
122122+ throw this.toThrow;
123123+ }
124124+ if (this.maxSize && this.size > this.maxSize) {
125125+ throw new AsyncBufferFullError(this.maxSize);
126126+ }
127127+ const [first, ...rest] = this.buffer;
128128+ if (first) {
129129+ this.buffer = rest;
130130+ yield first;
131131+ } else {
132132+ this.resetPromise();
133133+ }
134134+ }
135135+ }
136136+137137+ throw(err: unknown) {
138138+ this.toThrow = err;
139139+ this.closed = true;
140140+ this.resolve();
141141+ }
142142+143143+ close() {
144144+ this.closed = true;
145145+ this.resolve();
146146+ }
147147+}
148148+149149+export class AsyncBufferFullError extends Error {
150150+ constructor(maxSize: number) {
151151+ super(`ReachedMaxBufferSize: ${maxSize}`);
152152+ }
153153+}
154154+155155+/**
156156+ * Utility function that behaves like {@link Promise.allSettled} but returns the
157157+ * same result as {@link Promise.all} in case every promise is fulfilled, and
158158+ * throws an {@link AggregateError} if there are more than one errors.
159159+ */
160160+export function allFulfilled<T extends readonly unknown[] | []>(
161161+ promises: T,
162162+): Promise<{ -readonly [P in keyof T]: Awaited<T[P]> }>;
163163+export function allFulfilled<T>(
164164+ promises: Iterable<T | PromiseLike<T>>,
165165+): Promise<Awaited<T>[]>;
166166+export function allFulfilled(
167167+ promises: Iterable<Promise<unknown>>,
168168+): Promise<unknown[]> {
169169+ return Promise.allSettled(promises).then(handleAllSettledErrors);
170170+}
171171+172172+export function handleAllSettledErrors<
173173+ T extends readonly PromiseSettledResult<unknown>[] | [],
174174+>(
175175+ results: T,
176176+): {
177177+ -readonly [P in keyof T]: T[P] extends PromiseSettledResult<infer U> ? U
178178+ : never;
179179+};
180180+export function handleAllSettledErrors<T>(
181181+ results: PromiseSettledResult<T>[],
182182+): T[];
183183+export function handleAllSettledErrors(
184184+ results: PromiseSettledResult<unknown>[],
185185+): unknown[] {
186186+ if (results.every(isFulfilledResult)) return results.map(extractValue);
187187+188188+ const errors = results.filter(isRejectedResult).map(extractReason);
189189+ throw errors;
190190+}
191191+192192+export function isRejectedResult(
193193+ result: PromiseSettledResult<unknown>,
194194+): result is PromiseRejectedResult {
195195+ return result.status === "rejected";
196196+}
197197+198198+function extractReason(result: PromiseRejectedResult): unknown {
199199+ return result.reason;
200200+}
201201+202202+export function isFulfilledResult<T>(
203203+ result: PromiseSettledResult<T>,
204204+): result is PromiseFulfilledResult<T> {
205205+ return result.status === "fulfilled";
206206+}
207207+208208+function extractValue<T>(result: PromiseFulfilledResult<T>): T {
209209+ return result.value;
210210+}
+29
common/check.ts
···11+// Explicitly not using "zod" types here to avoid mismatching types due to
22+// version differences.
33+44+export interface Checkable<T> {
55+ parse: (obj: unknown) => T;
66+ safeParse: (
77+ obj: unknown,
88+ ) => { success: true; data: T } | { success: false; error: Error };
99+}
1010+1111+export interface Def<T> {
1212+ name: string;
1313+ schema: Checkable<T>;
1414+}
1515+1616+export const is = <T>(obj: unknown, def: Checkable<T>): obj is T => {
1717+ return def.safeParse(obj).success;
1818+};
1919+2020+export const create = <T>(def: Checkable<T>) => (v: unknown): v is T =>
2121+ def.safeParse(v).success;
2222+2323+export const assure = <T>(def: Checkable<T>, obj: unknown): T => {
2424+ return def.parse(obj);
2525+};
2626+2727+export const isObject = (obj: unknown): obj is Record<string, unknown> => {
2828+ return typeof obj === "object" && obj !== null;
2929+};
···11+export * as util from "./util.ts";
22+export * as check from "./check.ts";
33+44+export * from "./env.ts";
55+export * from "./fs.ts";
66+export * from "./ipld.ts";
77+export * from "./ipld-multi.ts";
88+export * from "./obfuscate.ts";
99+export * from "./streams.ts";
1010+export * from "./async.ts";
1111+export * from "./types.ts";
1212+export * from "./tid.ts";
1313+export * from "./strings.ts";
1414+export * from "./logger.ts";
+91
common/obfuscate.ts
···11+import { decodeBase64 } from "@std/encoding";
22+33+export function obfuscateEmail(email: string) {
44+ const [local, domain] = email.split("@");
55+ return `${obfuscateWord(local)}@${obfuscateWord(domain)}`;
66+}
77+88+export function obfuscateWord(word: string) {
99+ return `${word.charAt(0)}***${word.charAt(word.length - 1)}`;
1010+}
1111+1212+export function obfuscateHeaders(headers: Record<string, string>) {
1313+ const obfuscatedHeaders: Record<string, string> = {};
1414+ for (const key in headers) {
1515+ if (key.toLowerCase() === "authorization") {
1616+ obfuscatedHeaders[key] = obfuscateAuthHeader(headers[key]);
1717+ } else if (key.toLowerCase() === "dpop") {
1818+ obfuscatedHeaders[key] = obfuscateJwt(headers[key]) || "Invalid";
1919+ } else {
2020+ obfuscatedHeaders[key] = headers[key];
2121+ }
2222+ }
2323+ return obfuscatedHeaders;
2424+}
2525+2626+export function obfuscateAuthHeader(authHeader: string): string {
2727+ // This is a hot path (runs on every request). Avoid using split() or regex.
2828+2929+ const spaceIdx = authHeader.indexOf(" ");
3030+ if (spaceIdx === -1) return "Invalid";
3131+3232+ const type = authHeader.slice(0, spaceIdx);
3333+ switch (type.toLowerCase()) {
3434+ case "bearer":
3535+ case "dpop":
3636+ return `${type} ${obfuscateBearer(authHeader.slice(spaceIdx + 1))}`;
3737+ case "basic":
3838+ return `${type} ${
3939+ obfuscateBasic(authHeader.slice(spaceIdx + 1)) || "Invalid"
4040+ }`;
4141+ default:
4242+ return `Invalid`;
4343+ }
4444+}
4545+4646+export function obfuscateBasic(token: string): null | string {
4747+ if (!token) return null;
4848+ const buffer = decodeBase64(token);
4949+ if (!buffer.length) return null;
5050+ const authHeader = new TextDecoder("utf-8").decode(buffer);
5151+ const colIdx = authHeader.indexOf(":");
5252+ if (colIdx === -1) return null;
5353+ const username = authHeader.slice(0, colIdx);
5454+ return `${username}:***`;
5555+}
5656+5757+export function obfuscateBearer(token: string): string {
5858+ return obfuscateJwt(token) || obfuscateToken(token);
5959+}
6060+6161+export function obfuscateToken(token: string): string {
6262+ if (token.length >= 12) return obfuscateWord(token);
6363+ return token ? "***" : "";
6464+}
6565+6666+export function obfuscateJwt(token: string): null | string {
6767+ const firstDot = token.indexOf(".");
6868+ if (firstDot === -1) return null;
6969+7070+ const secondDot = token.indexOf(".", firstDot + 1);
7171+ if (secondDot === -1) return null;
7272+7373+ // Expected to be missing
7474+ const thirdDot = token.indexOf(".", secondDot + 1);
7575+ if (thirdDot !== -1) return null;
7676+7777+ try {
7878+ const payloadEnc = token.slice(firstDot + 1, secondDot);
7979+ const payloadJson = new TextDecoder("utf-8").decode(
8080+ decodeBase64(payloadEnc),
8181+ );
8282+ const payload = JSON.parse(payloadJson);
8383+ if (typeof payload.sub === "string") return payload.sub;
8484+ } catch {
8585+ // Invalid JWT
8686+ return null;
8787+ }
8888+8989+ // Strip the signature
9090+ return token.slice(0, secondDot) + ".obfuscated";
9191+}
+216
common/streams.ts
···11+import { concat } from "jsr:@std/bytes";
22+import { Buffer } from "jsr:@std/io";
33+44+export const forwardStreamErrors = (..._streams: ReadableStream[]) => {
55+ // Web Streams don't have the same error forwarding mechanism as Node streams
66+ // This is a no-op in the Web Streams world since error handling is done differently
77+};
88+99+export const cloneStream = (
1010+ stream: ReadableStream<Uint8Array>,
1111+): ReadableStream<Uint8Array> => {
1212+ const [_stream1, stream2] = stream.tee();
1313+ return stream2;
1414+};
1515+1616+export const streamSize = async (
1717+ stream: ReadableStream<Uint8Array>,
1818+): Promise<number> => {
1919+ let size = 0;
2020+ const reader = stream.getReader();
2121+ try {
2222+ while (true) {
2323+ const { done, value } = await reader.read();
2424+ if (done) break;
2525+ size += value.byteLength;
2626+ }
2727+ } finally {
2828+ reader.releaseLock();
2929+ }
3030+ return size;
3131+};
3232+3333+export const streamToBytes = async (
3434+ stream: AsyncIterable<Uint8Array> | ReadableStream<Uint8Array>,
3535+): Promise<Uint8Array> => {
3636+ const chunks: Uint8Array[] = [];
3737+3838+ if (stream instanceof ReadableStream) {
3939+ const reader = stream.getReader();
4040+ try {
4141+ while (true) {
4242+ const { done, value } = await reader.read();
4343+ if (done) break;
4444+ chunks.push(value);
4545+ }
4646+ } finally {
4747+ reader.releaseLock();
4848+ }
4949+ } else {
5050+ for await (const chunk of stream) {
5151+ if (chunk instanceof Uint8Array) {
5252+ chunks.push(chunk);
5353+ } else {
5454+ throw new TypeError("expected Uint8Array");
5555+ }
5656+ }
5757+ }
5858+5959+ return concat(chunks);
6060+};
6161+6262+// streamToBuffer identifier name already taken by @atproto/common-web
6363+export const streamToNodeBuffer = async (
6464+ stream:
6565+ | Iterable<Uint8Array>
6666+ | AsyncIterable<Uint8Array>
6767+ | ReadableStream<Uint8Array>,
6868+): Promise<Buffer> => {
6969+ const bytes = await streamToBytes(stream as AsyncIterable<Uint8Array>);
7070+ const buffer = new Buffer();
7171+ await buffer.write(bytes);
7272+ return buffer;
7373+};
7474+7575+export const byteIterableToStream = (
7676+ iter: AsyncIterable<Uint8Array>,
7777+): ReadableStream<Uint8Array> => {
7878+ return new ReadableStream({
7979+ async start(controller) {
8080+ try {
8181+ for await (const chunk of iter) {
8282+ controller.enqueue(chunk);
8383+ }
8484+ controller.close();
8585+ } catch (error) {
8686+ controller.error(error);
8787+ }
8888+ },
8989+ });
9090+};
9191+9292+export const bytesToStream = (
9393+ bytes: Uint8Array,
9494+): ReadableStream<Uint8Array> => {
9595+ return new ReadableStream({
9696+ start(controller) {
9797+ controller.enqueue(bytes);
9898+ controller.close();
9999+ },
100100+ });
101101+};
102102+103103+export class MaxSizeChecker extends TransformStream<Uint8Array, Uint8Array> {
104104+ totalSize = 0;
105105+106106+ constructor(
107107+ public maxSize: number,
108108+ public createError: () => Error,
109109+ ) {
110110+ super({
111111+ transform: (chunk, controller) => {
112112+ this.totalSize += chunk.byteLength;
113113+ if (this.totalSize > this.maxSize) {
114114+ controller.error(this.createError());
115115+ } else {
116116+ controller.enqueue(chunk);
117117+ }
118118+ },
119119+ });
120120+ }
121121+}
122122+123123+export function decodeStream(
124124+ stream: ReadableStream<Uint8Array>,
125125+ contentEncoding?: string | string[],
126126+): ReadableStream<Uint8Array>;
127127+export function decodeStream(
128128+ stream: AsyncIterable<Uint8Array>,
129129+ contentEncoding?: string | string[],
130130+): AsyncIterable<Uint8Array> | ReadableStream<Uint8Array>;
131131+export function decodeStream(
132132+ stream: ReadableStream<Uint8Array> | AsyncIterable<Uint8Array>,
133133+ contentEncoding?: string | string[],
134134+): ReadableStream<Uint8Array> | AsyncIterable<Uint8Array> {
135135+ const decoders = createDecoders(contentEncoding);
136136+ if (decoders.length === 0) return stream;
137137+138138+ let result: ReadableStream<Uint8Array>;
139139+140140+ if (stream instanceof ReadableStream) {
141141+ result = stream;
142142+ } else {
143143+ result = byteIterableToStream(stream);
144144+ }
145145+146146+ // Chain the decoders together
147147+ for (const decoder of decoders) {
148148+ result = result.pipeThrough(decoder);
149149+ }
150150+151151+ return result;
152152+}
153153+154154+/**
155155+ * Create a series of decoding streams based on the content-encoding header. The
156156+ * resulting streams should be piped together to decode the content.
157157+ *
158158+ * @see {@link https://datatracker.ietf.org/doc/html/rfc9110#section-8.4.1}
159159+ */
160160+export function createDecoders(
161161+ contentEncoding?: string | string[],
162162+): TransformStream<Uint8Array, Uint8Array>[] {
163163+ const decoders: TransformStream<Uint8Array, Uint8Array>[] = [];
164164+165165+ if (contentEncoding?.length) {
166166+ const encodings: string[] = Array.isArray(contentEncoding)
167167+ ? contentEncoding.flatMap(commaSplit)
168168+ : contentEncoding.split(",");
169169+ for (const encoding of encodings) {
170170+ const normalizedEncoding = normalizeEncoding(encoding);
171171+172172+ // @NOTE
173173+ // > The default (identity) encoding [...] is used only in the
174174+ // > Accept-Encoding header, and SHOULD NOT be used in the
175175+ // > Content-Encoding header.
176176+ if (normalizedEncoding === "identity") continue;
177177+178178+ decoders.push(createDecoder(normalizedEncoding));
179179+ }
180180+ }
181181+182182+ return decoders.reverse();
183183+}
184184+185185+function commaSplit(header: string): string[] {
186186+ return header.split(",");
187187+}
188188+189189+function normalizeEncoding(encoding: string) {
190190+ // https://www.rfc-editor.org/rfc/rfc7231#section-3.1.2.1
191191+ // > All content-coding values are case-insensitive...
192192+ return encoding.trim().toLowerCase();
193193+}
194194+195195+function createDecoder(
196196+ normalizedEncoding: string,
197197+): TransformStream<Uint8Array, Uint8Array> {
198198+ switch (normalizedEncoding) {
199199+ // https://www.rfc-editor.org/rfc/rfc9112.html#section-7.2
200200+ case "gzip":
201201+ case "x-gzip":
202202+ return new DecompressionStream("gzip");
203203+ case "deflate":
204204+ return new DecompressionStream("deflate");
205205+ case "br":
206206+ throw new TypeError(
207207+ `Brotli decompression is not supported in this Deno implementation`,
208208+ );
209209+ case "identity":
210210+ return new TransformStream(); // Pass-through
211211+ default:
212212+ throw new TypeError(
213213+ `Unsupported content-encoding: "${normalizedEncoding}"`,
214214+ );
215215+ }
216216+}
+79
common/strings.ts
···11+// counts the number of bytes in a utf8 string
22+export const utf8Len = (str: string): number => {
33+ return new TextEncoder().encode(str).byteLength;
44+};
55+66+// counts the number of graphemes (user-displayed characters) in a string
77+// Using Intl.Segmenter which is supported in Deno and modern browsers
88+export const graphemeLen = (str: string): number => {
99+ if (typeof Intl !== "undefined" && "Segmenter" in Intl) {
1010+ const segmenter = new Intl.Segmenter(undefined, {
1111+ granularity: "grapheme",
1212+ });
1313+ return Array.from(segmenter.segment(str)).length;
1414+ }
1515+1616+ // Fallback for environments without Intl.Segmenter
1717+ // This is a simplified approach that handles basic cases
1818+ return Array.from(str).length;
1919+};
2020+2121+export const utf8ToB64Url = (utf8: string): string => {
2222+ const encoder = new TextEncoder();
2323+ const bytes = encoder.encode(utf8);
2424+ return btoa(String.fromCharCode(...bytes))
2525+ .replace(/\+/g, "-")
2626+ .replace(/\//g, "_")
2727+ .replace(/=/g, "");
2828+};
2929+3030+export const b64UrlToUtf8 = (b64: string): string => {
3131+ // Convert base64url to base64
3232+ const base64 = b64.replace(/-/g, "+").replace(/_/g, "/");
3333+ // Add padding if needed
3434+ const padded = base64 + "=".repeat((4 - (base64.length % 4)) % 4);
3535+3636+ const binaryString = atob(padded);
3737+ const bytes = new Uint8Array(binaryString.length);
3838+ for (let i = 0; i < binaryString.length; i++) {
3939+ bytes[i] = binaryString.charCodeAt(i);
4040+ }
4141+4242+ const decoder = new TextDecoder();
4343+ return decoder.decode(bytes);
4444+};
4545+4646+export const parseLanguage = (langTag: string): LanguageTag | null => {
4747+ const parsed = langTag.match(bcp47Regexp);
4848+ if (!parsed?.groups) return null;
4949+ const parts = parsed.groups;
5050+ return {
5151+ grandfathered: parts.grandfathered,
5252+ language: parts.language,
5353+ extlang: parts.extlang,
5454+ script: parts.script,
5555+ region: parts.region,
5656+ variant: parts.variant,
5757+ extension: parts.extension,
5858+ privateUse: parts.privateUseA || parts.privateUseB,
5959+ };
6060+};
6161+6262+export const validateLanguage = (langTag: string): boolean => {
6363+ return bcp47Regexp.test(langTag);
6464+};
6565+6666+export type LanguageTag = {
6767+ grandfathered?: string;
6868+ language?: string;
6969+ extlang?: string;
7070+ script?: string;
7171+ region?: string;
7272+ variant?: string;
7373+ extension?: string;
7474+ privateUse?: string;
7575+};
7676+7777+// Validates well-formed BCP 47 syntax: https://www.rfc-editor.org/rfc/rfc5646.html#section-2.1
7878+const bcp47Regexp =
7979+ /^((?<grandfathered>(en-GB-oed|i-ami|i-bnn|i-default|i-enochian|i-hak|i-klingon|i-lux|i-mingo|i-navajo|i-pwn|i-tao|i-tay|i-tsu|sgn-BE-FR|sgn-BE-NL|sgn-CH-DE)|(art-lojban|cel-gaulish|no-bok|no-nyn|zh-guoyu|zh-hakka|zh-min|zh-min-nan|zh-xiang))|((?<language>([A-Za-z]{2,3}(-(?<extlang>[A-Za-z]{3}(-[A-Za-z]{3}){0,2}))?)|[A-Za-z]{4}|[A-Za-z]{5,8})(-(?<script>[A-Za-z]{4}))?(-(?<region>[A-Za-z]{2}|[0-9]{3}))?(-(?<variant>[A-Za-z0-9]{5,8}|[0-9][A-Za-z0-9]{3}))*(-(?<extension>[0-9A-WY-Za-wy-z](-[A-Za-z0-9]{2,8})+))*(-(?<privateUseA>x(-[A-Za-z0-9]{1,8})+))?)|(?<privateUseB>x(-[A-Za-z0-9]{1,8})+))$/;
+112
common/tid.ts
···11+import { s32decode, s32encode } from "./util.ts";
22+33+const TID_LEN = 13;
44+55+let lastTimestamp = 0;
66+let timestampCount = 0;
77+let clockid: number | null = null;
88+99+function dedash(str: string): string {
1010+ return str.replaceAll("-", "");
1111+}
1212+1313+export class TID {
1414+ str: string;
1515+1616+ constructor(str: string) {
1717+ const noDashes = dedash(str);
1818+ if (noDashes.length !== TID_LEN) {
1919+ throw new Error(`Poorly formatted TID: ${noDashes.length} length`);
2020+ }
2121+ this.str = noDashes;
2222+ }
2323+2424+ static next(prev?: TID): TID {
2525+ // javascript does not have microsecond precision
2626+ // instead, we append a counter to the timestamp to indicate if multiple timestamps were created within the same millisecond
2727+ // take max of current time & last timestamp to prevent tids moving backwards if system clock drifts backwards
2828+ const time = Math.max(Date.now(), lastTimestamp);
2929+ if (time === lastTimestamp) {
3030+ timestampCount++;
3131+ }
3232+ lastTimestamp = time;
3333+ const timestamp = time * 1000 + timestampCount;
3434+ // the bottom 32 clock ids can be randomized & are not guaranteed to be collision resistant
3535+ // we use the same clockid for all tids coming from this machine
3636+ if (clockid === null) {
3737+ clockid = Math.floor(Math.random() * 32);
3838+ }
3939+ const tid = TID.fromTime(timestamp, clockid);
4040+ if (!prev || tid.newerThan(prev)) {
4141+ return tid;
4242+ }
4343+ return TID.fromTime(prev.timestamp() + 1, clockid);
4444+ }
4545+4646+ static nextStr(prev?: string): string {
4747+ return TID.next(prev ? new TID(prev) : undefined).toString();
4848+ }
4949+5050+ static fromTime(timestamp: number, clockid: number): TID {
5151+ // base32 encode with encoding variant sort (s32)
5252+ const str = `${s32encode(timestamp)}${s32encode(clockid).padStart(2, "2")}`;
5353+ return new TID(str);
5454+ }
5555+5656+ static fromStr(str: string): TID {
5757+ return new TID(str);
5858+ }
5959+6060+ static oldestFirst(a: TID, b: TID): number {
6161+ return a.compareTo(b);
6262+ }
6363+6464+ static newestFirst(a: TID, b: TID): number {
6565+ return b.compareTo(a);
6666+ }
6767+6868+ static is(str: string): boolean {
6969+ return dedash(str).length === TID_LEN;
7070+ }
7171+7272+ timestamp(): number {
7373+ return s32decode(this.str.slice(0, 11));
7474+ }
7575+7676+ clockid(): number {
7777+ return s32decode(this.str.slice(11, 13));
7878+ }
7979+8080+ formatted(): string {
8181+ const str = this.toString();
8282+ return `${str.slice(0, 4)}-${str.slice(4, 7)}-${
8383+ str.slice(
8484+ 7,
8585+ 11,
8686+ )
8787+ }-${str.slice(11, 13)}`;
8888+ }
8989+9090+ toString(): string {
9191+ return this.str;
9292+ }
9393+9494+ // newer > older
9595+ compareTo(other: TID): number {
9696+ if (this.str > other.str) return 1;
9797+ if (this.str < other.str) return -1;
9898+ return 0;
9999+ }
100100+101101+ equals(other: TID): boolean {
102102+ return this.str === other.str;
103103+ }
104104+105105+ newerThan(other: TID): boolean {
106106+ return this.compareTo(other) > 0;
107107+ }
108108+109109+ olderThan(other: TID): boolean {
110110+ return this.compareTo(other) < 0;
111111+ }
112112+}
···11+import type { Context, Handler } from "hono";
22+import { Hono } from "hono";
33+import {
44+ type LexiconDoc,
55+ Lexicons,
66+ type LexXrpcProcedure,
77+ type LexXrpcQuery,
88+ type LexXrpcSubscription,
99+} from "@atproto/lexicon";
1010+import {
1111+ excludeErrorResult,
1212+ InternalServerError,
1313+ InvalidRequestError,
1414+ isErrorResult,
1515+ MethodNotImplementedError,
1616+ XRPCError,
1717+} from "./errors.ts";
1818+import {
1919+ type RateLimiterI,
2020+ RateLimitExceededError,
2121+ RouteRateLimiter,
2222+} from "./rate-limiter.ts";
2323+import { ErrorFrame, XrpcStreamServer } from "./stream/index.ts";
2424+import {
2525+ type Auth,
2626+ type HandlerContext,
2727+ type HandlerSuccess,
2828+ type Input,
2929+ isHandlerPipeThroughBuffer,
3030+ isHandlerPipeThroughStream,
3131+ isSharedRateLimitOpts,
3232+ type MethodAuthVerifier,
3333+ type MethodConfig,
3434+ type MethodConfigOrHandler,
3535+ type Options,
3636+ type Params,
3737+ type ServerRateLimitDescription,
3838+ type StreamConfig,
3939+ type StreamConfigOrHandler,
4040+} from "./types.ts";
4141+import {
4242+ asArray,
4343+ createInputVerifier,
4444+ decodeUrlQueryParams,
4545+ getQueryParams,
4646+ parseUrlNsid,
4747+ setHeaders,
4848+ validateOutput,
4949+} from "./util.ts";
5050+import {
5151+ type CalcKeyFn,
5252+ type CalcPointsFn,
5353+ type RateLimiterOptions,
5454+ WrappedRateLimiter,
5555+ type WrappedRateLimiterOptions,
5656+} from "./rate-limiter.ts";
5757+import type { CatchallHandler, HandlerInput } from "./types.ts";
5858+import { assert } from "@std/assert";
5959+6060+/**
6161+ * Creates a new XRPC server instance.
6262+ * @param lexicons - Optional array of lexicon documents to initialize the server with
6363+ * @param options - Optional server configuration options
6464+ * @returns A new Server instance
6565+ */
6666+export function createServer(
6767+ lexicons?: LexiconDoc[],
6868+ options?: Options,
6969+): Server {
7070+ return new Server(lexicons, options);
7171+}
7272+7373+/**
7474+ * XRPC server implementation that handles HTTP and WebSocket requests.
7575+ * Manages method registration, authentication, rate limiting, and streaming.
7676+ */
7777+export class Server {
7878+ /** The underlying Hono HTTP server instance */
7979+ app: Hono;
8080+ /** Map of NSID to WebSocket streaming servers for subscriptions */
8181+ subscriptions: Map<string, XrpcStreamServer> = new Map<
8282+ string,
8383+ XrpcStreamServer
8484+ >();
8585+ /** Lexicon registry for schema validation and method definitions */
8686+ lex: Lexicons = new Lexicons();
8787+ /** Server configuration options */
8888+ options: Options;
8989+ /** Global rate limiter applied to all routes */
9090+ globalRateLimiter?: RouteRateLimiter<HandlerContext>;
9191+ /** Map of named shared rate limiters */
9292+ sharedRateLimiters?: Map<string, RateLimiterI<HandlerContext>>;
9393+9494+ /**
9595+ * Creates a new XRPC server instance.
9696+ * @param lexicons - Optional array of lexicon documents to register
9797+ * @param opts - Server configuration options
9898+ */
9999+ constructor(lexicons?: LexiconDoc[], opts: Options = {}) {
100100+ this.app = new Hono();
101101+ this.options = opts;
102102+103103+ if (lexicons) {
104104+ this.addLexicons(lexicons);
105105+ }
106106+107107+ // Add global middleware
108108+ this.app.use("*", this.catchall);
109109+ this.app.onError(createErrorHandler(opts));
110110+111111+ if (opts.rateLimits) {
112112+ const { global, shared, creator, bypass } = opts.rateLimits;
113113+114114+ if (global) {
115115+ this.globalRateLimiter = RouteRateLimiter.from(
116116+ global.map((options) => creator(buildRateLimiterOptions(options))),
117117+ { bypass },
118118+ );
119119+ }
120120+121121+ if (shared) {
122122+ this.sharedRateLimiters = new Map(
123123+ shared.map((options) => [
124124+ options.name,
125125+ creator(buildRateLimiterOptions(options)),
126126+ ]),
127127+ );
128128+ }
129129+ }
130130+ }
131131+132132+ // handlers
133133+ // =
134134+135135+ /**
136136+ * Registers a method handler for the specified NSID.
137137+ * @param nsid - The namespace identifier for the method
138138+ * @param configOrFn - Either a handler function or full method configuration
139139+ */
140140+ method(
141141+ nsid: string,
142142+ configOrFn: MethodConfigOrHandler,
143143+ ) {
144144+ this.addMethod(nsid, configOrFn);
145145+ }
146146+147147+ /**
148148+ * Adds a method handler for the specified NSID.
149149+ * @param nsid - The namespace identifier for the method
150150+ * @param configOrFn - Either a handler function or full method configuration
151151+ * @throws {Error} If the method is not found in the lexicon or is not a query/procedure
152152+ */
153153+ addMethod(
154154+ nsid: string,
155155+ configOrFn: MethodConfigOrHandler,
156156+ ) {
157157+ const config = typeof configOrFn === "function"
158158+ ? { handler: configOrFn }
159159+ : configOrFn;
160160+ const def = this.lex.getDef(nsid);
161161+ if (!def || (def.type !== "query" && def.type !== "procedure")) {
162162+ throw new Error(`Method not found in lexicon: ${nsid}`);
163163+ }
164164+ this.addRoute(nsid, def, config);
165165+ }
166166+167167+ /**
168168+ * Registers a streaming method handler for the specified NSID.
169169+ * @param nsid - The namespace identifier for the streaming method
170170+ * @param configOrFn - Either a stream handler function or full stream configuration
171171+ */
172172+ streamMethod(
173173+ nsid: string,
174174+ configOrFn: StreamConfigOrHandler,
175175+ ) {
176176+ this.addStreamMethod(nsid, configOrFn);
177177+ }
178178+179179+ /**
180180+ * Adds a streaming method handler for the specified NSID.
181181+ * @param nsid - The namespace identifier for the streaming method
182182+ * @param configOrFn - Either a stream handler function or full stream configuration
183183+ * @throws {Error} If the subscription is not found in the lexicon
184184+ */
185185+ addStreamMethod(
186186+ nsid: string,
187187+ configOrFn: StreamConfigOrHandler,
188188+ ) {
189189+ const config = typeof configOrFn === "function"
190190+ ? { handler: configOrFn }
191191+ : configOrFn;
192192+ const def = this.lex.getDef(nsid);
193193+ if (!def || def.type !== "subscription") {
194194+ throw new Error(`Subscription not found in lexicon: ${nsid}`);
195195+ }
196196+ this.addSubscription(nsid, def, config);
197197+ }
198198+199199+ // lexicon
200200+ // =
201201+202202+ /**
203203+ * Adds a lexicon document to the server's schema registry.
204204+ * @param doc - The lexicon document to add
205205+ */
206206+ addLexicon(doc: LexiconDoc) {
207207+ this.lex.add(doc);
208208+ }
209209+210210+ /**
211211+ * Adds multiple lexicon documents to the server's schema registry.
212212+ * @param docs - Array of lexicon documents to add
213213+ */
214214+ addLexicons(docs: LexiconDoc[]) {
215215+ for (const doc of docs) {
216216+ this.addLexicon(doc);
217217+ }
218218+ }
219219+220220+ // routes
221221+ // =
222222+223223+ /**
224224+ * Adds an HTTP route for the specified method.
225225+ * @param nsid - The namespace identifier for the method
226226+ * @param def - The lexicon definition for the method
227227+ * @param config - The method configuration including handler and options
228228+ * @protected
229229+ */
230230+ protected addRoute(
231231+ nsid: string,
232232+ def: LexXrpcQuery | LexXrpcProcedure,
233233+ config: MethodConfig,
234234+ ) {
235235+ const path = `/xrpc/${nsid}`;
236236+ const handler = this.createHandler(nsid, def, config);
237237+238238+ if (def.type === "procedure") {
239239+ this.app.post(path, handler);
240240+ } else {
241241+ this.app.get(path, handler);
242242+ }
243243+ }
244244+245245+ /**
246246+ * Catchall handler that processes all XRPC routes and applies global rate limiting.
247247+ * Only applies to routes starting with "/xrpc/".
248248+ */
249249+ catchall: CatchallHandler = async (c, next) => { // catchall handler only applies to XRPC routes
250250+ if (!c.req.url.startsWith("/xrpc/")) return next();
251251+252252+ // Validate the NSID
253253+ const nsid = parseUrlNsid(c.req.url);
254254+ if (!nsid) {
255255+ throw new InvalidRequestError("invalid xrpc path");
256256+ }
257257+258258+ if (this.globalRateLimiter) {
259259+ try {
260260+ await this.globalRateLimiter.handle({
261261+ req: c.req.raw,
262262+ res: new Response(),
263263+ auth: undefined,
264264+ params: {},
265265+ input: undefined,
266266+ async resetRouteRateLimits() {},
267267+ });
268268+ } catch {
269269+ return next();
270270+ }
271271+ }
272272+273273+ // Ensure that known XRPC methods are only called with the correct HTTP
274274+ // method.
275275+ const def = this.lex.getDef(nsid);
276276+ if (def) {
277277+ const expectedMethod = def.type === "procedure"
278278+ ? "POST"
279279+ : def.type === "query"
280280+ ? "GET"
281281+ : null;
282282+ if (expectedMethod != null && expectedMethod !== c.req.method) {
283283+ throw new InvalidRequestError(
284284+ `Incorrect HTTP method (${c.req.method}) expected ${expectedMethod}`,
285285+ );
286286+ }
287287+ }
288288+289289+ if (this.options.catchall) {
290290+ await this.options.catchall(c, next);
291291+ } else if (!def) {
292292+ throw new MethodNotImplementedError();
293293+ } else {
294294+ await next();
295295+ }
296296+ };
297297+298298+ /**
299299+ * Creates a parameter verification function for the given method definition.
300300+ * @param _nsid - The namespace identifier (unused)
301301+ * @param def - The lexicon definition containing parameter schema
302302+ * @returns A function that validates and transforms query parameters
303303+ * @protected
304304+ */
305305+ protected createParamsVerifier(
306306+ _nsid: string,
307307+ def: LexXrpcQuery | LexXrpcProcedure | LexXrpcSubscription,
308308+ ): (query: Record<string, unknown>) => Params {
309309+ if (!def.parameters) {
310310+ return () => ({});
311311+ }
312312+ return (query: Record<string, unknown>) => {
313313+ return query as Params;
314314+ };
315315+ }
316316+317317+ /**
318318+ * Creates an input verification function for the given method definition.
319319+ * @param nsid - The namespace identifier for the method
320320+ * @param def - The lexicon definition containing input schema
321321+ * @returns A function that validates and transforms request input
322322+ * @protected
323323+ */
324324+ protected createInputVerifier(
325325+ nsid: string,
326326+ def: LexXrpcQuery | LexXrpcProcedure,
327327+ ): (req: Request) => Promise<HandlerInput | undefined> {
328328+ return createInputVerifier(this.lex, nsid, def);
329329+ }
330330+331331+ /**
332332+ * Creates an authentication verification function.
333333+ * @param _nsid - The namespace identifier (unused)
334334+ * @param verifier - Optional custom authentication verifier
335335+ * @returns A function that performs authentication for the method
336336+ * @protected
337337+ */
338338+ protected createAuthVerifier(
339339+ _nsid: string,
340340+ verifier?: MethodAuthVerifier,
341341+ ): (params: Params, input: Input, req: Request) => Promise<Auth> {
342342+ return async (
343343+ params: Params,
344344+ input: Input,
345345+ req: Request,
346346+ ): Promise<Auth> => {
347347+ if (verifier) {
348348+ return await verifier({
349349+ params,
350350+ input,
351351+ req,
352352+ res: new Response(),
353353+ });
354354+ }
355355+ return undefined;
356356+ };
357357+ }
358358+359359+ /**
360360+ * Creates a Hono handler function for the specified XRPC method.
361361+ * @template A - The authentication type
362362+ * @param nsid - The namespace identifier for the method
363363+ * @param def - The lexicon definition for the method
364364+ * @param routeCfg - The method configuration including handler and options
365365+ * @returns A Hono handler function
366366+ */
367367+ createHandler<A extends Auth = Auth>(
368368+ nsid: string,
369369+ def: LexXrpcQuery | LexXrpcProcedure,
370370+ routeCfg: MethodConfig<A>,
371371+ ): Handler {
372372+ const verifyParams = this.createParamsVerifier(nsid, def);
373373+ const verifyInput = this.createInputVerifier(nsid, def);
374374+ const verifyAuth = this.createAuthVerifier(nsid, routeCfg.auth);
375375+ const validateReqNSID = () => nsid;
376376+ const validateOutputFn = (output?: HandlerSuccess) =>
377377+ this.options.validateResponse && output && def.output
378378+ ? validateOutput(nsid, def, output, this.lex)
379379+ : undefined;
380380+381381+ const routeLimiter = this.createRouteRateLimiter(nsid, routeCfg);
382382+383383+ return async (c: Context) => {
384384+ try {
385385+ validateReqNSID();
386386+387387+ const query = getQueryParams(c.req.url);
388388+ const params = verifyParams(decodeUrlQueryParams(query));
389389+390390+ let input: Input = undefined;
391391+ if (def.type === "procedure") {
392392+ input = await verifyInput(c.req.raw);
393393+ }
394394+395395+ const auth = await verifyAuth(params, input, c.req.raw);
396396+397397+ const ctx: HandlerContext<A> = {
398398+ req: c.req.raw,
399399+ res: new Response(),
400400+ params,
401401+ input,
402402+ auth: auth as A,
403403+ resetRouteRateLimits: async () => {},
404404+ };
405405+406406+ if (routeLimiter) {
407407+ const result = await routeLimiter.consume(ctx);
408408+ if (result instanceof RateLimitExceededError) {
409409+ throw result;
410410+ }
411411+ }
412412+413413+ const output = await routeCfg.handler(ctx);
414414+ if (isErrorResult(output)) {
415415+ throw output.error;
416416+ }
417417+418418+ if (isHandlerPipeThroughBuffer(output)) {
419419+ setHeaders(c, output.headers);
420420+ return c.body(new Uint8Array(output.buffer), 200, {
421421+ "Content-Type": output.encoding,
422422+ });
423423+ } else if (isHandlerPipeThroughStream(output)) {
424424+ setHeaders(c, output.headers);
425425+ return c.body(output.stream, 200, {
426426+ "Content-Type": output.encoding,
427427+ });
428428+ }
429429+430430+ if (output) {
431431+ excludeErrorResult(output);
432432+ validateOutputFn(output);
433433+ }
434434+435435+ if (output) {
436436+ setHeaders(c, output.headers);
437437+ if (output.encoding === "application/json") {
438438+ return c.json(output.body);
439439+ } else {
440440+ return c.body(output.body, 200, {
441441+ "Content-Type": output.encoding,
442442+ });
443443+ }
444444+ }
445445+446446+ return c.body(null, 200);
447447+ } catch (err: unknown) {
448448+ throw err || new InternalServerError();
449449+ }
450450+ };
451451+ }
452452+453453+ /**
454454+ * Adds a WebSocket subscription handler for the specified NSID.
455455+ * @param nsid - The namespace identifier for the subscription
456456+ * @param _def - The lexicon definition for the subscription (unused)
457457+ * @param _config - The stream configuration (unused)
458458+ * @protected
459459+ */
460460+ protected addSubscription(
461461+ nsid: string,
462462+ _def: LexXrpcSubscription,
463463+ _config: StreamConfig,
464464+ ) {
465465+ const server = new XrpcStreamServer({
466466+ noServer: true,
467467+ handler: async function* (_req: Request, _signal: AbortSignal) {
468468+ // Stream handler implementation would go here
469469+ yield new ErrorFrame({
470470+ error: "NotImplemented",
471471+ message: "Streaming not implemented",
472472+ });
473473+ },
474474+ });
475475+476476+ this.subscriptions.set(nsid, server);
477477+ }
478478+479479+ /**
480480+ * Creates a route-specific rate limiter based on the method configuration.
481481+ * @template A - The authentication type
482482+ * @template C - The handler context type
483483+ * @param nsid - The namespace identifier for the method
484484+ * @param config - The method configuration containing rate limit options
485485+ * @returns A route rate limiter or undefined if no rate limiting is configured
486486+ * @private
487487+ */
488488+ private createRouteRateLimiter<A extends Auth, C extends HandlerContext>(
489489+ nsid: string,
490490+ config: MethodConfig<A>,
491491+ ): RouteRateLimiter<C> | undefined {
492492+ // @NOTE global & shared rate limiters are instantiated with a context of
493493+ // HandlerContext which is compatible (more generic) with the context of
494494+ // this route specific rate limiters (C). For this reason, it's safe to
495495+ // cast these with an `any` context
496496+497497+ const globalRateLimiter = this.globalRateLimiter as
498498+ | RouteRateLimiter<C>
499499+ | undefined;
500500+501501+ // No route specific rate limiting configured, use the global rate limiter.
502502+ if (!config.rateLimit) return globalRateLimiter;
503503+504504+ const { rateLimits } = this.options;
505505+506506+ // @NOTE Silently ignore creation of route specific rate limiter if the
507507+ // `rateLimits` options was not provided to the constructor.
508508+ if (!rateLimits) return globalRateLimiter;
509509+510510+ const { creator, bypass } = rateLimits;
511511+512512+ const rateLimiters = asArray(config.rateLimit).map((options, i) => {
513513+ if (isSharedRateLimitOpts(options)) {
514514+ const rateLimiter = this.sharedRateLimiters?.get(options.name);
515515+516516+ // The route config references a shared rate limiter that does not
517517+ // exist. This is a configuration error.
518518+ assert(
519519+ rateLimiter,
520520+ `Shared rate limiter "${options.name}" not defined`,
521521+ );
522522+523523+ return WrappedRateLimiter.from<C>(
524524+ rateLimiter as unknown as RateLimiterI<C>,
525525+ options as unknown as WrappedRateLimiterOptions<C>,
526526+ );
527527+ } else {
528528+ return creator({
529529+ ...options,
530530+ calcKey: options.calcKey ?? defaultKey,
531531+ calcPoints: options.calcPoints ?? defaultPoints,
532532+ keyPrefix: `${nsid}-${i}`,
533533+ });
534534+ }
535535+ });
536536+537537+ // If the route config contains an empty array, use global rate limiter.
538538+ if (!rateLimiters.length) return globalRateLimiter;
539539+540540+ // The global rate limiter (if present) should be applied in addition to
541541+ // the route specific rate limiters.
542542+ if (globalRateLimiter) rateLimiters.push(globalRateLimiter);
543543+544544+ return RouteRateLimiter.from<C>(
545545+ rateLimiters as unknown as readonly RateLimiterI<C>[],
546546+ { bypass },
547547+ );
548548+ }
549549+550550+ /**
551551+ * Gets the underlying Hono app instance for external use.
552552+ * @returns The Hono application instance
553553+ */
554554+ get handler(): Hono {
555555+ return this.app;
556556+ }
557557+}
558558+559559+/**
560560+ * Creates an error handler function for the Hono application.
561561+ * @param opts - Server options containing optional error parser
562562+ * @returns An error handler function that converts errors to XRPC error responses
563563+ */
564564+function createErrorHandler(opts: Options) {
565565+ return (err: Error, c: Context) => {
566566+ const errorParser = opts.errorParser ||
567567+ ((e: unknown) => XRPCError.fromError(e));
568568+ const xrpcError = errorParser(err);
569569+570570+ const statusCode = "statusCode" in xrpcError
571571+ ? (xrpcError as { statusCode: number }).statusCode
572572+ : 500;
573573+574574+ return c.json(
575575+ {
576576+ error: xrpcError.type || "InternalServerError",
577577+ message: xrpcError.message || "Internal Server Error",
578578+ },
579579+ statusCode as 500,
580580+ );
581581+ };
582582+}
583583+584584+/**
585585+ * Type guard to check if an object is a Pino HTTP request object.
586586+ * @param obj - The object to check
587587+ * @returns True if the object has a req property
588588+ * @private
589589+ */
590590+function _isPinoHttpRequest(obj: unknown): obj is {
591591+ req: unknown;
592592+} {
593593+ return (
594594+ !!obj &&
595595+ typeof obj === "object" &&
596596+ "req" in obj
597597+ );
598598+}
599599+600600+/**
601601+ * Converts an error to a simplified error-like object for logging.
602602+ * @param err - The error to convert
603603+ * @returns A simplified error object or the original value
604604+ * @private
605605+ */
606606+function _toSimplifiedErrorLike(err: unknown) {
607607+ if (err instanceof Error) {
608608+ return {
609609+ name: err.name,
610610+ message: err.message,
611611+ stack: err.stack,
612612+ };
613613+ }
614614+ return err;
615615+}
616616+617617+/**
618618+ * Builds rate limiter options from a server rate limit description.
619619+ * @template C - The handler context type
620620+ * @param options - The server rate limit description
621621+ * @returns Rate limiter options with defaults applied
622622+ */
623623+function buildRateLimiterOptions<C extends HandlerContext = HandlerContext>({
624624+ name,
625625+ calcKey = defaultKey,
626626+ calcPoints = defaultPoints,
627627+ ...desc
628628+}: ServerRateLimitDescription<C>): RateLimiterOptions<C> {
629629+ return { ...desc, calcKey, calcPoints, keyPrefix: `rl-${name}` };
630630+}
631631+632632+/**
633633+ * Default function for calculating rate limit points consumed per request.
634634+ * Always returns 1 point per request.
635635+ */
636636+const defaultPoints: CalcPointsFn = () => 1;
637637+638638+/**
639639+ * Default function for calculating rate limit keys based on client IP address.
640640+ * Extracts IP from X-Forwarded-For, X-Real-IP headers, or falls back to "unknown".
641641+ *
642642+ * @note When using a proxy, ensure headers are getting forwarded correctly:
643643+ * `app.set('trust proxy', true)`
644644+ *
645645+ * @see {@link https://expressjs.com/en/guide/behind-proxies.html}
646646+ */
647647+const defaultKey: CalcKeyFn<HandlerContext> = ({ req }) => {
648648+ const forwarded = req.headers.get("x-forwarded-for");
649649+ const ip = forwarded
650650+ ? forwarded.split(",")[0]
651651+ : req.headers.get("x-real-ip") ||
652652+ "unknown";
653653+ return ip;
654654+};
+182
xrpc-server/stream/frames.ts
···11+import * as uint8arrays from "uint8arrays";
22+import { cborDecodeMulti, cborEncode } from "@atproto/common";
33+import type {
44+ ErrorFrameBody,
55+ ErrorFrameHeader,
66+ FrameHeader,
77+ MessageFrameHeader,
88+} from "./types.ts";
99+import { errorFrameBody, frameHeader, FrameType } from "./types.ts";
1010+1111+/**
1212+ * Abstract base class for XRPC stream frames.
1313+ * Frames are the basic unit of communication in XRPC streaming, consisting of a header and body.
1414+ * Each frame is serialized as CBOR for efficient binary transmission.
1515+ *
1616+ * @abstract
1717+ * @property {FrameHeader} header - Frame header containing operation type and metadata
1818+ * @property {unknown} body - Frame payload data
1919+ */
2020+export abstract class Frame {
2121+ abstract header: FrameHeader;
2222+ body: unknown;
2323+2424+ /**
2525+ * Gets the operation type of the frame.
2626+ * @returns {FrameType} The frame's operation type
2727+ */
2828+ get op(): FrameType {
2929+ return this.header.op;
3030+ }
3131+3232+ /**
3333+ * Serializes the frame to a binary format using CBOR encoding.
3434+ * The resulting bytes contain both the header and body concatenated.
3535+ * @returns {Uint8Array} The serialized frame as bytes
3636+ */
3737+ toBytes(): Uint8Array {
3838+ return uint8arrays.concat([cborEncode(this.header), cborEncode(this.body)]);
3939+ }
4040+4141+ /**
4242+ * Type guard to check if this frame is a MessageFrame.
4343+ * @returns {boolean} True if this is a MessageFrame
4444+ */
4545+ isMessage(): this is MessageFrame<unknown> {
4646+ return this.op === FrameType.Message;
4747+ }
4848+4949+ /**
5050+ * Type guard to check if this frame is an ErrorFrame.
5151+ * @returns {boolean} True if this is an ErrorFrame
5252+ */
5353+ isError(): this is ErrorFrame {
5454+ return this.op === FrameType.Error;
5555+ }
5656+5757+ /**
5858+ * Deserializes a frame from its binary representation.
5959+ * Validates the frame structure and creates the appropriate frame type.
6060+ *
6161+ * @param {Uint8Array} bytes - The serialized frame bytes
6262+ * @returns {Frame} The deserialized frame (either MessageFrame or ErrorFrame)
6363+ * @throws {Error} If the frame format is invalid or unknown
6464+ */
6565+ static fromBytes(bytes: Uint8Array): Frame {
6666+ const decoded = cborDecodeMulti(bytes);
6767+ if (decoded.length > 2) {
6868+ throw new Error("Too many CBOR data items in frame");
6969+ }
7070+ const header = decoded[0];
7171+ let body: unknown = kUnset;
7272+ if (decoded.length > 1) {
7373+ body = decoded[1];
7474+ }
7575+ const parsedHeader = frameHeader.safeParse(header);
7676+ if (!parsedHeader.success) {
7777+ throw new Error(`Invalid frame header: ${parsedHeader.error.message}`);
7878+ }
7979+ if (body === kUnset) {
8080+ throw new Error("Missing frame body");
8181+ }
8282+ const frameOp = parsedHeader.data.op;
8383+ if (frameOp === FrameType.Message) {
8484+ return new MessageFrame(body, {
8585+ type: parsedHeader.data.t,
8686+ });
8787+ } else if (frameOp === FrameType.Error) {
8888+ const parsedBody = errorFrameBody.safeParse(body);
8989+ if (!parsedBody.success) {
9090+ throw new Error(
9191+ `Invalid error frame body: ${parsedBody.error.message}`,
9292+ );
9393+ }
9494+ return new ErrorFrame(parsedBody.data);
9595+ } else {
9696+ const exhaustiveCheck: never = frameOp;
9797+ throw new Error(`Unknown frame op: ${exhaustiveCheck}`);
9898+ }
9999+ }
100100+}
101101+102102+/**
103103+ * Frame type for sending messages/data over an XRPC stream.
104104+ * Can contain any type of payload data and an optional message type identifier.
105105+ *
106106+ * @template T - The type of the message body, defaults to Record<string, unknown>
107107+ * @extends {Frame}
108108+ * @property {MessageFrameHeader} header - Message frame header
109109+ * @property {T} body - Message payload data
110110+ */
111111+export class MessageFrame<T = Record<string, unknown>> extends Frame {
112112+ header: MessageFrameHeader;
113113+ override body: T;
114114+115115+ /**
116116+ * Creates a new MessageFrame.
117117+ * @param {T} body - The message payload
118118+ * @param {Object} [opts] - Optional frame configuration
119119+ * @param {string} [opts.type] - Message type identifier
120120+ */
121121+ constructor(body: T, opts?: { type?: string }) {
122122+ super();
123123+ this.header = opts?.type !== undefined
124124+ ? { op: FrameType.Message, t: opts?.type }
125125+ : { op: FrameType.Message };
126126+ this.body = body;
127127+ }
128128+129129+ /**
130130+ * Gets the message type identifier.
131131+ * @returns {string | undefined} The message type, if specified
132132+ */
133133+ get type(): string | undefined {
134134+ return this.header.t;
135135+ }
136136+}
137137+138138+/**
139139+ * Frame type for sending errors over an XRPC stream.
140140+ * Contains an error code and optional error message.
141141+ *
142142+ * @template T - The type of error code string
143143+ * @extends {Frame}
144144+ * @property {ErrorFrameHeader} header - Error frame header
145145+ * @property {ErrorFrameBody<T>} body - Error details including code and message
146146+ */
147147+export class ErrorFrame<T extends string = string> extends Frame {
148148+ header: ErrorFrameHeader;
149149+ override body: ErrorFrameBody<T>;
150150+151151+ /**
152152+ * Creates a new ErrorFrame.
153153+ * @param {ErrorFrameBody<T>} body - The error details
154154+ */
155155+ constructor(body: ErrorFrameBody<T>) {
156156+ super();
157157+ this.header = { op: FrameType.Error };
158158+ this.body = body;
159159+ }
160160+161161+ /**
162162+ * Gets the error code.
163163+ * @returns {string} The error code
164164+ */
165165+ get code(): string {
166166+ return this.body.error;
167167+ }
168168+169169+ /**
170170+ * Gets the error message.
171171+ * @returns {string | undefined} The error message, if provided
172172+ */
173173+ get message(): string | undefined {
174174+ return this.body.message;
175175+ }
176176+}
177177+178178+/**
179179+ * Symbol used internally to detect unset frame body.
180180+ * @private
181181+ */
182182+const kUnset = Symbol("unset");
+6
xrpc-server/stream/index.ts
···11+export * from "./types.ts";
22+export * from "./frames.ts";
33+export * from "./stream.ts";
44+export * from "./subscription.ts";
55+export * from "./server.ts";
66+export * from "./websocket-keepalive.ts";
+24
xrpc-server/stream/logger.ts
···11+import { subsystemLogger } from "@atproto/common";
22+33+/**
44+ * Logger instance for XRPC streaming operations.
55+ * This is a subsystem logger specifically configured for logging events
66+ * related to WebSocket streaming, connection management, and stream processing.
77+ *
88+ * @example
99+ * ```typescript
1010+ * import { logger } from './logger';
1111+ *
1212+ * logger.info('WebSocket connection established');
1313+ * logger.error(error, 'Stream processing failed');
1414+ * ```
1515+ */
1616+export const logger: ReturnType<typeof subsystemLogger> = subsystemLogger(
1717+ "xrpc-stream",
1818+);
1919+2020+/**
2121+ * Default export of the XRPC stream logger.
2222+ * Same as the named export, provided for convenience.
2323+ */
2424+export default logger;
+109
xrpc-server/stream/server.ts
···11+import { type ServerOptions, WebSocketServer } from "ws";
22+import { ErrorFrame, type Frame } from "./frames.ts";
33+import { logger } from "../logger.ts";
44+import { CloseCode, DisconnectError } from "./types.ts";
55+66+/**
77+ * XRPC WebSocket streaming server implementation.
88+ * Handles WebSocket connections and message streaming for XRPC methods.
99+ * @class
1010+ */
1111+export class XrpcStreamServer {
1212+ wss: WebSocketServer;
1313+1414+ /**
1515+ * Creates a new XRPC streaming server instance.
1616+ * @constructor
1717+ * @param {Object} opts - Server configuration options
1818+ * @param {Handler} opts.handler - Function to handle incoming WebSocket connections
1919+ * @param {ServerOptions} opts - Additional WebSocket server options
2020+ */
2121+ constructor(opts: ServerOptions & { handler: Handler }) {
2222+ const { handler, ...serverOpts } = opts;
2323+ this.wss = new WebSocketServer(serverOpts);
2424+ this.wss.on(
2525+ "connection",
2626+ async (socket: WebSocket, req: Request) => {
2727+ socket.onerror = (ev: Event | ErrorEvent) => {
2828+ if (ev instanceof ErrorEvent) {
2929+ logger.error(ev.error, "websocket error");
3030+ } else {
3131+ logger.error(ev, "websocket error");
3232+ }
3333+ };
3434+ try {
3535+ const ac = new AbortController();
3636+ const iterator = unwrapIterator(
3737+ handler(req, ac.signal, socket, this),
3838+ );
3939+ socket.onclose = () => {
4040+ iterator.return?.();
4141+ ac.abort();
4242+ };
4343+ const safeFrames = wrapIterator(iterator);
4444+ for await (const frame of safeFrames) {
4545+ await new Promise<void>((res, rej) => {
4646+ try {
4747+ socket.send((frame as Frame).toBytes());
4848+ res();
4949+ } catch (err) {
5050+ rej(err);
5151+ }
5252+ });
5353+ if (frame instanceof ErrorFrame) {
5454+ throw new DisconnectError(CloseCode.Policy, frame.body.error);
5555+ }
5656+ }
5757+ } catch (err) {
5858+ if (err instanceof DisconnectError) {
5959+ return socket.close(err.wsCode, err.xrpcCode);
6060+ } else {
6161+ logger.error({ err }, "websocket server error");
6262+ return socket.close(CloseCode.Abnormal);
6363+ }
6464+ }
6565+ socket.close(CloseCode.Normal);
6666+ },
6767+ );
6868+ }
6969+}
7070+7171+/**
7272+ * Handler function type for WebSocket connections.
7373+ * @callback Handler
7474+ * @param {Request} req - The incoming WebSocket request
7575+ * @param {AbortSignal} signal - Signal for detecting connection abort
7676+ * @param {WebSocket} socket - The WebSocket connection
7777+ * @param {XrpcStreamServer} server - The server instance
7878+ * @returns {AsyncIterable<Frame>} An async iterable of frames to send
7979+ */
8080+export type Handler = (
8181+ req: Request,
8282+ signal: AbortSignal,
8383+ socket: WebSocket,
8484+ server: XrpcStreamServer,
8585+) => AsyncIterable<Frame>;
8686+8787+/**
8888+ * Unwraps an AsyncIterable into its AsyncIterator.
8989+ * @template T - The type of values being iterated
9090+ * @param {AsyncIterable<T>} iterable - The iterable to unwrap
9191+ * @returns {AsyncIterator<T>} The unwrapped iterator
9292+ */
9393+function unwrapIterator<T>(iterable: AsyncIterable<T>): AsyncIterator<T> {
9494+ return iterable[Symbol.asyncIterator]();
9595+}
9696+9797+/**
9898+ * Wraps an AsyncIterator back into an AsyncIterable.
9999+ * @template T - The type of values being iterated
100100+ * @param {AsyncIterator<T>} iterator - The iterator to wrap
101101+ * @returns {AsyncIterable<T>} The wrapped iterable
102102+ */
103103+function wrapIterator<T>(iterator: AsyncIterator<T>): AsyncIterable<T> {
104104+ return {
105105+ [Symbol.asyncIterator]() {
106106+ return iterator;
107107+ },
108108+ };
109109+}
+98
xrpc-server/stream/stream.ts
···11+import { ResponseType, XRPCError } from "@atproto/xrpc";
22+import { Frame } from "./frames.ts";
33+import type { MessageFrame } from "./frames.ts";
44+55+/**
66+ * Converts a WebSocket connection into an async generator of Frame objects.
77+ * Handles both message and error frames, with proper error propagation.
88+ *
99+ * @param {WebSocket} ws - The WebSocket connection to read from
1010+ * @yields {Frame} Each frame received from the WebSocket
1111+ * @throws {Error} Any WebSocket error that occurs during communication
1212+ *
1313+ * @example
1414+ * ```typescript
1515+ * const ws = new WebSocket(url);
1616+ * for await (const frame of byFrame(ws)) {
1717+ * // Process each frame
1818+ * console.log(frame.type);
1919+ * }
2020+ * ```
2121+ */
2222+export async function* byFrame(
2323+ ws: WebSocket,
2424+): AsyncGenerator<Frame> {
2525+ const messageQueue: Frame[] = [];
2626+ let error: Error | null = null;
2727+ let done = false;
2828+2929+ ws.onmessage = (ev) => {
3030+ if (ev.data instanceof Uint8Array) {
3131+ messageQueue.push(Frame.fromBytes(ev.data));
3232+ }
3333+ };
3434+ ws.onerror = (ev) => {
3535+ if (ev instanceof ErrorEvent) {
3636+ error = ev.error;
3737+ }
3838+ };
3939+ ws.onclose = () => {
4040+ done = true;
4141+ };
4242+4343+ while (!done && !error) {
4444+ if (messageQueue.length > 0) {
4545+ yield messageQueue.shift()!;
4646+ } else {
4747+ await new Promise((resolve) => setTimeout(resolve, 0));
4848+ }
4949+ }
5050+5151+ if (error) throw error;
5252+}
5353+5454+/**
5555+ * Converts a WebSocket connection into an async generator of MessageFrames.
5656+ * Automatically filters and validates frames to ensure they are valid messages.
5757+ * Error frames are converted to exceptions.
5858+ *
5959+ * @param {WebSocket} ws - The WebSocket connection to read from
6060+ * @yields {MessageFrame<unknown>} Each message frame received from the WebSocket
6161+ * @throws {XRPCError} If an error frame is received or an invalid frame type is encountered
6262+ *
6363+ * @example
6464+ * ```typescript
6565+ * const ws = new WebSocket(url);
6666+ * for await (const message of byMessage(ws)) {
6767+ * // Process each message
6868+ * console.log(message.body);
6969+ * }
7070+ * ```
7171+ */
7272+export async function* byMessage(
7373+ ws: WebSocket,
7474+): AsyncGenerator<MessageFrame<unknown>> {
7575+ for await (const frame of byFrame(ws)) {
7676+ yield ensureChunkIsMessage(frame);
7777+ }
7878+}
7979+8080+/**
8181+ * Validates that a frame is a MessageFrame and converts it to the appropriate type.
8282+ * If the frame is an error frame, throws an XRPCError with the error details.
8383+ *
8484+ * @param {Frame} frame - The frame to validate
8585+ * @returns {MessageFrame<unknown>} The frame as a MessageFrame if valid
8686+ * @throws {XRPCError} If the frame is an error frame or an invalid type
8787+ * @internal
8888+ */
8989+export function ensureChunkIsMessage(frame: Frame): MessageFrame<unknown> {
9090+ if (frame.isMessage()) {
9191+ return frame;
9292+ } else if (frame.isError()) {
9393+ // @TODO work -1 error code into XRPCError
9494+ throw new XRPCError(-1, frame.code, frame.message);
9595+ } else {
9696+ throw new XRPCError(ResponseType.Unknown, undefined, "Unknown frame type");
9797+ }
9898+}
+139
xrpc-server/stream/subscription.ts
···11+import { ensureChunkIsMessage } from "./stream.ts";
22+import { WebSocketKeepAlive } from "./websocket-keepalive.ts";
33+import { Frame } from "./frames.ts";
44+import type { WebSocketOptions } from "./types.ts";
55+66+/**
77+ * Represents a message body in a subscription stream.
88+ * @interface
99+ * @property {string} [$type] - Optional type identifier for the message
1010+ * @property {unknown} [key: string] - Additional message properties
1111+ */
1212+interface MessageBody {
1313+ $type?: string;
1414+ [key: string]: unknown;
1515+}
1616+1717+/**
1818+ * Represents a subscription to an XRPC streaming endpoint.
1919+ * Handles WebSocket connection management, reconnection, and message parsing.
2020+ * @class
2121+ * @template T - The type of messages yielded by the subscription
2222+ */
2323+export class Subscription<T = unknown> {
2424+ /**
2525+ * Creates a new subscription instance.
2626+ * @constructor
2727+ * @param {Object} opts - Subscription configuration options
2828+ * @param {string} opts.service - The base URL of the XRPC service
2929+ * @param {string} opts.method - The XRPC method to subscribe to
3030+ * @param {number} [opts.maxReconnectSeconds] - Maximum time in seconds between reconnection attempts
3131+ * @param {number} [opts.heartbeatIntervalMs] - Interval in milliseconds for sending heartbeat messages
3232+ * @param {AbortSignal} [opts.signal] - Signal for aborting the subscription
3333+ * @param {Function} opts.validate - Function to validate and transform incoming messages
3434+ * @param {Function} [opts.onReconnectError] - Callback for handling reconnection errors
3535+ * @param {Function} [opts.getParams] - Function to get query parameters for the subscription URL
3636+ */
3737+ constructor(
3838+ public opts: WebSocketOptions & {
3939+ service: string;
4040+ method: string;
4141+ maxReconnectSeconds?: number;
4242+ heartbeatIntervalMs?: number;
4343+ signal?: AbortSignal;
4444+ validate: (obj: unknown) => T | undefined;
4545+ onReconnectError?: (
4646+ error: unknown,
4747+ n: number,
4848+ initialSetup: boolean,
4949+ ) => void;
5050+ getParams?: () =>
5151+ | Record<string, unknown>
5252+ | Promise<Record<string, unknown> | undefined>
5353+ | undefined;
5454+ },
5555+ ) {}
5656+5757+ /**
5858+ * Implements the AsyncIterator protocol for the subscription.
5959+ * Allows using the subscription in a for-await-of loop.
6060+ * @returns {AsyncGenerator<T>} An async generator that yields validated messages
6161+ */
6262+ async *[Symbol.asyncIterator](): AsyncGenerator<T> {
6363+ const ws = new WebSocketKeepAlive({
6464+ ...this.opts,
6565+ getUrl: async () => {
6666+ const params = (await this.opts.getParams?.()) ?? {};
6767+ const query = encodeQueryParams(params);
6868+ return `${this.opts.service}/xrpc/${this.opts.method}?${query}`;
6969+ },
7070+ });
7171+ for await (const chunk of ws) {
7272+ const frame = Frame.fromBytes(chunk);
7373+ const message = ensureChunkIsMessage(frame);
7474+ const t = message.header.t;
7575+ const clone = message.body !== undefined
7676+ ? { ...message.body } as MessageBody
7777+ : undefined;
7878+ if (clone !== undefined && t !== undefined) {
7979+ clone.$type = t.startsWith("#") ? this.opts.method + t : t;
8080+ }
8181+ const result = this.opts.validate(clone);
8282+ if (result !== undefined) {
8383+ yield result;
8484+ }
8585+ }
8686+ }
8787+}
8888+8989+export default Subscription;
9090+9191+/**
9292+ * Encodes an object of parameters into a URL query string.
9393+ * @param {Record<string, unknown>} obj - The parameters to encode
9494+ * @returns {string} The encoded query string
9595+ */
9696+function encodeQueryParams(obj: Record<string, unknown>): string {
9797+ const params = new URLSearchParams();
9898+ Object.entries(obj).forEach(([key, value]) => {
9999+ const encoded = encodeQueryParam(value);
100100+ if (Array.isArray(encoded)) {
101101+ encoded.forEach((enc) => params.append(key, enc));
102102+ } else {
103103+ params.set(key, encoded);
104104+ }
105105+ });
106106+ return params.toString();
107107+}
108108+109109+/**
110110+ * Encodes a single query parameter value into a string or array of strings.
111111+ * Handles various types including strings, numbers, booleans, dates, and arrays.
112112+ * @param {unknown} value - The value to encode
113113+ * @returns {string | string[]} The encoded parameter value(s)
114114+ * @throws {Error} If the value cannot be encoded as a query parameter
115115+ */
116116+function encodeQueryParam(value: unknown): string | string[] {
117117+ if (typeof value === "string") {
118118+ return value;
119119+ }
120120+ if (typeof value === "number") {
121121+ return value.toString();
122122+ }
123123+ if (typeof value === "boolean") {
124124+ return value ? "true" : "false";
125125+ }
126126+ if (typeof value === "undefined") {
127127+ return "";
128128+ }
129129+ if (typeof value === "object") {
130130+ if (value instanceof Date) {
131131+ return value.toISOString();
132132+ } else if (Array.isArray(value)) {
133133+ return value.flatMap(encodeQueryParam);
134134+ } else if (!value) {
135135+ return "";
136136+ }
137137+ }
138138+ throw new Error(`Cannot encode ${typeof value}s into query params`);
139139+}
+121
xrpc-server/stream/types.ts
···11+import { z } from "zod";
22+33+/**
44+ * Enumeration of frame types used in the XRPC streaming protocol.
55+ * @enum {number}
66+ */
77+export enum FrameType {
88+ /** Normal message frame */
99+ Message = 1,
1010+ /** Error message frame */
1111+ Error = -1,
1212+}
1313+1414+/**
1515+ * WebSocket connection options.
1616+ * @interface
1717+ * @property {Record<string, string>} [headers] - Additional headers for the WebSocket connection
1818+ * @property {string[]} [protocols] - WebSocket subprotocols to use
1919+ */
2020+export interface WebSocketOptions {
2121+ headers?: Record<string, string>;
2222+ protocols?: string[];
2323+}
2424+2525+/**
2626+ * Header for message frames.
2727+ * @interface
2828+ * @property {FrameType.Message} op - Operation type, always Message
2929+ * @property {string} [t] - Optional message type discriminator
3030+ */
3131+export type MessageFrameHeader = {
3232+ op: FrameType.Message;
3333+ t?: string;
3434+};
3535+3636+export const messageFrameHeader = z.object({
3737+ op: z.literal(FrameType.Message), // Frame op
3838+ t: z.string().optional(), // Message body type discriminator
3939+}).strict() as z.ZodType<MessageFrameHeader>;
4040+4141+/**
4242+ * Header for error frames.
4343+ * @interface
4444+ * @property {FrameType.Error} op - Operation type, always Error
4545+ */
4646+export type ErrorFrameHeader = {
4747+ op: FrameType.Error;
4848+};
4949+5050+export const errorFrameHeader = z.object({
5151+ op: z.literal(FrameType.Error),
5252+}).strict() as z.ZodType<ErrorFrameHeader>;
5353+5454+/**
5555+ * Base type for error frame bodies.
5656+ * @interface
5757+ * @property {string} error - Error code or identifier
5858+ * @property {string} [message] - Optional error message
5959+ */
6060+export type ErrorFrameBodyBase = {
6161+ error: string;
6262+ message?: string;
6363+};
6464+6565+/**
6666+ * Generic error frame body with typed error codes.
6767+ * @template T - The type of error codes allowed
6868+ * @interface
6969+ * @property {T} error - Typed error code
7070+ * @property {string} [message] - Optional error message
7171+ */
7272+export type ErrorFrameBody<T extends string = string> = {
7373+ error: T;
7474+ message?: string;
7575+};
7676+7777+export const errorFrameBody = z.object({
7878+ error: z.string(), // Error code
7979+ message: z.string().optional(), // Error message
8080+}).strict() as z.ZodType<ErrorFrameBodyBase>;
8181+8282+/**
8383+ * Union type for all frame headers.
8484+ * Can be either a message frame header or an error frame header.
8585+ */
8686+export type FrameHeader = MessageFrameHeader | ErrorFrameHeader;
8787+8888+export const frameHeader = z.union([
8989+ messageFrameHeader,
9090+ errorFrameHeader,
9191+]) as z.ZodType<FrameHeader>;
9292+9393+/**
9494+ * Error class for handling WebSocket disconnections.
9595+ * @class
9696+ * @extends Error
9797+ * @property {CloseCode} wsCode - WebSocket close code
9898+ * @property {string} [xrpcCode] - XRPC-specific error code
9999+ */
100100+export class DisconnectError extends Error {
101101+ constructor(
102102+ public wsCode: CloseCode = CloseCode.Policy,
103103+ public xrpcCode?: string,
104104+ ) {
105105+ super();
106106+ }
107107+}
108108+109109+/**
110110+ * WebSocket close codes as defined in RFC 6455.
111111+ * @see https://www.rfc-editor.org/rfc/rfc6455#section-7.4.1
112112+ * @enum {number}
113113+ */
114114+export enum CloseCode {
115115+ /** Normal closure, meaning the purpose for which the connection was established has been fulfilled */
116116+ Normal = 1000,
117117+ /** Abnormal closure, meaning that the connection was terminated in an abnormal way */
118118+ Abnormal = 1006,
119119+ /** Policy violation, meaning the endpoint is terminating the connection due to a policy violation */
120120+ Policy = 1008,
121121+}
+253
xrpc-server/stream/websocket-keepalive.ts
···11+import { SECOND, wait } from "@atproto/common";
22+import { CloseCode, DisconnectError, type WebSocketOptions } from "./types.ts";
33+44+/**
55+ * WebSocket client with automatic reconnection and heartbeat functionality.
66+ * Handles connection management, reconnection backoff, and keep-alive messages.
77+ * @class
88+ */
99+export class WebSocketKeepAlive {
1010+ /** Current WebSocket connection instance */
1111+ public ws: WebSocket | null = null;
1212+ /** Whether this is the first connection attempt */
1313+ public initialSetup = true;
1414+ /** Number of reconnection attempts made, or null if not reconnecting */
1515+ public reconnects: number | null = null;
1616+1717+ /**
1818+ * Creates a new WebSocket client with keep-alive functionality.
1919+ * @constructor
2020+ * @param {Object} opts - Client configuration options
2121+ * @param {Function} opts.getUrl - Function to get the WebSocket URL
2222+ * @param {number} [opts.maxReconnectSeconds] - Maximum backoff time between reconnection attempts
2323+ * @param {AbortSignal} [opts.signal] - Signal for aborting the connection
2424+ * @param {number} [opts.heartbeatIntervalMs] - Interval between heartbeat messages
2525+ * @param {Function} [opts.onReconnectError] - Callback for handling reconnection errors
2626+ */
2727+ constructor(
2828+ public opts: WebSocketOptions & {
2929+ getUrl: () => Promise<string>;
3030+ maxReconnectSeconds?: number;
3131+ signal?: AbortSignal;
3232+ heartbeatIntervalMs?: number;
3333+ onReconnectError?: (
3434+ error: unknown,
3535+ n: number,
3636+ initialSetup: boolean,
3737+ ) => void;
3838+ },
3939+ ) {}
4040+4141+ /**
4242+ * Implements the AsyncIterator protocol for receiving WebSocket messages.
4343+ * Handles automatic reconnection and message buffering.
4444+ * @returns {AsyncGenerator<Uint8Array>} An async generator that yields received messages
4545+ */
4646+ async *[Symbol.asyncIterator](): AsyncGenerator<Uint8Array> {
4747+ const maxReconnectMs = 1000 * (this.opts.maxReconnectSeconds ?? 64);
4848+ while (true) {
4949+ if (this.reconnects !== null) {
5050+ const duration = this.initialSetup
5151+ ? Math.min(1000, maxReconnectMs)
5252+ : backoffMs(this.reconnects++, maxReconnectMs);
5353+ await wait(duration);
5454+ }
5555+ const url = await this.opts.getUrl();
5656+ this.ws = new WebSocket(url, this.opts.protocols);
5757+ const ac = new AbortController();
5858+ if (this.opts.signal) {
5959+ forwardSignal(this.opts.signal, ac);
6060+ }
6161+ this.ws.onopen = () => {
6262+ this.initialSetup = false;
6363+ this.reconnects = 0;
6464+ if (this.ws) {
6565+ this.startHeartbeat(this.ws);
6666+ }
6767+ };
6868+ this.ws.onclose = (ev: CloseEvent) => {
6969+ if (ev.code === CloseCode.Abnormal) {
7070+ // Forward into an error to distinguish from a clean close
7171+ ac.abort(
7272+ new AbnormalCloseError(`Abnormal ws close: ${ev.reason}`),
7373+ );
7474+ }
7575+ };
7676+7777+ try {
7878+ const messageQueue: Uint8Array[] = [];
7979+ let error: Error | null = null;
8080+ let done = false;
8181+8282+ this.ws.onmessage = (ev: MessageEvent) => {
8383+ if (ev.data instanceof Uint8Array) {
8484+ messageQueue.push(ev.data);
8585+ }
8686+ };
8787+ this.ws.onerror = (ev: Event | ErrorEvent) => {
8888+ if (ev instanceof ErrorEvent) {
8989+ error = ev.error;
9090+ }
9191+ };
9292+ this.ws.onclose = () => {
9393+ done = true;
9494+ };
9595+9696+ while (!done && !error && !ac.signal.aborted) {
9797+ if (messageQueue.length > 0) {
9898+ yield messageQueue.shift()!;
9999+ } else {
100100+ await new Promise((resolve) => setTimeout(resolve, 0));
101101+ }
102102+ }
103103+104104+ if (error) throw error;
105105+ if (ac.signal.aborted) throw ac.signal.reason;
106106+ } catch (_err) {
107107+ const err = isErrorWithCode(_err) && _err.code === "ABORT_ERR"
108108+ ? _err.cause
109109+ : _err;
110110+ if (err instanceof DisconnectError) {
111111+ // We cleanly end the connection
112112+ this.ws?.close(err.wsCode);
113113+ break;
114114+ }
115115+ this.ws?.close(); // No-ops if already closed or closing
116116+ if (isReconnectable(err)) {
117117+ this.reconnects ??= 0; // Never reconnect with a null
118118+ this.opts.onReconnectError?.(err, this.reconnects, this.initialSetup);
119119+ continue;
120120+ } else {
121121+ throw err;
122122+ }
123123+ }
124124+ break; // Other side cleanly ended stream and disconnected
125125+ }
126126+ }
127127+128128+ /**
129129+ * Starts the heartbeat mechanism for a WebSocket connection.
130130+ * Sends periodic ping messages and monitors for pong responses.
131131+ * @param {WebSocket} ws - The WebSocket connection to monitor
132132+ */
133133+ startHeartbeat(ws: WebSocket) {
134134+ let isAlive = true;
135135+ let heartbeatInterval: ReturnType<typeof setInterval> | null = null;
136136+137137+ const checkAlive = () => {
138138+ if (!isAlive) {
139139+ return ws.close();
140140+ }
141141+ isAlive = false; // expect websocket to no longer be alive unless we receive a "pong" within the interval
142142+ ws.send("ping");
143143+ };
144144+145145+ checkAlive();
146146+ heartbeatInterval = setInterval(
147147+ checkAlive,
148148+ this.opts.heartbeatIntervalMs ?? 10 * SECOND,
149149+ );
150150+151151+ ws.onmessage = (ev: MessageEvent) => {
152152+ if (ev.data === "pong") {
153153+ isAlive = true;
154154+ }
155155+ };
156156+ ws.onclose = () => {
157157+ if (heartbeatInterval) {
158158+ clearInterval(heartbeatInterval);
159159+ heartbeatInterval = null;
160160+ }
161161+ };
162162+ }
163163+}
164164+165165+export default WebSocketKeepAlive;
166166+167167+/**
168168+ * Error class for abnormal WebSocket closures.
169169+ * @class
170170+ * @extends Error
171171+ */
172172+class AbnormalCloseError extends Error {
173173+ code = "EWSABNORMALCLOSE";
174174+}
175175+176176+/**
177177+ * Interface for errors with error codes.
178178+ * @interface
179179+ * @property {string} [code] - Error code identifier
180180+ * @property {unknown} [cause] - Underlying cause of the error
181181+ */
182182+interface ErrorWithCode {
183183+ code?: string;
184184+ cause?: unknown;
185185+}
186186+187187+/**
188188+ * Type guard to check if an error has an error code.
189189+ * @param {unknown} err - The error to check
190190+ * @returns {boolean} True if the error has a code property
191191+ */
192192+function isErrorWithCode(err: unknown): err is ErrorWithCode {
193193+ return err !== null && typeof err === "object" && "code" in err;
194194+}
195195+196196+/**
197197+ * Checks if an error should trigger a reconnection attempt.
198198+ * Network-related errors are typically reconnectable.
199199+ * @param {unknown} err - The error to check
200200+ * @returns {boolean} True if the error should trigger a reconnection
201201+ */
202202+function isReconnectable(err: unknown): boolean {
203203+ // Network errors are reconnectable.
204204+ // AuthenticationRequired and InvalidRequest XRPCErrors are not reconnectable.
205205+ // @TODO method-specific XRPCErrors may be reconnectable, need to consider. Receiving
206206+ // an invalid message is not current reconnectable, but the user can decide to skip them.
207207+ if (!isErrorWithCode(err)) return false;
208208+ return typeof err.code === "string" && networkErrorCodes.includes(err.code);
209209+}
210210+211211+/**
212212+ * List of error codes that indicate network-related issues.
213213+ * These errors typically warrant a reconnection attempt.
214214+ */
215215+const networkErrorCodes = [
216216+ "EWSABNORMALCLOSE",
217217+ "ECONNRESET",
218218+ "ECONNREFUSED",
219219+ "ECONNABORTED",
220220+ "EPIPE",
221221+ "ETIMEDOUT",
222222+ "ECANCELED",
223223+];
224224+225225+/**
226226+ * Calculates the backoff duration for reconnection attempts.
227227+ * Uses exponential backoff with random jitter.
228228+ * @param {number} n - The number of reconnection attempts so far
229229+ * @param {number} maxMs - Maximum backoff duration in milliseconds
230230+ * @returns {number} The backoff duration in milliseconds
231231+ */
232232+function backoffMs(n: number, maxMs: number) {
233233+ const baseSec = Math.pow(2, n); // 1, 2, 4, ...
234234+ const randSec = Math.random() - 0.5; // Random jitter between -.5 and .5 seconds
235235+ const ms = 1000 * (baseSec + randSec);
236236+ return Math.min(ms, maxMs);
237237+}
238238+239239+/**
240240+ * Forwards abort signals from one AbortController to another.
241241+ * @param {AbortSignal} signal - The source abort signal
242242+ * @param {AbortController} ac - The target abort controller
243243+ */
244244+function forwardSignal(signal: AbortSignal, ac: AbortController) {
245245+ if (signal.aborted) {
246246+ return ac.abort(signal.reason);
247247+ } else {
248248+ signal.addEventListener("abort", () => ac.abort(signal.reason), {
249249+ // @ts-ignore https://github.com/DefinitelyTyped/DefinitelyTyped/pull/68625
250250+ signal: ac.signal,
251251+ });
252252+ }
253253+}
···11+import { z } from "zod";
22+import type { ErrorResult, XRPCError } from "./errors.ts";
33+import type { CalcKeyFn, CalcPointsFn } from "./rate-limiter.ts";
44+import type { RateLimiterI } from "./rate-limiter.ts";
55+66+/**
77+ * Represents a value that can be either synchronous or asynchronous.
88+ * @template T - The type of the value
99+ */
1010+export type Awaitable<T> = T | Promise<T>;
1111+1212+/**
1313+ * Handler function for catching all unmatched routes.
1414+ * @param req - The HTTP request object
1515+ * @returns A promise that resolves to a Response
1616+ */
1717+export type CatchallHandler = (
1818+ req: Request,
1919+ res: Response,
2020+) => Promise<Response>;
2121+2222+/**
2323+ * Configuration options for the XRPC server.
2424+ */
2525+export type Options = {
2626+ /** Whether to validate response schemas */
2727+ validateResponse?: boolean;
2828+ /** Handler for catching all unmatched routes */
2929+ catchall?: CatchallHandler;
3030+ /** Payload size limits for different content types */
3131+ payload?: RouteOptions;
3232+ /** Rate limiting configuration */
3333+ rateLimits?: {
3434+ /** Factory function for creating rate limiters */
3535+ creator: RateLimiterCreator<HandlerContext>;
3636+ /** Global rate limits applied to all routes */
3737+ global?: ServerRateLimitDescription<HandlerContext>[];
3838+ /** Shared rate limits that can be referenced by name */
3939+ shared?: ServerRateLimitDescription<HandlerContext>[];
4040+ /** Function to determine if rate limits should be bypassed for a request */
4141+ bypass?: (ctx: HandlerContext) => boolean;
4242+ };
4343+ /**
4444+ * By default, errors are converted to {@link XRPCError} using
4545+ * {@link XRPCError.fromError} before being rendered. If method handlers throw
4646+ * error objects that are not properly rendered in the HTTP response, this
4747+ * function can be used to properly convert them to {@link XRPCError}. The
4848+ * provided function will typically fallback to the default error conversion
4949+ * (`return XRPCError.fromError(err)`) if the error is not recognized.
5050+ *
5151+ * @note This function should not throw errors.
5252+ */
5353+ errorParser?: (err: unknown) => XRPCError;
5454+};
5555+5656+/**
5757+ * Basic primitive types supported in XRPC parameters.
5858+ */
5959+export type Primitive = string | number | boolean;
6060+6161+/**
6262+ * Type-safe parameter object with optional primitive values or arrays.
6363+ */
6464+export type Params = { [P in string]?: undefined | Primitive | Primitive[] };
6565+6666+/**
6767+ * Input data for XRPC method handlers.
6868+ */
6969+export type HandlerInput = {
7070+ /** Content encoding of the request body */
7171+ encoding: string;
7272+ /** Parsed request body */
7373+ body: unknown;
7474+};
7575+7676+/**
7777+ * Result of successful authentication.
7878+ */
7979+export type AuthResult = {
8080+ /** Authentication credentials (e.g., user info, tokens) */
8181+ credentials: unknown;
8282+ /** Optional authentication artifacts (e.g., session data) */
8383+ artifacts?: unknown;
8484+};
8585+8686+export const headersSchema: z.ZodRecord<z.ZodString, z.ZodString> = z.record(
8787+ z.string(),
8888+ z.string(),
8989+);
9090+9191+/**
9292+ * HTTP headers as a record of string key-value pairs.
9393+ */
9494+export type Headers = z.infer<typeof headersSchema>;
9595+9696+export const handlerSuccess: z.ZodObject<{
9797+ encoding: z.ZodString;
9898+ body: z.ZodAny;
9999+ headers: z.ZodOptional<z.ZodRecord<z.ZodString, z.ZodString>>;
100100+}> = z.object({
101101+ encoding: z.string(),
102102+ body: z.any(),
103103+ headers: headersSchema.optional(),
104104+});
105105+106106+/**
107107+ * Successful response from a method handler.
108108+ */
109109+export type HandlerSuccess = z.infer<typeof handlerSuccess>;
110110+111111+/**
112112+ * Handler response that pipes through a buffer.
113113+ */
114114+export type HandlerPipeThroughBuffer = {
115115+ /** Content encoding of the response */
116116+ encoding: string;
117117+ /** Response data as a buffer */
118118+ buffer: Uint8Array;
119119+ /** Optional HTTP headers */
120120+ headers?: Headers;
121121+};
122122+123123+/**
124124+ * Handler response that pipes through a stream.
125125+ */
126126+export type HandlerPipeThroughStream = {
127127+ /** Content encoding of the response */
128128+ encoding: string;
129129+ /** Response data as a readable stream */
130130+ stream: ReadableStream<Uint8Array>;
131131+ /** Optional HTTP headers */
132132+ headers?: Headers;
133133+};
134134+135135+/**
136136+ * Union type for handler responses that pipe data through either a buffer or stream.
137137+ */
138138+export type HandlerPipeThrough =
139139+ | HandlerPipeThroughBuffer
140140+ | HandlerPipeThroughStream;
141141+142142+/**
143143+ * Authentication state for a handler context.
144144+ */
145145+export type Auth = void | AuthResult;
146146+147147+/**
148148+ * Input data for a handler context.
149149+ */
150150+export type Input = void | HandlerInput;
151151+152152+/**
153153+ * Output data from a handler.
154154+ */
155155+export type Output = void | HandlerSuccess | ErrorResult;
156156+157157+/**
158158+ * Function that verifies authentication for a request.
159159+ * @template C - The context type
160160+ * @template A - The authentication result type
161161+ */
162162+export type AuthVerifier<C, A extends AuthResult = AuthResult> =
163163+ | ((ctx: C) => Awaitable<A | ErrorResult>)
164164+ | ((ctx: C) => Awaitable<A>);
165165+166166+/**
167167+ * Context object provided to XRPC method handlers containing request data and utilities.
168168+ * @template A - Authentication type
169169+ * @template P - Parameters type
170170+ * @template I - Input type
171171+ */
172172+export type HandlerContext<
173173+ A extends Auth = Auth,
174174+ P extends Params = Params,
175175+ I extends Input = Input,
176176+> = MethodAuthContext<P> & {
177177+ /** Authentication result */
178178+ auth: A;
179179+ /** Request input data */
180180+ input: I;
181181+ /** Function to reset rate limits for this route */
182182+ resetRouteRateLimits: () => Promise<void>;
183183+};
184184+185185+/**
186186+ * Handler function for XRPC methods.
187187+ * @template A - Authentication type
188188+ * @template P - Parameters type
189189+ * @template I - Input type
190190+ * @template O - Output type
191191+ */
192192+export type MethodHandler<
193193+ A extends Auth = Auth,
194194+ P extends Params = Params,
195195+ I extends Input = Input,
196196+ O extends Output = Output,
197197+> = (ctx: HandlerContext<A, P, I>) => Awaitable<O | HandlerPipeThrough>;
198198+199199+/**
200200+ * Factory function for creating rate limiter instances.
201201+ * @template T - The handler context type
202202+ */
203203+export type RateLimiterCreator<T extends HandlerContext = HandlerContext> = <
204204+ C extends T = T,
205205+>(opts: {
206206+ /** Prefix for rate limiter keys */
207207+ keyPrefix: string;
208208+ /** Duration window in milliseconds */
209209+ durationMs: number;
210210+ /** Number of points allowed in the duration window */
211211+ points: number;
212212+ /** Function to calculate the rate limit key */
213213+ calcKey: CalcKeyFn<C>;
214214+ /** Function to calculate points consumed */
215215+ calcPoints: CalcPointsFn<C>;
216216+ /** Whether to fail closed (deny) when rate limiter is unavailable */
217217+ failClosed?: boolean;
218218+}) => RateLimiterI<C>;
219219+220220+/**
221221+ * Context object for method authentication containing request data.
222222+ * @template P - Parameters type
223223+ * @template I - Input type
224224+ */
225225+export type MethodAuthContext<
226226+ P extends Params = Params,
227227+ I extends Input = Input,
228228+> = {
229229+ /** Parsed request parameters */
230230+ params: P;
231231+ /** Request input data */
232232+ input: I;
233233+ /** HTTP request object */
234234+ req: Request;
235235+ /** HTTP response object */
236236+ res: Response;
237237+};
238238+239239+/**
240240+ * Authentication verifier function for XRPC methods.
241241+ * @template A - Authentication result type
242242+ * @template P - Parameters type
243243+ * @template I - Input type
244244+ */
245245+export type MethodAuthVerifier<
246246+ A extends AuthResult = AuthResult,
247247+ P extends Params = Params,
248248+ I extends Input = Input,
249249+> = (ctx: MethodAuthContext<P, I>) => Awaitable<A>;
250250+251251+/**
252252+ * Context object for streaming handlers.
253253+ * @template A - Authentication type
254254+ * @template P - Parameters type
255255+ */
256256+export type StreamContext<
257257+ A extends Auth = Auth,
258258+ P extends Params = Params,
259259+> = StreamAuthContext<P> & {
260260+ /** Authentication result */
261261+ auth: A;
262262+ /** Abort signal for cancelling the stream */
263263+ signal: AbortSignal;
264264+};
265265+266266+/**
267267+ * Handler function for streaming XRPC endpoints.
268268+ * @template A - Authentication type
269269+ * @template P - Parameters type
270270+ * @template O - Output item type
271271+ */
272272+export type StreamHandler<
273273+ A extends Auth = Auth,
274274+ P extends Params = Params,
275275+ O = unknown,
276276+> = (ctx: StreamContext<A, P>) => AsyncIterable<O>;
277277+278278+/**
279279+ * Context object for stream authentication.
280280+ * @template P - Parameters type
281281+ */
282282+export type StreamAuthContext<P extends Params = Params> = {
283283+ /** Parsed request parameters */
284284+ params: P;
285285+ /** HTTP request object */
286286+ req: Request;
287287+};
288288+289289+/**
290290+ * Authentication verifier function for streaming endpoints.
291291+ * @template A - Authentication result type
292292+ * @template P - Parameters type
293293+ */
294294+export type StreamAuthVerifier<
295295+ A extends AuthResult = AuthResult,
296296+ P extends Params = Params,
297297+> = AuthVerifier<StreamAuthContext<P>, A>;
298298+299299+/**
300300+ * Configuration for server-level rate limits.
301301+ * @template C - Handler context type
302302+ */
303303+export type ServerRateLimitDescription<
304304+ C extends HandlerContext = HandlerContext,
305305+> = {
306306+ /** Unique name for this rate limit */
307307+ name: string;
308308+ /** Duration window in milliseconds */
309309+ durationMs: number;
310310+ /** Number of points allowed in the duration window */
311311+ points: number;
312312+ /** Optional function to calculate the rate limit key */
313313+ calcKey?: CalcKeyFn<C>;
314314+ /** Optional function to calculate points consumed */
315315+ calcPoints?: CalcPointsFn<C>;
316316+ /** Whether to fail closed when rate limiter is unavailable */
317317+ failClosed?: boolean;
318318+};
319319+320320+/**
321321+ * Options for referencing a shared rate limit by name.
322322+ * @template C - Handler context type
323323+ */
324324+export type SharedRateLimitOpts<C extends HandlerContext = HandlerContext> = {
325325+ /** Name of the shared rate limit to use */
326326+ name: string;
327327+ /** Optional function to calculate the rate limit key */
328328+ calcKey?: CalcKeyFn<C>;
329329+ /** Optional function to calculate points consumed */
330330+ calcPoints?: CalcPointsFn<C>;
331331+};
332332+333333+/**
334334+ * Options for defining a route-specific rate limit.
335335+ * @template C - Handler context type
336336+ */
337337+export type RouteRateLimitOpts<C extends HandlerContext = HandlerContext> = {
338338+ /** Duration window in milliseconds */
339339+ durationMs: number;
340340+ /** Number of points allowed in the duration window */
341341+ points: number;
342342+ /** Optional function to calculate the rate limit key */
343343+ calcKey?: CalcKeyFn<C>;
344344+ /** Optional function to calculate points consumed */
345345+ calcPoints?: CalcPointsFn<C>;
346346+};
347347+348348+/**
349349+ * Union type for rate limit options - either shared or route-specific.
350350+ * @template C - Handler context type
351351+ */
352352+export type RateLimitOpts<C extends HandlerContext = HandlerContext> =
353353+ | SharedRateLimitOpts<C>
354354+ | RouteRateLimitOpts<C>;
355355+356356+/**
357357+ * Type guard to check if rate limit options are for a shared rate limit.
358358+ * @template C - Handler context type
359359+ * @param opts Rate limit options to check
360360+ * @returns True if the options reference a shared rate limit
361361+ */
362362+export function isSharedRateLimitOpts<
363363+ C extends HandlerContext = HandlerContext,
364364+>(opts: RateLimitOpts<C>): opts is SharedRateLimitOpts<C> {
365365+ return "name" in opts && typeof opts.name === "string";
366366+}
367367+368368+/**
369369+ * Options for configuring payload size limits by content type.
370370+ */
371371+export type RouteOptions = {
372372+ /** Maximum size for binary/blob payloads in bytes */
373373+ blobLimit?: number;
374374+ /** Maximum size for JSON payloads in bytes */
375375+ jsonLimit?: number;
376376+ /** Maximum size for text payloads in bytes */
377377+ textLimit?: number;
378378+};
379379+380380+/**
381381+ * Simplified route options with only blob limit configuration.
382382+ */
383383+export type RouteOpts = {
384384+ /** Maximum size for binary/blob payloads in bytes */
385385+ blobLimit?: number;
386386+};
387387+388388+/**
389389+ * Configuration object for an XRPC method including handler, auth, and options.
390390+ * @template A - Authentication type
391391+ * @template P - Parameters type
392392+ * @template I - Input type
393393+ * @template O - Output type
394394+ */
395395+export type MethodConfig<
396396+ A extends Auth = Auth,
397397+ P extends Params = Params,
398398+ I extends Input = Input,
399399+ O extends Output = Output,
400400+> = {
401401+ /** The method handler function */
402402+ handler: MethodHandler<A, P, I, O>;
403403+ /** Optional authentication verifier */
404404+ auth?: MethodAuthVerifier<Extract<A, AuthResult>, P>;
405405+ /** Optional route configuration */
406406+ opts?: RouteOptions;
407407+ /** Optional rate limiting configuration */
408408+ rateLimit?:
409409+ | RateLimitOpts<HandlerContext<A, P, I>>
410410+ | RateLimitOpts<HandlerContext<A, P, I>>[];
411411+};
412412+413413+/**
414414+ * Union type allowing either a simple handler function or full method configuration.
415415+ * @template A - Authentication type
416416+ * @template P - Parameters type
417417+ * @template I - Input type
418418+ * @template O - Output type
419419+ */
420420+export type MethodConfigOrHandler<
421421+ A extends Auth = Auth,
422422+ P extends Params = Params,
423423+ I extends Input = Input,
424424+ O extends Output = Output,
425425+> = MethodHandler<A, P, I, O> | MethodConfig<A, P, I, O>;
426426+427427+/**
428428+ * Configuration object for a streaming XRPC endpoint.
429429+ * @template A - Authentication type
430430+ * @template P - Parameters type
431431+ * @template O - Output item type
432432+ */
433433+export type StreamConfig<
434434+ A extends Auth = Auth,
435435+ P extends Params = Params,
436436+ O = unknown,
437437+> = {
438438+ /** Optional authentication verifier for the stream */
439439+ auth?: StreamAuthVerifier<Extract<A, AuthResult>, P>;
440440+ /** The stream handler function */
441441+ handler: StreamHandler<A, P, O>;
442442+};
443443+444444+/**
445445+ * Union type allowing either a simple stream handler or full stream configuration.
446446+ * @template A - Authentication type
447447+ * @template P - Parameters type
448448+ * @template O - Output item type
449449+ */
450450+export type StreamConfigOrHandler<
451451+ A extends Auth = Auth,
452452+ P extends Params = Params,
453453+ O = unknown,
454454+> = StreamHandler<A, P, O> | StreamConfig<A, P, O>;
455455+456456+/**
457457+ * Type guard to check if handler output is a pipe-through buffer response.
458458+ * @param output - The handler output to check
459459+ * @returns True if the output is a buffer pipe-through response
460460+ */
461461+export function isHandlerPipeThroughBuffer(
462462+ output: Output | HandlerPipeThrough,
463463+): output is HandlerPipeThroughBuffer {
464464+ // We only need to discriminate between possible Output values
465465+ return output != null && "buffer" in output && output["buffer"] !== undefined;
466466+}
467467+468468+/**
469469+ * Type guard to check if handler output is a pipe-through stream response.
470470+ * @param output - The handler output to check
471471+ * @returns True if the output is a stream pipe-through response
472472+ */
473473+export function isHandlerPipeThroughStream(
474474+ output: Output | HandlerPipeThrough,
475475+): output is HandlerPipeThroughStream {
476476+ // We only need to discriminate between possible Output values
477477+ return output != null && "stream" in output && output["stream"] !== undefined;
478478+}
+547
xrpc-server/util.ts
···11+import type {
22+ Lexicons,
33+ LexXrpcProcedure,
44+ LexXrpcQuery,
55+ LexXrpcSubscription,
66+} from "@atproto/lexicon";
77+import { jsonToLex } from "@atproto/lexicon";
88+import { InternalServerError, InvalidRequestError } from "./errors.ts";
99+import { handlerSuccess } from "./types.ts";
1010+import type { HandlerInput, HandlerSuccess, Params } from "./types.ts";
1111+import type { Context, HonoRequest } from "hono";
1212+1313+function assert(condition: unknown, message?: string): asserts condition {
1414+ if (!condition) {
1515+ throw new Error(message || "Assertion failed");
1616+ }
1717+}
1818+1919+/**
2020+ * Decodes query parameters from HTTP request into typed parameters.
2121+ * Handles type conversion for strings, numbers, booleans, and arrays based on lexicon definitions.
2222+ * @param def - The lexicon definition containing parameter schema
2323+ * @param params - Raw query parameters from the HTTP request
2424+ * @returns Decoded and type-converted parameters
2525+ */
2626+export function decodeQueryParams(
2727+ def: LexXrpcProcedure | LexXrpcQuery | LexXrpcSubscription,
2828+ params: Record<string, string | string[]>,
2929+): Params {
3030+ const decoded: Params = {};
3131+ if (!def.parameters?.properties) {
3232+ return decoded;
3333+ }
3434+3535+ for (const k in def.parameters.properties) {
3636+ const property = def.parameters.properties[k];
3737+ const val = params[k];
3838+ if (property && val !== undefined) {
3939+ if (property.type === "array") {
4040+ const vals = (Array.isArray(val) ? val : [val]).filter(
4141+ (v) => v !== undefined,
4242+ );
4343+ decoded[k] = vals
4444+ .map((v) => decodeQueryParam(property.items?.type || "string", v))
4545+ .filter((v) => v !== undefined) as (string | number | boolean)[];
4646+ } else {
4747+ const actualVal = Array.isArray(val) ? val[0] : val;
4848+ decoded[k] = decodeQueryParam(property.type, actualVal);
4949+ }
5050+ }
5151+ }
5252+ return decoded;
5353+}
5454+5555+/**
5656+ * Decodes a single query parameter value based on its expected type.
5757+ * Converts string values to appropriate JavaScript types (string, number, boolean).
5858+ * @param type - The expected parameter type from the lexicon
5959+ * @param value - The raw parameter value from the query string
6060+ * @returns The decoded parameter value or undefined if conversion fails
6161+ */
6262+export function decodeQueryParam(
6363+ type: string,
6464+ value: unknown,
6565+): string | number | boolean | undefined {
6666+ if (!value) {
6767+ return undefined;
6868+ }
6969+ if (type === "string" || type === "datetime") {
7070+ return String(value);
7171+ }
7272+ if (type === "float") {
7373+ return Number(String(value));
7474+ } else if (type === "integer") {
7575+ return parseInt(String(value), 10) || 0;
7676+ } else if (type === "boolean") {
7777+ return value === "true";
7878+ }
7979+}
8080+8181+/**
8282+ * Extracts query parameters from a URL and returns them as arrays of strings.
8383+ * @param url - The URL to parse (defaults to empty string)
8484+ * @returns Object mapping parameter names to arrays of values
8585+ */
8686+export function getQueryParams(url = ""): Record<string, string[]> {
8787+ const { searchParams } = new URL(url ?? "", "http://x");
8888+ const result: Record<string, string[]> = {};
8989+ for (const key of searchParams.keys()) {
9090+ result[key] = searchParams.getAll(key);
9191+ }
9292+ return result;
9393+}
9494+9595+/**
9696+ * Represents a request-like object with essential HTTP request properties.
9797+ * Used for handling both standard HTTP requests and custom request implementations.
9898+ */
9999+export type RequestLike = {
100100+ headers: Headers | { [key: string]: string | string[] | undefined };
101101+ body?: ReadableStream | unknown;
102102+ method?: string;
103103+ url?: string;
104104+ signal?: AbortSignal;
105105+};
106106+107107+/**
108108+ * Validates the input of an XRPC method against its lexicon definition.
109109+ * Performs content-type validation, body presence checks, and schema validation.
110110+ * @param nsid - The namespace identifier of the method
111111+ * @param def - The lexicon definition for the method
112112+ * @param body - The request body content
113113+ * @param contentType - The Content-Type header value
114114+ * @param lexicons - The lexicon registry for schema validation
115115+ * @returns Validated handler input or undefined for methods without input
116116+ * @throws {InvalidRequestError} If validation fails
117117+ */
118118+export async function validateInput(
119119+ nsid: string,
120120+ def: LexXrpcProcedure | LexXrpcQuery,
121121+ body: unknown,
122122+ contentType: string | undefined | null,
123123+ lexicons: Lexicons,
124124+): Promise<HandlerInput | undefined> {
125125+ let processedBody: unknown | Uint8Array = body;
126126+ if (body instanceof ReadableStream) {
127127+ const reader = body.getReader();
128128+ const chunks: Uint8Array[] = [];
129129+ while (true) {
130130+ const { done, value } = await reader.read();
131131+ if (done) break;
132132+ chunks.push(value);
133133+ }
134134+ const totalLength = chunks.reduce((acc, chunk) => acc + chunk.length, 0);
135135+ const tempBody = new Uint8Array(totalLength);
136136+ let offset = 0;
137137+ for (const chunk of chunks) {
138138+ tempBody.set(chunk, offset);
139139+ offset += chunk.length;
140140+ }
141141+ processedBody = tempBody;
142142+ }
143143+144144+ const bodyPresence = getBodyPresence(processedBody, contentType);
145145+ if (bodyPresence === "present" && (def.type !== "procedure" || !def.input)) {
146146+ throw new InvalidRequestError(
147147+ `A request body was provided when none was expected`,
148148+ );
149149+ }
150150+ if (def.type === "query") {
151151+ return;
152152+ }
153153+ if (bodyPresence === "missing" && def.input) {
154154+ throw new InvalidRequestError(
155155+ `A request body is expected but none was provided`,
156156+ );
157157+ }
158158+159159+ // mimetype
160160+ const inputEncoding = normalizeMime(contentType || "");
161161+ if (
162162+ def.input?.encoding &&
163163+ (!inputEncoding || !isValidEncoding(def.input?.encoding, inputEncoding))
164164+ ) {
165165+ if (!inputEncoding) {
166166+ throw new InvalidRequestError(
167167+ `Request encoding (Content-Type) required but not provided`,
168168+ );
169169+ } else {
170170+ throw new InvalidRequestError(
171171+ `Wrong request encoding (Content-Type): ${inputEncoding}`,
172172+ );
173173+ }
174174+ }
175175+176176+ if (!inputEncoding) {
177177+ // no input body
178178+ return undefined;
179179+ }
180180+181181+ // if input schema, validate
182182+ if (def.input?.schema) {
183183+ try {
184184+ const lexBody = processedBody ? jsonToLex(processedBody) : processedBody;
185185+ processedBody = lexicons.assertValidXrpcInput(nsid, lexBody);
186186+ } catch (e) {
187187+ throw new InvalidRequestError(e instanceof Error ? e.message : String(e));
188188+ }
189189+ }
190190+191191+ return {
192192+ encoding: inputEncoding,
193193+ body: processedBody,
194194+ };
195195+}
196196+197197+/**
198198+ * Validates the output of an XRPC method against its lexicon definition.
199199+ * Performs response body validation, content-type checks, and schema validation.
200200+ * @param nsid - The namespace identifier of the method
201201+ * @param def - The lexicon definition for the method
202202+ * @param output - The handler output to validate
203203+ * @param lexicons - The lexicon registry for schema validation
204204+ * @throws {InternalServerError} If validation fails
205205+ */
206206+export function validateOutput(
207207+ nsid: string,
208208+ def: LexXrpcProcedure | LexXrpcQuery,
209209+ output: HandlerSuccess | undefined,
210210+ lexicons: Lexicons,
211211+): void {
212212+ // initial validation
213213+ if (output) {
214214+ handlerSuccess.parse(output);
215215+ }
216216+217217+ // response expectation
218218+ if (output?.body && !def.output) {
219219+ throw new InternalServerError(
220220+ `A response body was provided when none was expected`,
221221+ );
222222+ }
223223+ if (!output?.body && def.output) {
224224+ throw new InternalServerError(
225225+ `A response body is expected but none was provided`,
226226+ );
227227+ }
228228+229229+ // mimetype
230230+ if (
231231+ def.output?.encoding &&
232232+ (!output?.encoding ||
233233+ !isValidEncoding(def.output?.encoding, output?.encoding))
234234+ ) {
235235+ throw new InternalServerError(
236236+ `Invalid response encoding: ${output?.encoding}`,
237237+ );
238238+ }
239239+240240+ // output schema
241241+ if (def.output?.schema) {
242242+ try {
243243+ const result = lexicons.assertValidXrpcOutput(nsid, output?.body);
244244+ if (output) {
245245+ output.body = result;
246246+ }
247247+ } catch (e) {
248248+ throw new InternalServerError(e instanceof Error ? e.message : String(e));
249249+ }
250250+ }
251251+}
252252+253253+/**
254254+ * Normalizes a MIME type by extracting the base type and converting to lowercase.
255255+ * Removes parameters (e.g., charset) from the MIME type.
256256+ * @param mime - The MIME type string to normalize
257257+ * @returns The normalized MIME type (base type only)
258258+ */
259259+export function normalizeMime(mime: string): string {
260260+ const [base] = mime.split(";");
261261+ return base.trim().toLowerCase();
262262+}
263263+264264+/**
265265+ * Checks if an actual encoding matches the expected encoding.
266266+ * Supports wildcard matching and JSON aliases.
267267+ * @param expected - The expected encoding from the lexicon
268268+ * @param actual - The actual encoding from the request
269269+ * @returns True if the encodings are compatible
270270+ */
271271+function isValidEncoding(expected: string, actual: string): boolean {
272272+ if (expected === "*/*") return true;
273273+ if (expected === actual) return true;
274274+ if (expected === "application/json" && actual === "json") return true;
275275+ return false;
276276+}
277277+278278+/**
279279+ * Determines if a request body is present or missing.
280280+ * Considers empty strings and empty arrays as missing when no content type is provided.
281281+ * @param body - The request body
282282+ * @param contentType - The Content-Type header value
283283+ * @returns "present" if body exists, "missing" otherwise
284284+ */
285285+function getBodyPresence(
286286+ body: unknown,
287287+ contentType: string | undefined | null,
288288+): "present" | "missing" {
289289+ if (body === undefined || body === null) {
290290+ return "missing";
291291+ }
292292+ if (typeof body === "string" && body.length === 0 && !contentType) {
293293+ return "missing";
294294+ }
295295+ if (body instanceof Uint8Array && body.length === 0 && !contentType) {
296296+ return "missing";
297297+ }
298298+ return "present";
299299+}
300300+301301+/**
302302+ * Formats server timing data into an HTTP Server-Timing header value.
303303+ * Creates a header string with timing metrics for performance monitoring.
304304+ * @param timings - Array of timing measurements
305305+ * @returns Formatted Server-Timing header value
306306+ */
307307+export function serverTimingHeader(timings: ServerTiming[]): string {
308308+ return timings
309309+ .map((timing) => {
310310+ let header = timing.name;
311311+ if (timing.duration) header += `;dur=${timing.duration}`;
312312+ if (timing.description) header += `;desc="${timing.description}"`;
313313+ return header;
314314+ })
315315+ .join(", ");
316316+}
317317+318318+/**
319319+ * Utility class for measuring server-side operation timings.
320320+ * Provides start/stop functionality and implements the ServerTiming interface.
321321+ */
322322+export class ServerTimer implements ServerTiming {
323323+ public duration?: number;
324324+ private startMs?: number;
325325+ /**
326326+ * Creates a new ServerTimer instance.
327327+ * @param name Identifier for the timing measurement
328328+ * @param description Optional description of what is being timed
329329+ */
330330+ constructor(
331331+ public name: string,
332332+ public description?: string,
333333+ ) {}
334334+ /**
335335+ * Starts the timer by recording the current timestamp.
336336+ * @returns This timer instance for method chaining
337337+ */
338338+ start(): ServerTimer {
339339+ this.startMs = Date.now();
340340+ return this;
341341+ }
342342+ /**
343343+ * Stops the timer and calculates the duration.
344344+ * @returns This timer instance for method chaining
345345+ * @throws {Error} If the timer hasn't been started
346346+ */
347347+ stop(): ServerTimer {
348348+ assert(this.startMs, "timer hasn't been started");
349349+ this.duration = Date.now() - this.startMs;
350350+ return this;
351351+ }
352352+}
353353+354354+/**
355355+ * Represents timing information for server-side operations.
356356+ * Used for performance monitoring and debugging.
357357+ */
358358+export interface ServerTiming {
359359+ name: string;
360360+ duration?: number;
361361+ description?: string;
362362+}
363363+364364+/**
365365+ * Represents a minimal HTTP request with essential properties.
366366+ * Used when full request information is not needed.
367367+ */
368368+export interface MinimalRequest {
369369+ url?: string;
370370+ method?: string;
371371+ headers: Headers | { [key: string]: string | string[] | undefined };
372372+}
373373+374374+/**
375375+ * Validates and extracts the NSID from a request object.
376376+ * Convenience wrapper for parseUrlNsid that works with request objects.
377377+ * @param req - The request object containing a URL
378378+ * @returns The extracted NSID from the request URL
379379+ * @throws {InvalidRequestError} If the URL doesn't contain a valid XRPC path
380380+ */
381381+export const parseReqNsid = (
382382+ req: MinimalRequest | HonoRequest,
383383+): string => parseUrlNsid(req.url || "/");
384384+385385+/**
386386+ * Validates and extracts the NSID (Namespace Identifier) from an XRPC URL.
387387+ * Performs strict validation of the /xrpc/ path format and NSID syntax.
388388+ * @param url - The URL or path to parse
389389+ * @returns The extracted NSID
390390+ * @throws {InvalidRequestError} If the URL doesn't contain a valid XRPC path or NSID
391391+ */
392392+export const parseUrlNsid = (url: string): string => {
393393+ // Extract path from full URL if needed
394394+ let path = url;
395395+ try {
396396+ const urlObj = new URL(url);
397397+ path = urlObj.pathname;
398398+ } catch {
399399+ // If URL parsing fails, assume it's already a path
400400+ }
401401+402402+ if (
403403+ // Ordered by likelihood of failure
404404+ path.length <= 6 ||
405405+ path[5] !== "/" ||
406406+ path[4] !== "c" ||
407407+ path[3] !== "p" ||
408408+ path[2] !== "r" ||
409409+ path[1] !== "x" ||
410410+ path[0] !== "/"
411411+ ) {
412412+ throw new InvalidRequestError("invalid xrpc path");
413413+ }
414414+415415+ const startOfNsid = 6;
416416+417417+ let curr = startOfNsid;
418418+ let char: number;
419419+ let alphaNumRequired = true;
420420+ for (; curr < path.length; curr++) {
421421+ char = path.charCodeAt(curr);
422422+ if (
423423+ (char >= 48 && char <= 57) || // 0-9
424424+ (char >= 65 && char <= 90) || // A-Z
425425+ (char >= 97 && char <= 122) // a-z
426426+ ) {
427427+ alphaNumRequired = false;
428428+ } else if (char === 45 /* "-" */ || char === 46 /* "." */) {
429429+ if (alphaNumRequired) {
430430+ throw new InvalidRequestError("invalid xrpc path");
431431+ }
432432+ alphaNumRequired = true;
433433+ } else if (char === 47 /* "/" */) {
434434+ // Allow trailing slash (next char is either EOS or "?")
435435+ if (curr === path.length - 1 || path.charCodeAt(curr + 1) === 63) {
436436+ break;
437437+ }
438438+ throw new InvalidRequestError("invalid xrpc path");
439439+ } else if (char === 63 /* "?"" */) {
440440+ break;
441441+ } else {
442442+ throw new InvalidRequestError("invalid xrpc path");
443443+ }
444444+ }
445445+446446+ // last char was one of: '-', '.', '/'
447447+ if (alphaNumRequired) {
448448+ throw new InvalidRequestError("invalid xrpc path");
449449+ }
450450+451451+ // A domain name consists of minimum two characters
452452+ if (curr - startOfNsid < 2) {
453453+ throw new InvalidRequestError("invalid xrpc path");
454454+ }
455455+456456+ // @TODO is there a max ?
457457+458458+ return path.slice(startOfNsid, curr);
459459+};
460460+461461+/**
462462+ * Alias for parseUrlNsid for backward compatibility.
463463+ * @deprecated Use parseUrlNsid instead
464464+ */
465465+export const extractUrlNsid = parseUrlNsid;
466466+467467+/**
468468+ * Creates an input verifier function for XRPC methods.
469469+ * Returns a function that validates and processes request input based on lexicon definitions.
470470+ * @param lexicons - The lexicon registry for validation
471471+ * @param nsid - The namespace identifier of the method
472472+ * @param def - The lexicon definition for the method
473473+ * @returns A function that verifies request input
474474+ */
475475+export function createInputVerifier(
476476+ lexicons: Lexicons,
477477+ nsid: string,
478478+ def: LexXrpcProcedure | LexXrpcQuery,
479479+) {
480480+ return async (req: Request): Promise<HandlerInput | undefined> => {
481481+ if (def.type === "query") {
482482+ return undefined;
483483+ }
484484+485485+ const contentType = req.headers.get("content-type");
486486+ let body: unknown;
487487+488488+ // Clone the request to avoid consuming the body multiple times
489489+ const clonedReq = req.clone();
490490+491491+ if (contentType?.includes("application/json")) {
492492+ body = await clonedReq.json();
493493+ } else if (contentType?.includes("text/")) {
494494+ body = await clonedReq.text();
495495+ } else {
496496+ const arrayBuffer = await clonedReq.arrayBuffer();
497497+ body = new Uint8Array(arrayBuffer);
498498+ }
499499+500500+ return await validateInput(nsid, def, body, contentType, lexicons);
501501+ };
502502+}
503503+504504+/**
505505+ * Sets headers on a Hono context response.
506506+ * Iterates through the provided headers and sets them on the response.
507507+ * @param c - The Hono context object
508508+ * @param headers - Optional headers to set as key-value pairs
509509+ */
510510+export function setHeaders(c: Context, headers?: Record<string, string>) {
511511+ if (headers) {
512512+ for (const [key, value] of Object.entries(headers)) {
513513+ c.header(key, value);
514514+ }
515515+ }
516516+}
517517+518518+/**
519519+ * Converts a value to an array.
520520+ * If the value is already an array, returns it as-is. Otherwise, wraps it in an array.
521521+ * @template T - The type of the value
522522+ * @param value - The value to convert to an array
523523+ * @returns An array containing the value(s)
524524+ */
525525+export function asArray<T>(value: T | T[]): T[] {
526526+ return Array.isArray(value) ? value : [value];
527527+}
528528+529529+/**
530530+ * Decodes query parameters from URL search params into a typed parameter object.
531531+ * Converts arrays of single values to single values, preserves multiple values as arrays.
532532+ * @param params - Raw query parameters as arrays of strings
533533+ * @returns Decoded parameters with single values or arrays
534534+ */
535535+export function decodeUrlQueryParams(params: Record<string, string[]>): Params {
536536+ const decoded: Params = {};
537537+538538+ for (const [key, values] of Object.entries(params)) {
539539+ if (values.length === 1) {
540540+ decoded[key] = values[0];
541541+ } else if (values.length > 1) {
542542+ decoded[key] = values;
543543+ }
544544+ }
545545+546546+ return decoded;
547547+}