Suite of AT Protocol TypeScript libraries built on web standards
21
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 993 lines 28 kB view raw
1import type { Context, Handler } from "hono"; 2import { Hono } from "hono"; 3import { Procedure, Query, Subscription } from "@atp/lex"; 4import { 5 type LexiconDoc, 6 Lexicons, 7 type LexXrpcProcedure, 8 type LexXrpcQuery, 9 type LexXrpcSubscription, 10} from "@atp/lexicon"; 11import { 12 excludeErrorResult, 13 InternalServerError, 14 InvalidRequestError, 15 isErrorResult, 16 MethodNotImplementedError, 17 XRPCError, 18} from "./errors.ts"; 19import { type RateLimiterI, RouteRateLimiter } from "./rate-limiter.ts"; 20import { 21 ErrorFrame, 22 Frame, 23 MessageFrame, 24 XrpcStreamServer, 25} from "./stream/index.ts"; 26import { 27 type Auth, 28 type AuthResult, 29 type AuthVerifier, 30 type HandlerContext, 31 type HandlerSuccess, 32 type Input, 33 isHandlerPipeThroughBuffer, 34 isHandlerPipeThroughStream, 35 isSharedRateLimitOpts, 36 type LexMethodConfig, 37 type LexMethodHandler, 38 type LexMethodInput, 39 type LexMethodLike, 40 type LexMethodOutput, 41 type LexMethodParams, 42 type LexSubscriptionConfig, 43 type LexSubscriptionHandler, 44 type MethodConfig, 45 type MethodConfigOrHandler, 46 type MethodHandler, 47 type Options, 48 type Output, 49 type Params, 50 type ServerRateLimitDescription, 51 type StreamConfig, 52 type StreamConfigOrHandler, 53 type StreamContext, 54} from "./types.ts"; 55import { 56 asArray, 57 createLexiconInputVerifier, 58 createLexiconParamsVerifier, 59 createSchemaInputVerifier, 60 createSchemaOutputVerifier, 61 createSchemaParamsVerifier, 62 parseUrlNsid, 63 setHeaders, 64 validateOutput, 65} from "./util.ts"; 66import { check, ipldToJson, schema } from "@atp/common"; 67import { 68 type CalcKeyFn, 69 type CalcPointsFn, 70 type RateLimiterOptions, 71 WrappedRateLimiter, 72 type WrappedRateLimiterOptions, 73} from "./rate-limiter.ts"; 74import { assert } from "@std/assert"; 75import type { CatchallHandler, RouteOptions } from "./types.ts"; 76import { 77 mountStreamingRoutesDeno, 78 mountStreamingRoutesWorkers, 79 type XrpcMux, 80} from "./stream/adapters.ts"; 81 82type LexAddConfig< 83 M extends Procedure | Query | Subscription, 84 A extends Auth = Auth, 85> = M extends Procedure | Query 86 ? LexMethodConfig<M, A> | LexMethodHandler<M, void> 87 : M extends Subscription 88 ? LexSubscriptionConfig<M, A> | LexSubscriptionHandler<M, void> 89 : never; 90 91/** 92 * Creates a new XRPC server instance 93 * @param lexicons - Optional array of lexicon documents to initialize the server with 94 * @param options - Optional server configuration options 95 */ 96export function createServer( 97 lexicons?: LexiconDoc[], 98 options?: Options, 99): Server { 100 return new Server(lexicons, options); 101} 102 103/** 104 * XRPC server implementation that handles HTTP and WebSocket requests. 105 * Manages method registration, authentication, rate limiting, and streaming 106 * with automatic schema validation. 107 */ 108export class Server { 109 /** The underlying Hono HTTP server instance */ 110 app: Hono; 111 /** Map of NSID to WebSocket streaming servers for subscriptions */ 112 subscriptions: Map<string, XrpcStreamServer> = new Map< 113 string, 114 XrpcStreamServer 115 >(); 116 methodTypes: Map<string, "procedure" | "query" | "subscription"> = new Map< 117 string, 118 "procedure" | "query" | "subscription" 119 >(); 120 /** Lexicon registry for schema validation and method definitions */ 121 lex: Lexicons = new Lexicons(); 122 /** Server configuration options */ 123 options: Options; 124 /** Global rate limiter applied to all routes */ 125 globalRateLimiter?: RouteRateLimiter<HandlerContext>; 126 /** Map of named shared rate limiters */ 127 sharedRateLimiters?: Map<string, RateLimiterI<HandlerContext>>; 128 129 /** 130 * Creates a new XRPC server instance. 131 * @param lexicons - Optional array of lexicon documents to register 132 * @param opts - Server configuration options 133 */ 134 constructor(lexicons?: LexiconDoc[], opts: Options = {}) { 135 this.app = new Hono(); 136 this.options = opts; 137 138 if (lexicons) { 139 this.addLexicons(lexicons); 140 } 141 142 this.app.use("*", this.catchall); 143 this.app.onError(createErrorHandler(opts)); 144 145 this.app.notFound((c) => { 146 const nsid = parseUrlNsid(c.req.url); 147 if (nsid) { 148 const methodType = this.methodTypes.get(nsid) ?? 149 this.lex.getDef(nsid)?.type; 150 if (methodType) { 151 const expectedMethod = methodType === "procedure" 152 ? "POST" 153 : methodType === "query" 154 ? "GET" 155 : null; 156 if (expectedMethod != null && expectedMethod !== c.req.method) { 157 const error = new InvalidRequestError( 158 `Incorrect HTTP method (${c.req.method}) expected ${expectedMethod}`, 159 ); 160 throw error; 161 } 162 } else { 163 const error = new MethodNotImplementedError(); 164 throw error; 165 } 166 } 167 return c.text("Not Found", 404); 168 }); 169 170 if (opts.rateLimits) { 171 const { global, shared, creator, bypass } = opts.rateLimits; 172 173 if (global) { 174 this.globalRateLimiter = RouteRateLimiter.from( 175 global.map((options) => creator(buildRateLimiterOptions(options))), 176 { bypass }, 177 ); 178 } 179 180 if (shared) { 181 this.sharedRateLimiters = new Map( 182 shared.map((options) => [ 183 options.name, 184 creator(buildRateLimiterOptions(options)), 185 ]), 186 ); 187 } 188 } 189 190 // Mount streaming (subscription) routes using runtime-specific Hono adapters. 191 { 192 const mux: XrpcMux = { 193 resolveForRequest: (req: Request) => { 194 const nsid = parseUrlNsid(req.url); 195 if (!nsid) return; 196 const sub = this.subscriptions.get(nsid); 197 if (!sub) return; 198 return { 199 handle: (req: Request, socket: WebSocket) => { 200 sub.handle(req, socket); 201 }, 202 }; 203 }, 204 }; 205 206 // Deno 207 if (globalThis.Deno?.version?.deno) { 208 mountStreamingRoutesDeno(this.app, mux); 209 } else if ("WebSocketPair" in globalThis) { 210 mountStreamingRoutesWorkers(this.app, mux); 211 } else { 212 // Node not supported for streaming subscriptions. 213 } 214 } 215 } 216 217 // handlers 218 219 add<M extends Procedure | Query | Subscription, A extends Auth = Auth>( 220 method: LexMethodLike<M>, 221 configOrHandler: LexAddConfig<M, A>, 222 ): void { 223 const schema = getLexMethod(method); 224 const config = typeof configOrHandler === "function" 225 ? { handler: configOrHandler } 226 : configOrHandler; 227 228 if (schema instanceof Procedure) { 229 return this.addProcedureSchema( 230 schema, 231 config as LexMethodConfig<Procedure, A>, 232 ); 233 } 234 235 if (schema instanceof Query) { 236 return this.addQuerySchema( 237 schema, 238 config as LexMethodConfig<Query, A>, 239 ); 240 } 241 242 return this.addSubscriptionSchema( 243 schema, 244 config as LexSubscriptionConfig<Subscription, A>, 245 ); 246 } 247 248 protected addProcedureSchema<M extends Procedure, A extends Auth>( 249 schema: M, 250 config: LexMethodConfig<M, A>, 251 ): void { 252 this.app.post( 253 `/xrpc/${schema.nsid}`, 254 this.createHandlerInternal< 255 A, 256 LexMethodParams<M>, 257 LexMethodInput<M>, 258 LexMethodOutput<M> 259 >( 260 this.createAuthVerifier(config), 261 this.createSchemaParamsVerifier(schema), 262 this.createSchemaInputVerifier(schema, config.opts), 263 this.createRouteRateLimiter(schema.nsid, config), 264 config.handler, 265 this.createSchemaOutputVerifier(schema), 266 ), 267 ); 268 this.methodTypes.set(schema.nsid, "procedure"); 269 } 270 271 protected addQuerySchema<M extends Query, A extends Auth>( 272 schema: M, 273 config: LexMethodConfig<M, A>, 274 ): void { 275 this.app.get( 276 `/xrpc/${schema.nsid}`, 277 this.createHandlerInternal< 278 A, 279 LexMethodParams<M>, 280 LexMethodInput<M>, 281 LexMethodOutput<M> 282 >( 283 this.createAuthVerifier(config), 284 this.createSchemaParamsVerifier(schema), 285 this.createSchemaInputVerifier(schema, config.opts), 286 this.createRouteRateLimiter(schema.nsid, config), 287 config.handler, 288 this.createSchemaOutputVerifier(schema), 289 ), 290 ); 291 this.methodTypes.set(schema.nsid, "query"); 292 } 293 294 protected addSubscriptionSchema<M extends Subscription, A extends Auth>( 295 schema: M, 296 config: LexSubscriptionConfig<M, A>, 297 ): void { 298 const { handler } = config; 299 const messageSchema = this.options.validateResponse === false 300 ? undefined 301 : schema.message; 302 303 return this.addSubscriptionInternal( 304 schema.nsid, 305 this.createSchemaParamsVerifier(schema), 306 this.createAuthVerifier(config), 307 messageSchema 308 ? async function* (ctx) { 309 for await (const rawItem of handler(ctx)) { 310 const item = rawItem as unknown; 311 if (item instanceof MessageFrame) { 312 yield validateMessageFrame(item, messageSchema, schema.nsid); 313 continue; 314 } 315 if (item instanceof Frame) { 316 yield item; 317 continue; 318 } 319 yield messageSchema.parse(item); 320 } 321 } 322 : handler, 323 ); 324 } 325 326 /** 327 * Registers a method handler for the specified NSID. 328 * @param nsid - The namespace identifier for the method 329 * @param configOrFn - Either a handler function or full method configuration 330 */ 331 method( 332 nsid: string, 333 configOrFn: MethodConfigOrHandler, 334 ) { 335 this.addMethod(nsid, configOrFn); 336 } 337 338 /** 339 * Adds a method handler for the specified NSID. 340 * @param nsid - The namespace identifier for the method 341 * @param configOrFn - Either a handler function or full method configuration 342 * @throws {Error} If the method is not found in the lexicon or is not a query/procedure 343 */ 344 addMethod( 345 nsid: string, 346 configOrFn: MethodConfigOrHandler, 347 ) { 348 const config = typeof configOrFn === "function" 349 ? { handler: configOrFn } 350 : configOrFn; 351 const def = this.lex.getDef(nsid); 352 if (!def || (def.type !== "query" && def.type !== "procedure")) { 353 throw new Error(`Method not found in lexicon: ${nsid}`); 354 } 355 this.addRoute(nsid, def, config); 356 } 357 358 /** 359 * Registers a streaming method handler for the specified NSID. 360 * @param nsid - The namespace identifier for the streaming method 361 * @param configOrFn - Either a stream handler function or full stream configuration 362 */ 363 streamMethod( 364 nsid: string, 365 configOrFn: StreamConfigOrHandler, 366 ) { 367 this.addStreamMethod(nsid, configOrFn); 368 } 369 370 /** 371 * Adds a streaming method handler for the specified NSID. 372 * @param nsid - The namespace identifier for the streaming method 373 * @param configOrFn - Either a stream handler function or full stream configuration 374 * @throws {Error} If the subscription is not found in the lexicon 375 */ 376 addStreamMethod( 377 nsid: string, 378 configOrFn: StreamConfigOrHandler, 379 ) { 380 const config = typeof configOrFn === "function" 381 ? { handler: configOrFn } 382 : configOrFn; 383 const def = this.lex.getDef(nsid); 384 if (!def || def.type !== "subscription") { 385 throw new Error(`Subscription not found in lexicon: ${nsid}`); 386 } 387 this.addSubscription(nsid, def, config); 388 } 389 390 // lexicon 391 392 /** 393 * Adds a lexicon document to the server's schema registry. 394 * @param doc - The lexicon document to add 395 */ 396 addLexicon(doc: LexiconDoc) { 397 this.lex.add(doc); 398 } 399 400 /** 401 * Adds multiple lexicon documents to the server's schema registry. 402 * @param docs - Array of lexicon documents to add 403 */ 404 addLexicons(docs: LexiconDoc[]) { 405 for (const doc of docs) { 406 this.addLexicon(doc); 407 } 408 } 409 410 // routes 411 412 /** 413 * Adds an HTTP route for the specified method. 414 * @param nsid - The namespace identifier for the method 415 * @param def - The lexicon definition for the method 416 * @param config - The method configuration including handler and options 417 * @protected 418 */ 419 protected addRoute( 420 nsid: string, 421 def: LexXrpcQuery | LexXrpcProcedure, 422 config: MethodConfig, 423 ) { 424 const path = `/xrpc/${nsid}`; 425 const handler = this.createHandler(nsid, def, config); 426 this.methodTypes.set(nsid, def.type); 427 428 if (def.type === "procedure") { 429 this.app.post(path, handler); 430 } else { 431 this.app.get(path, handler); 432 } 433 } 434 435 /** 436 * Catchall handler that processes all XRPC routes and applies global rate limiting. 437 */ 438 catchall: CatchallHandler = async (c, next) => { 439 if (!c.req.url.includes("/xrpc/")) { 440 return await next(); 441 } 442 443 // Validate the NSID 444 const nsid = parseUrlNsid(c.req.url); 445 if (!nsid) { 446 throw new InvalidRequestError("invalid xrpc path"); 447 } 448 449 if (this.globalRateLimiter) { 450 try { 451 await this.globalRateLimiter.handle({ 452 req: c.req.raw, 453 res: new Response(), 454 auth: undefined, 455 params: {}, 456 input: undefined, 457 async resetRouteRateLimits(): Promise<void> { 458 // Global rate limits don't have route-specific resets 459 }, 460 }); 461 } catch { 462 return await next(); 463 } 464 } 465 466 // Ensure that known XRPC methods are only called with the correct HTTP 467 // method. 468 const methodType = this.methodTypes.get(nsid) ?? 469 this.lex.getDef(nsid)?.type; 470 if (methodType) { 471 const expectedMethod = methodType === "procedure" 472 ? "POST" 473 : methodType === "query" 474 ? "GET" 475 : null; 476 if (expectedMethod != null && expectedMethod !== c.req.method) { 477 throw new InvalidRequestError( 478 `Incorrect HTTP method (${c.req.method}) expected ${expectedMethod}`, 479 ); 480 } 481 } 482 483 if (this.options.catchall) { 484 return await this.options.catchall(c, next); 485 } else if (!methodType) { 486 throw new MethodNotImplementedError(); 487 } else { 488 return await next(); 489 } 490 }; 491 492 /** 493 * Creates an authentication verification function. 494 * @param cfg - Configuration containing optional authentication verifier 495 * @returns A function that performs authentication for the method 496 * @protected 497 */ 498 protected createAuthVerifier<C, A extends Auth>(cfg: { 499 auth?: AuthVerifier<C, A & AuthResult>; 500 }): ((ctx: C) => Promise<A>) | null { 501 const { auth } = cfg; 502 if (!auth) return null; 503 504 return async (ctx: C) => { 505 const result = await auth(ctx); 506 return excludeErrorResult(result); 507 }; 508 } 509 510 private createLexiconParamsVerifier( 511 nsid: string, 512 def: LexXrpcQuery | LexXrpcProcedure | LexXrpcSubscription, 513 ): (req: Request) => Params { 514 return createLexiconParamsVerifier(nsid, def, this.lex); 515 } 516 517 private createLexiconInputVerifier( 518 nsid: string, 519 def: LexXrpcQuery | LexXrpcProcedure, 520 opts?: RouteOptions, 521 ): (req: Request) => Input | Promise<Input> { 522 return createLexiconInputVerifier( 523 nsid, 524 def, 525 { 526 blobLimit: opts?.blobLimit ?? this.options.payload?.blobLimit, 527 jsonLimit: opts?.jsonLimit ?? this.options.payload?.jsonLimit, 528 textLimit: opts?.textLimit ?? this.options.payload?.textLimit, 529 }, 530 this.lex, 531 ); 532 } 533 534 private createLexiconOutputVerifier( 535 nsid: string, 536 def: LexXrpcQuery | LexXrpcProcedure, 537 ): null | ((output: HandlerSuccess | void) => void) { 538 if (this.options.validateResponse === false) { 539 return null; 540 } 541 542 return (output) => validateOutput(nsid, def, output, this.lex); 543 } 544 545 private createSchemaParamsVerifier< 546 M extends Procedure | Query | Subscription, 547 >( 548 method: M, 549 ): (req: Request) => LexMethodParams<M> { 550 return createSchemaParamsVerifier(method); 551 } 552 553 private createSchemaInputVerifier<M extends Procedure | Query>( 554 method: M, 555 opts?: RouteOptions, 556 ): (req: Request) => LexMethodInput<M> | Promise<LexMethodInput<M>> { 557 return createSchemaInputVerifier(method, { 558 blobLimit: opts?.blobLimit ?? this.options.payload?.blobLimit, 559 jsonLimit: opts?.jsonLimit ?? this.options.payload?.jsonLimit, 560 textLimit: opts?.textLimit ?? this.options.payload?.textLimit, 561 }); 562 } 563 564 private createSchemaOutputVerifier<M extends Procedure | Query>( 565 method: M, 566 ): null | ((output: LexMethodOutput<M>) => void) { 567 if (this.options.validateResponse === false) { 568 return null; 569 } 570 571 return createSchemaOutputVerifier(method); 572 } 573 574 /** 575 * Creates a Hono handler function for the specified XRPC method. 576 * @template A - The authentication type 577 * @param nsid - The namespace identifier for the method 578 * @param def - The lexicon definition for the method 579 * @param routeCfg - The method configuration including handler and options 580 * @returns A Hono handler function 581 */ 582 createHandler<A extends Auth = Auth>( 583 nsid: string, 584 def: LexXrpcQuery | LexXrpcProcedure, 585 cfg: MethodConfig<A>, 586 ): Handler { 587 return this.createHandlerInternal<A, Params, Input, HandlerSuccess | void>( 588 this.createAuthVerifier(cfg), 589 this.createLexiconParamsVerifier(nsid, def), 590 this.createLexiconInputVerifier(nsid, def, cfg.opts), 591 this.createRouteRateLimiter(nsid, cfg), 592 cfg.handler as MethodHandler<A, Params, Input, HandlerSuccess | void>, 593 this.createLexiconOutputVerifier(nsid, def), 594 ); 595 } 596 597 protected createHandlerInternal< 598 A extends Auth, 599 P extends Params, 600 I extends Input, 601 O extends HandlerSuccess | void, 602 >( 603 authVerifier: 604 | null 605 | ((ctx: { req: Request; res: Response; params: P }) => Promise<A>), 606 paramsVerifier: (req: Request) => P, 607 inputVerifier: (req: Request) => I | Promise<I>, 608 routeLimiter: RouteRateLimiter<HandlerContext<A, P, I>> | undefined, 609 handler: MethodHandler<A, P, I, O>, 610 validateOutput: 611 | null 612 | ((output: O) => void), 613 ): Handler { 614 return async (c: Context) => { 615 try { 616 const params = paramsVerifier(c.req.raw); 617 618 const auth: A = authVerifier 619 ? await authVerifier({ req: c.req.raw, res: c.res, params }) 620 : (undefined as A); 621 622 const input = await inputVerifier(c.req.raw); 623 624 const ctx: HandlerContext<A, P, I> = { 625 req: c.req.raw, 626 res: new Response(), 627 params, 628 input, 629 auth, 630 resetRouteRateLimits: async () => { 631 if (routeLimiter) { 632 await routeLimiter.reset(ctx); 633 } 634 }, 635 }; 636 637 if (routeLimiter) { 638 await routeLimiter.handle(ctx); 639 } 640 641 const output = await handler(ctx); 642 643 if (output === undefined) { 644 validateOutput?.(output); 645 return c.body(null, 200); 646 } 647 648 if (isErrorResult(output)) { 649 throw XRPCError.fromErrorResult(output); 650 } 651 652 if (isHandlerPipeThroughBuffer(output)) { 653 setHeaders(c, output.headers); 654 return c.body(output.buffer.buffer as ArrayBuffer, 200, { 655 "Content-Type": output.encoding, 656 }); 657 } 658 659 if (isHandlerPipeThroughStream(output)) { 660 setHeaders(c, output.headers); 661 return c.body(output.stream, 200, { 662 "Content-Type": output.encoding, 663 }); 664 } 665 666 const successOutput = output as HandlerSuccess; 667 validateOutput?.(successOutput as O); 668 setHeaders(c, successOutput.headers); 669 670 if (successOutput.encoding === "application/json") { 671 return c.json(ipldToJson(successOutput.body) as JSON); 672 } 673 674 return c.body(successOutput.body, 200, { 675 "Content-Type": successOutput.encoding, 676 }); 677 } catch (err: unknown) { 678 throw err || new InternalServerError(); 679 } 680 }; 681 } 682 683 /** 684 * Adds a WebSocket subscription handler for the specified NSID. 685 * @param nsid - The namespace identifier for the subscription 686 * @param def - The lexicon definition for the subscription 687 * @param config - The stream configuration 688 * @protected 689 */ 690 protected addSubscription<A extends Auth = Auth>( 691 nsid: string, 692 def: LexXrpcSubscription, 693 cfg: StreamConfig<A>, 694 ) { 695 this.addSubscriptionInternal( 696 nsid, 697 this.createLexiconParamsVerifier(nsid, def), 698 this.createAuthVerifier(cfg), 699 cfg.handler, 700 ); 701 } 702 703 protected addSubscriptionInternal<A extends Auth, P extends Params>( 704 nsid: string, 705 paramsVerifier: (req: Request) => P, 706 authVerifier: 707 | null 708 | ((ctx: { req: Request; params: P }) => Promise<A>), 709 handler: (ctx: StreamContext<A, P>) => AsyncIterable<unknown>, 710 ): void { 711 this.methodTypes.set(nsid, "subscription"); 712 this.subscriptions.set( 713 nsid, 714 new XrpcStreamServer({ 715 handler: async function* (req, signal) { 716 try { 717 const params = paramsVerifier(req); 718 const auth = authVerifier 719 ? await authVerifier({ req, params }) 720 : (undefined as A); 721 722 for await (const item of handler({ req, params, auth, signal })) { 723 yield item instanceof Frame 724 ? item 725 : messageFrameFromValue(item, nsid); 726 } 727 } catch (err) { 728 const xrpcError = XRPCError.fromError(err); 729 yield new ErrorFrame({ 730 error: xrpcError.payload.error ?? "Unknown", 731 message: xrpcError.payload.message, 732 }); 733 } 734 }, 735 }), 736 ); 737 } 738 739 private createRouteRateLimiter< 740 A extends Auth, 741 P extends Params, 742 I extends Input, 743 O extends Output, 744 C extends HandlerContext<A, P, I> = HandlerContext<A, P, I>, 745 >( 746 nsid: string, 747 config: MethodConfig<A, P, I, O>, 748 ): RouteRateLimiter<C> | undefined { 749 // @NOTE global & shared rate limiters are instantiated with a context of 750 // HandlerContext which is compatible (more generic) with the context of 751 // this route specific rate limiters (C). For this reason, it's safe to 752 // cast these with an `any` context 753 754 const globalRateLimiter = this.globalRateLimiter as 755 | RouteRateLimiter<C> 756 | undefined; 757 758 // No route specific rate limiting configured, use the global rate limiter. 759 if (!config.rateLimit) return globalRateLimiter; 760 761 const { rateLimits } = this.options; 762 763 // @NOTE Silently ignore creation of route specific rate limiter if the 764 // `rateLimits` options was not provided to the constructor. 765 if (!rateLimits) return globalRateLimiter; 766 767 const { creator, bypass } = rateLimits; 768 769 const rateLimiters = asArray(config.rateLimit).map((options, i) => { 770 if (isSharedRateLimitOpts(options)) { 771 const rateLimiter = this.sharedRateLimiters?.get(options.name); 772 773 // The route config references a shared rate limiter that does not 774 // exist. This is a configuration error. 775 assert( 776 rateLimiter, 777 `Shared rate limiter "${options.name}" not defined`, 778 ); 779 780 return WrappedRateLimiter.from<C>( 781 rateLimiter as unknown as RateLimiterI<C>, 782 options as unknown as WrappedRateLimiterOptions<C>, 783 ); 784 } else { 785 return creator({ 786 ...options, 787 calcKey: options.calcKey ?? defaultKey, 788 calcPoints: options.calcPoints ?? defaultPoints, 789 keyPrefix: `${nsid}-${i}`, 790 }); 791 } 792 }); 793 794 // If the route config contains an empty array, use global rate limiter. 795 if (!rateLimiters.length) return globalRateLimiter; 796 797 // The global rate limiter (if present) should be applied in addition to 798 // the route specific rate limiters. 799 if (globalRateLimiter) rateLimiters.push(globalRateLimiter); 800 801 return RouteRateLimiter.from<C>( 802 rateLimiters as unknown as readonly RateLimiterI<C>[], 803 { bypass }, 804 ); 805 } 806 807 /** 808 * Gets the underlying Hono app instance for external use. 809 * @returns The Hono application instance 810 */ 811 get handler(): Hono { 812 return this.app; 813 } 814} 815 816function getLexMethod<M extends Procedure | Query | Subscription>( 817 method: LexMethodLike<M>, 818): M { 819 if (isLexMethod(method)) { 820 return method; 821 } 822 823 if ("main" in method && isLexMethod(method.main)) { 824 return method.main; 825 } 826 827 if ("Main" in method && isLexMethod(method.Main)) { 828 return method.Main; 829 } 830 831 throw new TypeError( 832 "Expected a lex method or a namespace with main/Main", 833 ); 834} 835 836function isLexMethod( 837 value: unknown, 838): value is Procedure | Query | Subscription { 839 return value instanceof Procedure || 840 value instanceof Query || 841 value instanceof Subscription; 842} 843 844function messageFrameFromValue( 845 value: unknown, 846 nsid: string, 847): MessageFrame<unknown> { 848 const type = (value as Record<string, unknown>)?.["$type"]; 849 if (!check.is(value, schema.map) || typeof type !== "string") { 850 return new MessageFrame(value); 851 } 852 853 const split = type.split("#"); 854 let t: string; 855 if (split.length === 2 && (split[0] === "" || split[0] === nsid)) { 856 t = `#${split[1]}`; 857 } else { 858 t = type; 859 } 860 861 const clone = { ...(value as Record<string, unknown>) }; 862 delete clone["$type"]; 863 return new MessageFrame(clone, { type: t }); 864} 865 866function validateMessageFrame( 867 frame: MessageFrame<unknown>, 868 messageSchema: { parse(input: unknown): unknown }, 869 nsid: string, 870): MessageFrame<unknown> { 871 const parsedBody = parseMessageBody( 872 frame.body, 873 frame.type, 874 messageSchema, 875 nsid, 876 ); 877 frame.body = stripMessageType(parsedBody, frame.type, nsid); 878 return frame; 879} 880 881function parseMessageBody( 882 value: unknown, 883 type: string | undefined, 884 messageSchema: { parse(input: unknown): unknown }, 885 nsid: string, 886): unknown { 887 let lastError: unknown; 888 889 for (const candidate of getMessageValueCandidates(value, type, nsid)) { 890 try { 891 return messageSchema.parse(candidate); 892 } catch (error) { 893 lastError = error; 894 } 895 } 896 897 throw lastError; 898} 899 900function getMessageValueCandidates( 901 value: unknown, 902 type: string | undefined, 903 nsid: string, 904): unknown[] { 905 if (type === undefined || !check.is(value, schema.map)) { 906 return [value]; 907 } 908 909 return getMessageTypeCandidates(type, nsid).map((candidateType) => ({ 910 ...(value as Record<string, unknown>), 911 $type: candidateType, 912 })); 913} 914 915function getMessageTypeCandidates(type: string, nsid: string): string[] { 916 const candidates: string[] = []; 917 const normalized = normalizeMessageType(type, nsid); 918 const absolute = normalized.startsWith("#") ? `${nsid}${normalized}` : type; 919 920 for (const candidate of [type, normalized, absolute]) { 921 if (!candidates.includes(candidate)) { 922 candidates.push(candidate); 923 } 924 } 925 926 return candidates; 927} 928 929function stripMessageType( 930 value: unknown, 931 type: string | undefined, 932 nsid: string, 933): unknown { 934 const body = value as Record<string, unknown>; 935 if ( 936 type === undefined || 937 !check.is(value, schema.map) || 938 typeof body["$type"] !== "string" || 939 normalizeMessageType(body["$type"], nsid) !== 940 normalizeMessageType(type, nsid) 941 ) { 942 return value; 943 } 944 945 const clone = { ...body }; 946 delete clone["$type"]; 947 return clone; 948} 949 950function normalizeMessageType(type: string, nsid: string): string { 951 const split = type.split("#"); 952 if (split.length === 2 && (split[0] === "" || split[0] === nsid)) { 953 return `#${split[1]}`; 954 } 955 return type; 956} 957 958function createErrorHandler( 959 opts: Options, 960): (err: Error, c: Context) => Response { 961 return (err: Error, c: Context): Response => { 962 const errorParser = opts.errorParser || 963 ((e: unknown) => XRPCError.fromError(e)); 964 const xrpcError = errorParser(err); 965 966 const statusCode = "statusCode" in xrpcError 967 ? (xrpcError as { statusCode: number }).statusCode 968 : 500; 969 970 const payload = xrpcError.payload; 971 return c.json(payload, statusCode as 500); 972 }; 973} 974 975function buildRateLimiterOptions<C extends HandlerContext = HandlerContext>({ 976 name, 977 calcKey = defaultKey, 978 calcPoints = defaultPoints, 979 ...desc 980}: ServerRateLimitDescription<C>): RateLimiterOptions<C> { 981 return { ...desc, calcKey, calcPoints, keyPrefix: `rl-${name}` }; 982} 983 984const defaultPoints: CalcPointsFn = (): number => 1; 985 986const defaultKey: CalcKeyFn<HandlerContext> = ({ req }) => { 987 const forwarded = req.headers.get("x-forwarded-for"); 988 const ip = forwarded 989 ? forwarded.split(",")[0] 990 : req.headers.get("x-real-ip") || 991 "unknown"; 992 return ip; 993};