···11# Identity Tests
2233-This directory contains tests for the ATP identity system, including DID resolution and caching functionality.
33+This directory contains tests for the ATP identity system, including DID
44+resolution and caching functionality.
4556## Current Status
6778### Working Tests
99+810- `did-document.test.ts` - DID document parsing and validation ✅
99-- `did-cache.test.ts` - DID caching functionality ✅ (fixed with mock implementation)
1111+- `did-cache.test.ts` - DID caching functionality ✅ (fixed with mock
1212+ implementation)
10131114### Partially Working Tests
1212-- `handle-resolver.test.ts` - Handle resolution (2/4 tests passing, DNS resolution issues)
1515+1616+- `handle-resolver.test.ts` - Handle resolution (2/4 tests passing, DNS
1717+ resolution issues)
13181419### Known Issues
15201621#### PLC Tests (did-resolver.test.ts)
1717-The `did-resolver.test.ts` tests are failing due to a version compatibility issue between the PLC library packages:
2222+2323+The `did-resolver.test.ts` tests are failing due to a version compatibility
2424+issue between the PLC library packages:
18251926- `@did-plc/lib` v0.0.4 (client library)
2027- `@did-plc/server` v0.0.1 (server library, published 2 years ago)
21282222-**Problem**: The server rejects signatures generated by the client with "Invalid signature on op" errors, even though the signature generation appears to be working correctly.
2929+**Problem**: The server rejects signatures generated by the client with "Invalid
3030+signature on op" errors, even though the signature generation appears to be
3131+working correctly.
23322424-**Root Cause**: The server package is significantly older and uses different validation logic than the client library expects.
3333+**Root Cause**: The server package is significantly older and uses different
3434+validation logic than the client library expects.
25352626-**Fixed for did-cache.test.ts**: Implemented a mock HTTP server approach that bypasses the PLC compatibility issue while still testing the caching functionality.
3636+**Fixed for did-cache.test.ts**: Implemented a mock HTTP server approach that
3737+bypasses the PLC compatibility issue while still testing the caching
3838+functionality.
27392840#### Handle Resolver Tests
2929-Some DNS resolution tests are failing, possibly due to network/environment issues or missing test DNS records.
4141+4242+Some DNS resolution tests are failing, possibly due to network/environment
4343+issues or missing test DNS records.
30443145**Potential Solutions**:
4646+32471. Wait for updated compatible versions of the PLC packages
33482. Switch to the `@atproto/plc` ecosystem (if available and compatible)
34493. Mock the PLC server interactions for testing purposes
···36513752## Server Changes
38533939-The `web/server.ts` file has been successfully converted from Express to use `Deno.serve` while maintaining the same API and functionality. The web server tests should work once the PLC dependency issues are resolved.
5454+The `web/server.ts` file has been successfully converted from Express to use
5555+`Deno.serve` while maintaining the same API and functionality. The web server
5656+tests should work once the PLC dependency issues are resolved.
40574158## Running Tests
4259
···11+import type { IncomingMessage } from "node:http";
22+import type { CID } from "multiformats/cid";
33+import { type LexiconDoc, Lexicons } from "@atp/lexicon";
44+import type { Auth, ErrorFrame } from "@atp/xrpc-server";
55+66+// @NOTE: this file is an ugly copy job of codegen output. I'd like to clean this whole thing up
77+88+export function isObj(v: unknown): v is Record<string, unknown> {
99+ return typeof v === "object" && v !== null;
1010+}
1111+1212+export function hasProp<K extends PropertyKey>(
1313+ data: object,
1414+ prop: K,
1515+): data is Record<K, unknown> {
1616+ return prop in data;
1717+}
1818+1919+export interface QueryParams {
2020+ /** The last known event seq number to backfill from. */
2121+ cursor?: number;
2222+}
2323+2424+export type RepoEvent =
2525+ | Commit
2626+ | Identity
2727+ | Account
2828+ | Sync
2929+ | Info
3030+ | { $type: string; [k: string]: unknown };
3131+export type HandlerError = ErrorFrame<"FutureCursor" | "ConsumerTooSlow">;
3232+export type HandlerOutput = HandlerError | RepoEvent;
3333+export type HandlerReqCtx<HA extends Auth = never> = {
3434+ auth: HA;
3535+ params: QueryParams;
3636+ req: IncomingMessage;
3737+ signal: AbortSignal;
3838+};
3939+export type Handler<HA extends Auth = never> = (
4040+ ctx: HandlerReqCtx<HA>,
4141+) => AsyncIterable<HandlerOutput>;
4242+4343+/** Represents an update of repository state. Note that empty commits are allowed, which include no repo data changes, but an update to rev and signature. */
4444+export interface Commit {
4545+ /** The stream sequence number of this message. */
4646+ seq: number;
4747+ /** DEPRECATED -- unused */
4848+ rebase: boolean;
4949+ /** Indicates that this commit contained too many ops, or data size was too large. Consumers will need to make a separate request to get missing data. */
5050+ tooBig: boolean;
5151+ /** The repo this event comes from. */
5252+ repo: string;
5353+ /** Repo commit object CID. */
5454+ commit: CID;
5555+ /** DEPRECATED -- unused. WARNING -- nullable and optional; stick with optional to ensure golang interoperability. */
5656+ prev?: CID | null;
5757+ /** The rev of the emitted commit. Note that this information is also in the commit object included in blocks, unless this is a tooBig event. */
5858+ rev: string;
5959+ /** The rev of the last emitted commit from this repo (if any). */
6060+ since: string | null;
6161+ /** CAR file containing relevant blocks, as a diff since the previous repo state. */
6262+ blocks: Uint8Array;
6363+ ops: RepoOp[];
6464+ blobs: CID[];
6565+ /** Timestamp of when this message was originally broadcast. */
6666+ time: string;
6767+ [k: string]: unknown;
6868+}
6969+7070+export function isCommit(v: unknown): v is Commit {
7171+ return (
7272+ isObj(v) &&
7373+ hasProp(v, "$type") &&
7474+ v.$type === "com.atproto.sync.subscribeRepos#commit"
7575+ );
7676+}
7777+7878+/** Updates the repo to a new state, without necessarily including that state on the firehose. Used to recover from broken commit streams, data loss incidents, or in situations where upstream host does not know recent state of the repository. */
7979+export interface Sync {
8080+ $type?: "com.atproto.sync.subscribeRepos#sync";
8181+ /** The stream sequence number of this message. */
8282+ seq: number;
8383+ /** The account this repo event corresponds to. Must match that in the commit object. */
8484+ did: string;
8585+ /** CAR file containing the commit, as a block. The CAR header must include the commit block CID as the first 'root'. */
8686+ blocks: Uint8Array;
8787+ /** The rev of the commit. This value must match that in the commit object. */
8888+ rev: string;
8989+ /** Timestamp of when this message was originally broadcast. */
9090+ time: string;
9191+}
9292+9393+export function isSync(v: unknown): v is Sync {
9494+ return (
9595+ isObj(v) &&
9696+ hasProp(v, "$type") &&
9797+ v.$type === "com.atproto.sync.subscribeRepos#sync"
9898+ );
9999+}
100100+101101+/** Represents a change to an account's identity. Could be an updated handle, signing key, or pds hosting endpoint. Serves as a prod to all downstream services to refresh their identity cache. */
102102+export interface Identity {
103103+ seq: number;
104104+ did: string;
105105+ time: string;
106106+ /** The current handle for the account, or 'handle.invalid' if validation fails. This field is optional, might have been validated or passed-through from an upstream source. Semantics and behaviors for PDS vs Relay may evolve in the future; see atproto specs for more details. */
107107+ handle?: string;
108108+ [k: string]: unknown;
109109+}
110110+111111+export function isIdentity(v: unknown): v is Identity {
112112+ return (
113113+ isObj(v) &&
114114+ hasProp(v, "$type") &&
115115+ v.$type === "com.atproto.sync.subscribeRepos#identity"
116116+ );
117117+}
118118+119119+/** Represents a change to an account's status on a host (eg, PDS or Relay). The semantics of this event are that the status is at the host which emitted the event, not necessarily that at the currently active PDS. Eg, a Relay takedown would emit a takedown with active=false, even if the PDS is still active. */
120120+export interface Account {
121121+ seq: number;
122122+ did: string;
123123+ time: string;
124124+ /** Indicates that the account has a repository which can be fetched from the host that emitted this event. */
125125+ active: boolean;
126126+ /** If active=false, this optional field indicates a reason for why the account is not active. */
127127+ status?: "takendown" | "suspended" | "deleted" | "deactivated" | string;
128128+ [k: string]: unknown;
129129+}
130130+131131+export function isAccount(v: unknown): v is Account {
132132+ return (
133133+ isObj(v) &&
134134+ hasProp(v, "$type") &&
135135+ v.$type === "com.atproto.sync.subscribeRepos#account"
136136+ );
137137+}
138138+139139+export interface Info {
140140+ name: "OutdatedCursor" | string;
141141+ message?: string;
142142+ [k: string]: unknown;
143143+}
144144+145145+export function isInfo(v: unknown): v is Info {
146146+ return (
147147+ isObj(v) &&
148148+ hasProp(v, "$type") &&
149149+ v.$type === "com.atproto.sync.subscribeRepos#info"
150150+ );
151151+}
152152+153153+/** A repo operation, ie a mutation of a single record. */
154154+export interface RepoOp {
155155+ action: "create" | "update" | "delete" | string;
156156+ path: string;
157157+ /** For creates and updates, the new record CID. For deletions, null. */
158158+ cid: CID | null;
159159+ [k: string]: unknown;
160160+}
161161+162162+export function isRepoOp(v: unknown): v is RepoOp {
163163+ return (
164164+ isObj(v) &&
165165+ hasProp(v, "$type") &&
166166+ v.$type === "com.atproto.sync.subscribeRepos#repoOp"
167167+ );
168168+}
169169+170170+export const ComAtprotoSyncSubscribeRepos: LexiconDoc = {
171171+ lexicon: 1,
172172+ id: "com.atproto.sync.subscribeRepos",
173173+ defs: {
174174+ main: {
175175+ type: "subscription",
176176+ description:
177177+ "Repository event stream, aka Firehose endpoint. Outputs repo commits with diff data, and identity update events, for all repositories on the current server. See the atproto specifications for details around stream sequencing, repo versioning, CAR diff format, and more. Public and does not require auth; implemented by PDS and Relay.",
178178+ parameters: {
179179+ type: "params",
180180+ properties: {
181181+ cursor: {
182182+ type: "integer",
183183+ description: "The last known event seq number to backfill from.",
184184+ },
185185+ },
186186+ },
187187+ message: {
188188+ schema: {
189189+ type: "union",
190190+ refs: [
191191+ "lex:com.atproto.sync.subscribeRepos#commit",
192192+ "lex:com.atproto.sync.subscribeRepos#sync",
193193+ "lex:com.atproto.sync.subscribeRepos#identity",
194194+ "lex:com.atproto.sync.subscribeRepos#account",
195195+ "lex:com.atproto.sync.subscribeRepos#info",
196196+ ],
197197+ },
198198+ },
199199+ errors: [
200200+ {
201201+ name: "FutureCursor",
202202+ },
203203+ {
204204+ name: "ConsumerTooSlow",
205205+ description:
206206+ "If the consumer of the stream can not keep up with events, and a backlog gets too large, the server will drop the connection.",
207207+ },
208208+ ],
209209+ },
210210+ commit: {
211211+ type: "object",
212212+ description:
213213+ "Represents an update of repository state. Note that empty commits are allowed, which include no repo data changes, but an update to rev and signature.",
214214+ required: [
215215+ "seq",
216216+ "rebase",
217217+ "tooBig",
218218+ "repo",
219219+ "commit",
220220+ "rev",
221221+ "since",
222222+ "blocks",
223223+ "ops",
224224+ "blobs",
225225+ "time",
226226+ ],
227227+ nullable: ["since"],
228228+ properties: {
229229+ seq: {
230230+ type: "integer",
231231+ description: "The stream sequence number of this message.",
232232+ },
233233+ rebase: {
234234+ type: "boolean",
235235+ description: "DEPRECATED -- unused",
236236+ },
237237+ tooBig: {
238238+ type: "boolean",
239239+ description:
240240+ "DEPRECATED -- replaced by #sync event and data limits. Indicates that this commit contained too many ops, or data size was too large. Consumers will need to make a separate request to get missing data.",
241241+ },
242242+ repo: {
243243+ type: "string",
244244+ format: "did",
245245+ description:
246246+ "The repo this event comes from. Note that all other message types name this field 'did'.",
247247+ },
248248+ commit: {
249249+ type: "cid-link",
250250+ description: "Repo commit object CID.",
251251+ },
252252+ rev: {
253253+ type: "string",
254254+ format: "tid",
255255+ description:
256256+ "The rev of the emitted commit. Note that this information is also in the commit object included in blocks, unless this is a tooBig event.",
257257+ },
258258+ since: {
259259+ type: "string",
260260+ format: "tid",
261261+ description:
262262+ "The rev of the last emitted commit from this repo (if any).",
263263+ },
264264+ blocks: {
265265+ type: "bytes",
266266+ description:
267267+ "CAR file containing relevant blocks, as a diff since the previous repo state. The commit must be included as a block, and the commit block CID must be the first entry in the CAR header 'roots' list.",
268268+ maxLength: 2000000,
269269+ },
270270+ ops: {
271271+ type: "array",
272272+ items: {
273273+ type: "ref",
274274+ ref: "lex:com.atproto.sync.subscribeRepos#repoOp",
275275+ description:
276276+ "List of repo mutation operations in this commit (eg, records created, updated, or deleted).",
277277+ },
278278+ maxLength: 200,
279279+ },
280280+ blobs: {
281281+ type: "array",
282282+ items: {
283283+ type: "cid-link",
284284+ description:
285285+ "DEPRECATED -- will soon always be empty. List of new blobs (by CID) referenced by records in this commit.",
286286+ },
287287+ },
288288+ prevData: {
289289+ type: "cid-link",
290290+ description:
291291+ "The root CID of the MST tree for the previous commit from this repo (indicated by the 'since' revision field in this message). Corresponds to the 'data' field in the repo commit object. NOTE: this field is effectively required for the 'inductive' version of firehose.",
292292+ },
293293+ time: {
294294+ type: "string",
295295+ format: "datetime",
296296+ description:
297297+ "Timestamp of when this message was originally broadcast.",
298298+ },
299299+ },
300300+ },
301301+ sync: {
302302+ type: "object",
303303+ description:
304304+ "Updates the repo to a new state, without necessarily including that state on the firehose. Used to recover from broken commit streams, data loss incidents, or in situations where upstream host does not know recent state of the repository.",
305305+ required: ["seq", "did", "blocks", "rev", "time"],
306306+ properties: {
307307+ seq: {
308308+ type: "integer",
309309+ description: "The stream sequence number of this message.",
310310+ },
311311+ did: {
312312+ type: "string",
313313+ format: "did",
314314+ description:
315315+ "The account this repo event corresponds to. Must match that in the commit object.",
316316+ },
317317+ blocks: {
318318+ type: "bytes",
319319+ description:
320320+ "CAR file containing the commit, as a block. The CAR header must include the commit block CID as the first 'root'.",
321321+ maxLength: 10000,
322322+ },
323323+ rev: {
324324+ type: "string",
325325+ description:
326326+ "The rev of the commit. This value must match that in the commit object.",
327327+ },
328328+ time: {
329329+ type: "string",
330330+ format: "datetime",
331331+ description:
332332+ "Timestamp of when this message was originally broadcast.",
333333+ },
334334+ },
335335+ },
336336+ identity: {
337337+ type: "object",
338338+ description:
339339+ "Represents a change to an account's identity. Could be an updated handle, signing key, or pds hosting endpoint. Serves as a prod to all downstream services to refresh their identity cache.",
340340+ required: ["seq", "did", "time"],
341341+ properties: {
342342+ seq: {
343343+ type: "integer",
344344+ },
345345+ did: {
346346+ type: "string",
347347+ format: "did",
348348+ },
349349+ time: {
350350+ type: "string",
351351+ format: "datetime",
352352+ },
353353+ handle: {
354354+ type: "string",
355355+ format: "handle",
356356+ description:
357357+ "The current handle for the account, or 'handle.invalid' if validation fails. This field is optional, might have been validated or passed-through from an upstream source. Semantics and behaviors for PDS vs Relay may evolve in the future; see atproto specs for more details.",
358358+ },
359359+ },
360360+ },
361361+ account: {
362362+ type: "object",
363363+ description:
364364+ "Represents a change to an account's status on a host (eg, PDS or Relay). The semantics of this event are that the status is at the host which emitted the event, not necessarily that at the currently active PDS. Eg, a Relay takedown would emit a takedown with active=false, even if the PDS is still active.",
365365+ required: ["seq", "did", "time", "active"],
366366+ properties: {
367367+ seq: {
368368+ type: "integer",
369369+ },
370370+ did: {
371371+ type: "string",
372372+ format: "did",
373373+ },
374374+ time: {
375375+ type: "string",
376376+ format: "datetime",
377377+ },
378378+ active: {
379379+ type: "boolean",
380380+ description:
381381+ "Indicates that the account has a repository which can be fetched from the host that emitted this event.",
382382+ },
383383+ status: {
384384+ type: "string",
385385+ description:
386386+ "If active=false, this optional field indicates a reason for why the account is not active.",
387387+ knownValues: [
388388+ "takendown",
389389+ "suspended",
390390+ "deleted",
391391+ "deactivated",
392392+ "desynchronized",
393393+ "throttled",
394394+ ],
395395+ },
396396+ },
397397+ },
398398+ info: {
399399+ type: "object",
400400+ required: ["name"],
401401+ properties: {
402402+ name: {
403403+ type: "string",
404404+ knownValues: ["OutdatedCursor"],
405405+ },
406406+ message: {
407407+ type: "string",
408408+ },
409409+ },
410410+ },
411411+ repoOp: {
412412+ type: "object",
413413+ description: "A repo operation, ie a mutation of a single record.",
414414+ required: ["action", "path", "cid"],
415415+ nullable: ["cid"],
416416+ properties: {
417417+ action: {
418418+ type: "string",
419419+ knownValues: ["create", "update", "delete"],
420420+ },
421421+ path: {
422422+ type: "string",
423423+ },
424424+ cid: {
425425+ type: "cid-link",
426426+ description:
427427+ "For creates and updates, the new record CID. For deletions, null.",
428428+ },
429429+ prev: {
430430+ type: "cid-link",
431431+ description:
432432+ "For updates and deletes, the previous record CID (required for inductive firehose). For creations, field should not be defined.",
433433+ },
434434+ },
435435+ },
436436+ },
437437+};
438438+439439+const lexicons = new Lexicons([ComAtprotoSyncSubscribeRepos]);
440440+441441+export const isValidRepoEvent = (evt: unknown) => {
442442+ return lexicons.assertValidXrpcMessage<RepoEvent>(
443443+ "com.atproto.sync.subscribeRepos",
444444+ evt,
445445+ );
446446+};
+3
sync/mod.ts
···11+export * from "./runner/index.ts";
22+export * from "./firehose/index.ts";
33+export * from "./events.ts";
+43
sync/runner/consecutive-list.ts
···11+/**
22+ * Add items to a list, and mark those items as
33+ * completed. Upon item completion, get list of consecutive
44+ * items completed at the head of the list. Example:
55+ *
66+ * const consecutive = new ConsecutiveList<number>()
77+ * const item1 = consecutive.push(1)
88+ * const item2 = consecutive.push(2)
99+ * const item3 = consecutive.push(3)
1010+ * item2.complete() // []
1111+ * item1.complete() // [1, 2]
1212+ * item3.complete() // [3]
1313+ */
1414+export class ConsecutiveList<T> {
1515+ list: ConsecutiveItem<T>[] = [];
1616+1717+ push(value: T): ConsecutiveItem<T> {
1818+ const item = new ConsecutiveItem<T>(this, value);
1919+ this.list.push(item);
2020+ return item;
2121+ }
2222+2323+ complete(): T[] {
2424+ let i = 0;
2525+ while (this.list[i]?.isComplete) {
2626+ i += 1;
2727+ }
2828+ return this.list.splice(0, i).map((item) => item.value);
2929+ }
3030+}
3131+3232+export class ConsecutiveItem<T> {
3333+ isComplete = false;
3434+ constructor(
3535+ private consecutive: ConsecutiveList<T>,
3636+ public value: T,
3737+ ) {}
3838+3939+ complete(): T[] {
4040+ this.isComplete = true;
4141+ return this.consecutive.complete();
4242+ }
4343+}
+3
sync/runner/index.ts
···11+export * from "./consecutive-list.ts";
22+export * from "./memory-runner.ts";
33+export * from "./types.ts";
+70
sync/runner/memory-runner.ts
···11+import PQueue from "p-queue";
22+import { ConsecutiveList } from "./consecutive-list.ts";
33+import type { EventRunner } from "./types.ts";
44+55+export type MemoryRunnerOptions = {
66+ setCursor?: (cursor: number) => Promise<void>;
77+ concurrency?: number;
88+ startCursor?: number;
99+};
1010+1111+// A queue with arbitrarily many partitions, each processing work sequentially.
1212+// Partitions are created lazily and taken out of memory when they go idle.
1313+export class MemoryRunner implements EventRunner {
1414+ consecutive: ConsecutiveList<number> = new ConsecutiveList<number>();
1515+ mainQueue: PQueue;
1616+ partitions: Map<string, PQueue> = new Map<string, PQueue>();
1717+ cursor: number | undefined;
1818+1919+ constructor(public opts: MemoryRunnerOptions = {}) {
2020+ this.mainQueue = new PQueue({ concurrency: opts.concurrency ?? Infinity });
2121+ this.cursor = opts.startCursor;
2222+ }
2323+2424+ getCursor(): number | undefined {
2525+ return this.cursor;
2626+ }
2727+2828+ addTask(partitionId: string, task: () => Promise<void>): Promise<void> {
2929+ if (this.mainQueue.isPaused) return Promise.resolve();
3030+ return this.mainQueue.add(() => {
3131+ return this.getPartition(partitionId).add(task);
3232+ });
3333+ }
3434+3535+ private getPartition(partitionId: string) {
3636+ let partition = this.partitions.get(partitionId);
3737+ if (!partition) {
3838+ partition = new PQueue({ concurrency: 1 });
3939+ partition.once("idle", () => this.partitions.delete(partitionId));
4040+ this.partitions.set(partitionId, partition);
4141+ }
4242+ return partition;
4343+ }
4444+4545+ async trackEvent(did: string, seq: number, handler: () => Promise<void>) {
4646+ if (this.mainQueue.isPaused) return;
4747+ const item = this.consecutive.push(seq);
4848+ await this.addTask(did, async () => {
4949+ await handler();
5050+ const latest = item.complete().at(-1);
5151+ if (latest !== undefined) {
5252+ this.cursor = latest;
5353+ if (this.opts.setCursor) {
5454+ await this.opts.setCursor(this.cursor);
5555+ }
5656+ }
5757+ });
5858+ }
5959+6060+ async processAll() {
6161+ await this.mainQueue.onIdle();
6262+ }
6363+6464+ async destroy() {
6565+ this.mainQueue.pause();
6666+ this.mainQueue.clear();
6767+ this.partitions.forEach((p) => p.clear());
6868+ await this.mainQueue.onIdle();
6969+ }
7070+}