Suite of AT Protocol TypeScript libraries built on web standards
1import { byteIterableToStream } from "@atp/common";
2import type { LexiconDoc } from "@atp/lexicon";
3import { Client } from "./_xrpc-client.ts";
4import * as xrpcServer from "../mod.ts";
5import { closeServer, createServer } from "./_util.ts";
6import { assertEquals, assertInstanceOf } from "@std/assert";
7
8const LEXICONS: LexiconDoc[] = [
9 {
10 lexicon: 1,
11 id: "io.example.readableStream",
12 defs: {
13 main: {
14 type: "query",
15 parameters: {
16 type: "params",
17 properties: {
18 shouldErr: { type: "boolean" },
19 },
20 },
21 output: {
22 encoding: "application/vnd.ipld.car",
23 },
24 },
25 },
26 },
27];
28
29async function setupServer() {
30 const server = xrpcServer.createServer(LEXICONS);
31 server.method(
32 "io.example.readableStream",
33 (ctx: { params: xrpcServer.Params }) => {
34 async function* iter(): AsyncIterable<Uint8Array> {
35 for (let i = 0; i < 5; i++) {
36 yield new Uint8Array([i]);
37 }
38 if (ctx.params.shouldErr) {
39 throw new Error("error");
40 }
41 }
42 return {
43 encoding: "application/vnd.ipld.car",
44 body: byteIterableToStream(iter()),
45 };
46 },
47 );
48
49 const s = await createServer(server);
50 const port = (s as Deno.HttpServer & { port: number }).port;
51 const client = new Client(`http://localhost:${port}`, LEXICONS);
52
53 return { server: s, client };
54}
55
56Deno.test("returns readable streams of bytes", async () => {
57 const { server, client } = await setupServer();
58 try {
59 const res = await client.call("io.example.readableStream", {
60 shouldErr: false,
61 });
62 const expected = new Uint8Array([0, 1, 2, 3, 4]);
63 assertEquals(res.data, expected);
64 } finally {
65 await closeServer(server);
66 }
67});
68
69Deno.test("handles errs on readable streams of bytes", async () => {
70 const { server, client } = await setupServer();
71 try {
72 const originalConsoleError = console.error;
73 console.error = () => {}; // Suppress expected error log
74
75 let err: unknown;
76 try {
77 await client.call("io.example.readableStream", {
78 shouldErr: true,
79 });
80 } catch (e) {
81 err = e;
82 }
83 assertInstanceOf(err, Error);
84
85 console.error = originalConsoleError; // Restore
86 } finally {
87 await closeServer(server);
88 }
89});