Suite of AT Protocol TypeScript libraries built on web standards
1import type { Context, Handler } from "hono";
2import { Hono } from "hono";
3import { Procedure, Query, Subscription } from "@atp/lex";
4import {
5 type LexiconDoc,
6 Lexicons,
7 type LexXrpcProcedure,
8 type LexXrpcQuery,
9 type LexXrpcSubscription,
10} from "@atp/lexicon";
11import {
12 excludeErrorResult,
13 InternalServerError,
14 InvalidRequestError,
15 isErrorResult,
16 MethodNotImplementedError,
17 XRPCError,
18} from "./errors.ts";
19import { type RateLimiterI, RouteRateLimiter } from "./rate-limiter.ts";
20import {
21 ErrorFrame,
22 Frame,
23 MessageFrame,
24 XrpcStreamServer,
25} from "./stream/index.ts";
26import {
27 type Auth,
28 type AuthResult,
29 type AuthVerifier,
30 type HandlerContext,
31 type HandlerSuccess,
32 type Input,
33 isHandlerPipeThroughBuffer,
34 isHandlerPipeThroughStream,
35 isSharedRateLimitOpts,
36 type LexMethodConfig,
37 type LexMethodHandler,
38 type LexMethodInput,
39 type LexMethodLike,
40 type LexMethodOutput,
41 type LexMethodParams,
42 type LexSubscriptionConfig,
43 type LexSubscriptionHandler,
44 type MethodConfig,
45 type MethodConfigOrHandler,
46 type MethodHandler,
47 type Options,
48 type Output,
49 type Params,
50 type ServerRateLimitDescription,
51 type StreamConfig,
52 type StreamConfigOrHandler,
53 type StreamContext,
54} from "./types.ts";
55import {
56 asArray,
57 createLexiconInputVerifier,
58 createLexiconParamsVerifier,
59 createSchemaInputVerifier,
60 createSchemaOutputVerifier,
61 createSchemaParamsVerifier,
62 parseUrlNsid,
63 setHeaders,
64 validateOutput,
65} from "./util.ts";
66import { check, ipldToJson, schema } from "@atp/common";
67import {
68 type CalcKeyFn,
69 type CalcPointsFn,
70 type RateLimiterOptions,
71 WrappedRateLimiter,
72 type WrappedRateLimiterOptions,
73} from "./rate-limiter.ts";
74import { assert } from "@std/assert";
75import type { CatchallHandler, RouteOptions } from "./types.ts";
76import {
77 mountStreamingRoutesDeno,
78 mountStreamingRoutesWorkers,
79 type XrpcMux,
80} from "./stream/adapters.ts";
81
82type LexAddConfig<
83 M extends Procedure | Query | Subscription,
84 A extends Auth = Auth,
85> = M extends Procedure | Query
86 ? LexMethodConfig<M, A> | LexMethodHandler<M, void>
87 : M extends Subscription
88 ? LexSubscriptionConfig<M, A> | LexSubscriptionHandler<M, void>
89 : never;
90
91/**
92 * Creates a new XRPC server instance
93 * @param lexicons - Optional array of lexicon documents to initialize the server with
94 * @param options - Optional server configuration options
95 */
96export function createServer(
97 lexicons?: LexiconDoc[],
98 options?: Options,
99): Server {
100 return new Server(lexicons, options);
101}
102
103/**
104 * XRPC server implementation that handles HTTP and WebSocket requests.
105 * Manages method registration, authentication, rate limiting, and streaming
106 * with automatic schema validation.
107 */
108export class Server {
109 /** The underlying Hono HTTP server instance */
110 app: Hono;
111 /** Map of NSID to WebSocket streaming servers for subscriptions */
112 subscriptions: Map<string, XrpcStreamServer> = new Map<
113 string,
114 XrpcStreamServer
115 >();
116 methodTypes: Map<string, "procedure" | "query" | "subscription"> = new Map<
117 string,
118 "procedure" | "query" | "subscription"
119 >();
120 /** Lexicon registry for schema validation and method definitions */
121 lex: Lexicons = new Lexicons();
122 /** Server configuration options */
123 options: Options;
124 /** Global rate limiter applied to all routes */
125 globalRateLimiter?: RouteRateLimiter<HandlerContext>;
126 /** Map of named shared rate limiters */
127 sharedRateLimiters?: Map<string, RateLimiterI<HandlerContext>>;
128
129 /**
130 * Creates a new XRPC server instance.
131 * @param lexicons - Optional array of lexicon documents to register
132 * @param opts - Server configuration options
133 */
134 constructor(lexicons?: LexiconDoc[], opts: Options = {}) {
135 this.app = new Hono();
136 this.options = opts;
137
138 if (lexicons) {
139 this.addLexicons(lexicons);
140 }
141
142 this.app.use("*", this.catchall);
143 this.app.onError(createErrorHandler(opts));
144
145 this.app.notFound((c) => {
146 const nsid = parseUrlNsid(c.req.url);
147 if (nsid) {
148 const methodType = this.methodTypes.get(nsid) ??
149 this.lex.getDef(nsid)?.type;
150 if (methodType) {
151 const expectedMethod = methodType === "procedure"
152 ? "POST"
153 : methodType === "query"
154 ? "GET"
155 : null;
156 if (expectedMethod != null && expectedMethod !== c.req.method) {
157 const error = new InvalidRequestError(
158 `Incorrect HTTP method (${c.req.method}) expected ${expectedMethod}`,
159 );
160 throw error;
161 }
162 } else {
163 const error = new MethodNotImplementedError();
164 throw error;
165 }
166 }
167 return c.text("Not Found", 404);
168 });
169
170 if (opts.rateLimits) {
171 const { global, shared, creator, bypass } = opts.rateLimits;
172
173 if (global) {
174 this.globalRateLimiter = RouteRateLimiter.from(
175 global.map((options) => creator(buildRateLimiterOptions(options))),
176 { bypass },
177 );
178 }
179
180 if (shared) {
181 this.sharedRateLimiters = new Map(
182 shared.map((options) => [
183 options.name,
184 creator(buildRateLimiterOptions(options)),
185 ]),
186 );
187 }
188 }
189
190 // Mount streaming (subscription) routes using runtime-specific Hono adapters.
191 {
192 const mux: XrpcMux = {
193 resolveForRequest: (req: Request) => {
194 const nsid = parseUrlNsid(req.url);
195 if (!nsid) return;
196 const sub = this.subscriptions.get(nsid);
197 if (!sub) return;
198 return {
199 handle: (req: Request, socket: WebSocket) => {
200 sub.handle(req, socket);
201 },
202 };
203 },
204 };
205
206 // Deno
207 if (globalThis.Deno?.version?.deno) {
208 mountStreamingRoutesDeno(this.app, mux);
209 } else if ("WebSocketPair" in globalThis) {
210 mountStreamingRoutesWorkers(this.app, mux);
211 } else {
212 // Node not supported for streaming subscriptions.
213 }
214 }
215 }
216
217 // handlers
218
219 add<M extends Procedure | Query | Subscription, A extends Auth = Auth>(
220 method: LexMethodLike<M>,
221 configOrHandler: LexAddConfig<M, A>,
222 ): void {
223 const schema = getLexMethod(method);
224 const config = typeof configOrHandler === "function"
225 ? { handler: configOrHandler }
226 : configOrHandler;
227
228 if (schema instanceof Procedure) {
229 return this.addProcedureSchema(
230 schema,
231 config as LexMethodConfig<Procedure, A>,
232 );
233 }
234
235 if (schema instanceof Query) {
236 return this.addQuerySchema(
237 schema,
238 config as LexMethodConfig<Query, A>,
239 );
240 }
241
242 return this.addSubscriptionSchema(
243 schema,
244 config as LexSubscriptionConfig<Subscription, A>,
245 );
246 }
247
248 protected addProcedureSchema<M extends Procedure, A extends Auth>(
249 schema: M,
250 config: LexMethodConfig<M, A>,
251 ): void {
252 this.app.post(
253 `/xrpc/${schema.nsid}`,
254 this.createHandlerInternal<
255 A,
256 LexMethodParams<M>,
257 LexMethodInput<M>,
258 LexMethodOutput<M>
259 >(
260 this.createAuthVerifier(config),
261 this.createSchemaParamsVerifier(schema),
262 this.createSchemaInputVerifier(schema, config.opts),
263 this.createRouteRateLimiter(schema.nsid, config),
264 config.handler,
265 this.createSchemaOutputVerifier(schema),
266 ),
267 );
268 this.methodTypes.set(schema.nsid, "procedure");
269 }
270
271 protected addQuerySchema<M extends Query, A extends Auth>(
272 schema: M,
273 config: LexMethodConfig<M, A>,
274 ): void {
275 this.app.get(
276 `/xrpc/${schema.nsid}`,
277 this.createHandlerInternal<
278 A,
279 LexMethodParams<M>,
280 LexMethodInput<M>,
281 LexMethodOutput<M>
282 >(
283 this.createAuthVerifier(config),
284 this.createSchemaParamsVerifier(schema),
285 this.createSchemaInputVerifier(schema, config.opts),
286 this.createRouteRateLimiter(schema.nsid, config),
287 config.handler,
288 this.createSchemaOutputVerifier(schema),
289 ),
290 );
291 this.methodTypes.set(schema.nsid, "query");
292 }
293
294 protected addSubscriptionSchema<M extends Subscription, A extends Auth>(
295 schema: M,
296 config: LexSubscriptionConfig<M, A>,
297 ): void {
298 const { handler } = config;
299 const messageSchema = this.options.validateResponse === false
300 ? undefined
301 : schema.message;
302
303 return this.addSubscriptionInternal(
304 schema.nsid,
305 this.createSchemaParamsVerifier(schema),
306 this.createAuthVerifier(config),
307 messageSchema
308 ? async function* (ctx) {
309 for await (const rawItem of handler(ctx)) {
310 const item = rawItem as unknown;
311 if (item instanceof MessageFrame) {
312 yield validateMessageFrame(item, messageSchema, schema.nsid);
313 continue;
314 }
315 if (item instanceof Frame) {
316 yield item;
317 continue;
318 }
319 yield messageSchema.parse(item);
320 }
321 }
322 : handler,
323 );
324 }
325
326 /**
327 * Registers a method handler for the specified NSID.
328 * @param nsid - The namespace identifier for the method
329 * @param configOrFn - Either a handler function or full method configuration
330 */
331 method(
332 nsid: string,
333 configOrFn: MethodConfigOrHandler,
334 ) {
335 this.addMethod(nsid, configOrFn);
336 }
337
338 /**
339 * Adds a method handler for the specified NSID.
340 * @param nsid - The namespace identifier for the method
341 * @param configOrFn - Either a handler function or full method configuration
342 * @throws {Error} If the method is not found in the lexicon or is not a query/procedure
343 */
344 addMethod(
345 nsid: string,
346 configOrFn: MethodConfigOrHandler,
347 ) {
348 const config = typeof configOrFn === "function"
349 ? { handler: configOrFn }
350 : configOrFn;
351 const def = this.lex.getDef(nsid);
352 if (!def || (def.type !== "query" && def.type !== "procedure")) {
353 throw new Error(`Method not found in lexicon: ${nsid}`);
354 }
355 this.addRoute(nsid, def, config);
356 }
357
358 /**
359 * Registers a streaming method handler for the specified NSID.
360 * @param nsid - The namespace identifier for the streaming method
361 * @param configOrFn - Either a stream handler function or full stream configuration
362 */
363 streamMethod(
364 nsid: string,
365 configOrFn: StreamConfigOrHandler,
366 ) {
367 this.addStreamMethod(nsid, configOrFn);
368 }
369
370 /**
371 * Adds a streaming method handler for the specified NSID.
372 * @param nsid - The namespace identifier for the streaming method
373 * @param configOrFn - Either a stream handler function or full stream configuration
374 * @throws {Error} If the subscription is not found in the lexicon
375 */
376 addStreamMethod(
377 nsid: string,
378 configOrFn: StreamConfigOrHandler,
379 ) {
380 const config = typeof configOrFn === "function"
381 ? { handler: configOrFn }
382 : configOrFn;
383 const def = this.lex.getDef(nsid);
384 if (!def || def.type !== "subscription") {
385 throw new Error(`Subscription not found in lexicon: ${nsid}`);
386 }
387 this.addSubscription(nsid, def, config);
388 }
389
390 // lexicon
391
392 /**
393 * Adds a lexicon document to the server's schema registry.
394 * @param doc - The lexicon document to add
395 */
396 addLexicon(doc: LexiconDoc) {
397 this.lex.add(doc);
398 }
399
400 /**
401 * Adds multiple lexicon documents to the server's schema registry.
402 * @param docs - Array of lexicon documents to add
403 */
404 addLexicons(docs: LexiconDoc[]) {
405 for (const doc of docs) {
406 this.addLexicon(doc);
407 }
408 }
409
410 // routes
411
412 /**
413 * Adds an HTTP route for the specified method.
414 * @param nsid - The namespace identifier for the method
415 * @param def - The lexicon definition for the method
416 * @param config - The method configuration including handler and options
417 * @protected
418 */
419 protected addRoute(
420 nsid: string,
421 def: LexXrpcQuery | LexXrpcProcedure,
422 config: MethodConfig,
423 ) {
424 const path = `/xrpc/${nsid}`;
425 const handler = this.createHandler(nsid, def, config);
426 this.methodTypes.set(nsid, def.type);
427
428 if (def.type === "procedure") {
429 this.app.post(path, handler);
430 } else {
431 this.app.get(path, handler);
432 }
433 }
434
435 /**
436 * Catchall handler that processes all XRPC routes and applies global rate limiting.
437 */
438 catchall: CatchallHandler = async (c, next) => {
439 if (!c.req.url.includes("/xrpc/")) {
440 return await next();
441 }
442
443 // Validate the NSID
444 const nsid = parseUrlNsid(c.req.url);
445 if (!nsid) {
446 throw new InvalidRequestError("invalid xrpc path");
447 }
448
449 if (this.globalRateLimiter) {
450 try {
451 await this.globalRateLimiter.handle({
452 req: c.req.raw,
453 res: new Response(),
454 auth: undefined,
455 params: {},
456 input: undefined,
457 async resetRouteRateLimits(): Promise<void> {
458 // Global rate limits don't have route-specific resets
459 },
460 });
461 } catch {
462 return await next();
463 }
464 }
465
466 // Ensure that known XRPC methods are only called with the correct HTTP
467 // method.
468 const methodType = this.methodTypes.get(nsid) ??
469 this.lex.getDef(nsid)?.type;
470 if (methodType) {
471 const expectedMethod = methodType === "procedure"
472 ? "POST"
473 : methodType === "query"
474 ? "GET"
475 : null;
476 if (expectedMethod != null && expectedMethod !== c.req.method) {
477 throw new InvalidRequestError(
478 `Incorrect HTTP method (${c.req.method}) expected ${expectedMethod}`,
479 );
480 }
481 }
482
483 if (this.options.catchall) {
484 return await this.options.catchall(c, next);
485 } else if (!methodType) {
486 throw new MethodNotImplementedError();
487 } else {
488 return await next();
489 }
490 };
491
492 /**
493 * Creates an authentication verification function.
494 * @param cfg - Configuration containing optional authentication verifier
495 * @returns A function that performs authentication for the method
496 * @protected
497 */
498 protected createAuthVerifier<C, A extends Auth>(cfg: {
499 auth?: AuthVerifier<C, A & AuthResult>;
500 }): ((ctx: C) => Promise<A>) | null {
501 const { auth } = cfg;
502 if (!auth) return null;
503
504 return async (ctx: C) => {
505 const result = await auth(ctx);
506 return excludeErrorResult(result);
507 };
508 }
509
510 private createLexiconParamsVerifier(
511 nsid: string,
512 def: LexXrpcQuery | LexXrpcProcedure | LexXrpcSubscription,
513 ): (req: Request) => Params {
514 return createLexiconParamsVerifier(nsid, def, this.lex);
515 }
516
517 private createLexiconInputVerifier(
518 nsid: string,
519 def: LexXrpcQuery | LexXrpcProcedure,
520 opts?: RouteOptions,
521 ): (req: Request) => Input | Promise<Input> {
522 return createLexiconInputVerifier(
523 nsid,
524 def,
525 {
526 blobLimit: opts?.blobLimit ?? this.options.payload?.blobLimit,
527 jsonLimit: opts?.jsonLimit ?? this.options.payload?.jsonLimit,
528 textLimit: opts?.textLimit ?? this.options.payload?.textLimit,
529 },
530 this.lex,
531 );
532 }
533
534 private createLexiconOutputVerifier(
535 nsid: string,
536 def: LexXrpcQuery | LexXrpcProcedure,
537 ): null | ((output: HandlerSuccess | void) => void) {
538 if (this.options.validateResponse === false) {
539 return null;
540 }
541
542 return (output) => validateOutput(nsid, def, output, this.lex);
543 }
544
545 private createSchemaParamsVerifier<
546 M extends Procedure | Query | Subscription,
547 >(
548 method: M,
549 ): (req: Request) => LexMethodParams<M> {
550 return createSchemaParamsVerifier(method);
551 }
552
553 private createSchemaInputVerifier<M extends Procedure | Query>(
554 method: M,
555 opts?: RouteOptions,
556 ): (req: Request) => LexMethodInput<M> | Promise<LexMethodInput<M>> {
557 return createSchemaInputVerifier(method, {
558 blobLimit: opts?.blobLimit ?? this.options.payload?.blobLimit,
559 jsonLimit: opts?.jsonLimit ?? this.options.payload?.jsonLimit,
560 textLimit: opts?.textLimit ?? this.options.payload?.textLimit,
561 });
562 }
563
564 private createSchemaOutputVerifier<M extends Procedure | Query>(
565 method: M,
566 ): null | ((output: LexMethodOutput<M>) => void) {
567 if (this.options.validateResponse === false) {
568 return null;
569 }
570
571 return createSchemaOutputVerifier(method);
572 }
573
574 /**
575 * Creates a Hono handler function for the specified XRPC method.
576 * @template A - The authentication type
577 * @param nsid - The namespace identifier for the method
578 * @param def - The lexicon definition for the method
579 * @param routeCfg - The method configuration including handler and options
580 * @returns A Hono handler function
581 */
582 createHandler<A extends Auth = Auth>(
583 nsid: string,
584 def: LexXrpcQuery | LexXrpcProcedure,
585 cfg: MethodConfig<A>,
586 ): Handler {
587 return this.createHandlerInternal<A, Params, Input, HandlerSuccess | void>(
588 this.createAuthVerifier(cfg),
589 this.createLexiconParamsVerifier(nsid, def),
590 this.createLexiconInputVerifier(nsid, def, cfg.opts),
591 this.createRouteRateLimiter(nsid, cfg),
592 cfg.handler as MethodHandler<A, Params, Input, HandlerSuccess | void>,
593 this.createLexiconOutputVerifier(nsid, def),
594 );
595 }
596
597 protected createHandlerInternal<
598 A extends Auth,
599 P extends Params,
600 I extends Input,
601 O extends HandlerSuccess | void,
602 >(
603 authVerifier:
604 | null
605 | ((ctx: { req: Request; res: Response; params: P }) => Promise<A>),
606 paramsVerifier: (req: Request) => P,
607 inputVerifier: (req: Request) => I | Promise<I>,
608 routeLimiter: RouteRateLimiter<HandlerContext<A, P, I>> | undefined,
609 handler: MethodHandler<A, P, I, O>,
610 validateOutput:
611 | null
612 | ((output: O) => void),
613 ): Handler {
614 return async (c: Context) => {
615 try {
616 const params = paramsVerifier(c.req.raw);
617
618 const auth: A = authVerifier
619 ? await authVerifier({ req: c.req.raw, res: c.res, params })
620 : (undefined as A);
621
622 const input = await inputVerifier(c.req.raw);
623
624 const ctx: HandlerContext<A, P, I> = {
625 req: c.req.raw,
626 res: new Response(),
627 params,
628 input,
629 auth,
630 resetRouteRateLimits: async () => {
631 if (routeLimiter) {
632 await routeLimiter.reset(ctx);
633 }
634 },
635 };
636
637 if (routeLimiter) {
638 await routeLimiter.handle(ctx);
639 }
640
641 const output = await handler(ctx);
642
643 if (output === undefined) {
644 validateOutput?.(output);
645 return c.body(null, 200);
646 }
647
648 if (isErrorResult(output)) {
649 throw XRPCError.fromErrorResult(output);
650 }
651
652 if (isHandlerPipeThroughBuffer(output)) {
653 setHeaders(c, output.headers);
654 return c.body(output.buffer.buffer as ArrayBuffer, 200, {
655 "Content-Type": output.encoding,
656 });
657 }
658
659 if (isHandlerPipeThroughStream(output)) {
660 setHeaders(c, output.headers);
661 return c.body(output.stream, 200, {
662 "Content-Type": output.encoding,
663 });
664 }
665
666 const successOutput = output as HandlerSuccess;
667 validateOutput?.(successOutput as O);
668 setHeaders(c, successOutput.headers);
669
670 if (successOutput.encoding === "application/json") {
671 return c.json(ipldToJson(successOutput.body) as JSON);
672 }
673
674 return c.body(successOutput.body, 200, {
675 "Content-Type": successOutput.encoding,
676 });
677 } catch (err: unknown) {
678 throw err || new InternalServerError();
679 }
680 };
681 }
682
683 /**
684 * Adds a WebSocket subscription handler for the specified NSID.
685 * @param nsid - The namespace identifier for the subscription
686 * @param def - The lexicon definition for the subscription
687 * @param config - The stream configuration
688 * @protected
689 */
690 protected addSubscription<A extends Auth = Auth>(
691 nsid: string,
692 def: LexXrpcSubscription,
693 cfg: StreamConfig<A>,
694 ) {
695 this.addSubscriptionInternal(
696 nsid,
697 this.createLexiconParamsVerifier(nsid, def),
698 this.createAuthVerifier(cfg),
699 cfg.handler,
700 );
701 }
702
703 protected addSubscriptionInternal<A extends Auth, P extends Params>(
704 nsid: string,
705 paramsVerifier: (req: Request) => P,
706 authVerifier:
707 | null
708 | ((ctx: { req: Request; params: P }) => Promise<A>),
709 handler: (ctx: StreamContext<A, P>) => AsyncIterable<unknown>,
710 ): void {
711 this.methodTypes.set(nsid, "subscription");
712 this.subscriptions.set(
713 nsid,
714 new XrpcStreamServer({
715 handler: async function* (req, signal) {
716 try {
717 const params = paramsVerifier(req);
718 const auth = authVerifier
719 ? await authVerifier({ req, params })
720 : (undefined as A);
721
722 for await (const item of handler({ req, params, auth, signal })) {
723 yield item instanceof Frame
724 ? item
725 : messageFrameFromValue(item, nsid);
726 }
727 } catch (err) {
728 const xrpcError = XRPCError.fromError(err);
729 yield new ErrorFrame({
730 error: xrpcError.payload.error ?? "Unknown",
731 message: xrpcError.payload.message,
732 });
733 }
734 },
735 }),
736 );
737 }
738
739 private createRouteRateLimiter<
740 A extends Auth,
741 P extends Params,
742 I extends Input,
743 O extends Output,
744 C extends HandlerContext<A, P, I> = HandlerContext<A, P, I>,
745 >(
746 nsid: string,
747 config: MethodConfig<A, P, I, O>,
748 ): RouteRateLimiter<C> | undefined {
749 // @NOTE global & shared rate limiters are instantiated with a context of
750 // HandlerContext which is compatible (more generic) with the context of
751 // this route specific rate limiters (C). For this reason, it's safe to
752 // cast these with an `any` context
753
754 const globalRateLimiter = this.globalRateLimiter as
755 | RouteRateLimiter<C>
756 | undefined;
757
758 // No route specific rate limiting configured, use the global rate limiter.
759 if (!config.rateLimit) return globalRateLimiter;
760
761 const { rateLimits } = this.options;
762
763 // @NOTE Silently ignore creation of route specific rate limiter if the
764 // `rateLimits` options was not provided to the constructor.
765 if (!rateLimits) return globalRateLimiter;
766
767 const { creator, bypass } = rateLimits;
768
769 const rateLimiters = asArray(config.rateLimit).map((options, i) => {
770 if (isSharedRateLimitOpts(options)) {
771 const rateLimiter = this.sharedRateLimiters?.get(options.name);
772
773 // The route config references a shared rate limiter that does not
774 // exist. This is a configuration error.
775 assert(
776 rateLimiter,
777 `Shared rate limiter "${options.name}" not defined`,
778 );
779
780 return WrappedRateLimiter.from<C>(
781 rateLimiter as unknown as RateLimiterI<C>,
782 options as unknown as WrappedRateLimiterOptions<C>,
783 );
784 } else {
785 return creator({
786 ...options,
787 calcKey: options.calcKey ?? defaultKey,
788 calcPoints: options.calcPoints ?? defaultPoints,
789 keyPrefix: `${nsid}-${i}`,
790 });
791 }
792 });
793
794 // If the route config contains an empty array, use global rate limiter.
795 if (!rateLimiters.length) return globalRateLimiter;
796
797 // The global rate limiter (if present) should be applied in addition to
798 // the route specific rate limiters.
799 if (globalRateLimiter) rateLimiters.push(globalRateLimiter);
800
801 return RouteRateLimiter.from<C>(
802 rateLimiters as unknown as readonly RateLimiterI<C>[],
803 { bypass },
804 );
805 }
806
807 /**
808 * Gets the underlying Hono app instance for external use.
809 * @returns The Hono application instance
810 */
811 get handler(): Hono {
812 return this.app;
813 }
814}
815
816function getLexMethod<M extends Procedure | Query | Subscription>(
817 method: LexMethodLike<M>,
818): M {
819 if (isLexMethod(method)) {
820 return method;
821 }
822
823 if ("main" in method && isLexMethod(method.main)) {
824 return method.main;
825 }
826
827 if ("Main" in method && isLexMethod(method.Main)) {
828 return method.Main;
829 }
830
831 throw new TypeError(
832 "Expected a lex method or a namespace with main/Main",
833 );
834}
835
836function isLexMethod(
837 value: unknown,
838): value is Procedure | Query | Subscription {
839 return value instanceof Procedure ||
840 value instanceof Query ||
841 value instanceof Subscription;
842}
843
844function messageFrameFromValue(
845 value: unknown,
846 nsid: string,
847): MessageFrame<unknown> {
848 const type = (value as Record<string, unknown>)?.["$type"];
849 if (!check.is(value, schema.map) || typeof type !== "string") {
850 return new MessageFrame(value);
851 }
852
853 const split = type.split("#");
854 let t: string;
855 if (split.length === 2 && (split[0] === "" || split[0] === nsid)) {
856 t = `#${split[1]}`;
857 } else {
858 t = type;
859 }
860
861 const clone = { ...(value as Record<string, unknown>) };
862 delete clone["$type"];
863 return new MessageFrame(clone, { type: t });
864}
865
866function validateMessageFrame(
867 frame: MessageFrame<unknown>,
868 messageSchema: { parse(input: unknown): unknown },
869 nsid: string,
870): MessageFrame<unknown> {
871 const parsedBody = parseMessageBody(
872 frame.body,
873 frame.type,
874 messageSchema,
875 nsid,
876 );
877 frame.body = stripMessageType(parsedBody, frame.type, nsid);
878 return frame;
879}
880
881function parseMessageBody(
882 value: unknown,
883 type: string | undefined,
884 messageSchema: { parse(input: unknown): unknown },
885 nsid: string,
886): unknown {
887 let lastError: unknown;
888
889 for (const candidate of getMessageValueCandidates(value, type, nsid)) {
890 try {
891 return messageSchema.parse(candidate);
892 } catch (error) {
893 lastError = error;
894 }
895 }
896
897 throw lastError;
898}
899
900function getMessageValueCandidates(
901 value: unknown,
902 type: string | undefined,
903 nsid: string,
904): unknown[] {
905 if (type === undefined || !check.is(value, schema.map)) {
906 return [value];
907 }
908
909 return getMessageTypeCandidates(type, nsid).map((candidateType) => ({
910 ...(value as Record<string, unknown>),
911 $type: candidateType,
912 }));
913}
914
915function getMessageTypeCandidates(type: string, nsid: string): string[] {
916 const candidates: string[] = [];
917 const normalized = normalizeMessageType(type, nsid);
918 const absolute = normalized.startsWith("#") ? `${nsid}${normalized}` : type;
919
920 for (const candidate of [type, normalized, absolute]) {
921 if (!candidates.includes(candidate)) {
922 candidates.push(candidate);
923 }
924 }
925
926 return candidates;
927}
928
929function stripMessageType(
930 value: unknown,
931 type: string | undefined,
932 nsid: string,
933): unknown {
934 const body = value as Record<string, unknown>;
935 if (
936 type === undefined ||
937 !check.is(value, schema.map) ||
938 typeof body["$type"] !== "string" ||
939 normalizeMessageType(body["$type"], nsid) !==
940 normalizeMessageType(type, nsid)
941 ) {
942 return value;
943 }
944
945 const clone = { ...body };
946 delete clone["$type"];
947 return clone;
948}
949
950function normalizeMessageType(type: string, nsid: string): string {
951 const split = type.split("#");
952 if (split.length === 2 && (split[0] === "" || split[0] === nsid)) {
953 return `#${split[1]}`;
954 }
955 return type;
956}
957
958function createErrorHandler(
959 opts: Options,
960): (err: Error, c: Context) => Response {
961 return (err: Error, c: Context): Response => {
962 const errorParser = opts.errorParser ||
963 ((e: unknown) => XRPCError.fromError(e));
964 const xrpcError = errorParser(err);
965
966 const statusCode = "statusCode" in xrpcError
967 ? (xrpcError as { statusCode: number }).statusCode
968 : 500;
969
970 const payload = xrpcError.payload;
971 return c.json(payload, statusCode as 500);
972 };
973}
974
975function buildRateLimiterOptions<C extends HandlerContext = HandlerContext>({
976 name,
977 calcKey = defaultKey,
978 calcPoints = defaultPoints,
979 ...desc
980}: ServerRateLimitDescription<C>): RateLimiterOptions<C> {
981 return { ...desc, calcKey, calcPoints, keyPrefix: `rl-${name}` };
982}
983
984const defaultPoints: CalcPointsFn = (): number => 1;
985
986const defaultKey: CalcKeyFn<HandlerContext> = ({ req }) => {
987 const forwarded = req.headers.get("x-forwarded-for");
988 const ip = forwarded
989 ? forwarded.split(",")[0]
990 : req.headers.get("x-real-ip") ||
991 "unknown";
992 return ip;
993};