Suite of AT Protocol TypeScript libraries built on web standards
21
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: xrpc-server lex compat

+1219 -157
+4 -1
lex/schema/params.ts
··· 50 50 for (const key in input) { 51 51 if (this.validatorsMap.has(key)) continue; 52 52 53 - const result = ctx.validateChild(input, key, paramSchema); 53 + const result = paramSchema.safeParse(input[key], { 54 + allowTransform: false, 55 + path: ctx.concatPath(key), 56 + }); 54 57 if (!result.success) return result; 55 58 56 59 if (result.value !== input[key]) {
+19
lex/tests/params_test.ts
··· 34 34 "since=2024-01-02T03%3A04%3A05.000Z", 35 35 ); 36 36 }); 37 + 38 + Deno.test("preserves undeclared params from URLSearchParams", () => { 39 + const params = l.params({ 40 + name: l.string(), 41 + }); 42 + 43 + const result = params.fromURLSearchParams( 44 + new URLSearchParams("name=Alice&num=1&num=2&foo=3"), 45 + ) as Record<string, unknown>; 46 + 47 + assertEquals( 48 + result, 49 + { 50 + name: "Alice", 51 + num: ["1", "2"], 52 + foo: "3", 53 + }, 54 + ); 55 + });
+427 -115
xrpc-server/server.ts
··· 1 1 import type { Context, Handler } from "hono"; 2 2 import { Hono } from "hono"; 3 + import { Procedure, Query, Subscription } from "@atp/lex"; 3 4 import { 4 5 type LexiconDoc, 5 6 Lexicons, ··· 26 27 type Auth, 27 28 type AuthResult, 28 29 type AuthVerifier, 29 - type Awaitable, 30 30 type HandlerContext, 31 31 type HandlerSuccess, 32 32 type Input, 33 33 isHandlerPipeThroughBuffer, 34 34 isHandlerPipeThroughStream, 35 35 isSharedRateLimitOpts, 36 + type LexMethodConfig, 37 + type LexMethodHandler, 38 + type LexMethodInput, 39 + type LexMethodLike, 40 + type LexMethodOutput, 41 + type LexMethodParams, 42 + type LexSubscriptionConfig, 43 + type LexSubscriptionHandler, 36 44 type MethodConfig, 37 45 type MethodConfigOrHandler, 46 + type MethodHandler, 38 47 type Options, 48 + type Output, 39 49 type Params, 40 50 type ServerRateLimitDescription, 41 51 type StreamConfig, 42 52 type StreamConfigOrHandler, 53 + type StreamContext, 43 54 } from "./types.ts"; 44 55 import { 45 56 asArray, 46 - createInputVerifier, 47 - decodeQueryParams, 48 - getQueryParams, 57 + createLexiconInputVerifier, 58 + createLexiconParamsVerifier, 59 + createSchemaInputVerifier, 60 + createSchemaOutputVerifier, 61 + createSchemaParamsVerifier, 49 62 parseUrlNsid, 50 63 setHeaders, 51 64 validateOutput, ··· 66 79 type XrpcMux, 67 80 } from "./stream/adapters.ts"; 68 81 82 + type LexAddConfig<M extends Procedure | Query | Subscription> = M extends 83 + Procedure | Query ? LexMethodConfig<M, Auth> | LexMethodHandler<M, void> 84 + : M extends Subscription 85 + ? LexSubscriptionConfig<M, Auth> | LexSubscriptionHandler<M, void> 86 + : never; 87 + 69 88 /** 70 89 * Creates a new XRPC server instance 71 90 * @param lexicons - Optional array of lexicon documents to initialize the server with ··· 90 109 subscriptions: Map<string, XrpcStreamServer> = new Map< 91 110 string, 92 111 XrpcStreamServer 112 + >(); 113 + methodTypes: Map<string, "procedure" | "query" | "subscription"> = new Map< 114 + string, 115 + "procedure" | "query" | "subscription" 93 116 >(); 94 117 /** Lexicon registry for schema validation and method definitions */ 95 118 lex: Lexicons = new Lexicons(); ··· 119 142 this.app.notFound((c) => { 120 143 const nsid = parseUrlNsid(c.req.url); 121 144 if (nsid) { 122 - const def = this.lex.getDef(nsid); 123 - if (def) { 124 - const expectedMethod = def.type === "procedure" 145 + const methodType = this.methodTypes.get(nsid) ?? 146 + this.lex.getDef(nsid)?.type; 147 + if (methodType) { 148 + const expectedMethod = methodType === "procedure" 125 149 ? "POST" 126 - : def.type === "query" 150 + : methodType === "query" 127 151 ? "GET" 128 152 : null; 129 153 if (expectedMethod != null && expectedMethod !== c.req.method) { ··· 189 213 190 214 // handlers 191 215 216 + add<M extends Procedure | Query | Subscription>( 217 + method: LexMethodLike<M>, 218 + configOrHandler: LexAddConfig<M>, 219 + ): void { 220 + const schema = getLexMethod(method); 221 + const config = typeof configOrHandler === "function" 222 + ? { handler: configOrHandler } 223 + : configOrHandler; 224 + 225 + if (schema instanceof Procedure) { 226 + return this.addProcedureSchema( 227 + schema, 228 + config as LexMethodConfig<Procedure, Auth>, 229 + ); 230 + } 231 + 232 + if (schema instanceof Query) { 233 + return this.addQuerySchema( 234 + schema, 235 + config as LexMethodConfig<Query, Auth>, 236 + ); 237 + } 238 + 239 + return this.addSubscriptionSchema( 240 + schema, 241 + config as LexSubscriptionConfig<Subscription, Auth>, 242 + ); 243 + } 244 + 245 + protected addProcedureSchema<M extends Procedure, A extends Auth>( 246 + schema: M, 247 + config: LexMethodConfig<M, A>, 248 + ): void { 249 + this.app.post( 250 + `/xrpc/${schema.nsid}`, 251 + this.createHandlerInternal< 252 + A, 253 + LexMethodParams<M>, 254 + LexMethodInput<M>, 255 + LexMethodOutput<M> 256 + >( 257 + this.createAuthVerifier(config), 258 + this.createSchemaParamsVerifier(schema), 259 + this.createSchemaInputVerifier(schema, config.opts), 260 + this.createRouteRateLimiter(schema.nsid, config), 261 + config.handler, 262 + this.createSchemaOutputVerifier(schema), 263 + ), 264 + ); 265 + this.methodTypes.set(schema.nsid, "procedure"); 266 + } 267 + 268 + protected addQuerySchema<M extends Query, A extends Auth>( 269 + schema: M, 270 + config: LexMethodConfig<M, A>, 271 + ): void { 272 + this.app.get( 273 + `/xrpc/${schema.nsid}`, 274 + this.createHandlerInternal< 275 + A, 276 + LexMethodParams<M>, 277 + LexMethodInput<M>, 278 + LexMethodOutput<M> 279 + >( 280 + this.createAuthVerifier(config), 281 + this.createSchemaParamsVerifier(schema), 282 + this.createSchemaInputVerifier(schema, config.opts), 283 + this.createRouteRateLimiter(schema.nsid, config), 284 + config.handler, 285 + this.createSchemaOutputVerifier(schema), 286 + ), 287 + ); 288 + this.methodTypes.set(schema.nsid, "query"); 289 + } 290 + 291 + protected addSubscriptionSchema<M extends Subscription, A extends Auth>( 292 + schema: M, 293 + config: LexSubscriptionConfig<M, A>, 294 + ): void { 295 + const { handler } = config; 296 + const messageSchema = this.options.validateResponse === false 297 + ? undefined 298 + : schema.message; 299 + 300 + return this.addSubscriptionInternal( 301 + schema.nsid, 302 + this.createSchemaParamsVerifier(schema), 303 + this.createAuthVerifier(config), 304 + messageSchema 305 + ? async function* (ctx) { 306 + for await (const rawItem of handler(ctx)) { 307 + const item = rawItem as unknown; 308 + if (item instanceof MessageFrame) { 309 + yield validateMessageFrame(item, messageSchema, schema.nsid); 310 + continue; 311 + } 312 + if (item instanceof Frame) { 313 + yield item; 314 + continue; 315 + } 316 + yield messageSchema.parse(item); 317 + } 318 + } 319 + : handler, 320 + ); 321 + } 322 + 192 323 /** 193 324 * Registers a method handler for the specified NSID. 194 325 * @param nsid - The namespace identifier for the method ··· 289 420 ) { 290 421 const path = `/xrpc/${nsid}`; 291 422 const handler = this.createHandler(nsid, def, config); 423 + this.methodTypes.set(nsid, def.type); 292 424 293 425 if (def.type === "procedure") { 294 426 this.app.post(path, handler); ··· 330 462 331 463 // Ensure that known XRPC methods are only called with the correct HTTP 332 464 // method. 333 - const def = this.lex.getDef(nsid); 334 - if (def) { 335 - const expectedMethod = def.type === "procedure" 465 + const methodType = this.methodTypes.get(nsid) ?? 466 + this.lex.getDef(nsid)?.type; 467 + if (methodType) { 468 + const expectedMethod = methodType === "procedure" 336 469 ? "POST" 337 - : def.type === "query" 470 + : methodType === "query" 338 471 ? "GET" 339 472 : null; 340 473 if (expectedMethod != null && expectedMethod !== c.req.method) { ··· 346 479 347 480 if (this.options.catchall) { 348 481 return await this.options.catchall(c, next); 349 - } else if (!def) { 482 + } else if (!methodType) { 350 483 throw new MethodNotImplementedError(); 351 484 } else { 352 485 return await next(); ··· 354 487 }; 355 488 356 489 /** 357 - * Creates a parameter verification function for the given method definition. 358 - * @param _nsid - The namespace identifier (unused) 359 - * @param def - The lexicon definition containing parameter schema 360 - * @returns A function that validates and transforms query parameters 361 - * @protected 362 - */ 363 - protected createParamsVerifier( 364 - nsid: string, 365 - def: LexXrpcQuery | LexXrpcProcedure | LexXrpcSubscription, 366 - ): (req: Request) => Params { 367 - return (req: Request): Params => { 368 - const queryParams = getQueryParams(req.url); 369 - const params: Params = decodeQueryParams(def, queryParams); 370 - try { 371 - return this.lex.assertValidXrpcParams(nsid, params) as Params; 372 - } catch (e) { 373 - throw new InvalidRequestError(String(e)); 374 - } 375 - }; 376 - } 377 - 378 - /** 379 - * Creates an input verification function for the given method definition. 380 - * @param nsid - The namespace identifier for the method 381 - * @param def - The lexicon definition containing input schema 382 - * @returns A function that validates and transforms request input 383 - * @protected 384 - */ 385 - protected createInputVerifier( 386 - nsid: string, 387 - def: LexXrpcQuery | LexXrpcProcedure, 388 - routeOpts: RouteOptions, 389 - ): (req: Request) => Awaitable<Input> { 390 - return createInputVerifier(nsid, def, routeOpts, this.lex); 391 - } 392 - 393 - /** 394 490 * Creates an authentication verification function. 395 491 * @param cfg - Configuration containing optional authentication verifier 396 492 * @returns A function that performs authentication for the method ··· 408 504 }; 409 505 } 410 506 507 + private createLexiconParamsVerifier( 508 + nsid: string, 509 + def: LexXrpcQuery | LexXrpcProcedure | LexXrpcSubscription, 510 + ): (req: Request) => Params { 511 + return createLexiconParamsVerifier(nsid, def, this.lex); 512 + } 513 + 514 + private createLexiconInputVerifier( 515 + nsid: string, 516 + def: LexXrpcQuery | LexXrpcProcedure, 517 + opts?: RouteOptions, 518 + ): (req: Request) => Input | Promise<Input> { 519 + return createLexiconInputVerifier( 520 + nsid, 521 + def, 522 + { 523 + blobLimit: opts?.blobLimit ?? this.options.payload?.blobLimit, 524 + jsonLimit: opts?.jsonLimit ?? this.options.payload?.jsonLimit, 525 + textLimit: opts?.textLimit ?? this.options.payload?.textLimit, 526 + }, 527 + this.lex, 528 + ); 529 + } 530 + 531 + private createLexiconOutputVerifier( 532 + nsid: string, 533 + def: LexXrpcQuery | LexXrpcProcedure, 534 + ): null | ((output: HandlerSuccess | void) => void) { 535 + if (this.options.validateResponse === false) { 536 + return null; 537 + } 538 + 539 + return (output) => validateOutput(nsid, def, output, this.lex); 540 + } 541 + 542 + private createSchemaParamsVerifier< 543 + M extends Procedure | Query | Subscription, 544 + >( 545 + method: M, 546 + ): (req: Request) => LexMethodParams<M> { 547 + return createSchemaParamsVerifier(method); 548 + } 549 + 550 + private createSchemaInputVerifier<M extends Procedure | Query>( 551 + method: M, 552 + opts?: RouteOptions, 553 + ): (req: Request) => LexMethodInput<M> | Promise<LexMethodInput<M>> { 554 + return createSchemaInputVerifier(method, { 555 + blobLimit: opts?.blobLimit ?? this.options.payload?.blobLimit, 556 + jsonLimit: opts?.jsonLimit ?? this.options.payload?.jsonLimit, 557 + textLimit: opts?.textLimit ?? this.options.payload?.textLimit, 558 + }); 559 + } 560 + 561 + private createSchemaOutputVerifier<M extends Procedure | Query>( 562 + method: M, 563 + ): null | ((output: LexMethodOutput<M>) => void) { 564 + if (this.options.validateResponse === false) { 565 + return null; 566 + } 567 + 568 + return createSchemaOutputVerifier(method); 569 + } 570 + 411 571 /** 412 572 * Creates a Hono handler function for the specified XRPC method. 413 573 * @template A - The authentication type ··· 421 581 def: LexXrpcQuery | LexXrpcProcedure, 422 582 cfg: MethodConfig<A>, 423 583 ): Handler { 424 - const authVerifier = this.createAuthVerifier(cfg); 425 - const paramsVerifier = this.createParamsVerifier(nsid, def); 426 - const inputVerifier = this.createInputVerifier(nsid, def, { 427 - blobLimit: cfg.opts?.blobLimit ?? this.options.payload?.blobLimit, 428 - jsonLimit: cfg.opts?.jsonLimit ?? this.options.payload?.jsonLimit, 429 - textLimit: cfg.opts?.textLimit ?? this.options.payload?.textLimit, 430 - }); 431 - const validateOutputFn = (output?: HandlerSuccess) => 432 - this.options.validateResponse && output && def.output 433 - ? validateOutput(nsid, def, output, this.lex) 434 - : undefined; 584 + return this.createHandlerInternal<A, Params, Input, HandlerSuccess | void>( 585 + this.createAuthVerifier(cfg), 586 + this.createLexiconParamsVerifier(nsid, def), 587 + this.createLexiconInputVerifier(nsid, def, cfg.opts), 588 + this.createRouteRateLimiter(nsid, cfg), 589 + cfg.handler as MethodHandler<A, Params, Input, HandlerSuccess | void>, 590 + this.createLexiconOutputVerifier(nsid, def), 591 + ); 592 + } 435 593 436 - const routeLimiter = this.createRouteRateLimiter(nsid, cfg); 437 - 594 + protected createHandlerInternal< 595 + A extends Auth, 596 + P extends Params, 597 + I extends Input, 598 + O extends HandlerSuccess | void, 599 + >( 600 + authVerifier: 601 + | null 602 + | ((ctx: { req: Request; res: Response; params: P }) => Promise<A>), 603 + paramsVerifier: (req: Request) => P, 604 + inputVerifier: (req: Request) => I | Promise<I>, 605 + routeLimiter: RouteRateLimiter<HandlerContext<A, P, I>> | undefined, 606 + handler: MethodHandler<A, P, I, O>, 607 + validateOutput: 608 + | null 609 + | ((output: O) => void), 610 + ): Handler { 438 611 return async (c: Context) => { 439 612 try { 440 613 const params = paramsVerifier(c.req.raw); ··· 443 616 ? await authVerifier({ req: c.req.raw, res: c.res, params }) 444 617 : (undefined as A); 445 618 446 - let input: Input = undefined; 447 - if (def.type === "procedure") { 448 - input = await inputVerifier(c.req.raw); 449 - } 619 + const input = await inputVerifier(c.req.raw); 450 620 451 - const ctx: HandlerContext<A> = { 621 + const ctx: HandlerContext<A, P, I> = { 452 622 req: c.req.raw, 453 623 res: new Response(), 454 624 params, 455 625 input, 456 - auth: auth as A, 626 + auth, 457 627 resetRouteRateLimits: async () => { 458 628 if (routeLimiter) { 459 629 await routeLimiter.reset(ctx); ··· 461 631 }, 462 632 }; 463 633 464 - // Apply rate limiting (route-specific, which includes global if configured) 465 634 if (routeLimiter) { 466 635 await routeLimiter.handle(ctx); 467 636 } 468 637 469 - const output = await cfg.handler(ctx); 638 + const output = await handler(ctx); 639 + 640 + if (output === undefined) { 641 + validateOutput?.(output); 642 + return c.body(null, 200); 643 + } 644 + 470 645 if (isErrorResult(output)) { 471 646 throw XRPCError.fromErrorResult(output); 472 647 } ··· 476 651 return c.body(output.buffer.buffer as ArrayBuffer, 200, { 477 652 "Content-Type": output.encoding, 478 653 }); 479 - } else if (isHandlerPipeThroughStream(output)) { 654 + } 655 + 656 + if (isHandlerPipeThroughStream(output)) { 480 657 setHeaders(c, output.headers); 481 658 return c.body(output.stream, 200, { 482 659 "Content-Type": output.encoding, 483 660 }); 484 661 } 485 662 486 - if (output) { 487 - excludeErrorResult(output); 488 - validateOutputFn(output); 489 - } 663 + const successOutput = output as HandlerSuccess; 664 + validateOutput?.(successOutput as O); 665 + setHeaders(c, successOutput.headers); 490 666 491 - if (output) { 492 - setHeaders(c, output.headers); 493 - if (output.encoding === "application/json") { 494 - return c.json(ipldToJson(output.body) as JSON); 495 - } else { 496 - return c.body(output.body, 200, { 497 - "Content-Type": output.encoding, 498 - }); 499 - } 667 + if (successOutput.encoding === "application/json") { 668 + return c.json(ipldToJson(successOutput.body) as JSON); 500 669 } 501 670 502 - return c.body(null, 200); 671 + return c.body(successOutput.body, 200, { 672 + "Content-Type": successOutput.encoding, 673 + }); 503 674 } catch (err: unknown) { 504 675 throw err || new InternalServerError(); 505 676 } ··· 518 689 def: LexXrpcSubscription, 519 690 cfg: StreamConfig<A>, 520 691 ) { 521 - const paramsVerifier = this.createParamsVerifier(nsid, def); 522 - const authVerifier = this.createAuthVerifier(cfg); 692 + this.addSubscriptionInternal( 693 + nsid, 694 + this.createLexiconParamsVerifier(nsid, def), 695 + this.createAuthVerifier(cfg), 696 + cfg.handler, 697 + ); 698 + } 523 699 524 - const { handler } = cfg; 700 + protected addSubscriptionInternal<A extends Auth, P extends Params>( 701 + nsid: string, 702 + paramsVerifier: (req: Request) => P, 703 + authVerifier: 704 + | null 705 + | ((ctx: { req: Request; params: P }) => Promise<A>), 706 + handler: (ctx: StreamContext<A, P>) => AsyncIterable<unknown>, 707 + ): void { 708 + this.methodTypes.set(nsid, "subscription"); 525 709 this.subscriptions.set( 526 710 nsid, 527 711 new XrpcStreamServer({ 528 712 handler: async function* (req, signal) { 529 713 try { 530 - // validate request 531 714 const params = paramsVerifier(req); 532 - // authenticate request 533 715 const auth = authVerifier 534 716 ? await authVerifier({ req, params }) 535 717 : (undefined as A); 536 - // stream 718 + 537 719 for await (const item of handler({ req, params, auth, signal })) { 538 - if (item instanceof Frame) { 539 - yield item; 540 - continue; 541 - } 542 - const type = (item as Record<string, unknown>)?.["$type"]; 543 - if (!check.is(item, schema.map) || typeof type !== "string") { 544 - yield new MessageFrame(item); 545 - continue; 546 - } 547 - const split = type.split("#"); 548 - let t: string; 549 - if ( 550 - split.length === 2 && (split[0] === "" || split[0] === nsid) 551 - ) { 552 - t = `#${split[1]}`; 553 - } else { 554 - t = type; 555 - } 556 - const clone = { ...(item as Record<string, unknown>) }; 557 - delete clone["$type"]; 558 - yield new MessageFrame(clone, { type: t }); 720 + yield item instanceof Frame 721 + ? item 722 + : messageFrameFromValue(item, nsid); 559 723 } 560 724 } catch (err) { 561 725 const xrpcError = XRPCError.fromError(err); ··· 569 733 ); 570 734 } 571 735 572 - private createRouteRateLimiter<A extends Auth, C extends HandlerContext>( 736 + private createRouteRateLimiter< 737 + A extends Auth, 738 + P extends Params, 739 + I extends Input, 740 + O extends Output, 741 + C extends HandlerContext<A, P, I> = HandlerContext<A, P, I>, 742 + >( 573 743 nsid: string, 574 - config: MethodConfig<A>, 744 + config: MethodConfig<A, P, I, O>, 575 745 ): RouteRateLimiter<C> | undefined { 576 746 // @NOTE global & shared rate limiters are instantiated with a context of 577 747 // HandlerContext which is compatible (more generic) with the context of ··· 638 808 get handler(): Hono { 639 809 return this.app; 640 810 } 811 + } 812 + 813 + function getLexMethod<M extends Procedure | Query | Subscription>( 814 + method: LexMethodLike<M>, 815 + ): M { 816 + if (isLexMethod(method)) { 817 + return method; 818 + } 819 + 820 + if ("main" in method && isLexMethod(method.main)) { 821 + return method.main; 822 + } 823 + 824 + if ("Main" in method && isLexMethod(method.Main)) { 825 + return method.Main; 826 + } 827 + 828 + throw new TypeError( 829 + "Expected a lex method or a namespace with main/Main", 830 + ); 831 + } 832 + 833 + function isLexMethod( 834 + value: unknown, 835 + ): value is Procedure | Query | Subscription { 836 + return value instanceof Procedure || 837 + value instanceof Query || 838 + value instanceof Subscription; 839 + } 840 + 841 + function messageFrameFromValue( 842 + value: unknown, 843 + nsid: string, 844 + ): MessageFrame<unknown> { 845 + const type = (value as Record<string, unknown>)?.["$type"]; 846 + if (!check.is(value, schema.map) || typeof type !== "string") { 847 + return new MessageFrame(value); 848 + } 849 + 850 + const split = type.split("#"); 851 + let t: string; 852 + if (split.length === 2 && (split[0] === "" || split[0] === nsid)) { 853 + t = `#${split[1]}`; 854 + } else { 855 + t = type; 856 + } 857 + 858 + const clone = { ...(value as Record<string, unknown>) }; 859 + delete clone["$type"]; 860 + return new MessageFrame(clone, { type: t }); 861 + } 862 + 863 + function validateMessageFrame( 864 + frame: MessageFrame<unknown>, 865 + messageSchema: { parse(input: unknown): unknown }, 866 + nsid: string, 867 + ): MessageFrame<unknown> { 868 + const parsedBody = parseMessageBody( 869 + frame.body, 870 + frame.type, 871 + messageSchema, 872 + nsid, 873 + ); 874 + frame.body = stripMessageType(parsedBody, frame.type, nsid); 875 + return frame; 876 + } 877 + 878 + function parseMessageBody( 879 + value: unknown, 880 + type: string | undefined, 881 + messageSchema: { parse(input: unknown): unknown }, 882 + nsid: string, 883 + ): unknown { 884 + let lastError: unknown; 885 + 886 + for (const candidate of getMessageValueCandidates(value, type, nsid)) { 887 + try { 888 + return messageSchema.parse(candidate); 889 + } catch (error) { 890 + lastError = error; 891 + } 892 + } 893 + 894 + throw lastError; 895 + } 896 + 897 + function getMessageValueCandidates( 898 + value: unknown, 899 + type: string | undefined, 900 + nsid: string, 901 + ): unknown[] { 902 + if (type === undefined || !check.is(value, schema.map)) { 903 + return [value]; 904 + } 905 + 906 + return getMessageTypeCandidates(type, nsid).map((candidateType) => ({ 907 + ...(value as Record<string, unknown>), 908 + $type: candidateType, 909 + })); 910 + } 911 + 912 + function getMessageTypeCandidates(type: string, nsid: string): string[] { 913 + const candidates: string[] = []; 914 + const normalized = normalizeMessageType(type, nsid); 915 + const absolute = normalized.startsWith("#") ? `${nsid}${normalized}` : type; 916 + 917 + for (const candidate of [type, normalized, absolute]) { 918 + if (!candidates.includes(candidate)) { 919 + candidates.push(candidate); 920 + } 921 + } 922 + 923 + return candidates; 924 + } 925 + 926 + function stripMessageType( 927 + value: unknown, 928 + type: string | undefined, 929 + nsid: string, 930 + ): unknown { 931 + const body = value as Record<string, unknown>; 932 + if ( 933 + type === undefined || 934 + !check.is(value, schema.map) || 935 + typeof body["$type"] !== "string" || 936 + normalizeMessageType(body["$type"], nsid) !== 937 + normalizeMessageType(type, nsid) 938 + ) { 939 + return value; 940 + } 941 + 942 + const clone = { ...body }; 943 + delete clone["$type"]; 944 + return clone; 945 + } 946 + 947 + function normalizeMessageType(type: string, nsid: string): string { 948 + const split = type.split("#"); 949 + if (split.length === 2 && (split[0] === "" || split[0] === nsid)) { 950 + return `#${split[1]}`; 951 + } 952 + return type; 641 953 } 642 954 643 955 function createErrorHandler(
+6 -5
xrpc-server/tests/bodies_test.ts
··· 130 130 } 131 131 return result; 132 132 } catch (err) { 133 - if (err instanceof XRPCError) { 133 + if ( 134 + err instanceof XRPCError || 135 + err instanceof xrpcServer.XRPCError 136 + ) { 134 137 throw err; 135 138 } else { 136 139 throw new XRPCError( ··· 296 299 297 300 await assertRejects( 298 301 () => client.call("io.example.validationTestTwo"), 299 - Error, 300 - "The server gave an invalid response and may be out of date.", 302 + XRPCError, 303 + "Internal Server Error", 301 304 ); 302 305 }); 303 306 ··· 542 545 543 546 await t.step({ 544 547 name: "supports max blob size (based on content-length)", 545 - ignore: true, 546 548 async fn() { 547 549 const bytes = randomBytes(BLOB_LIMIT + 1); 548 550 ··· 568 570 569 571 await t.step({ 570 572 name: "supports max blob size (missing content-length)", 571 - ignore: true, 572 573 async fn() { 573 574 const bytes = randomBytes(BLOB_LIMIT + 1); 574 575
+35
xrpc-server/tests/errors_test.ts
··· 207 207 await closeServer(upstreamS); 208 208 }); 209 209 210 + Deno.test("validates responses by default on the server", { 211 + sanitizeOps: false, 212 + sanitizeResources: false, 213 + }, async () => { 214 + const validatingServer = xrpcServer.createServer(LEXICONS); 215 + validatingServer.method("io.example.invalidResponse", () => { 216 + return { encoding: "application/json", body: { something: "else" } }; 217 + }); 218 + 219 + const validatingHttpServer = await createServer(validatingServer); 220 + 221 + try { 222 + const port = (validatingHttpServer as Deno.HttpServer & { port: number }) 223 + .port; 224 + const validatingClient = new Client(`http://localhost:${port}`, LEXICONS); 225 + 226 + await assertRejects( 227 + async () => { 228 + await validatingClient.call("io.example.invalidResponse"); 229 + }, 230 + XRPCError, 231 + "Internal Server Error", 232 + ); 233 + 234 + const error = await validatingClient.call("io.example.invalidResponse") 235 + .catch((err) => err); 236 + assert(error instanceof XRPCError); 237 + assert(!(error instanceof XRPCInvalidResponseError)); 238 + assertEquals(error.status, 500); 239 + assertEquals(error.error, "InternalServerError"); 240 + } finally { 241 + await closeServer(validatingHttpServer); 242 + } 243 + }); 244 + 210 245 Deno.test("throws XRPCError for foo error", { 211 246 sanitizeOps: false, 212 247 sanitizeResources: false,
+406
xrpc-server/tests/lex_compat_test.ts
··· 1 + import { assertEquals, assertRejects } from "@std/assert"; 2 + import { Client, XRPCError } from "@atp/xrpc"; 3 + import { l } from "@atp/lex"; 4 + import { byFrame, MessageFrame } from "../mod.ts"; 5 + import * as xrpcServer from "../mod.ts"; 6 + import { closeServer, createServer } from "./_util.ts"; 7 + 8 + const echoQuery = l.query( 9 + "io.example.echoQuery", 10 + l.params({ 11 + message: l.string(), 12 + }), 13 + l.jsonPayload({ 14 + message: l.string(), 15 + }), 16 + ); 17 + 18 + const passthroughExtraQuery = l.query( 19 + "io.example.passthroughExtraQuery", 20 + l.params({ 21 + message: l.string(), 22 + }), 23 + l.jsonPayload({ 24 + message: l.string(), 25 + extra: l.optional(l.string()), 26 + }), 27 + ); 28 + 29 + const echoProcedure = l.procedure( 30 + "io.example.echoProcedure", 31 + l.params(), 32 + l.jsonPayload({ 33 + message: l.string(), 34 + }), 35 + l.jsonPayload({ 36 + message: l.string(), 37 + }), 38 + ); 39 + 40 + const echoBinaryProcedure = l.procedure( 41 + "io.example.echoBinaryProcedure", 42 + l.params(), 43 + l.payload("application/octet-stream", l.bytes()), 44 + l.payload("application/octet-stream", l.bytes()), 45 + ); 46 + 47 + const limitedBinaryProcedure = l.procedure( 48 + "io.example.limitedBinaryProcedure", 49 + l.params(), 50 + l.payload("application/octet-stream", l.bytes()), 51 + l.jsonPayload({ 52 + ok: l.boolean(), 53 + }), 54 + ); 55 + 56 + const countSubscription = l.subscription( 57 + "io.example.countSubscription", 58 + l.params({ 59 + count: l.integer(), 60 + }), 61 + l.object({ 62 + count: l.integer(), 63 + }), 64 + ); 65 + 66 + type Expect<T extends true> = T; 67 + type Equals<A, B> = (<T>() => T extends A ? 1 : 2) extends 68 + (<T>() => T extends B ? 1 : 2) ? true 69 + : false; 70 + type StreamAuth<T> = T extends ( 71 + ctx: infer C, 72 + ) => AsyncIterable<unknown> ? C extends { auth: infer A } ? A : never 73 + : never; 74 + type _lexSubscriptionHandlerAuth = Expect< 75 + Equals< 76 + StreamAuth<xrpcServer.LexSubscriptionHandler<typeof countSubscription>>, 77 + xrpcServer.Auth 78 + > 79 + >; 80 + 81 + const oddCountMessage = l.typedObject( 82 + "io.example.typedCountSubscription", 83 + "odd", 84 + l.object({ 85 + count: l.integer(), 86 + }), 87 + ); 88 + 89 + const evenCountMessage = l.typedObject( 90 + "io.example.typedCountSubscription", 91 + "even", 92 + l.object({ 93 + count: l.integer(), 94 + }), 95 + ); 96 + 97 + const typedCountSubscription = l.subscription( 98 + "io.example.typedCountSubscription", 99 + l.params({ 100 + count: l.integer(), 101 + }), 102 + l.typedUnion([ 103 + l.typedRef(() => oddCountMessage), 104 + l.typedRef(() => evenCountMessage), 105 + ], false), 106 + ); 107 + 108 + const invalidQuery = l.query( 109 + "io.example.invalidQuery", 110 + l.params(), 111 + l.jsonPayload({ 112 + message: l.string(), 113 + }), 114 + ); 115 + 116 + const defaultedQuery = l.query( 117 + "io.example.defaultedQuery", 118 + l.params(), 119 + l.jsonPayload({ 120 + message: l.string({ default: "hello default" }), 121 + }), 122 + ); 123 + 124 + let server: xrpcServer.Server; 125 + let httpServer: Deno.HttpServer; 126 + let client: Client; 127 + let baseUrl: string; 128 + 129 + Deno.test.beforeAll(async () => { 130 + server = xrpcServer.createServer(); 131 + 132 + server.add(echoQuery, { 133 + handler: ({ params }) => ({ 134 + encoding: "application/json", 135 + body: { message: params.message }, 136 + }), 137 + }); 138 + 139 + server.add(passthroughExtraQuery, { 140 + handler: ({ params }) => { 141 + const extra = (params as Record<string, unknown>).extra; 142 + return { 143 + encoding: "application/json", 144 + body: { 145 + message: params.message, 146 + extra: typeof extra === "string" ? extra : undefined, 147 + }, 148 + }; 149 + }, 150 + }); 151 + 152 + server.add(echoProcedure, { 153 + handler: ({ input }) => ({ 154 + encoding: "application/json", 155 + body: { message: input.body.message }, 156 + }), 157 + }); 158 + 159 + server.add(echoBinaryProcedure, { 160 + handler: ({ input }) => ({ 161 + encoding: "application/octet-stream", 162 + body: input.body, 163 + }), 164 + }); 165 + 166 + server.add(countSubscription, { 167 + handler: async function* ({ params }) { 168 + yield { count: params.count }; 169 + }, 170 + }); 171 + 172 + server.add( 173 + typedCountSubscription, 174 + { 175 + handler: async function* ({ params }: { params: { count: number } }) { 176 + yield new MessageFrame({ count: params.count }, { type: "#odd" }); 177 + }, 178 + } as unknown as xrpcServer.LexSubscriptionConfig< 179 + typeof typedCountSubscription 180 + >, 181 + ); 182 + 183 + server.add(defaultedQuery, { 184 + handler: () => ({ 185 + encoding: "application/json", 186 + body: {} as unknown as { message: string }, 187 + }), 188 + }); 189 + 190 + httpServer = await createServer(server); 191 + const port = (httpServer as Deno.HttpServer & { port: number }).port; 192 + baseUrl = `http://localhost:${port}`; 193 + client = new Client(baseUrl); 194 + }); 195 + 196 + Deno.test.afterAll(async () => { 197 + await closeServer(httpServer); 198 + }); 199 + 200 + Deno.test("registers queries from lex sdk methods", { 201 + sanitizeOps: false, 202 + sanitizeResources: false, 203 + }, async () => { 204 + const response = await client.call(echoQuery, { 205 + params: { message: "hello query" }, 206 + }); 207 + 208 + assertEquals(response.data, { message: "hello query" }); 209 + }); 210 + 211 + Deno.test("preserves undeclared query params for lex sdk methods", { 212 + sanitizeOps: false, 213 + sanitizeResources: false, 214 + }, async () => { 215 + const response = await fetch( 216 + `${baseUrl}/xrpc/${passthroughExtraQuery.nsid}?message=hello&extra=world`, 217 + ); 218 + 219 + assertEquals(await response.json(), { 220 + message: "hello", 221 + extra: "world", 222 + }); 223 + }); 224 + 225 + Deno.test("registers procedures from lex sdk methods", { 226 + sanitizeOps: false, 227 + sanitizeResources: false, 228 + }, async () => { 229 + const response = await client.call(echoProcedure, { 230 + body: { message: "hello procedure" }, 231 + }); 232 + 233 + assertEquals(response.data, { message: "hello procedure" }); 234 + }); 235 + 236 + Deno.test("registers binary procedures from lex sdk methods", { 237 + sanitizeOps: false, 238 + sanitizeResources: false, 239 + }, async () => { 240 + const bytes = new TextEncoder().encode("hello binary"); 241 + const response = await client.call(echoBinaryProcedure, { 242 + body: bytes, 243 + encoding: "application/octet-stream", 244 + }); 245 + 246 + assertEquals(response.data, bytes); 247 + }); 248 + 249 + Deno.test("enforces blob limits for lex sdk binary procedures", { 250 + sanitizeOps: false, 251 + sanitizeResources: false, 252 + }, async () => { 253 + const limitingServer = xrpcServer.createServer(undefined, { 254 + payload: { blobLimit: 1 }, 255 + }); 256 + limitingServer.add(limitedBinaryProcedure, { 257 + handler: () => ({ 258 + encoding: "application/json", 259 + body: { ok: true }, 260 + }), 261 + }); 262 + 263 + const limitingHttpServer = await createServer(limitingServer); 264 + 265 + try { 266 + const port = (limitingHttpServer as Deno.HttpServer & { port: number }) 267 + .port; 268 + const limitingClient = new Client(`http://localhost:${port}`); 269 + 270 + await assertRejects( 271 + () => 272 + limitingClient.call(limitedBinaryProcedure, { 273 + body: new Uint8Array([1, 2]), 274 + encoding: "application/octet-stream", 275 + }), 276 + XRPCError, 277 + "request entity too large", 278 + ); 279 + } finally { 280 + await closeServer(limitingHttpServer); 281 + } 282 + }); 283 + 284 + Deno.test("validates lex sdk responses by default", { 285 + sanitizeOps: false, 286 + sanitizeResources: false, 287 + }, async () => { 288 + const validatingServer = xrpcServer.createServer(); 289 + validatingServer.add(invalidQuery, { 290 + handler: () => ({ 291 + encoding: "application/json", 292 + body: { something: "else" } as unknown as { message: string }, 293 + }), 294 + }); 295 + 296 + const validatingHttpServer = await createServer(validatingServer); 297 + 298 + try { 299 + const port = (validatingHttpServer as Deno.HttpServer & { port: number }) 300 + .port; 301 + const response = await fetch( 302 + `http://localhost:${port}/xrpc/${invalidQuery.nsid}`, 303 + ); 304 + const payload = await response.json(); 305 + 306 + assertEquals(response.status, 500); 307 + assertEquals(payload.error, "InternalServerError"); 308 + } finally { 309 + await closeServer(validatingHttpServer); 310 + } 311 + }); 312 + 313 + Deno.test("applies parsed lex sdk response bodies", { 314 + sanitizeOps: false, 315 + sanitizeResources: false, 316 + }, async () => { 317 + const response = await client.call(defaultedQuery); 318 + 319 + assertEquals(response.data, { message: "hello default" }); 320 + }); 321 + 322 + Deno.test("registers subscriptions from lex sdk methods", { 323 + sanitizeOps: false, 324 + sanitizeResources: false, 325 + }, async () => { 326 + const ws = new WebSocket( 327 + `${baseUrl.replace("http", "ws")}/xrpc/${countSubscription.nsid}?count=3`, 328 + ); 329 + 330 + try { 331 + await new Promise<void>((resolve, reject) => { 332 + ws.onopen = () => resolve(); 333 + ws.onerror = () => reject(new Error("Connection failed")); 334 + }); 335 + 336 + const frames = []; 337 + for await (const frame of byFrame(ws)) { 338 + frames.push(frame); 339 + } 340 + 341 + assertEquals(frames, [new MessageFrame({ count: 3 })]); 342 + } finally { 343 + if ( 344 + ws.readyState === WebSocket.OPEN || 345 + ws.readyState === WebSocket.CONNECTING 346 + ) { 347 + ws.close(); 348 + } 349 + await new Promise<void>((resolve) => { 350 + if (ws.readyState === WebSocket.CLOSED) { 351 + resolve(); 352 + return; 353 + } 354 + const onClose = () => { 355 + ws.removeEventListener("close", onClose); 356 + resolve(); 357 + }; 358 + ws.addEventListener("close", onClose); 359 + }); 360 + } 361 + }); 362 + 363 + Deno.test("registers typed message frames from lex sdk subscriptions", { 364 + sanitizeOps: false, 365 + sanitizeResources: false, 366 + }, async () => { 367 + const ws = new WebSocket( 368 + `${ 369 + baseUrl.replace("http", "ws") 370 + }/xrpc/${typedCountSubscription.nsid}?count=3`, 371 + ); 372 + 373 + try { 374 + await new Promise<void>((resolve, reject) => { 375 + ws.onopen = () => resolve(); 376 + ws.onerror = () => reject(new Error("Connection failed")); 377 + }); 378 + 379 + const frames = []; 380 + for await (const frame of byFrame(ws)) { 381 + frames.push(frame); 382 + } 383 + 384 + assertEquals(frames, [ 385 + new MessageFrame({ count: 3 }, { type: "#odd" }), 386 + ]); 387 + } finally { 388 + if ( 389 + ws.readyState === WebSocket.OPEN || 390 + ws.readyState === WebSocket.CONNECTING 391 + ) { 392 + ws.close(); 393 + } 394 + await new Promise<void>((resolve) => { 395 + if (ws.readyState === WebSocket.CLOSED) { 396 + resolve(); 397 + return; 398 + } 399 + const onClose = () => { 400 + ws.removeEventListener("close", onClose); 401 + resolve(); 402 + }; 403 + ws.addEventListener("close", onClose); 404 + }); 405 + } 406 + });
-1
xrpc-server/tests/procedures_test.ts
··· 117 117 }; 118 118 }, 119 119 ); 120 - 121 120 s = await createServer(server); 122 121 const port = (s as Deno.HttpServer & { port: number }).port; 123 122 client = new Client(`http://localhost:${port}`, LEXICONS);
+57
xrpc-server/types.ts
··· 1 1 import type { Context, HonoRequest, Next } from "hono"; 2 + import type { 3 + InferMethodInput, 4 + InferMethodMessage, 5 + InferMethodOutput, 6 + InferMethodParams, 7 + Procedure, 8 + Query, 9 + Subscription, 10 + } from "@atp/lex"; 2 11 import { z } from "zod"; 3 12 import type { ErrorResult, XRPCError } from "./errors.ts"; 4 13 import type { CalcKeyFn, CalcPointsFn } from "./rate-limiter.ts"; ··· 277 286 A extends AuthResult = AuthResult, 278 287 P extends Params = Params, 279 288 > = AuthVerifier<StreamAuthContext<P>, A>; 289 + 290 + export type LexMethod = Procedure | Query | Subscription; 291 + export type LexMethodNamespace<M extends LexMethod = LexMethod> = 292 + | { readonly main: M } 293 + | { readonly Main: M }; 294 + export type LexMethodLike<M extends LexMethod = LexMethod> = 295 + | M 296 + | LexMethodNamespace<M>; 297 + 298 + export type LexMethodParams<M extends Procedure | Query | Subscription> = 299 + InferMethodParams<M>; 300 + 301 + export type LexMethodInput<M extends Procedure | Query> = InferMethodInput< 302 + M, 303 + ReadableStream<Uint8Array> 304 + >; 305 + 306 + export type LexMethodOutput<M extends Procedure | Query> = 307 + InferMethodOutput<M, Uint8Array | ReadableStream<Uint8Array>> extends 308 + undefined 309 + ? InferMethodOutput<M, Uint8Array | ReadableStream<Uint8Array>> | void 310 + : InferMethodOutput<M, Uint8Array | ReadableStream<Uint8Array>>; 311 + 312 + export type LexMethodMessage<M extends Subscription> = InferMethodMessage<M>; 313 + 314 + export type LexMethodHandler< 315 + M extends Procedure | Query, 316 + A extends Auth = Auth, 317 + > = MethodHandler<A, LexMethodParams<M>, LexMethodInput<M>, LexMethodOutput<M>>; 318 + 319 + export type LexMethodConfig< 320 + M extends Procedure | Query, 321 + A extends Auth = Auth, 322 + > = MethodConfig<A, LexMethodParams<M>, LexMethodInput<M>, LexMethodOutput<M>>; 323 + 324 + export type LexSubscriptionHandler< 325 + M extends Subscription, 326 + A extends Auth = Auth, 327 + > = StreamHandler< 328 + A, 329 + LexMethodParams<M>, 330 + LexMethodMessage<M> 331 + >; 332 + 333 + export type LexSubscriptionConfig< 334 + M extends Subscription, 335 + A extends Auth = Auth, 336 + > = StreamConfig<A, LexMethodParams<M>, LexMethodMessage<M>>; 280 337 281 338 /** 282 339 * Configuration for server-level rate limits.
+265 -35
xrpc-server/util.ts
··· 6 6 type LexXrpcQuery, 7 7 type LexXrpcSubscription, 8 8 } from "@atp/lexicon"; 9 + import { Procedure, type Query, type Subscription } from "@atp/lex"; 9 10 import { 10 11 InternalServerError, 11 12 InvalidRequestError, ··· 17 18 Awaitable, 18 19 HandlerSuccess, 19 20 Input, 21 + LexMethodInput, 22 + LexMethodOutput, 23 + LexMethodParams, 20 24 Params, 21 25 RouteOptions, 22 26 } from "./types.ts"; ··· 105 109 return result; 106 110 } 107 111 112 + function getSearchParams(url = ""): URLSearchParams { 113 + return new URL(url ?? "", "http://x").searchParams; 114 + } 115 + 108 116 /** 109 117 * Represents a request-like object with essential HTTP request properties. 110 118 * Used for handling both standard HTTP requests and custom request implementations. ··· 174 182 } 175 183 } 176 184 185 + export function createLexiconParamsVerifier( 186 + nsid: string, 187 + def: LexXrpcQuery | LexXrpcProcedure | LexXrpcSubscription, 188 + lexicons: Lexicons, 189 + ): (req: Request) => Params { 190 + return (req) => { 191 + const queryParams = getQueryParams(req.url); 192 + const params = decodeQueryParams(def, queryParams); 193 + try { 194 + return lexicons.assertValidXrpcParams(nsid, params) as Params; 195 + } catch (e) { 196 + throw new InvalidRequestError(String(e)); 197 + } 198 + }; 199 + } 200 + 201 + export function createSchemaParamsVerifier< 202 + M extends Procedure | Query | Subscription, 203 + >( 204 + method: M, 205 + ): (req: Request) => LexMethodParams<M> { 206 + return (req) => { 207 + try { 208 + return method.parameters.fromURLSearchParams( 209 + getSearchParams(req.url), 210 + ) as LexMethodParams<M>; 211 + } catch (e) { 212 + throw new InvalidRequestError( 213 + e instanceof Error ? e.message : String(e), 214 + ); 215 + } 216 + }; 217 + } 218 + 177 219 const ENCODING_ANY = "*/*"; 178 220 179 221 function parseDefEncoding({ encoding }: LexXrpcBody) { ··· 230 272 inputEncoding: string, 231 273 options: RouteOptions, 232 274 ): ((req: Request, encoding: string) => Promise<unknown>) | undefined { 233 - if (inputEncoding === ENCODING_ANY) { 234 - // When the lexicon's input encoding is */*, the handler will determine how to process it 275 + if ( 276 + inputEncoding === ENCODING_ANY || 277 + ( 278 + inputEncoding !== "application/json" && 279 + inputEncoding !== "json" && 280 + !inputEncoding.startsWith("text/") && 281 + inputEncoding !== "application/x-www-form-urlencoded" 282 + ) 283 + ) { 235 284 return; 236 285 } 237 286 const { jsonLimit, textLimit } = options; ··· 264 313 }; 265 314 } 266 315 316 + async function parseBodyForSchemaValidation( 317 + req: Request, 318 + encoding: string, 319 + options: RouteOptions, 320 + ): Promise<unknown> { 321 + const contentLength = req.headers.get("content-length"); 322 + const bodySize = contentLength ? parseInt(contentLength, 10) : 0; 323 + 324 + if (encoding === "application/json" || encoding === "json") { 325 + if (options.jsonLimit && bodySize > options.jsonLimit) { 326 + throw new InvalidRequestError( 327 + `Request body too large: ${bodySize} bytes exceeds JSON limit of ${options.jsonLimit} bytes`, 328 + ); 329 + } 330 + return JSON.parse(await req.text()); 331 + } 332 + 333 + if ( 334 + encoding.startsWith("text/") || 335 + encoding === "application/x-www-form-urlencoded" 336 + ) { 337 + if (options.textLimit && bodySize > options.textLimit) { 338 + throw new InvalidRequestError( 339 + `Request body too large: ${bodySize} bytes exceeds text limit of ${options.textLimit} bytes`, 340 + ); 341 + } 342 + return await req.text(); 343 + } 344 + 345 + const body = decodeBodyStream(req, options.blobLimit); 346 + if (body === null) { 347 + return new Uint8Array(0); 348 + } 349 + return new Uint8Array(await new Response(body).arrayBuffer()); 350 + } 351 + 352 + function toLexBody(value: unknown): unknown { 353 + if (value === undefined || value instanceof Uint8Array) { 354 + return value; 355 + } 356 + return jsonToLex(value); 357 + } 358 + 267 359 function decodeBodyStream( 268 360 req: Request, 269 361 maxSize: number | undefined, ··· 273 365 274 366 if (!req.body) { 275 367 return null; 276 - } 277 - 278 - if (!contentEncoding) { 279 - return req.body; 280 - } 281 - 282 - if (!contentLength) { 283 - throw new XRPCError( 284 - ResponseType.UnsupportedMediaType, 285 - "unsupported content-encoding", 286 - ); 287 368 } 288 369 289 370 const contentLengthParsed = contentLength ··· 305 386 ); 306 387 } 307 388 308 - let transforms: TransformStream[]; 309 - try { 310 - transforms = createDecoders(contentEncoding); 311 - } catch (cause) { 312 - throw new XRPCError( 313 - ResponseType.UnsupportedMediaType, 314 - "unsupported content-encoding", 315 - undefined, 316 - { cause }, 317 - ); 389 + let stream: ReadableStream = req.body; 390 + 391 + if (contentEncoding) { 392 + if (!contentLength) { 393 + throw new XRPCError( 394 + ResponseType.UnsupportedMediaType, 395 + "unsupported content-encoding", 396 + ); 397 + } 398 + 399 + let transforms: TransformStream[]; 400 + try { 401 + transforms = createDecoders(contentEncoding); 402 + } catch (cause) { 403 + throw new XRPCError( 404 + ResponseType.UnsupportedMediaType, 405 + "unsupported content-encoding", 406 + undefined, 407 + { cause }, 408 + ); 409 + } 410 + 411 + for (const transform of transforms) { 412 + stream = stream.pipeThrough(transform); 413 + } 318 414 } 319 415 320 416 if (maxSize !== undefined) { 321 - const maxSizeChecker = new MaxSizeChecker( 322 - maxSize, 323 - () => 324 - new XRPCError(ResponseType.PayloadTooLarge, "request entity too large"), 417 + stream = stream.pipeThrough( 418 + new MaxSizeChecker( 419 + maxSize, 420 + () => 421 + new XRPCError( 422 + ResponseType.PayloadTooLarge, 423 + "request entity too large", 424 + ), 425 + ), 325 426 ); 326 - transforms.push(maxSizeChecker); 327 - } 328 - 329 - let stream: ReadableStream = req.body; 330 - for (const transform of transforms) { 331 - stream = stream.pipeThrough(transform); 332 427 } 333 428 334 429 return stream; ··· 508 603 * @param def - The lexicon definition for the method 509 604 * @returns A function that verifies request input 510 605 */ 511 - export function createInputVerifier( 606 + export function createLexiconInputVerifier( 512 607 nsid: string, 513 608 def: LexXrpcProcedure | LexXrpcQuery, 514 609 options: RouteOptions, ··· 569 664 // Validate against schema if defined 570 665 if (input.schema) { 571 666 try { 572 - const lexBody = parsedBody ? jsonToLex(parsedBody) : parsedBody; 667 + if (parsedBody === undefined) { 668 + parsedBody = await parseBodyForSchemaValidation( 669 + req, 670 + reqEncoding, 671 + options, 672 + ); 673 + } 674 + const lexBody = toLexBody(parsedBody); 573 675 parsedBody = lexicons.assertValidXrpcInput(nsid, lexBody); 574 676 } catch (e) { 575 677 throw new InvalidRequestError( ··· 587 689 return { encoding: reqEncoding, body }; 588 690 }; 589 691 } 692 + 693 + export function createSchemaInputVerifier<M extends Procedure | Query>( 694 + method: M, 695 + options: RouteOptions, 696 + ): (req: Request) => Awaitable<LexMethodInput<M>> { 697 + const input = method instanceof Procedure ? method.input : undefined; 698 + 699 + if (!input?.encoding) { 700 + return (req) => { 701 + if (getBodyPresence(req) === "present") { 702 + throw new InvalidRequestError( 703 + `A request body was provided when none was expected`, 704 + ); 705 + } 706 + 707 + return undefined as LexMethodInput<M>; 708 + }; 709 + } 710 + 711 + const { blobLimit } = options; 712 + const allowedEncodings = parseDefEncoding(input as LexXrpcBody); 713 + const checkEncoding = allowedEncodings.includes(ENCODING_ANY) 714 + ? undefined 715 + : (encoding: string) => allowedEncodings.includes(encoding); 716 + const bodyParser = createBodyParser(input.encoding, options); 717 + 718 + return async (req) => { 719 + if (getBodyPresence(req) === "missing") { 720 + throw new InvalidRequestError( 721 + `A request body is expected but none was provided`, 722 + ); 723 + } 724 + 725 + const reqEncoding = parseReqEncoding(req); 726 + if (checkEncoding && !checkEncoding(reqEncoding)) { 727 + throw new InvalidRequestError( 728 + `Wrong request encoding (Content-Type): ${reqEncoding}`, 729 + ); 730 + } 731 + 732 + let parsedBody: unknown = undefined; 733 + 734 + if (bodyParser) { 735 + try { 736 + parsedBody = await bodyParser(req, reqEncoding); 737 + } catch (e) { 738 + throw new InvalidRequestError( 739 + e instanceof Error ? e.message : String(e), 740 + ); 741 + } 742 + } 743 + 744 + if (input.schema) { 745 + try { 746 + if (parsedBody === undefined) { 747 + parsedBody = await parseBodyForSchemaValidation( 748 + req, 749 + reqEncoding, 750 + options, 751 + ); 752 + } 753 + const lexBody = toLexBody(parsedBody); 754 + parsedBody = input.schema.parse(lexBody); 755 + } catch (e) { 756 + throw new InvalidRequestError( 757 + e instanceof Error ? e.message : String(e), 758 + ); 759 + } 760 + } 761 + 762 + const body = parsedBody !== undefined 763 + ? parsedBody 764 + : decodeBodyStream(req, blobLimit); 765 + 766 + return { encoding: reqEncoding, body } as LexMethodInput<M>; 767 + }; 768 + } 769 + 770 + export function createSchemaOutputVerifier<M extends Procedure | Query>( 771 + method: M, 772 + ): (output: LexMethodOutput<M>) => void { 773 + const output = method.output; 774 + 775 + if (!output.encoding) { 776 + return (handlerOutput) => { 777 + if (handlerOutput !== undefined) { 778 + throw new InternalServerError( 779 + `A response body was provided when none was expected`, 780 + ); 781 + } 782 + }; 783 + } 784 + 785 + return (handlerOutput) => { 786 + if (handlerOutput === undefined) { 787 + throw new InternalServerError( 788 + `A response body is expected but none was provided`, 789 + ); 790 + } 791 + 792 + const result = handlerSuccess.safeParse(handlerOutput); 793 + if (!result.success) { 794 + throw new InternalServerError(`Invalid handler output`, undefined, { 795 + cause: result.error, 796 + }); 797 + } 798 + 799 + const successOutput = handlerOutput as HandlerSuccess; 800 + 801 + if (!isValidEncoding(output as LexXrpcBody, successOutput.encoding)) { 802 + throw new InternalServerError( 803 + `Invalid response encoding: ${successOutput.encoding}`, 804 + ); 805 + } 806 + 807 + if (output.schema) { 808 + const bodyResult = output.schema.safeParse(successOutput.body); 809 + if (!bodyResult.success) { 810 + throw new InternalServerError(bodyResult.error.message, undefined, { 811 + cause: bodyResult.error, 812 + }); 813 + } 814 + successOutput.body = bodyResult.value; 815 + } 816 + }; 817 + } 818 + 819 + export { createLexiconInputVerifier as createInputVerifier }; 590 820 591 821 /** 592 822 * Sets headers on a Hono context response.