a collection of lightweight TypeScript packages for AT Protocol, the protocol powering Bluesky
atproto bluesky typescript npm
101
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(labeler): take 2 implementation

Mary 134f7706 513ca6af

+945 -1375
+1
packages/servers/labeler/.gitignore
··· 1 + /coverage/
-139
packages/servers/labeler/README.md
··· 1 - # @atcute/labeler 2 - 3 - sign and emit AT Protocol labels. 4 - 5 - ```sh 6 - npm install @atcute/labeler 7 - ``` 8 - 9 - provides the core logic for running an AT Protocol labeler like label signing, a subscription outbox 10 - and XRPC operation handlers. 11 - 12 - ## usage 13 - 14 - ### setting up a labeler 15 - 16 - create a `Labeler` with your signing key and storage backend, then register its routes on a router: 17 - 18 - ```ts 19 - import { Secp256k1PrivateKey } from '@atcute/crypto'; 20 - 21 - import { XRPCRouter } from '@atcute/xrpc-server'; 22 - import { createBunWebSocket } from '@atcute/xrpc-server-bun'; 23 - 24 - import { Labeler } from '@atcute/labeler'; 25 - 26 - const ws = createBunWebSocket(); 27 - const router = new XRPCRouter({ websocket: ws.adapter }); 28 - 29 - const labeler = new Labeler({ 30 - did: 'did:plc:mylabeler', 31 - key: await Secp256k1PrivateKey.importRaw(keyBytes), 32 - store: myLabelStore, // implements LabelStore 33 - }); 34 - 35 - labeler.register(router); 36 - 37 - export default ws.wrap(router); 38 - ``` 39 - 40 - `register()` adds two endpoints to the router: 41 - 42 - - **`com.atproto.label.queryLabels`** — query stored labels by URI pattern, source, and cursor 43 - - **`com.atproto.label.subscribeLabels`** — WebSocket subscription with backfill and live tailing 44 - 45 - ### creating labels 46 - 47 - ```ts 48 - // create a single label 49 - const saved = await labeler.createLabel({ 50 - uri: 'did:plc:targetuser', 51 - val: 'spam', 52 - }); 53 - 54 - // create and negate labels for a subject 55 - const labels = await labeler.createLabels( 56 - { uri: 'at://did:plc:user/app.bsky.feed.post/abc123', cid: 'bafyrei...' }, 57 - { create: ['nsfw'], negate: ['misleading'] }, 58 - ); 59 - ``` 60 - 61 - created labels are signed, saved to the store, and pushed to all active WebSocket subscribers. 62 - 63 - ### emitEvent endpoint 64 - 65 - to accept label creation requests over XRPC (via `tools.ozone.moderation.emitEvent`), provide an 66 - `auth` callback: 67 - 68 - ```ts 69 - import { AuthRequiredError } from '@atcute/xrpc-server'; 70 - import { ServiceJwtVerifier } from '@atcute/xrpc-server/auth'; 71 - import { 72 - CompositeDidDocumentResolver, 73 - PlcDidDocumentResolver, 74 - WebDidDocumentResolver, 75 - } from '@atcute/identity-resolver'; 76 - 77 - const jwtVerifier = new ServiceJwtVerifier({ 78 - serviceDid: 'did:plc:mylabeler', 79 - resolver: new CompositeDidDocumentResolver({ 80 - methods: { 81 - plc: new PlcDidDocumentResolver(), 82 - web: new WebDidDocumentResolver(), 83 - }, 84 - }), 85 - }); 86 - 87 - const labeler = new Labeler({ 88 - did: 'did:plc:mylabeler', 89 - key: await Secp256k1PrivateKey.importRaw(keyBytes), 90 - store: myLabelStore, 91 - auth: async (request) => { 92 - const authHeader = request.headers.get('authorization'); 93 - if (!authHeader?.startsWith('Bearer ')) { 94 - return false; 95 - } 96 - 97 - const result = await jwtVerifier.verify(authHeader.slice(7), { 98 - lxm: 'tools.ozone.moderation.emitEvent', 99 - }); 100 - 101 - return result.ok; 102 - }, 103 - }); 104 - ``` 105 - 106 - when `auth` is provided, the `emitEvent` endpoint is registered alongside the other routes. the 107 - callback receives the incoming `Request` and should return `true` to allow or `false` to reject. 108 - 109 - ### storage backend 110 - 111 - implement the `LabelStore` interface to plug in any database: 112 - 113 - ```ts 114 - import type { LabelStore } from '@atcute/labeler'; 115 - 116 - const store: LabelStore = { 117 - async save(label) { 118 - // insert label, assign and return a monotonic sequence number 119 - // label has: src, uri, cid?, val, neg, cts, exp?, sig 120 - }, 121 - 122 - async query({ uriPatterns, sources, cursor, limit }) { 123 - // query labels matching URI patterns and sources 124 - // return { labels: [...], cursor: "..." } 125 - }, 126 - 127 - async getLatestSeq() { 128 - // return the highest sequence number, or 0 129 - }, 130 - 131 - async getRange(after, limit) { 132 - // return labels with seq > after, ordered by seq 133 - }, 134 - }; 135 - ``` 136 - 137 - `save()` receives a signed label (without `seq`) and must assign a monotonically increasing sequence 138 - number. `query()` returns formatted labels for the `queryLabels` endpoint. `getRange()` and 139 - `getLatestSeq()` are used by the subscription outbox for backfill and cursor validation.
-103
packages/servers/labeler/lib/async-buffer.ts
··· 1 - import type { SimpleEventEmitter } from '@mary-ext/simple-event-emitter'; 2 - 3 - import { Queue } from './queue.ts'; 4 - 5 - /** thrown when the async buffer exceeds its maximum size. */ 6 - export class AsyncBufferFullError extends Error { 7 - constructor(maxSize: number) { 8 - super(`reached max buffer size: ${maxSize}`); 9 - } 10 - } 11 - 12 - /** bounded async buffer with backpressure support. */ 13 - export class AsyncBuffer<T> { 14 - #queue = new Queue<T>(); 15 - #closed = false; 16 - #deferred = Promise.withResolvers<void>(); 17 - 18 - #maxSize: number; 19 - 20 - constructor(maxSize: number) { 21 - this.#maxSize = maxSize; 22 - } 23 - 24 - push(value: T): void { 25 - if (this.#closed) { 26 - return; 27 - } 28 - 29 - if (this.#queue.size >= this.#maxSize) { 30 - this.#closed = true; 31 - } 32 - 33 - this.#queue.enqueue(value); 34 - this.#deferred.resolve(); 35 - } 36 - 37 - close(): void { 38 - if (this.#closed) { 39 - return; 40 - } 41 - 42 - this.#closed = true; 43 - this.#deferred.resolve(); 44 - } 45 - 46 - async *events(): AsyncGenerator<T> { 47 - while (true) { 48 - await this.#deferred.promise; 49 - 50 - if (this.#queue.size > this.#maxSize) { 51 - throw new AsyncBufferFullError(this.#maxSize); 52 - } 53 - 54 - const value = this.#queue.dequeue(); 55 - if (value !== undefined) { 56 - yield value; 57 - } else if (this.#closed) { 58 - return; 59 - } else { 60 - this.#deferred = Promise.withResolvers(); 61 - } 62 - } 63 - } 64 - } 65 - 66 - export interface OnOptions { 67 - maxSize: number; 68 - signal?: AbortSignal; 69 - } 70 - 71 - /** 72 - * create an async iterator from a SimpleEventEmitter with bounded buffering. 73 - * @param emitter event emitter 74 - * @param options buffer and signal options 75 - * @returns async iterator of event payloads 76 - */ 77 - export const on = <T>(emitter: SimpleEventEmitter<[T]>, options: OnOptions): AsyncIterableIterator<T> => { 78 - const { maxSize, signal } = options; 79 - 80 - signal?.throwIfAborted(); 81 - 82 - const buffer = new AsyncBuffer<T>(maxSize); 83 - const unsubscribe = emitter.subscribe((value) => { 84 - buffer.push(value); 85 - }); 86 - 87 - const cleanup = () => { 88 - unsubscribe(); 89 - buffer.close(); 90 - 91 - signal?.removeEventListener('abort', cleanup); 92 - }; 93 - 94 - signal?.addEventListener('abort', cleanup, { once: true }); 95 - 96 - return (async function* () { 97 - try { 98 - yield* buffer.events(); 99 - } finally { 100 - cleanup(); 101 - } 102 - })(); 103 - };
+42
packages/servers/labeler/lib/errors.ts
··· 1 + /** 2 + * base error type for labeler-specific failures 3 + */ 4 + export class LabelerError extends Error { 5 + override name = 'LabelerError'; 6 + } 7 + 8 + /** 9 + * error thrown when a subscription cursor is ahead of the store 10 + */ 11 + export class FutureCursorError extends LabelerError { 12 + override name = 'FutureCursorError'; 13 + 14 + readonly cursor: number; 15 + readonly latest: number; 16 + 17 + /** 18 + * creates a future-cursor error 19 + * @param cursor requested cursor 20 + * @param latest latest known sequence 21 + */ 22 + constructor(cursor: number, latest: number) { 23 + super(`cursor is in the future`); 24 + 25 + this.cursor = cursor; 26 + this.latest = latest; 27 + } 28 + } 29 + 30 + /** 31 + * error thrown when a subscriber cannot keep up with live events 32 + */ 33 + export class ConsumerTooSlowError extends LabelerError { 34 + override name = 'ConsumerTooSlowError'; 35 + 36 + /** 37 + * creates a consumer-too-slow error 38 + */ 39 + constructor() { 40 + super(`stream consumer too slow`); 41 + } 42 + }
+13 -4
packages/servers/labeler/lib/index.ts
··· 1 - export { formatLabel, signLabel, type CreateLabelData, type LabelSubject } from './labels.ts'; 2 - export { LabelOutbox, type LabelOutboxOptions } from './outbox.ts'; 3 - export { Labeler, type AuthCheck, type LabelerOptions } from './labeler.ts'; 4 - export type { LabelQueryParams, LabelQueryResult, LabelStore, SavedLabel } from './store.ts'; 1 + export { ConsumerTooSlowError, FutureCursorError, LabelerError } from './errors.ts'; 2 + export { Labeler } from './labeler.ts'; 3 + export { MemoryLabelStore } from './memory-label-store.ts'; 4 + 5 + export type { 6 + ApplyLabelsOptions, 7 + LabelEvent, 8 + LabelOp, 9 + LabelStore, 10 + LabelSubscriptionOptions, 11 + LabelerOptions, 12 + SignedLabel, 13 + } from './types.ts';
+123
packages/servers/labeler/lib/internal/async-buffer.ts
··· 1 + export class AsyncBufferFullError extends Error { 2 + override readonly name = 'AsyncBufferFullError'; 3 + 4 + constructor(maxSize: number) { 5 + super(`reached max buffer size: ${maxSize}`); 6 + } 7 + } 8 + 9 + export class AsyncBuffer<T> { 10 + private queue = new Queue<T>(); 11 + private closed = false; 12 + private deferred = Promise.withResolvers<void>(); 13 + 14 + constructor(private maxSize: number) {} 15 + 16 + push(value: T): void { 17 + if (this.closed) { 18 + return; 19 + } 20 + 21 + if (this.queue.size >= this.maxSize) { 22 + this.closed = true; 23 + } 24 + 25 + this.queue.enqueue(value); 26 + this.deferred.resolve(); 27 + } 28 + 29 + pushMany(values: T[]): void { 30 + if (this.closed) { 31 + return; 32 + } 33 + 34 + if (this.queue.size + values.length > this.maxSize) { 35 + this.closed = true; 36 + } 37 + 38 + for (const value of values) { 39 + this.queue.enqueue(value); 40 + } 41 + 42 + this.deferred.resolve(); 43 + } 44 + 45 + close(): void { 46 + if (this.closed) { 47 + return; 48 + } 49 + 50 + this.closed = true; 51 + this.deferred.resolve(); 52 + } 53 + 54 + async *events(): AsyncGenerator<T> { 55 + while (true) { 56 + await this.deferred.promise; 57 + 58 + if (this.queue.size > this.maxSize) { 59 + throw new AsyncBufferFullError(this.maxSize); 60 + } 61 + 62 + const value = this.queue.dequeue(); 63 + if (value !== undefined) { 64 + yield value; 65 + } else if (this.closed) { 66 + return; 67 + } else { 68 + this.deferred = Promise.withResolvers(); 69 + } 70 + } 71 + } 72 + } 73 + 74 + class Queue<T> { 75 + #head: QueueNode<T> | undefined; 76 + #tail: QueueNode<T> | undefined; 77 + #size: number = 0; 78 + 79 + get size(): number { 80 + return this.#size; 81 + } 82 + 83 + enqueue(value: T): this { 84 + const tail = this.#tail; 85 + const node = createNode(value, undefined); 86 + 87 + if (tail !== undefined) { 88 + tail.next = node; 89 + } else { 90 + this.#head = node; 91 + } 92 + 93 + this.#tail = node; 94 + this.#size++; 95 + return this; 96 + } 97 + 98 + dequeue(): T | undefined { 99 + const head = this.#head; 100 + if (!head) { 101 + return; 102 + } 103 + 104 + const next = head.next; 105 + 106 + this.#head = next; 107 + if (next === undefined) { 108 + this.#tail = undefined; 109 + } 110 + 111 + this.#size--; 112 + return head.value; 113 + } 114 + } 115 + 116 + interface QueueNode<T> { 117 + value: T; 118 + next: QueueNode<T> | undefined; 119 + } 120 + 121 + function createNode<T>(value: T, next: QueueNode<T> | undefined): QueueNode<T> { 122 + return { value, next }; 123 + }
+38
packages/servers/labeler/lib/internal/on.ts
··· 1 + import { SimpleEventEmitter } from '@mary-ext/simple-event-emitter'; 2 + 3 + import { AsyncBuffer } from './async-buffer.ts'; 4 + 5 + export interface OnOptions { 6 + maxSize: number; 7 + signal?: AbortSignal; 8 + } 9 + 10 + export const on = <T>( 11 + emitter: SimpleEventEmitter<[value: T]>, 12 + options: OnOptions, 13 + ): AsyncIterableIterator<T> => { 14 + const { maxSize, signal } = options; 15 + 16 + signal?.throwIfAborted(); 17 + 18 + const buffer = new AsyncBuffer<T>(maxSize); 19 + const unsubscribe = emitter.subscribe((value: T) => { 20 + buffer.push(value); 21 + }); 22 + 23 + const cleanup = () => { 24 + unsubscribe(); 25 + buffer.close(); 26 + signal?.removeEventListener('abort', cleanup); 27 + }; 28 + 29 + signal?.addEventListener('abort', cleanup, { once: true }); 30 + 31 + return (async function* () { 32 + try { 33 + yield* buffer.events(); 34 + } finally { 35 + cleanup(); 36 + } 37 + })(); 38 + };
+192
packages/servers/labeler/lib/internal/outbox.test.ts
··· 1 + import { toBytes } from '@atcute/cbor'; 2 + 3 + import { SimpleEventEmitter } from '@mary-ext/simple-event-emitter'; 4 + import { describe, expect, it } from 'vitest'; 5 + 6 + import { ConsumerTooSlowError } from '../errors.ts'; 7 + import type { LabelEvent, LabelStore, SignedLabel } from '../types.ts'; 8 + 9 + import { LabelerOutbox } from './outbox.ts'; 10 + 11 + const makeSignedLabel = (seq: number): SignedLabel => { 12 + return { 13 + src: 'did:plc:labeler', 14 + uri: `at://did:plc:alice/app.bsky.feed.post/${seq}`, 15 + val: 'spam', 16 + cts: '2026-02-22T00:00:00Z', 17 + ver: 1, 18 + sig: toBytes(new Uint8Array([seq & 0xff])), 19 + }; 20 + }; 21 + 22 + const makeEvent = (seq: number): LabelEvent => { 23 + return { 24 + seq, 25 + labels: [makeSignedLabel(seq)], 26 + }; 27 + }; 28 + 29 + class TestStore implements LabelStore { 30 + #events: LabelEvent[] = []; 31 + 32 + onList?: (options: { after?: number; limit: number }) => void; 33 + 34 + insert(event: LabelEvent): void { 35 + this.#events.push(event); 36 + this.#events.sort((a, b) => a.seq - b.seq); 37 + } 38 + 39 + insertSeqs(seqs: Iterable<number>): void { 40 + for (const seq of seqs) { 41 + this.insert(makeEvent(seq)); 42 + } 43 + } 44 + 45 + async appendLabels(labels: SignedLabel[]): Promise<LabelEvent[]> { 46 + const latest = (await this.getLatestSeq()) ?? 0; 47 + const events: LabelEvent[] = []; 48 + 49 + let seq = latest; 50 + for (const label of labels) { 51 + seq++; 52 + const event = { seq, labels: [label] }; 53 + this.insert(event); 54 + events.push(event); 55 + } 56 + 57 + return events; 58 + } 59 + 60 + async getLatestSeq(): Promise<number | null> { 61 + return this.#events.at(-1)?.seq ?? null; 62 + } 63 + 64 + async listLabelEvents(options: { after?: number; limit: number }): Promise<LabelEvent[]> { 65 + this.onList?.(options); 66 + 67 + const out: LabelEvent[] = []; 68 + for (const event of this.#events) { 69 + if (options.after !== undefined && event.seq <= options.after) { 70 + continue; 71 + } 72 + 73 + out.push(event); 74 + if (out.length >= options.limit) { 75 + break; 76 + } 77 + } 78 + 79 + return out; 80 + } 81 + } 82 + 83 + const readSeqs = async (iterator: AsyncIterator<LabelEvent>, count: number): Promise<number[]> => { 84 + const seqs: number[] = []; 85 + 86 + for (let idx = 0; idx < count; idx++) { 87 + const next = await iterator.next(); 88 + if (next.done) { 89 + break; 90 + } 91 + 92 + seqs.push(next.value.seq); 93 + } 94 + 95 + return seqs; 96 + }; 97 + 98 + describe('LabelerOutbox', () => { 99 + it('pages through backfill across multiple reads', async () => { 100 + const store = new TestStore(); 101 + store.insertSeqs([1, 2, 3, 4, 5, 6, 7, 8, 9]); 102 + 103 + const controller = new AbortController(); 104 + const emitter = new SimpleEventEmitter<[event: LabelEvent]>(); 105 + const outbox = new LabelerOutbox(store, emitter, { pageSize: 4 }); 106 + const iterator = outbox.events(0, controller.signal); 107 + 108 + const seqs = await readSeqs(iterator[Symbol.asyncIterator](), 9); 109 + 110 + expect(seqs).toEqual([1, 2, 3, 4, 5, 6, 7, 8, 9]); 111 + 112 + controller.abort(); 113 + }); 114 + 115 + it('handles cutover without duplicating buffered events', async () => { 116 + const store = new TestStore(); 117 + store.insertSeqs([1, 2, 3]); 118 + 119 + const controller = new AbortController(); 120 + const emitter = new SimpleEventEmitter<[event: LabelEvent]>(); 121 + const outbox = new LabelerOutbox(store, emitter, { pageSize: 4 }); 122 + 123 + let injectedCutoverEvent = false; 124 + store.onList = (options) => { 125 + if (injectedCutoverEvent || options.limit !== Number.MAX_SAFE_INTEGER) { 126 + return; 127 + } 128 + 129 + injectedCutoverEvent = true; 130 + 131 + const event = makeEvent(4); 132 + store.insert(event); 133 + emitter.emit(event); 134 + }; 135 + 136 + const iterator = outbox.events(0, controller.signal)[Symbol.asyncIterator](); 137 + 138 + const seqs = await readSeqs(iterator, 4); 139 + expect(seqs).toEqual([1, 2, 3, 4]); 140 + 141 + const liveEvent = makeEvent(5); 142 + store.insert(liveEvent); 143 + emitter.emit(liveEvent); 144 + 145 + const next = await iterator.next(); 146 + expect(next.done).toBe(false); 147 + expect(next.value?.seq).toBe(5); 148 + 149 + controller.abort(); 150 + }); 151 + 152 + it('buffers unread tail events', async () => { 153 + const store = new TestStore(); 154 + const controller = new AbortController(); 155 + const emitter = new SimpleEventEmitter<[event: LabelEvent]>(); 156 + const outbox = new LabelerOutbox(store, emitter, { pageSize: 4, maxBufferSize: 16 }); 157 + const iterator = outbox.events(undefined, controller.signal)[Symbol.asyncIterator](); 158 + 159 + const first = iterator.next(); 160 + 161 + for (const seq of [1, 2, 3, 4, 5]) { 162 + emitter.emit(makeEvent(seq)); 163 + } 164 + 165 + const firstResult = await first; 166 + expect(firstResult.done).toBe(false); 167 + expect(firstResult.value?.seq).toBe(1); 168 + 169 + const rest = await readSeqs(iterator, 4); 170 + expect(rest).toEqual([2, 3, 4, 5]); 171 + 172 + controller.abort(); 173 + }); 174 + 175 + it('throws when the live tail buffer overflows', async () => { 176 + const store = new TestStore(); 177 + const controller = new AbortController(); 178 + const emitter = new SimpleEventEmitter<[event: LabelEvent]>(); 179 + const outbox = new LabelerOutbox(store, emitter, { pageSize: 4, maxBufferSize: 2 }); 180 + const iterator = outbox.events(undefined, controller.signal)[Symbol.asyncIterator](); 181 + 182 + const next = iterator.next(); 183 + 184 + for (const seq of [1, 2, 3, 4]) { 185 + emitter.emit(makeEvent(seq)); 186 + } 187 + 188 + await expect(next).rejects.toBeInstanceOf(ConsumerTooSlowError); 189 + 190 + controller.abort(); 191 + }); 192 + });
+113
packages/servers/labeler/lib/internal/outbox.ts
··· 1 + import { SimpleEventEmitter } from '@mary-ext/simple-event-emitter'; 2 + 3 + import { ConsumerTooSlowError } from '../errors.ts'; 4 + import type { LabelEvent, LabelStore } from '../types.ts'; 5 + 6 + import { AsyncBufferFullError } from './async-buffer.ts'; 7 + import { on } from './on.ts'; 8 + 9 + interface LabelerOutboxOptions { 10 + pageSize: number; 11 + maxBufferSize?: number; 12 + } 13 + 14 + /** 15 + * internal helper that merges store backfill with local wake-ups 16 + */ 17 + export class LabelerOutbox { 18 + #store: LabelStore; 19 + #events: SimpleEventEmitter<[event: LabelEvent]>; 20 + #pageSize: number; 21 + #maxBufferSize: number; 22 + 23 + /** 24 + * creates an outbox 25 + * @param store label store 26 + * @param wakeups local wakeup emitter 27 + * @param options outbox options 28 + */ 29 + constructor( 30 + store: LabelStore, 31 + events: SimpleEventEmitter<[event: LabelEvent]>, 32 + options: LabelerOutboxOptions, 33 + ) { 34 + this.#store = store; 35 + this.#events = events; 36 + this.#pageSize = options.pageSize; 37 + this.#maxBufferSize = Math.max(1, options.maxBufferSize ?? 64); 38 + } 39 + 40 + async *events(backfillCursor: number | undefined, signal: AbortSignal): AsyncIterableIterator<LabelEvent> { 41 + let lastBackfillSeq = -1; 42 + let caughtUp = backfillCursor === undefined; 43 + 44 + // consumer is backfilling, dump everything we have 45 + if (!caughtUp) { 46 + while (true) { 47 + const events = await this.#store.listLabelEvents({ 48 + after: lastBackfillSeq > -1 ? lastBackfillSeq : backfillCursor, 49 + limit: this.#pageSize, 50 + }); 51 + 52 + if (events.length === 0) { 53 + break; 54 + } 55 + 56 + yield* events; 57 + 58 + signal.throwIfAborted(); 59 + 60 + lastBackfillSeq = events.at(-1)!.seq; 61 + 62 + // stop when we're within half a page of the latest known seq 63 + const lastSeq = (await this.#store.getLatestSeq()) ?? -1; 64 + if (lastSeq - lastBackfillSeq < this.#pageSize / 2) { 65 + break; 66 + } 67 + } 68 + 69 + signal.throwIfAborted(); 70 + } 71 + 72 + // start listening to local labeler events 73 + const tail = on(this.#events, { signal, maxSize: this.#maxBufferSize }); 74 + 75 + // ensure we're truly caught up 76 + if (!caughtUp) { 77 + const events = await this.#store.listLabelEvents({ 78 + after: lastBackfillSeq > -1 ? lastBackfillSeq : backfillCursor, 79 + limit: Number.MAX_SAFE_INTEGER, 80 + }); 81 + 82 + if (events.length > 0) { 83 + yield* events; 84 + 85 + signal.throwIfAborted(); 86 + 87 + lastBackfillSeq = events.at(-1)!.seq; 88 + } 89 + } 90 + 91 + // start tailing 92 + try { 93 + for await (const event of tail) { 94 + if (!caughtUp) { 95 + // we're tailing now, but we still have to omit previous events 96 + if (event.seq <= lastBackfillSeq) { 97 + continue; 98 + } 99 + 100 + caughtUp = true; 101 + } 102 + 103 + yield event; 104 + } 105 + } catch (err) { 106 + if (err instanceof AsyncBufferFullError) { 107 + throw new ConsumerTooSlowError(); 108 + } 109 + 110 + throw err; 111 + } 112 + } 113 + }
+104 -212
packages/servers/labeler/lib/labeler.test.ts
··· 1 - import type { ComAtprotoLabelDefs } from '@atcute/atproto'; 2 - import { Secp256k1PrivateKey } from '@atcute/crypto'; 3 - import type { Did } from '@atcute/lexicons'; 4 - import { XRPCRouter } from '@atcute/xrpc-server'; 1 + import type * as ComAtprotoLabelDefs from '@atcute/atproto/types/label/defs'; 2 + import { fromBytes, isBytes } from '@atcute/cbor'; 3 + import type { PrivateKey } from '@atcute/crypto'; 5 4 6 5 import { describe, expect, it } from 'vitest'; 7 6 8 - import { Labeler, type LabelerOptions } from './labeler.ts'; 9 - import { formatLabel } from './labels.ts'; 10 - import type { LabelStore, SavedLabel } from './store.ts'; 11 - 12 - const TEST_DID = 'did:plc:testlabeler' as Did; 13 - 14 - const getTestKey = async () => { 15 - const keyBytes = new Uint8Array(32); 16 - keyBytes[0] = 1; 17 - return Secp256k1PrivateKey.importRaw(keyBytes); 18 - }; 19 - 20 - const createMockStore = (): LabelStore & { labels: SavedLabel[] } => { 21 - const labels: SavedLabel[] = []; 7 + import { FutureCursorError } from './errors.ts'; 8 + import { Labeler } from './labeler.ts'; 9 + import { MemoryLabelStore } from './memory-label-store.ts'; 22 10 11 + const createTestPrivateKey = (): PrivateKey => { 23 12 return { 24 - labels, 25 - async save(label) { 26 - const seq = labels.length + 1; 27 - const saved = { ...label, seq }; 28 - labels.push(saved); 29 - return saved; 30 - }, 31 - async query({ uriPatterns, sources, cursor, limit }) { 32 - let result = [...labels]; 33 - 34 - // filter by URI patterns 35 - if (uriPatterns.length > 0 && !uriPatterns.includes('*')) { 36 - result = result.filter((l) => 37 - uriPatterns.some((pattern) => { 38 - if (pattern.endsWith('*')) { 39 - return l.uri.startsWith(pattern.slice(0, -1)); 40 - } 41 - return l.uri === pattern; 42 - }), 43 - ); 13 + type: 'test', 14 + jwtAlg: 'TEST', 15 + async sign(data: Uint8Array): Promise<Uint8Array<ArrayBuffer>> { 16 + const out = new Uint8Array(8); 17 + for (let idx = 0; idx < data.length; idx++) { 18 + out[idx % out.length] = (out[idx % out.length] + data[idx] + idx) & 0xff; 44 19 } 45 20 46 - // filter by sources 47 - if (sources.length > 0) { 48 - result = result.filter((l) => sources.includes(l.src)); 49 - } 50 - 51 - // filter by cursor 52 - if (cursor > 0) { 53 - result = result.filter((l) => l.seq > cursor); 54 - } 55 - 56 - // apply limit 57 - result = result.slice(0, limit); 58 - 59 - const formatted: ComAtprotoLabelDefs.Label[] = result.map(formatLabel); 60 - const lastSeq = result.at(-1)?.seq ?? 0; 61 - 62 - return { labels: formatted, cursor: String(lastSeq) }; 21 + return out; 63 22 }, 64 - async getLatestSeq() { 65 - return labels.at(-1)?.seq ?? 0; 23 + async verify() { 24 + return true; 66 25 }, 67 - async getRange(after, limit) { 68 - const result = labels.filter((l) => l.seq > after); 69 - return limit !== undefined ? result.slice(0, limit) : result; 26 + async exportPublicKey() { 27 + return 'did:key:test' as any; 70 28 }, 71 - }; 29 + } as PrivateKey; 72 30 }; 73 31 74 - const createMockWebSocket = () => ({ 75 - async upgrade() { 76 - return undefined; 77 - }, 78 - }); 79 - 80 - const createTestRouter = async (options?: Partial<LabelerOptions>) => { 81 - const key = await getTestKey(); 82 - const store = createMockStore(); 83 - 84 - const labeler = new Labeler({ 85 - did: TEST_DID, 86 - key, 87 - store, 88 - ...options, 89 - }); 90 - 91 - const router = new XRPCRouter({ websocket: createMockWebSocket() }); 92 - labeler.register(router); 32 + const collectOne = async <T>(iterable: AsyncIterable<T>): Promise<T> => { 33 + const iterator = iterable[Symbol.asyncIterator](); 34 + const result = await iterator.next(); 35 + if (result.done) { 36 + throw new Error(`iterator completed`); 37 + } 93 38 94 - return { labeler, store, router }; 39 + return result.value; 95 40 }; 96 41 97 42 describe('Labeler', () => { 98 - describe('createLabel', () => { 99 - it('should create and save a label', async () => { 100 - const { labeler, store } = await createTestRouter(); 101 - 102 - const saved = await labeler.createLabel({ 103 - uri: 'did:plc:target', 104 - val: 'spam', 105 - }); 106 - 107 - expect(saved.seq).toBe(1); 108 - expect(saved.src).toBe(TEST_DID); 109 - expect(saved.uri).toBe('did:plc:target'); 110 - expect(saved.val).toBe('spam'); 111 - expect(saved.neg).toBe(false); 112 - expect(saved.sig.byteLength).toBe(64); 113 - expect(store.labels).toHaveLength(1); 43 + it('signs and stores labels', async () => { 44 + const store = new MemoryLabelStore(); 45 + const labeler = new Labeler({ 46 + serviceDid: 'did:plc:labeler', 47 + signingKey: createTestPrivateKey(), 48 + store: store, 114 49 }); 115 50 116 - it('should create labels with custom src', async () => { 117 - const { labeler } = await createTestRouter(); 51 + const labels = await labeler.applyLabels( 52 + [ 53 + { uri: 'at://did:plc:alice/app.bsky.feed.post/1', value: 'spam' }, 54 + { uri: 'at://did:plc:alice/app.bsky.feed.post/1', value: 'spam', negate: true }, 55 + ], 56 + { issuedAt: '2026-02-22T00:00:00Z' }, 57 + ); 118 58 119 - const saved = await labeler.createLabel({ 120 - uri: 'did:plc:target', 121 - val: 'spam', 122 - src: 'did:plc:custom', 123 - }); 59 + expect(labels).toHaveLength(2); 60 + expect(labels[0]?.src).toBe('did:plc:labeler'); 61 + expect(labels[0]?.ver).toBe(1); 62 + expect(labels[1]?.neg).toBe(true); 63 + expect(labels.every((label) => isBytes(label.sig))).toBe(true); 124 64 125 - expect(saved.src).toBe('did:plc:custom'); 126 - }); 65 + const firstSig = fromBytes(labels[0]!.sig as NonNullable<ComAtprotoLabelDefs.Label['sig']>); 66 + expect(firstSig.length).toBe(8); 127 67 }); 128 68 129 - describe('createLabels', () => { 130 - it('should create and negate labels for a subject', async () => { 131 - const { labeler, store } = await createTestRouter(); 132 - 133 - const results = await labeler.createLabels( 134 - { uri: 'did:plc:target' }, 135 - { create: ['spam', 'nsfw'], negate: ['misleading'] }, 136 - ); 137 - 138 - expect(results).toHaveLength(3); 139 - expect(results[0]!.val).toBe('spam'); 140 - expect(results[0]!.neg).toBe(false); 141 - expect(results[1]!.val).toBe('nsfw'); 142 - expect(results[1]!.neg).toBe(false); 143 - expect(results[2]!.val).toBe('misleading'); 144 - expect(results[2]!.neg).toBe(true); 145 - expect(store.labels).toHaveLength(3); 69 + it('returns an empty array for an empty batch', async () => { 70 + const labeler = new Labeler({ 71 + serviceDid: 'did:plc:labeler', 72 + signingKey: createTestPrivateKey(), 73 + store: new MemoryLabelStore(), 146 74 }); 147 - }); 148 75 149 - describe('queryLabels endpoint', () => { 150 - it('should return labels matching URI patterns', async () => { 151 - const { labeler, router } = await createTestRouter(); 152 - 153 - await labeler.createLabel({ uri: 'did:plc:user1', val: 'spam' }); 154 - await labeler.createLabel({ uri: 'did:plc:user2', val: 'nsfw' }); 155 - 156 - const response = await router.fetch( 157 - new Request('http://localhost/xrpc/com.atproto.label.queryLabels?uriPatterns=*'), 158 - ); 159 - 160 - expect(response.status).toBe(200); 76 + await expect(labeler.applyLabels([])).resolves.toEqual([]); 77 + }); 161 78 162 - const body = await response.json(); 163 - expect(body.labels).toHaveLength(2); 164 - expect(body.cursor).toBeDefined(); 79 + it('backfills and streams live events', async () => { 80 + const store = new MemoryLabelStore(); 81 + const labeler = new Labeler({ 82 + serviceDid: 'did:plc:labeler', 83 + signingKey: createTestPrivateKey(), 84 + store: store, 165 85 }); 166 86 167 - it('should paginate with cursor', async () => { 168 - const { labeler, router } = await createTestRouter(); 87 + await labeler.applyLabel({ uri: 'at://did:plc:alice/app.bsky.feed.post/1', value: 'spam' }); 169 88 170 - await labeler.createLabel({ uri: 'did:plc:user1', val: 'spam' }); 171 - await labeler.createLabel({ uri: 'did:plc:user2', val: 'nsfw' }); 89 + const controller = new AbortController(); 90 + const subscription = labeler.subscribeLabels({ cursor: 0, signal: controller.signal }); 91 + const iterator = subscription[Symbol.asyncIterator](); 172 92 173 - const response = await router.fetch( 174 - new Request('http://localhost/xrpc/com.atproto.label.queryLabels?uriPatterns=*&limit=1'), 175 - ); 93 + const first = await iterator.next(); 94 + expect(first.done).toBe(false); 95 + expect(first.value?.seq).toBe(1); 176 96 177 - const body = await response.json(); 178 - expect(body.labels).toHaveLength(1); 179 - expect(body.labels[0].val).toBe('spam'); 97 + const nextPromise = iterator.next(); 98 + await labeler.applyLabel({ uri: 'at://did:plc:alice/app.bsky.feed.post/2', value: 'spam' }); 180 99 181 - // fetch next page 182 - const response2 = await router.fetch( 183 - new Request( 184 - `http://localhost/xrpc/com.atproto.label.queryLabels?uriPatterns=*&limit=1&cursor=${body.cursor}`, 185 - ), 186 - ); 100 + const second = await nextPromise; 101 + expect(second.done).toBe(false); 102 + expect(second.value?.seq).toBe(2); 187 103 188 - const body2 = await response2.json(); 189 - expect(body2.labels).toHaveLength(1); 190 - expect(body2.labels[0].val).toBe('nsfw'); 191 - }); 104 + controller.abort(); 192 105 }); 193 106 194 - describe('emitEvent endpoint', () => { 195 - it('should not register without auth', async () => { 196 - const { router } = await createTestRouter(); 197 - 198 - const response = await router.fetch( 199 - new Request('http://localhost/xrpc/tools.ozone.moderation.emitEvent', { 200 - method: 'POST', 201 - headers: { 'content-type': 'application/json' }, 202 - body: JSON.stringify({ 203 - event: { 204 - $type: 'tools.ozone.moderation.defs#modEventLabel', 205 - createLabelVals: ['spam'], 206 - negateLabelVals: [], 207 - }, 208 - subject: { 209 - $type: 'com.atproto.admin.defs#repoRef', 210 - did: 'did:plc:target', 211 - }, 212 - createdBy: TEST_DID, 213 - }), 214 - }), 215 - ); 107 + it('rejects future subscription cursors', async () => { 108 + const controller = new AbortController(); 216 109 217 - expect(response.status).toBe(404); 110 + const labeler = new Labeler({ 111 + serviceDid: 'did:plc:labeler', 112 + signingKey: createTestPrivateKey(), 113 + store: new MemoryLabelStore(), 218 114 }); 219 115 220 - it('should reject when auth returns false', async () => { 221 - const { router } = await createTestRouter({ 222 - auth: () => false, 223 - }); 116 + const error = await collectOne( 117 + labeler.subscribeLabels({ 118 + cursor: 1, 119 + signal: controller.signal, 120 + }), 121 + ).catch((err) => err); 224 122 225 - const response = await router.fetch( 226 - new Request('http://localhost/xrpc/tools.ozone.moderation.emitEvent', { 227 - method: 'POST', 228 - headers: { 'content-type': 'application/json' }, 229 - body: JSON.stringify({ 230 - event: { 231 - $type: 'tools.ozone.moderation.defs#modEventLabel', 232 - createLabelVals: ['spam'], 233 - negateLabelVals: [], 234 - }, 235 - subject: { 236 - $type: 'com.atproto.admin.defs#repoRef', 237 - did: 'did:plc:target', 238 - }, 239 - createdBy: TEST_DID, 240 - }), 241 - }), 242 - ); 123 + expect(error).toBeInstanceOf(FutureCursorError); 124 + }); 125 + 126 + it('tolerates sequence gaps in the store', async () => { 127 + const controller = new AbortController(); 128 + 129 + const store = new MemoryLabelStore(); 130 + store.advanceSeq(3); 243 131 244 - expect(response.status).toBe(401); 132 + const labeler = new Labeler({ 133 + serviceDid: 'did:plc:labeler', 134 + signingKey: createTestPrivateKey(), 135 + store: store, 245 136 }); 246 - }); 247 137 248 - describe('register', () => { 249 - it('should return 404 for unknown routes', async () => { 250 - const { router } = await createTestRouter(); 138 + await labeler.applyLabel({ uri: 'at://did:plc:alice/app.bsky.feed.post/1', value: 'spam' }); 251 139 252 - const response = await router.fetch(new Request('http://localhost/xrpc/com.atproto.nonexistent')); 140 + const event = await collectOne( 141 + labeler.subscribeLabels({ 142 + cursor: 0, 143 + signal: controller.signal, 144 + }), 145 + ); 253 146 254 - expect(response.status).toBe(404); 255 - }); 147 + expect(event.seq).toBe(4); 256 148 }); 257 149 });
+71 -184
packages/servers/labeler/lib/labeler.ts
··· 1 - import { ComAtprotoLabelQueryLabels, ComAtprotoLabelSubscribeLabels } from '@atcute/atproto'; 2 1 import type { PrivateKey } from '@atcute/crypto'; 3 - import type { Did } from '@atcute/lexicons'; 4 - import { ToolsOzoneModerationEmitEvent } from '@atcute/ozone'; 5 - import { AuthRequiredError, InvalidRequestError, type XRPCRouter, json } from '@atcute/xrpc-server'; 2 + import type { Did } from '@atcute/lexicons/syntax'; 6 3 7 4 import { SimpleEventEmitter } from '@mary-ext/simple-event-emitter'; 8 5 9 - import { formatLabel, signLabel, type CreateLabelData, type LabelSubject } from './labels.ts'; 10 - import { LabelOutbox } from './outbox.ts'; 11 - import type { LabelStore, SavedLabel } from './store.ts'; 12 - 13 - type Promisable<T> = T | Promise<T>; 6 + import { FutureCursorError } from './errors.ts'; 7 + import { LabelerOutbox } from './internal/outbox.ts'; 8 + import { buildLabels, signLabel } from './signing.ts'; 9 + import type { 10 + ApplyLabelsOptions, 11 + LabelerOptions, 12 + LabelEvent, 13 + LabelOp, 14 + LabelSubscriptionOptions, 15 + LabelStore, 16 + SignedLabel, 17 + } from './types.ts'; 14 18 15 19 /** 16 - * auth callback for the `emitEvent` endpoint. 17 - * receives the request; return `false` to reject. 20 + * high-level labeler api with internal sequencing and subscription handling 18 21 */ 19 - export type AuthCheck = (request: Request) => Promisable<boolean>; 20 - 21 - export interface LabelerOptions { 22 - /** DID of the labeler account */ 23 - did: Did; 24 - /** private signing key */ 25 - key: PrivateKey; 26 - /** label storage backend */ 27 - store: LabelStore; 28 - /** authenticate `emitEvent` requests. if not provided, the endpoint is not registered */ 29 - auth?: AuthCheck; 30 - /** maximum outbox buffer size per subscription */ 31 - maxBufferSize?: number; 32 - } 33 - 34 - /** labeler server core. */ 35 22 export class Labeler { 36 - /** labeler DID */ 37 - readonly did: Did; 38 - 39 - #store: LabelStore; 40 - #key: PrivateKey; 23 + readonly #serviceDid: Did; 24 + readonly #signingKey: PrivateKey; 25 + readonly #store: LabelStore; 41 26 42 - #auth: AuthCheck | undefined; 27 + readonly #pageSize: number; 43 28 44 - #emitter = new SimpleEventEmitter<[SavedLabel]>(); 45 - #maxBufferSize: number; 29 + #events = new SimpleEventEmitter<[event: LabelEvent]>(); 46 30 31 + /** 32 + * creates a new labeler 33 + * @param options labeler options 34 + */ 47 35 constructor(options: LabelerOptions) { 48 - this.did = options.did; 49 - 36 + this.#serviceDid = options.serviceDid; 37 + this.#signingKey = options.signingKey; 50 38 this.#store = options.store; 51 - this.#key = options.key; 52 39 53 - this.#auth = options.auth; 54 - 55 - this.#maxBufferSize = options.maxBufferSize ?? 500; 40 + this.#pageSize = Math.max(1, options.pageSize ?? 500); 56 41 } 57 42 58 43 /** 59 - * register labeler routes on a router. 60 - * registers queryLabels and subscribeLabels always; 61 - * emitEvent is only registered if `auth` was provided. 62 - * @param router the router to register on 44 + * apply a single label operation 45 + * @param op label operation 46 + * @param options batch defaults 47 + * @returns stored label 63 48 */ 64 - register(router: XRPCRouter): void { 65 - this.#registerQueryLabels(router); 66 - this.#registerSubscribeLabels(router); 67 - 68 - if (this.#auth !== undefined) { 69 - this.#registerEmitEvent(router, this.#auth); 49 + async applyLabel(op: LabelOp, options?: ApplyLabelsOptions): Promise<SignedLabel> { 50 + const labels = await this.applyLabels([op], options); 51 + const label = labels[0]; 52 + if (label === undefined) { 53 + throw new Error(`expected one stored label`); 70 54 } 71 - } 72 - 73 - /** 74 - * create and save a single label. 75 - * @param data label creation data 76 - * @returns the saved label 77 - */ 78 - async createLabel(data: CreateLabelData): Promise<SavedLabel> { 79 - const signed = await signLabel(data, this.did, this.#key); 80 - const saved = await this.#store.save(signed); 81 55 82 - this.#emitter.emit(saved); 83 - return saved; 56 + return label; 84 57 } 85 58 86 59 /** 87 - * create and save multiple labels for a subject. 88 - * @param subject the label subject (URI + optional CID) 89 - * @param labels label values to create and/or negate 90 - * @returns all created labels 60 + * apply label operations 61 + * @param ops label operations 62 + * @param options batch defaults 63 + * @returns stored labels in event order 91 64 */ 92 - async createLabels( 93 - subject: LabelSubject, 94 - labels: { create?: string[]; negate?: string[]; exp?: string }, 95 - ): Promise<SavedLabel[]> { 96 - const result: SavedLabel[] = []; 65 + async applyLabels(ops: Iterable<LabelOp>, options?: ApplyLabelsOptions): Promise<SignedLabel[]> { 66 + const drafts = buildLabels(this.#serviceDid, ops, options); 67 + if (drafts.length === 0) { 68 + return []; 69 + } 97 70 98 - if (labels.create) { 99 - for (const val of labels.create) { 100 - const saved = await this.createLabel({ ...subject, val, exp: labels.exp }); 101 - result.push(saved); 102 - } 71 + const signed = await Promise.all(drafts.map((label) => signLabel(this.#signingKey, label))); 72 + const events = await this.#store.appendLabels(signed); 73 + 74 + for (const event of events) { 75 + this.#events.emit(event); 103 76 } 104 77 105 - if (labels.negate) { 106 - for (const val of labels.negate) { 107 - const saved = await this.createLabel({ ...subject, val, neg: true }); 108 - result.push(saved); 78 + const labels: SignedLabel[] = []; 79 + for (const event of events) { 80 + for (const label of event.labels) { 81 + labels.push(label); 109 82 } 110 83 } 111 84 112 - return result; 113 - } 114 - 115 - #registerQueryLabels(router: XRPCRouter): void { 116 - const store = this.#store; 117 - 118 - router.addQuery(ComAtprotoLabelQueryLabels, { 119 - handler: async ({ params }) => { 120 - const result = await store.query({ 121 - uriPatterns: params.uriPatterns, 122 - sources: params.sources ?? [], 123 - cursor: params.cursor !== undefined ? parseInt(params.cursor, 10) || 0 : 0, 124 - limit: params.limit, 125 - }); 126 - 127 - return json(result); 128 - }, 129 - }); 85 + return labels; 130 86 } 131 87 132 - #registerSubscribeLabels(router: XRPCRouter): void { 133 - const store = this.#store; 134 - const emitter = this.#emitter; 135 - const maxBufferSize = this.#maxBufferSize; 136 - 137 - router.addSubscription(ComAtprotoLabelSubscribeLabels, { 138 - async *handler({ params, signal }) { 139 - const { cursor } = params; 88 + /** 89 + * subscribe to sequenced label events 90 + * @param options subscription options 91 + * @returns async iterator of label events 92 + * @throws {LabelerFutureCursorError} 93 + */ 94 + async *subscribeLabels(options: LabelSubscriptionOptions): AsyncIterableIterator<LabelEvent> { 95 + const { cursor, signal } = options; 140 96 141 - if (cursor !== undefined) { 142 - const latestSeq = await store.getLatestSeq(); 143 - if (cursor > latestSeq) { 144 - throw new InvalidRequestError({ 145 - error: 'FutureCursor', 146 - description: `cursor is in the future`, 147 - }); 148 - } 149 - } 97 + if (cursor !== undefined) { 98 + const latest = (await this.#store.getLatestSeq()) ?? 0; 99 + if (cursor > latest) { 100 + throw new FutureCursorError(cursor, latest); 101 + } 102 + } 150 103 151 - const outbox = new LabelOutbox(store, emitter, { maxBufferSize }); 104 + const outbox = new LabelerOutbox(this.#store, this.#events, { pageSize: this.#pageSize }); 152 105 153 - for await (const label of outbox.events(cursor, signal)) { 154 - yield { 155 - $type: 'com.atproto.label.subscribeLabels#labels', 156 - seq: label.seq, 157 - labels: [formatLabel(label)], 158 - }; 159 - } 160 - }, 161 - }); 162 - } 163 - 164 - #registerEmitEvent(router: XRPCRouter, auth: AuthCheck): void { 165 - router.addProcedure(ToolsOzoneModerationEmitEvent, { 166 - handler: async ({ request, input }) => { 167 - if (!(await auth(request))) { 168 - throw new AuthRequiredError({ description: `unauthorized` }); 169 - } 170 - 171 - const { event, subject, subjectBlobCids = [], createdBy } = input; 172 - 173 - if (event.$type !== 'tools.ozone.moderation.defs#modEventLabel') { 174 - throw new InvalidRequestError({ description: `unsupported event type` }); 175 - } 176 - 177 - if (!event.createLabelVals?.length && !event.negateLabelVals?.length) { 178 - throw new InvalidRequestError({ description: `must provide at least one label value` }); 179 - } 180 - 181 - const uri = 182 - subject.$type === 'com.atproto.admin.defs#repoRef' 183 - ? subject.did 184 - : subject.$type === 'com.atproto.repo.strongRef' 185 - ? subject.uri 186 - : undefined; 187 - 188 - if (uri === undefined) { 189 - throw new InvalidRequestError({ description: `invalid subject` }); 190 - } 191 - 192 - const cid = subject.$type === 'com.atproto.repo.strongRef' ? subject.cid : undefined; 193 - 194 - const labelSubject: LabelSubject = { uri }; 195 - if (cid !== undefined) { 196 - labelSubject.cid = cid; 197 - } 198 - 199 - let exp: string | undefined; 200 - if (event.durationInHours !== undefined) { 201 - exp = new Date(Date.now() + event.durationInHours * 60 * 60 * 1000).toISOString(); 202 - } 203 - 204 - const labels = await this.createLabels(labelSubject, { 205 - create: event.createLabelVals, 206 - negate: event.negateLabelVals, 207 - exp, 208 - }); 209 - 210 - return json({ 211 - id: labels[0]!.seq, 212 - event, 213 - subject, 214 - subjectBlobCids, 215 - createdBy, 216 - createdAt: new Date().toISOString(), 217 - }); 218 - }, 219 - }); 106 + yield* outbox.events(cursor, signal); 220 107 } 221 108 }
-145
packages/servers/labeler/lib/labels.test.ts
··· 1 - import { fromBytes, isBytes } from '@atcute/cbor'; 2 - import { Secp256k1PrivateKey } from '@atcute/crypto'; 3 - import type { Did } from '@atcute/lexicons'; 4 - 5 - import { describe, expect, it } from 'vitest'; 6 - 7 - import { formatLabel, signLabel } from './labels.ts'; 8 - import type { SavedLabel } from './store.ts'; 9 - 10 - const TEST_DID = 'did:plc:test1234' as Did; 11 - 12 - const getTestKey = async () => { 13 - // deterministic test key (32 bytes) 14 - const keyBytes = new Uint8Array(32); 15 - keyBytes[0] = 1; 16 - return Secp256k1PrivateKey.importRaw(keyBytes); 17 - }; 18 - 19 - describe('signLabel', () => { 20 - it('should produce a 64-byte compact secp256k1 signature', async () => { 21 - const key = await getTestKey(); 22 - 23 - const signed = await signLabel({ uri: 'did:plc:target', val: 'spam' }, TEST_DID, key); 24 - 25 - expect(signed.sig).toBeInstanceOf(Uint8Array); 26 - expect(signed.sig.byteLength).toBe(64); 27 - expect(signed.src).toBe(TEST_DID); 28 - expect(signed.uri).toBe('did:plc:target'); 29 - expect(signed.val).toBe('spam'); 30 - expect(signed.neg).toBe(false); 31 - expect(signed.cts).toBeDefined(); 32 - }); 33 - 34 - it('should use provided cts and src', async () => { 35 - const key = await getTestKey(); 36 - const cts = '2025-01-01T00:00:00.000Z'; 37 - 38 - const signed = await signLabel( 39 - { uri: 'did:plc:target', val: 'spam', cts, src: 'did:plc:other' }, 40 - TEST_DID, 41 - key, 42 - ); 43 - 44 - expect(signed.cts).toBe(cts); 45 - expect(signed.src).toBe('did:plc:other'); 46 - }); 47 - 48 - it('should produce consistent label fields for the same input', async () => { 49 - const key = await getTestKey(); 50 - const cts = '2025-01-01T00:00:00.000Z'; 51 - const data = { uri: 'did:plc:target', val: 'spam', cts }; 52 - 53 - const signed1 = await signLabel(data, TEST_DID, key); 54 - const signed2 = await signLabel(data, TEST_DID, key); 55 - 56 - // label fields should be identical 57 - expect(signed1.src).toBe(signed2.src); 58 - expect(signed1.uri).toBe(signed2.uri); 59 - expect(signed1.val).toBe(signed2.val); 60 - expect(signed1.neg).toBe(signed2.neg); 61 - expect(signed1.cts).toBe(signed2.cts); 62 - 63 - // both signatures should be valid 64-byte compact signatures 64 - expect(signed1.sig.byteLength).toBe(64); 65 - expect(signed2.sig.byteLength).toBe(64); 66 - }); 67 - 68 - it('should include optional fields when provided', async () => { 69 - const key = await getTestKey(); 70 - 71 - const signed = await signLabel( 72 - { 73 - uri: 'at://did:plc:target/app.bsky.feed.post/abc', 74 - val: 'nudity', 75 - cid: 'bafyreib2rxk3rybk3aobmv5cjuql3setrnhfekwxbdmg7il4q2hr3hilqe', 76 - neg: true, 77 - exp: '2026-01-01T00:00:00.000Z', 78 - }, 79 - TEST_DID, 80 - key, 81 - ); 82 - 83 - expect(signed.cid).toBe('bafyreib2rxk3rybk3aobmv5cjuql3setrnhfekwxbdmg7il4q2hr3hilqe'); 84 - expect(signed.neg).toBe(true); 85 - expect(signed.exp).toBe('2026-01-01T00:00:00.000Z'); 86 - }); 87 - }); 88 - 89 - describe('formatLabel', () => { 90 - it('should convert sig to Bytes wrapper and set ver to 1', () => { 91 - const saved: SavedLabel = { 92 - seq: 1, 93 - src: TEST_DID, 94 - uri: 'did:plc:target', 95 - val: 'spam', 96 - neg: false, 97 - cts: '2025-01-01T00:00:00.000Z', 98 - sig: new Uint8Array(64), 99 - }; 100 - 101 - const formatted = formatLabel(saved); 102 - 103 - expect(formatted.ver).toBe(1); 104 - expect(formatted.src).toBe(TEST_DID); 105 - expect(isBytes(formatted.sig)).toBe(true); 106 - expect(fromBytes(formatted.sig)).toEqual(saved.sig); 107 - }); 108 - 109 - it('should include optional fields when present', () => { 110 - const saved: SavedLabel = { 111 - seq: 1, 112 - src: TEST_DID, 113 - uri: 'at://did:plc:target/app.bsky.feed.post/abc', 114 - val: 'nudity', 115 - neg: true, 116 - cts: '2025-01-01T00:00:00.000Z', 117 - cid: 'bafyreib2rxk3rybk3aobmv5cjuql3setrnhfekwxbdmg7il4q2hr3hilqe', 118 - exp: '2026-01-01T00:00:00.000Z', 119 - sig: new Uint8Array(64), 120 - }; 121 - 122 - const formatted = formatLabel(saved); 123 - 124 - expect(formatted.cid).toBe(saved.cid); 125 - expect(formatted.exp).toBe(saved.exp); 126 - expect(formatted.neg).toBe(true); 127 - }); 128 - 129 - it('should omit optional fields when absent', () => { 130 - const saved: SavedLabel = { 131 - seq: 1, 132 - src: TEST_DID, 133 - uri: 'did:plc:target', 134 - val: 'spam', 135 - neg: false, 136 - cts: '2025-01-01T00:00:00.000Z', 137 - sig: new Uint8Array(64), 138 - }; 139 - 140 - const formatted = formatLabel(saved); 141 - 142 - expect('cid' in formatted).toBe(false); 143 - expect('exp' in formatted).toBe(false); 144 - }); 145 - });
-124
packages/servers/labeler/lib/labels.ts
··· 1 - import type { ComAtprotoLabelDefs } from '@atcute/atproto'; 2 - import { encode, toBytes } from '@atcute/cbor'; 3 - import type { PrivateKey } from '@atcute/crypto'; 4 - import type { Did, GenericUri } from '@atcute/lexicons'; 5 - 6 - import type { SavedLabel } from './store.ts'; 7 - 8 - const LABEL_VERSION = 1; 9 - 10 - /** data for creating a new label. */ 11 - export interface CreateLabelData { 12 - /** AT URI of the record, repository (account), or other resource */ 13 - uri: string; 14 - /** CID specifying the version of `uri` to label */ 15 - cid?: string; 16 - /** the label value */ 17 - val: string; 18 - /** whether this label negates a previous label */ 19 - neg?: boolean; 20 - /** creation timestamp (ISO 8601). defaults to current time */ 21 - cts?: string; 22 - /** expiration timestamp (ISO 8601) */ 23 - exp?: string; 24 - /** DID of the label source. defaults to the labeler's DID */ 25 - src?: string; 26 - } 27 - 28 - /** subject of a label: a URI with optional CID. */ 29 - export interface LabelSubject { 30 - uri: string; 31 - cid?: string; 32 - } 33 - 34 - interface UnsignedLabel { 35 - ver: number; 36 - src: string; 37 - uri: string; 38 - cid?: string; 39 - val: string; 40 - neg: boolean; 41 - cts: string; 42 - exp?: string; 43 - } 44 - 45 - /** 46 - * create an unsigned label object with version and defaults. 47 - * @param data label creation data 48 - * @param src default source DID 49 - * @returns unsigned label ready for signing 50 - */ 51 - const toUnsignedLabel = (data: CreateLabelData, src: Did): UnsignedLabel => { 52 - const label: UnsignedLabel = { 53 - ver: LABEL_VERSION, 54 - src: data.src ?? src, 55 - uri: data.uri, 56 - val: data.val, 57 - neg: data.neg ?? false, 58 - cts: data.cts ?? new Date().toISOString(), 59 - }; 60 - 61 - if (data.cid !== undefined) { 62 - label.cid = data.cid; 63 - } 64 - if (data.exp !== undefined) { 65 - label.exp = data.exp; 66 - } 67 - 68 - return label; 69 - }; 70 - 71 - /** 72 - * CBOR-encode and sign a label. 73 - * @param data label creation data 74 - * @param src default source DID 75 - * @param key private key for signing 76 - * @returns the label fields and signature, ready for storage 77 - */ 78 - export const signLabel = async ( 79 - data: CreateLabelData, 80 - src: Did, 81 - key: PrivateKey, 82 - ): Promise<Omit<SavedLabel, 'seq'>> => { 83 - const label = toUnsignedLabel(data, src); 84 - const bytes = encode(label); 85 - const sig = await key.sign(bytes); 86 - 87 - return { 88 - src: label.src, 89 - uri: label.uri, 90 - cid: label.cid, 91 - val: label.val, 92 - neg: label.neg, 93 - cts: label.cts, 94 - exp: label.exp, 95 - sig: sig, 96 - }; 97 - }; 98 - 99 - /** 100 - * format a saved label for wire transmission. 101 - * converts raw sig bytes to CBOR Bytes wrapper. 102 - * @param label saved label from store 103 - * @returns formatted label for XRPC responses 104 - */ 105 - export const formatLabel = (label: SavedLabel): ComAtprotoLabelDefs.Label => { 106 - const formatted: ComAtprotoLabelDefs.Label = { 107 - ver: 1, 108 - src: label.src as Did, 109 - uri: label.uri as GenericUri, 110 - val: label.val, 111 - neg: label.neg, 112 - cts: label.cts, 113 - sig: toBytes(label.sig), 114 - }; 115 - 116 - if (label.cid !== undefined) { 117 - formatted.cid = label.cid; 118 - } 119 - if (label.exp !== undefined) { 120 - formatted.exp = label.exp; 121 - } 122 - 123 - return formatted; 124 - };
+71
packages/servers/labeler/lib/memory-label-store.ts
··· 1 + import type { LabelEvent, LabelStore, SignedLabel } from './types.ts'; 2 + 3 + /** 4 + * in-memory label store useful for tests and simple deployments 5 + */ 6 + export class MemoryLabelStore implements LabelStore { 7 + #events: LabelEvent[] = []; 8 + #latestSeq = 0; 9 + 10 + /** 11 + * append signed labels 12 + * @param labels signed labels 13 + * @returns emitted events 14 + */ 15 + async appendLabels(labels: SignedLabel[]): Promise<LabelEvent[]> { 16 + const events: LabelEvent[] = []; 17 + 18 + for (const label of labels) { 19 + const event: LabelEvent = { 20 + seq: ++this.#latestSeq, 21 + labels: [label], 22 + }; 23 + 24 + events.push(event); 25 + this.#events.push(event); 26 + } 27 + 28 + return events; 29 + } 30 + 31 + /** 32 + * get latest sequence number 33 + * @returns latest sequence, or `null` 34 + */ 35 + async getLatestSeq(): Promise<number | null> { 36 + return this.#latestSeq === 0 ? null : this.#latestSeq; 37 + } 38 + 39 + /** 40 + * list events after a sequence cursor 41 + * @param options list options 42 + * @returns events in ascending order 43 + */ 44 + async listLabelEvents(options: { after?: number; limit: number }): Promise<LabelEvent[]> { 45 + const { after, limit } = options; 46 + 47 + const events: LabelEvent[] = []; 48 + for (const event of this.#events) { 49 + if (after !== undefined && event.seq <= after) { 50 + continue; 51 + } 52 + 53 + events.push(event); 54 + if (events.length >= limit) { 55 + break; 56 + } 57 + } 58 + 59 + return events; 60 + } 61 + 62 + /** 63 + * advance the sequence counter without emitting events 64 + * @param count number of sequence values to skip 65 + * @returns latest sequence after advancing 66 + */ 67 + advanceSeq(count = 1): number { 68 + this.#latestSeq += Math.max(0, Math.trunc(count)); 69 + return this.#latestSeq; 70 + } 71 + }
-191
packages/servers/labeler/lib/outbox.test.ts
··· 1 - import { XRPCSubscriptionError } from '@atcute/xrpc-server'; 2 - 3 - import { SimpleEventEmitter } from '@mary-ext/simple-event-emitter'; 4 - import { describe, expect, it } from 'vitest'; 5 - 6 - import { AsyncBuffer, AsyncBufferFullError } from './async-buffer.ts'; 7 - import { LabelOutbox } from './outbox.ts'; 8 - import type { LabelStore, SavedLabel } from './store.ts'; 9 - 10 - const makeSavedLabel = (seq: number): SavedLabel => ({ 11 - seq, 12 - src: 'did:plc:labeler', 13 - uri: 'did:plc:target', 14 - val: 'spam', 15 - neg: false, 16 - cts: new Date().toISOString(), 17 - sig: new Uint8Array(64), 18 - }); 19 - 20 - const createMockStore = (labels: SavedLabel[] = []): LabelStore => { 21 - return { 22 - async save(label) { 23 - const seq = labels.length + 1; 24 - const saved = { ...label, seq }; 25 - labels.push(saved); 26 - return saved; 27 - }, 28 - async query() { 29 - return { labels: [], cursor: '0' }; 30 - }, 31 - async getLatestSeq() { 32 - return labels.at(-1)?.seq ?? 0; 33 - }, 34 - async getRange(after, limit) { 35 - const result = labels.filter((l) => l.seq > after); 36 - return limit !== undefined ? result.slice(0, limit) : result; 37 - }, 38 - }; 39 - }; 40 - 41 - describe('AsyncBuffer', () => { 42 - it('should throw AsyncBufferFullError on overflow', async () => { 43 - const buffer = new AsyncBuffer<number>(2); 44 - 45 - // push 4 items (exceeding max of 2) 46 - buffer.push(1); 47 - buffer.push(2); 48 - buffer.push(3); 49 - buffer.push(4); 50 - 51 - const collected: number[] = []; 52 - 53 - await expect(async () => { 54 - for await (const value of buffer.events()) { 55 - collected.push(value); 56 - } 57 - }).rejects.toThrow(AsyncBufferFullError); 58 - }); 59 - }); 60 - 61 - describe('LabelOutbox', () => { 62 - it('should backfill from cursor', async () => { 63 - const labels = Array.from({ length: 5 }, (_, i) => makeSavedLabel(i + 1)); 64 - const store = createMockStore(labels); 65 - const emitter = new SimpleEventEmitter<[SavedLabel]>(); 66 - 67 - const outbox = new LabelOutbox(store, emitter); 68 - const ac = new AbortController(); 69 - 70 - const collected: SavedLabel[] = []; 71 - 72 - setTimeout(() => ac.abort(), 100); 73 - 74 - for await (const label of outbox.events(0, ac.signal)) { 75 - collected.push(label); 76 - if (collected.length === 5) { 77 - ac.abort(); 78 - break; 79 - } 80 - } 81 - 82 - expect(collected).toHaveLength(5); 83 - expect(collected.map((l) => l.seq)).toEqual([1, 2, 3, 4, 5]); 84 - }); 85 - 86 - it('should tail live events with no cursor', async () => { 87 - const store = createMockStore(); 88 - const emitter = new SimpleEventEmitter<[SavedLabel]>(); 89 - 90 - const outbox = new LabelOutbox(store, emitter); 91 - const ac = new AbortController(); 92 - 93 - const collected: SavedLabel[] = []; 94 - 95 - const iter = outbox.events(undefined, ac.signal); 96 - 97 - // emit after a tick so the outbox is tailing 98 - queueMicrotask(() => { 99 - emitter.emit(makeSavedLabel(1)); 100 - }); 101 - 102 - for await (const label of iter) { 103 - collected.push(label); 104 - if (collected.length === 1) { 105 - ac.abort(); 106 - break; 107 - } 108 - } 109 - 110 - expect(collected).toHaveLength(1); 111 - expect(collected[0]!.seq).toBe(1); 112 - }); 113 - 114 - it('should backfill then tail live events', async () => { 115 - const labels = [makeSavedLabel(1), makeSavedLabel(2)]; 116 - const store = createMockStore(labels); 117 - const emitter = new SimpleEventEmitter<[SavedLabel]>(); 118 - 119 - const outbox = new LabelOutbox(store, emitter); 120 - const ac = new AbortController(); 121 - 122 - const collected: SavedLabel[] = []; 123 - 124 - // emit a new label shortly after start so the outbox picks it up during tailing 125 - setTimeout(() => { 126 - emitter.emit(makeSavedLabel(3)); 127 - }, 20); 128 - 129 - for await (const label of outbox.events(0, ac.signal)) { 130 - collected.push(label); 131 - if (collected.length === 3) { 132 - ac.abort(); 133 - break; 134 - } 135 - } 136 - 137 - expect(collected.map((l) => l.seq)).toEqual([1, 2, 3]); 138 - }); 139 - 140 - it('should stop on abort signal', async () => { 141 - const store = createMockStore(); 142 - const emitter = new SimpleEventEmitter<[SavedLabel]>(); 143 - 144 - const outbox = new LabelOutbox(store, emitter); 145 - const ac = new AbortController(); 146 - 147 - const collected: SavedLabel[] = []; 148 - 149 - setTimeout(() => ac.abort(), 50); 150 - 151 - for await (const label of outbox.events(undefined, ac.signal)) { 152 - collected.push(label); 153 - } 154 - 155 - expect(collected).toHaveLength(0); 156 - }); 157 - 158 - it('should wrap AsyncBufferFullError as ConsumerTooSlow', async () => { 159 - const store = createMockStore(); 160 - const emitter = new SimpleEventEmitter<[SavedLabel]>(); 161 - 162 - const outbox = new LabelOutbox(store, emitter, { maxBufferSize: 2 }); 163 - const ac = new AbortController(); 164 - 165 - let error: unknown; 166 - const done = (async () => { 167 - try { 168 - for await (const _label of outbox.events(undefined, ac.signal)) { 169 - // stall the consumer — don't break, just wait for more 170 - // while the producer floods the buffer below 171 - await new Promise((resolve) => setTimeout(resolve, 50)); 172 - } 173 - } catch (err) { 174 - error = err; 175 - } 176 - })(); 177 - 178 - // wait for the outbox to start tailing 179 - await new Promise((resolve) => setTimeout(resolve, 10)); 180 - 181 - // flood the buffer well beyond capacity 182 - for (let i = 1; i <= 10; i++) { 183 - emitter.emit(makeSavedLabel(i)); 184 - } 185 - 186 - await done; 187 - 188 - expect(error).toBeInstanceOf(XRPCSubscriptionError); 189 - expect((error as XRPCSubscriptionError).error).toBe('ConsumerTooSlow'); 190 - }); 191 - });
-111
packages/servers/labeler/lib/outbox.ts
··· 1 - import { XRPCSubscriptionError } from '@atcute/xrpc-server'; 2 - 3 - import type { SimpleEventEmitter } from '@mary-ext/simple-event-emitter'; 4 - 5 - import { AsyncBufferFullError, on } from './async-buffer.ts'; 6 - import type { LabelStore, SavedLabel } from './store.ts'; 7 - 8 - const BACKFILL_PAGE_SIZE = 500; 9 - 10 - export interface LabelOutboxOptions { 11 - maxBufferSize?: number; 12 - } 13 - 14 - /** 15 - * outbox for streaming labels with push-pull semantics. 16 - * 17 - * handles backfill from cursor, catch-up, and live tailing 18 - * with backpressure via bounded async buffer. 19 - */ 20 - export class LabelOutbox { 21 - #store: LabelStore; 22 - #emitter: SimpleEventEmitter<[label: SavedLabel]>; 23 - #maxBufferSize: number; 24 - 25 - constructor( 26 - store: LabelStore, 27 - emitter: SimpleEventEmitter<[label: SavedLabel]>, 28 - options: LabelOutboxOptions = {}, 29 - ) { 30 - this.#store = store; 31 - this.#emitter = emitter; 32 - this.#maxBufferSize = options.maxBufferSize ?? 500; 33 - } 34 - 35 - /** 36 - * stream labels, optionally backfilling from a cursor. 37 - * @param cursor sequence number to backfill from (exclusive), or undefined for live-only 38 - * @param signal abort signal to stop streaming 39 - * @returns async iterator of saved labels 40 - */ 41 - async *events(cursor: number | undefined, signal: AbortSignal): AsyncGenerator<SavedLabel> { 42 - let lastBackfillSeq = -1; 43 - let caughtUp = cursor === undefined; 44 - 45 - // backfill phase: dump stored labels in pages 46 - if (!caughtUp) { 47 - while (true) { 48 - const events = await this.#store.getRange( 49 - lastBackfillSeq > -1 ? lastBackfillSeq : cursor!, 50 - BACKFILL_PAGE_SIZE, 51 - ); 52 - 53 - if (events.length === 0) { 54 - break; 55 - } 56 - 57 - yield* events; 58 - signal.throwIfAborted(); 59 - 60 - lastBackfillSeq = events.at(-1)!.seq; 61 - 62 - // stop when close to the head 63 - const latestSeq = await this.#store.getLatestSeq(); 64 - if (latestSeq - lastBackfillSeq < BACKFILL_PAGE_SIZE / 2) { 65 - break; 66 - } 67 - } 68 - 69 - signal.throwIfAborted(); 70 - } 71 - 72 - // start listening before reading the gap 73 - const tail = on(this.#emitter, { signal, maxSize: this.#maxBufferSize }); 74 - 75 - // catch-up phase: read any labels between backfill end and tail start 76 - if (!caughtUp) { 77 - const events = await this.#store.getRange(lastBackfillSeq > -1 ? lastBackfillSeq : cursor!); 78 - 79 - if (events.length > 0) { 80 - yield* events; 81 - signal.throwIfAborted(); 82 - 83 - lastBackfillSeq = events.at(-1)!.seq; 84 - } 85 - } 86 - 87 - // tail phase: stream live events, deduplicating the catch-up overlap 88 - try { 89 - for await (const event of tail) { 90 - if (!caughtUp) { 91 - if (event.seq <= lastBackfillSeq) { 92 - continue; 93 - } 94 - 95 - caughtUp = true; 96 - } 97 - 98 - yield event; 99 - } 100 - } catch (err) { 101 - if (err instanceof AsyncBufferFullError) { 102 - throw new XRPCSubscriptionError({ 103 - error: 'ConsumerTooSlow', 104 - description: `stream consumer too slow`, 105 - }); 106 - } 107 - 108 - throw err; 109 - } 110 - } 111 - }
-55
packages/servers/labeler/lib/queue.ts
··· 1 - interface Node<T> { 2 - value: T; 3 - next: Node<T> | undefined; 4 - } 5 - 6 - /** a FIFO queue backed by a linked list. */ 7 - export class Queue<T> { 8 - #head: Node<T> | undefined; 9 - #tail: Node<T> | undefined; 10 - #size: number = 0; 11 - 12 - /** number of items in the queue */ 13 - get size(): number { 14 - return this.#size; 15 - } 16 - 17 - /** 18 - * add a value to the end of the queue. 19 - * @param value value to enqueue 20 - */ 21 - enqueue(value: T): void { 22 - const node: Node<T> = { value, next: undefined }; 23 - const tail = this.#tail; 24 - 25 - if (tail !== undefined) { 26 - tail.next = node; 27 - } else { 28 - this.#head = node; 29 - } 30 - 31 - this.#tail = node; 32 - this.#size++; 33 - } 34 - 35 - /** 36 - * remove and return the first value from the queue. 37 - * @returns first queued value, or undefined if empty 38 - */ 39 - dequeue(): T | undefined { 40 - const head = this.#head; 41 - if (head === undefined) { 42 - return; 43 - } 44 - 45 - const next = head.next; 46 - 47 - this.#head = next; 48 - if (next === undefined) { 49 - this.#tail = undefined; 50 - } 51 - 52 - this.#size--; 53 - return head.value; 54 - } 55 - }
+59
packages/servers/labeler/lib/signing.ts
··· 1 + import type * as ComAtprotoLabelDefs from '@atcute/atproto/types/label/defs'; 2 + import { encode, toBytes } from '@atcute/cbor'; 3 + import type { PrivateKey } from '@atcute/crypto'; 4 + import type { Did } from '@atcute/lexicons/syntax'; 5 + 6 + import type { ApplyLabelsOptions, LabelOp, SignedLabel } from './types.ts'; 7 + 8 + type UnsignedLabel = Omit<ComAtprotoLabelDefs.Label, 'sig'> & { 9 + sig?: undefined; 10 + }; 11 + 12 + export const signLabel = async (key: PrivateKey, label: UnsignedLabel): Promise<SignedLabel> => { 13 + const sig = await key.sign(encode(label)); 14 + 15 + return { 16 + ...label, 17 + sig: toBytes(sig), 18 + }; 19 + }; 20 + 21 + const toUnsignedLabel = ( 22 + op: LabelOp, 23 + opts: { serviceDid: Did; issuedAt: string; expiresAt: string | undefined; negate: boolean }, 24 + ): UnsignedLabel => { 25 + return { 26 + cid: op.cid, 27 + cts: op.issuedAt ?? opts.issuedAt, 28 + exp: op.expiresAt ?? opts.expiresAt, 29 + neg: opts.negate ? true : undefined, 30 + src: opts.serviceDid, 31 + uri: op.uri, 32 + val: op.value, 33 + ver: 1, 34 + }; 35 + }; 36 + 37 + export const buildLabels = ( 38 + serviceDid: Did, 39 + ops: Iterable<LabelOp>, 40 + defaults: ApplyLabelsOptions = {}, 41 + ): UnsignedLabel[] => { 42 + const labels: UnsignedLabel[] = []; 43 + 44 + const issuedAt = defaults.issuedAt ?? new Date().toISOString(); 45 + const expiresAt = defaults.expiresAt; 46 + 47 + for (const op of ops) { 48 + labels.push( 49 + toUnsignedLabel(op, { 50 + serviceDid: serviceDid, 51 + issuedAt: issuedAt, 52 + expiresAt: expiresAt, 53 + negate: op.negate === true, 54 + }), 55 + ); 56 + } 57 + 58 + return labels; 59 + };
-68
packages/servers/labeler/lib/store.ts
··· 1 - import type { ComAtprotoLabelDefs } from '@atcute/atproto'; 2 - 3 - /** label as stored in the database, with sequence number and raw sig bytes. */ 4 - export interface SavedLabel { 5 - /** sequence number / ID of the label */ 6 - seq: number; 7 - /** DID of the actor who created this label */ 8 - src: string; 9 - /** AT URI of the record, repository (account), or other resource */ 10 - uri: string; 11 - /** CID specifying the version of `uri` to label */ 12 - cid?: string; 13 - /** the label value */ 14 - val: string; 15 - /** whether this label negates a previous label */ 16 - neg: boolean; 17 - /** creation timestamp (ISO 8601) */ 18 - cts: string; 19 - /** expiration timestamp (ISO 8601) */ 20 - exp?: string; 21 - /** signature bytes */ 22 - sig: Uint8Array; 23 - } 24 - 25 - /** parameters for querying labels. */ 26 - export interface LabelQueryParams { 27 - uriPatterns: string[]; 28 - sources: string[]; 29 - cursor: number; 30 - limit: number; 31 - } 32 - 33 - /** result of a label query. */ 34 - export interface LabelQueryResult { 35 - labels: ComAtprotoLabelDefs.Label[]; 36 - cursor: string; 37 - } 38 - 39 - /** pluggable storage backend for labels. */ 40 - export interface LabelStore { 41 - /** 42 - * save a signed label and assign it a sequence number. 43 - * @param label the signed label to save 44 - * @returns the saved label with sequence number 45 - */ 46 - save(label: Omit<SavedLabel, 'seq'>): Promise<SavedLabel>; 47 - 48 - /** 49 - * query labels matching the given parameters. 50 - * @param params query parameters 51 - * @returns matching labels and cursor 52 - */ 53 - query(params: LabelQueryParams): Promise<LabelQueryResult>; 54 - 55 - /** 56 - * get the latest sequence number. 57 - * @returns the highest sequence number, or 0 if none 58 - */ 59 - getLatestSeq(): Promise<number>; 60 - 61 - /** 62 - * get a range of labels after a given sequence number. 63 - * @param after sequence number to start after (exclusive) 64 - * @param limit maximum number of labels to return 65 - * @returns labels in the range 66 - */ 67 - getRange(after: number, limit?: number): Promise<SavedLabel[]>; 68 - }
+81
packages/servers/labeler/lib/types.ts
··· 1 + import type * as ComAtprotoLabelDefs from '@atcute/atproto/types/label/defs'; 2 + import type { PrivateKey } from '@atcute/crypto'; 3 + import type { GenericUri } from '@atcute/lexicons'; 4 + 5 + /** 6 + * a label with a required signature 7 + */ 8 + export type SignedLabel = Omit<ComAtprotoLabelDefs.Label, 'sig'> & { 9 + sig: NonNullable<ComAtprotoLabelDefs.Label['sig']>; 10 + }; 11 + 12 + /** 13 + * a signed label event with a monotonically increasing sequence number 14 + */ 15 + export interface LabelEvent { 16 + seq: number; 17 + labels: SignedLabel[]; 18 + } 19 + 20 + /** 21 + * a single label operation before the labeler fills in service metadata and signs 22 + */ 23 + export interface LabelOp { 24 + uri: GenericUri; 25 + cid?: string; 26 + value: string; 27 + negate?: boolean; 28 + issuedAt?: string; 29 + expiresAt?: string; 30 + } 31 + 32 + /** 33 + * defaults applied to a batch of label operations 34 + */ 35 + export interface ApplyLabelsOptions { 36 + issuedAt?: string; 37 + expiresAt?: string; 38 + } 39 + 40 + /** 41 + * options for subscribing to label events 42 + */ 43 + export interface LabelSubscriptionOptions { 44 + cursor?: number; 45 + signal: AbortSignal; 46 + } 47 + 48 + /** 49 + * persistence backend used by `Labeler` 50 + */ 51 + export interface LabelStore { 52 + /** 53 + * append signed labels to the store and return emitted events in sequence order 54 + * @param labels signed labels to persist 55 + * @returns emitted label events 56 + */ 57 + appendLabels(labels: SignedLabel[]): Promise<LabelEvent[]>; 58 + 59 + /** 60 + * get the latest known sequence number 61 + * @returns latest sequence, or `null` if empty 62 + */ 63 + getLatestSeq(): Promise<number | null>; 64 + 65 + /** 66 + * list events after a cursor in ascending sequence order 67 + * @param options list options 68 + * @returns label events 69 + */ 70 + listLabelEvents(options: { after?: number; limit: number }): Promise<LabelEvent[]>; 71 + } 72 + 73 + /** 74 + * options for constructing a labeler 75 + */ 76 + export interface LabelerOptions { 77 + serviceDid: ComAtprotoLabelDefs.Label['src']; 78 + signingKey: PrivateKey; 79 + store: LabelStore; 80 + pageSize?: number; 81 + }
+6 -8
packages/servers/labeler/package.json
··· 1 1 { 2 2 "name": "@atcute/labeler", 3 3 "version": "0.1.0", 4 - "description": "sign and emit AT Protocol labels", 4 + "description": "a lightweight core for building AT Protocol labelers", 5 5 "license": "0BSD", 6 6 "repository": { 7 7 "url": "https://github.com/mary-ext/atcute", ··· 22 22 }, 23 23 "scripts": { 24 24 "build": "tsgo --project tsconfig.build.json", 25 - "test": "vitest", 25 + "test": "vitest --coverage", 26 26 "prepublish": "rm -rf dist; pnpm run build" 27 27 }, 28 28 "dependencies": { 29 + "@atcute/atproto": "workspace:^", 29 30 "@atcute/cbor": "workspace:^", 30 31 "@atcute/crypto": "workspace:^", 31 - "@atcute/lexicons": "workspace:^", 32 - "@atcute/xrpc-server": "workspace:^", 33 - "@mary-ext/simple-event-emitter": "^1.0.1" 32 + "@atcute/lexicons": "^1.2.9", 33 + "@mary-ext/simple-event-emitter": "^1.0.0" 34 34 }, 35 35 "devDependencies": { 36 - "@atcute/atproto": "workspace:^", 37 - "@atcute/labeler": "file:", 38 - "@atcute/ozone": "workspace:^", 39 36 "@types/node": "^25.2.3", 37 + "@vitest/coverage-v8": "^4.0.18", 40 38 "vitest": "^4.0.18" 41 39 } 42 40 }
+31 -31
pnpm-lock.yaml
··· 913 913 914 914 packages/servers/labeler: 915 915 dependencies: 916 + '@atcute/atproto': 917 + specifier: workspace:^ 918 + version: link:../../definitions/atproto 916 919 '@atcute/cbor': 917 920 specifier: workspace:^ 918 921 version: link:../../utilities/cbor 919 922 '@atcute/crypto': 920 923 specifier: workspace:^ 921 924 version: link:../../utilities/crypto 922 - '@atcute/identity': 923 - specifier: workspace:^ 924 - version: link:../../identity/identity 925 - '@atcute/identity-resolver': 926 - specifier: workspace:^ 927 - version: link:../../identity/identity-resolver 928 925 '@atcute/lexicons': 929 - specifier: workspace:^ 930 - version: link:../../lexicons/lexicons 926 + specifier: ^1.2.9 927 + version: 1.2.9 931 928 '@atcute/xrpc-server': 932 929 specifier: workspace:^ 933 930 version: link:../xrpc-server 934 931 '@mary-ext/simple-event-emitter': 935 - specifier: ^1.0.1 932 + specifier: ^1.0.0 936 933 version: 1.0.1 937 934 devDependencies: 938 - '@atcute/atproto': 939 - specifier: workspace:^ 940 - version: link:../../definitions/atproto 941 - '@atcute/labeler': 942 - specifier: 'file:' 943 - version: file:packages/servers/labeler 944 - '@atcute/ozone': 945 - specifier: workspace:^ 946 - version: link:../../definitions/ozone 947 935 '@types/node': 948 936 specifier: ^25.2.3 949 937 version: 25.2.3 938 + '@vitest/coverage-v8': 939 + specifier: ^4.0.18 940 + version: 4.0.18(@vitest/browser@4.0.18(vite@7.3.1(@types/node@25.2.3)(jiti@2.6.1)(tsx@4.20.6)(yaml@2.8.0))(vitest@4.0.18))(vitest@4.0.18) 950 941 vitest: 951 942 specifier: ^4.0.18 952 943 version: 4.0.18(@types/node@25.2.3)(@vitest/browser-playwright@4.0.18)(jiti@2.6.1)(tsx@4.20.6)(yaml@2.8.0) ··· 1278 1269 '@atcute/frontpage@file:packages/definitions/frontpage': 1279 1270 resolution: {directory: packages/definitions/frontpage, type: directory} 1280 1271 1281 - '@atcute/labeler@file:packages/servers/labeler': 1282 - resolution: {directory: packages/servers/labeler, type: directory} 1283 - 1284 1272 '@atcute/leaflet@file:packages/definitions/leaflet': 1285 1273 resolution: {directory: packages/definitions/leaflet, type: directory} 1286 1274 1287 1275 '@atcute/lexicon-community@file:packages/definitions/lexicon-community': 1288 1276 resolution: {directory: packages/definitions/lexicon-community, type: directory} 1277 + 1278 + '@atcute/lexicons@1.2.9': 1279 + resolution: {integrity: sha512-/RRHm2Cw9o8Mcsrq0eo8fjS9okKYLGfuFwrQ0YoP/6sdSDsXshaTLJsvLlcUcaDaSJ1YFOuHIo3zr2Om2F/16g==} 1289 1280 1290 1281 '@atcute/microcosm@file:packages/definitions/microcosm': 1291 1282 resolution: {directory: packages/definitions/microcosm, type: directory} ··· 1295 1286 1296 1287 '@atcute/tangled@file:packages/definitions/tangled': 1297 1288 resolution: {directory: packages/definitions/tangled, type: directory} 1289 + 1290 + '@atcute/uint8array@1.1.1': 1291 + resolution: {integrity: sha512-3LsC8XB8TKe9q/5hOA5sFuzGaIFdJZJNewC5OKa3o/eU6+K7JR6see9Zy2JbQERNVnRl11EzbNov1efgLMAs4g==} 1292 + 1293 + '@atcute/util-text@1.1.1': 1294 + resolution: {integrity: sha512-JH0SxzUQJAmbOBTYyhxQbkkI6M33YpjlVLEcbP5GYt43xgFArzV0FJVmEpvIj0kjsmphHB45b6IitdvxPdec9w==} 1298 1295 1299 1296 '@atcute/whitewind@file:packages/definitions/whitewind': 1300 1297 resolution: {directory: packages/definitions/whitewind, type: directory} ··· 4703 4700 '@atcute/atproto': link:packages/definitions/atproto 4704 4701 '@atcute/lexicons': link:packages/lexicons/lexicons 4705 4702 4706 - '@atcute/labeler@file:packages/servers/labeler': 4707 - dependencies: 4708 - '@atcute/cbor': link:packages/utilities/cbor 4709 - '@atcute/crypto': link:packages/utilities/crypto 4710 - '@atcute/identity': link:packages/identity/identity 4711 - '@atcute/identity-resolver': link:packages/identity/identity-resolver 4712 - '@atcute/lexicons': link:packages/lexicons/lexicons 4713 - '@atcute/xrpc-server': link:packages/servers/xrpc-server 4714 - '@mary-ext/simple-event-emitter': 1.0.1 4715 - 4716 4703 '@atcute/leaflet@file:packages/definitions/leaflet': 4717 4704 dependencies: 4718 4705 '@atcute/atproto': link:packages/definitions/atproto ··· 4723 4710 '@atcute/atproto': link:packages/definitions/atproto 4724 4711 '@atcute/lexicons': link:packages/lexicons/lexicons 4725 4712 4713 + '@atcute/lexicons@1.2.9': 4714 + dependencies: 4715 + '@atcute/uint8array': 1.1.1 4716 + '@atcute/util-text': 1.1.1 4717 + '@standard-schema/spec': 1.1.0 4718 + esm-env: 1.2.2 4719 + 4726 4720 '@atcute/microcosm@file:packages/definitions/microcosm': 4727 4721 dependencies: 4728 4722 '@atcute/lexicons': link:packages/lexicons/lexicons ··· 4737 4731 dependencies: 4738 4732 '@atcute/atproto': link:packages/definitions/atproto 4739 4733 '@atcute/lexicons': link:packages/lexicons/lexicons 4734 + 4735 + '@atcute/uint8array@1.1.1': {} 4736 + 4737 + '@atcute/util-text@1.1.1': 4738 + dependencies: 4739 + unicode-segmenter: 0.14.5 4740 4740 4741 4741 '@atcute/whitewind@file:packages/definitions/whitewind': 4742 4742 dependencies: