···11+export { formatLabel, signLabel, type CreateLabelData, type LabelSubject } from './labels.ts';
22+export { LabelOutbox, type LabelOutboxOptions } from './outbox.ts';
33+export { Labeler, type AuthCheck, type LabelerOptions } from './labeler.ts';
44+export type { LabelQueryParams, LabelQueryResult, LabelStore, SavedLabel } from './store.ts';
···11+import type { ComAtprotoLabelDefs } from '@atcute/atproto';
22+import { encode, toBytes } from '@atcute/cbor';
33+import type { PrivateKey } from '@atcute/crypto';
44+import type { Did, GenericUri } from '@atcute/lexicons';
55+66+import type { SavedLabel } from './store.ts';
77+88+const LABEL_VERSION = 1;
99+1010+/** data for creating a new label. */
1111+export interface CreateLabelData {
1212+ /** AT URI of the record, repository (account), or other resource */
1313+ uri: string;
1414+ /** CID specifying the version of `uri` to label */
1515+ cid?: string;
1616+ /** the label value */
1717+ val: string;
1818+ /** whether this label negates a previous label */
1919+ neg?: boolean;
2020+ /** creation timestamp (ISO 8601). defaults to current time */
2121+ cts?: string;
2222+ /** expiration timestamp (ISO 8601) */
2323+ exp?: string;
2424+ /** DID of the label source. defaults to the labeler's DID */
2525+ src?: string;
2626+}
2727+2828+/** subject of a label: a URI with optional CID. */
2929+export interface LabelSubject {
3030+ uri: string;
3131+ cid?: string;
3232+}
3333+3434+interface UnsignedLabel {
3535+ ver: number;
3636+ src: string;
3737+ uri: string;
3838+ cid?: string;
3939+ val: string;
4040+ neg: boolean;
4141+ cts: string;
4242+ exp?: string;
4343+}
4444+4545+/**
4646+ * create an unsigned label object with version and defaults.
4747+ * @param data label creation data
4848+ * @param src default source DID
4949+ * @returns unsigned label ready for signing
5050+ */
5151+const toUnsignedLabel = (data: CreateLabelData, src: Did): UnsignedLabel => {
5252+ const label: UnsignedLabel = {
5353+ ver: LABEL_VERSION,
5454+ src: data.src ?? src,
5555+ uri: data.uri,
5656+ val: data.val,
5757+ neg: data.neg ?? false,
5858+ cts: data.cts ?? new Date().toISOString(),
5959+ };
6060+6161+ if (data.cid !== undefined) {
6262+ label.cid = data.cid;
6363+ }
6464+ if (data.exp !== undefined) {
6565+ label.exp = data.exp;
6666+ }
6767+6868+ return label;
6969+};
7070+7171+/**
7272+ * CBOR-encode and sign a label.
7373+ * @param data label creation data
7474+ * @param src default source DID
7575+ * @param key private key for signing
7676+ * @returns the label fields and signature, ready for storage
7777+ */
7878+export const signLabel = async (
7979+ data: CreateLabelData,
8080+ src: Did,
8181+ key: PrivateKey,
8282+): Promise<Omit<SavedLabel, 'seq'>> => {
8383+ const label = toUnsignedLabel(data, src);
8484+ const bytes = encode(label);
8585+ const sig = await key.sign(bytes);
8686+8787+ return {
8888+ src: label.src,
8989+ uri: label.uri,
9090+ cid: label.cid,
9191+ val: label.val,
9292+ neg: label.neg,
9393+ cts: label.cts,
9494+ exp: label.exp,
9595+ sig: sig,
9696+ };
9797+};
9898+9999+/**
100100+ * format a saved label for wire transmission.
101101+ * converts raw sig bytes to CBOR Bytes wrapper.
102102+ * @param label saved label from store
103103+ * @returns formatted label for XRPC responses
104104+ */
105105+export const formatLabel = (label: SavedLabel): ComAtprotoLabelDefs.Label => {
106106+ const formatted: ComAtprotoLabelDefs.Label = {
107107+ ver: 1,
108108+ src: label.src as Did,
109109+ uri: label.uri as GenericUri,
110110+ val: label.val,
111111+ neg: label.neg,
112112+ cts: label.cts,
113113+ sig: toBytes(label.sig),
114114+ };
115115+116116+ if (label.cid !== undefined) {
117117+ formatted.cid = label.cid;
118118+ }
119119+ if (label.exp !== undefined) {
120120+ formatted.exp = label.exp;
121121+ }
122122+123123+ return formatted;
124124+};
+191
packages/servers/labeler/lib/outbox.test.ts
···11+import { XRPCSubscriptionError } from '@atcute/xrpc-server';
22+33+import { SimpleEventEmitter } from '@mary-ext/simple-event-emitter';
44+import { describe, expect, it } from 'vitest';
55+66+import { AsyncBuffer, AsyncBufferFullError } from './async-buffer.ts';
77+import { LabelOutbox } from './outbox.ts';
88+import type { LabelStore, SavedLabel } from './store.ts';
99+1010+const makeSavedLabel = (seq: number): SavedLabel => ({
1111+ seq,
1212+ src: 'did:plc:labeler',
1313+ uri: 'did:plc:target',
1414+ val: 'spam',
1515+ neg: false,
1616+ cts: new Date().toISOString(),
1717+ sig: new Uint8Array(64),
1818+});
1919+2020+const createMockStore = (labels: SavedLabel[] = []): LabelStore => {
2121+ return {
2222+ async save(label) {
2323+ const seq = labels.length + 1;
2424+ const saved = { ...label, seq };
2525+ labels.push(saved);
2626+ return saved;
2727+ },
2828+ async query() {
2929+ return { labels: [], cursor: '0' };
3030+ },
3131+ async getLatestSeq() {
3232+ return labels.at(-1)?.seq ?? 0;
3333+ },
3434+ async getRange(after, limit) {
3535+ const result = labels.filter((l) => l.seq > after);
3636+ return limit !== undefined ? result.slice(0, limit) : result;
3737+ },
3838+ };
3939+};
4040+4141+describe('AsyncBuffer', () => {
4242+ it('should throw AsyncBufferFullError on overflow', async () => {
4343+ const buffer = new AsyncBuffer<number>(2);
4444+4545+ // push 4 items (exceeding max of 2)
4646+ buffer.push(1);
4747+ buffer.push(2);
4848+ buffer.push(3);
4949+ buffer.push(4);
5050+5151+ const collected: number[] = [];
5252+5353+ await expect(async () => {
5454+ for await (const value of buffer.events()) {
5555+ collected.push(value);
5656+ }
5757+ }).rejects.toThrow(AsyncBufferFullError);
5858+ });
5959+});
6060+6161+describe('LabelOutbox', () => {
6262+ it('should backfill from cursor', async () => {
6363+ const labels = Array.from({ length: 5 }, (_, i) => makeSavedLabel(i + 1));
6464+ const store = createMockStore(labels);
6565+ const emitter = new SimpleEventEmitter<[SavedLabel]>();
6666+6767+ const outbox = new LabelOutbox(store, emitter);
6868+ const ac = new AbortController();
6969+7070+ const collected: SavedLabel[] = [];
7171+7272+ setTimeout(() => ac.abort(), 100);
7373+7474+ for await (const label of outbox.events(0, ac.signal)) {
7575+ collected.push(label);
7676+ if (collected.length === 5) {
7777+ ac.abort();
7878+ break;
7979+ }
8080+ }
8181+8282+ expect(collected).toHaveLength(5);
8383+ expect(collected.map((l) => l.seq)).toEqual([1, 2, 3, 4, 5]);
8484+ });
8585+8686+ it('should tail live events with no cursor', async () => {
8787+ const store = createMockStore();
8888+ const emitter = new SimpleEventEmitter<[SavedLabel]>();
8989+9090+ const outbox = new LabelOutbox(store, emitter);
9191+ const ac = new AbortController();
9292+9393+ const collected: SavedLabel[] = [];
9494+9595+ const iter = outbox.events(undefined, ac.signal);
9696+9797+ // emit after a tick so the outbox is tailing
9898+ queueMicrotask(() => {
9999+ emitter.emit(makeSavedLabel(1));
100100+ });
101101+102102+ for await (const label of iter) {
103103+ collected.push(label);
104104+ if (collected.length === 1) {
105105+ ac.abort();
106106+ break;
107107+ }
108108+ }
109109+110110+ expect(collected).toHaveLength(1);
111111+ expect(collected[0]!.seq).toBe(1);
112112+ });
113113+114114+ it('should backfill then tail live events', async () => {
115115+ const labels = [makeSavedLabel(1), makeSavedLabel(2)];
116116+ const store = createMockStore(labels);
117117+ const emitter = new SimpleEventEmitter<[SavedLabel]>();
118118+119119+ const outbox = new LabelOutbox(store, emitter);
120120+ const ac = new AbortController();
121121+122122+ const collected: SavedLabel[] = [];
123123+124124+ // emit a new label shortly after start so the outbox picks it up during tailing
125125+ setTimeout(() => {
126126+ emitter.emit(makeSavedLabel(3));
127127+ }, 20);
128128+129129+ for await (const label of outbox.events(0, ac.signal)) {
130130+ collected.push(label);
131131+ if (collected.length === 3) {
132132+ ac.abort();
133133+ break;
134134+ }
135135+ }
136136+137137+ expect(collected.map((l) => l.seq)).toEqual([1, 2, 3]);
138138+ });
139139+140140+ it('should stop on abort signal', async () => {
141141+ const store = createMockStore();
142142+ const emitter = new SimpleEventEmitter<[SavedLabel]>();
143143+144144+ const outbox = new LabelOutbox(store, emitter);
145145+ const ac = new AbortController();
146146+147147+ const collected: SavedLabel[] = [];
148148+149149+ setTimeout(() => ac.abort(), 50);
150150+151151+ for await (const label of outbox.events(undefined, ac.signal)) {
152152+ collected.push(label);
153153+ }
154154+155155+ expect(collected).toHaveLength(0);
156156+ });
157157+158158+ it('should wrap AsyncBufferFullError as ConsumerTooSlow', async () => {
159159+ const store = createMockStore();
160160+ const emitter = new SimpleEventEmitter<[SavedLabel]>();
161161+162162+ const outbox = new LabelOutbox(store, emitter, { maxBufferSize: 2 });
163163+ const ac = new AbortController();
164164+165165+ let error: unknown;
166166+ const done = (async () => {
167167+ try {
168168+ for await (const _label of outbox.events(undefined, ac.signal)) {
169169+ // stall the consumer — don't break, just wait for more
170170+ // while the producer floods the buffer below
171171+ await new Promise((resolve) => setTimeout(resolve, 50));
172172+ }
173173+ } catch (err) {
174174+ error = err;
175175+ }
176176+ })();
177177+178178+ // wait for the outbox to start tailing
179179+ await new Promise((resolve) => setTimeout(resolve, 10));
180180+181181+ // flood the buffer well beyond capacity
182182+ for (let i = 1; i <= 10; i++) {
183183+ emitter.emit(makeSavedLabel(i));
184184+ }
185185+186186+ await done;
187187+188188+ expect(error).toBeInstanceOf(XRPCSubscriptionError);
189189+ expect((error as XRPCSubscriptionError).error).toBe('ConsumerTooSlow');
190190+ });
191191+});
+111
packages/servers/labeler/lib/outbox.ts
···11+import { XRPCSubscriptionError } from '@atcute/xrpc-server';
22+33+import type { SimpleEventEmitter } from '@mary-ext/simple-event-emitter';
44+55+import { AsyncBufferFullError, on } from './async-buffer.ts';
66+import type { LabelStore, SavedLabel } from './store.ts';
77+88+const BACKFILL_PAGE_SIZE = 500;
99+1010+export interface LabelOutboxOptions {
1111+ maxBufferSize?: number;
1212+}
1313+1414+/**
1515+ * outbox for streaming labels with push-pull semantics.
1616+ *
1717+ * handles backfill from cursor, catch-up, and live tailing
1818+ * with backpressure via bounded async buffer.
1919+ */
2020+export class LabelOutbox {
2121+ #store: LabelStore;
2222+ #emitter: SimpleEventEmitter<[label: SavedLabel]>;
2323+ #maxBufferSize: number;
2424+2525+ constructor(
2626+ store: LabelStore,
2727+ emitter: SimpleEventEmitter<[label: SavedLabel]>,
2828+ options: LabelOutboxOptions = {},
2929+ ) {
3030+ this.#store = store;
3131+ this.#emitter = emitter;
3232+ this.#maxBufferSize = options.maxBufferSize ?? 500;
3333+ }
3434+3535+ /**
3636+ * stream labels, optionally backfilling from a cursor.
3737+ * @param cursor sequence number to backfill from (exclusive), or undefined for live-only
3838+ * @param signal abort signal to stop streaming
3939+ * @returns async iterator of saved labels
4040+ */
4141+ async *events(cursor: number | undefined, signal: AbortSignal): AsyncGenerator<SavedLabel> {
4242+ let lastBackfillSeq = -1;
4343+ let caughtUp = cursor === undefined;
4444+4545+ // backfill phase: dump stored labels in pages
4646+ if (!caughtUp) {
4747+ while (true) {
4848+ const events = await this.#store.getRange(
4949+ lastBackfillSeq > -1 ? lastBackfillSeq : cursor!,
5050+ BACKFILL_PAGE_SIZE,
5151+ );
5252+5353+ if (events.length === 0) {
5454+ break;
5555+ }
5656+5757+ yield* events;
5858+ signal.throwIfAborted();
5959+6060+ lastBackfillSeq = events.at(-1)!.seq;
6161+6262+ // stop when close to the head
6363+ const latestSeq = await this.#store.getLatestSeq();
6464+ if (latestSeq - lastBackfillSeq < BACKFILL_PAGE_SIZE / 2) {
6565+ break;
6666+ }
6767+ }
6868+6969+ signal.throwIfAborted();
7070+ }
7171+7272+ // start listening before reading the gap
7373+ const tail = on(this.#emitter, { signal, maxSize: this.#maxBufferSize });
7474+7575+ // catch-up phase: read any labels between backfill end and tail start
7676+ if (!caughtUp) {
7777+ const events = await this.#store.getRange(lastBackfillSeq > -1 ? lastBackfillSeq : cursor!);
7878+7979+ if (events.length > 0) {
8080+ yield* events;
8181+ signal.throwIfAborted();
8282+8383+ lastBackfillSeq = events.at(-1)!.seq;
8484+ }
8585+ }
8686+8787+ // tail phase: stream live events, deduplicating the catch-up overlap
8888+ try {
8989+ for await (const event of tail) {
9090+ if (!caughtUp) {
9191+ if (event.seq <= lastBackfillSeq) {
9292+ continue;
9393+ }
9494+9595+ caughtUp = true;
9696+ }
9797+9898+ yield event;
9999+ }
100100+ } catch (err) {
101101+ if (err instanceof AsyncBufferFullError) {
102102+ throw new XRPCSubscriptionError({
103103+ error: 'ConsumerTooSlow',
104104+ description: `stream consumer too slow`,
105105+ });
106106+ }
107107+108108+ throw err;
109109+ }
110110+ }
111111+}
+55
packages/servers/labeler/lib/queue.ts
···11+interface Node<T> {
22+ value: T;
33+ next: Node<T> | undefined;
44+}
55+66+/** a FIFO queue backed by a linked list. */
77+export class Queue<T> {
88+ #head: Node<T> | undefined;
99+ #tail: Node<T> | undefined;
1010+ #size: number = 0;
1111+1212+ /** number of items in the queue */
1313+ get size(): number {
1414+ return this.#size;
1515+ }
1616+1717+ /**
1818+ * add a value to the end of the queue.
1919+ * @param value value to enqueue
2020+ */
2121+ enqueue(value: T): void {
2222+ const node: Node<T> = { value, next: undefined };
2323+ const tail = this.#tail;
2424+2525+ if (tail !== undefined) {
2626+ tail.next = node;
2727+ } else {
2828+ this.#head = node;
2929+ }
3030+3131+ this.#tail = node;
3232+ this.#size++;
3333+ }
3434+3535+ /**
3636+ * remove and return the first value from the queue.
3737+ * @returns first queued value, or undefined if empty
3838+ */
3939+ dequeue(): T | undefined {
4040+ const head = this.#head;
4141+ if (head === undefined) {
4242+ return;
4343+ }
4444+4545+ const next = head.next;
4646+4747+ this.#head = next;
4848+ if (next === undefined) {
4949+ this.#tail = undefined;
5050+ }
5151+5252+ this.#size--;
5353+ return head.value;
5454+ }
5555+}
+68
packages/servers/labeler/lib/store.ts
···11+import type { ComAtprotoLabelDefs } from '@atcute/atproto';
22+33+/** label as stored in the database, with sequence number and raw sig bytes. */
44+export interface SavedLabel {
55+ /** sequence number / ID of the label */
66+ seq: number;
77+ /** DID of the actor who created this label */
88+ src: string;
99+ /** AT URI of the record, repository (account), or other resource */
1010+ uri: string;
1111+ /** CID specifying the version of `uri` to label */
1212+ cid?: string;
1313+ /** the label value */
1414+ val: string;
1515+ /** whether this label negates a previous label */
1616+ neg: boolean;
1717+ /** creation timestamp (ISO 8601) */
1818+ cts: string;
1919+ /** expiration timestamp (ISO 8601) */
2020+ exp?: string;
2121+ /** signature bytes */
2222+ sig: Uint8Array;
2323+}
2424+2525+/** parameters for querying labels. */
2626+export interface LabelQueryParams {
2727+ uriPatterns: string[];
2828+ sources: string[];
2929+ cursor: number;
3030+ limit: number;
3131+}
3232+3333+/** result of a label query. */
3434+export interface LabelQueryResult {
3535+ labels: ComAtprotoLabelDefs.Label[];
3636+ cursor: string;
3737+}
3838+3939+/** pluggable storage backend for labels. */
4040+export interface LabelStore {
4141+ /**
4242+ * save a signed label and assign it a sequence number.
4343+ * @param label the signed label to save
4444+ * @returns the saved label with sequence number
4545+ */
4646+ save(label: Omit<SavedLabel, 'seq'>): Promise<SavedLabel>;
4747+4848+ /**
4949+ * query labels matching the given parameters.
5050+ * @param params query parameters
5151+ * @returns matching labels and cursor
5252+ */
5353+ query(params: LabelQueryParams): Promise<LabelQueryResult>;
5454+5555+ /**
5656+ * get the latest sequence number.
5757+ * @returns the highest sequence number, or 0 if none
5858+ */
5959+ getLatestSeq(): Promise<number>;
6060+6161+ /**
6262+ * get a range of labels after a given sequence number.
6363+ * @param after sequence number to start after (exclusive)
6464+ * @param limit maximum number of labels to return
6565+ * @returns labels in the range
6666+ */
6767+ getRange(after: number, limit?: number): Promise<SavedLabel[]>;
6868+}