a collection of lightweight TypeScript packages for AT Protocol, the protocol powering Bluesky
atproto bluesky typescript npm
101
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(labeler): some adjustments

Mary 2ae7ead6 134f7706

+143 -9
+135
packages/servers/labeler/README.md
··· 1 + # @atcute/labeler 2 + 3 + label signing and subscription engine for AT Protocol. 4 + 5 + ```sh 6 + npm install @atcute/labeler 7 + ``` 8 + 9 + this package provides the core logic for building an AT Protocol labeler: signing labels with a 10 + service key, persisting them to a store, and streaming them to subscribers. it's framework-agnostic 11 + — wire it up to `@atcute/xrpc-server` or any other server to expose the 12 + `com.atproto.label.subscribeLabels` endpoint. 13 + 14 + ## usage 15 + 16 + ### creating a labeler 17 + 18 + ```ts 19 + import { Labeler, MemoryLabelStore } from '@atcute/labeler'; 20 + import { P256PrivateKey, parsePrivateMultikey } from '@atcute/crypto'; 21 + 22 + const { privateKeyBytes } = parsePrivateMultikey(process.env.SIGNING_KEY!); 23 + 24 + const labeler = new Labeler({ 25 + serviceDid: 'did:plc:my-labeler', 26 + signingKey: await P256PrivateKey.importRaw(privateKeyBytes), 27 + store: new MemoryLabelStore(), 28 + }); 29 + ``` 30 + 31 + `MemoryLabelStore` works for development and testing. for production, implement the `LabelStore` 32 + interface backed by a database. 33 + 34 + ### applying labels 35 + 36 + use `applyLabel()` for a single label or `applyLabels()` for a batch. each label operation specifies 37 + a target URI and a label value: 38 + 39 + ```ts 40 + // label a post as spam 41 + await labeler.applyLabel({ 42 + uri: 'at://did:plc:alice/app.bsky.feed.post/abc123', 43 + value: 'spam', 44 + }); 45 + 46 + // negate a previous label 47 + await labeler.applyLabel({ 48 + uri: 'at://did:plc:alice/app.bsky.feed.post/abc123', 49 + value: 'spam', 50 + negate: true, 51 + }); 52 + 53 + // batch of labels with shared defaults 54 + await labeler.applyLabels( 55 + [ 56 + { uri: 'at://did:plc:alice/app.bsky.feed.post/1', value: 'spam' }, 57 + { uri: 'at://did:plc:alice/app.bsky.feed.post/2', value: 'nudity' }, 58 + ], 59 + { issuedAt: new Date().toISOString() }, 60 + ); 61 + ``` 62 + 63 + labels are CBOR-encoded, signed with the service key, and persisted to the store. each stored label 64 + gets a monotonically increasing sequence number. 65 + 66 + ### subscribing to label events 67 + 68 + `subscribeLabels()` returns an async iterator of label events. pass a `cursor` to replay from a 69 + previous sequence number, or omit it to only receive live events: 70 + 71 + ```ts 72 + // replay from sequence 0 and continue with live events 73 + for await (const event of labeler.subscribeLabels({ cursor: 0, signal: controller.signal })) { 74 + console.log(event.seq, event.labels); 75 + } 76 + ``` 77 + 78 + the subscription handles backfill (draining stored events) and then seamlessly transitions to live 79 + tailing. if a subscriber falls too far behind, a `ConsumerTooSlowError` is thrown. 80 + 81 + ### wiring up to an XRPC server 82 + 83 + use `@atcute/xrpc-server` with a runtime-specific WebSocket adapter to serve the subscription 84 + endpoint: 85 + 86 + ```ts 87 + import { XRPCRouter, XRPCSubscriptionError } from '@atcute/xrpc-server'; 88 + import { createBunWebSocket } from '@atcute/xrpc-server-bun'; 89 + import { ComAtprotoLabelSubscribeLabels } from '@atcute/atproto'; 90 + 91 + import { FutureCursorError } from '@atcute/labeler'; 92 + 93 + const ws = createBunWebSocket(); 94 + const router = new XRPCRouter({ websocket: ws.adapter }); 95 + 96 + router.addSubscription(ComAtprotoLabelSubscribeLabels, { 97 + async *handler({ params, signal }) { 98 + try { 99 + yield* labeler.subscribeLabels({ cursor: params.cursor, signal }); 100 + } catch (err) { 101 + if (err instanceof FutureCursorError) { 102 + throw new XRPCSubscriptionError({ error: 'FutureCursor' }); 103 + } 104 + throw err; 105 + } 106 + }, 107 + }); 108 + 109 + export default ws.wrap(router); 110 + ``` 111 + 112 + ## custom label stores 113 + 114 + implement the `LabelStore` interface for durable persistence: 115 + 116 + ```ts 117 + import type { LabelStore, LabelEvent, SignedLabel } from '@atcute/labeler'; 118 + 119 + class SqliteLabelStore implements LabelStore { 120 + async appendLabels(labels: SignedLabel[]): Promise<LabelEvent[]> { 121 + // insert labels and assign sequence numbers 122 + } 123 + 124 + async getLatestSeq(): Promise<number | null> { 125 + // return the highest sequence number, or null if empty 126 + } 127 + 128 + async listLabelEvents(options: { after?: number; limit?: number }): Promise<LabelEvent[]> { 129 + // return events after the cursor in ascending sequence order 130 + } 131 + } 132 + ``` 133 + 134 + the store must assign a unique, monotonically increasing `seq` to each event. the labeler uses these 135 + for cursor-based pagination during subscription backfill.
+4 -4
packages/servers/labeler/lib/internal/outbox.test.ts
··· 29 29 class TestStore implements LabelStore { 30 30 #events: LabelEvent[] = []; 31 31 32 - onList?: (options: { after?: number; limit: number }) => void; 32 + onList?: (options: { after?: number; limit?: number }) => void; 33 33 34 34 insert(event: LabelEvent): void { 35 35 this.#events.push(event); ··· 61 61 return this.#events.at(-1)?.seq ?? null; 62 62 } 63 63 64 - async listLabelEvents(options: { after?: number; limit: number }): Promise<LabelEvent[]> { 64 + async listLabelEvents(options: { after?: number; limit?: number }): Promise<LabelEvent[]> { 65 65 this.onList?.(options); 66 66 67 67 const out: LabelEvent[] = []; ··· 71 71 } 72 72 73 73 out.push(event); 74 - if (out.length >= options.limit) { 74 + if (options.limit !== undefined && out.length >= options.limit) { 75 75 break; 76 76 } 77 77 } ··· 122 122 123 123 let injectedCutoverEvent = false; 124 124 store.onList = (options) => { 125 - if (injectedCutoverEvent || options.limit !== Number.MAX_SAFE_INTEGER) { 125 + if (injectedCutoverEvent || options.limit !== undefined) { 126 126 return; 127 127 } 128 128
-1
packages/servers/labeler/lib/internal/outbox.ts
··· 76 76 if (!caughtUp) { 77 77 const events = await this.#store.listLabelEvents({ 78 78 after: lastBackfillSeq > -1 ? lastBackfillSeq : backfillCursor, 79 - limit: Number.MAX_SAFE_INTEGER, 80 79 }); 81 80 82 81 if (events.length > 0) {
+2 -2
packages/servers/labeler/lib/memory-label-store.ts
··· 41 41 * @param options list options 42 42 * @returns events in ascending order 43 43 */ 44 - async listLabelEvents(options: { after?: number; limit: number }): Promise<LabelEvent[]> { 44 + async listLabelEvents(options: { after?: number; limit?: number }): Promise<LabelEvent[]> { 45 45 const { after, limit } = options; 46 46 47 47 const events: LabelEvent[] = []; ··· 51 51 } 52 52 53 53 events.push(event); 54 - if (events.length >= limit) { 54 + if (limit !== undefined && events.length >= limit) { 55 55 break; 56 56 } 57 57 }
+1 -1
packages/servers/labeler/lib/types.ts
··· 67 67 * @param options list options 68 68 * @returns label events 69 69 */ 70 - listLabelEvents(options: { after?: number; limit: number }): Promise<LabelEvent[]>; 70 + listLabelEvents(options: { after?: number; limit?: number }): Promise<LabelEvent[]>; 71 71 } 72 72 73 73 /**
+1 -1
packages/servers/labeler/package.json
··· 1 1 { 2 2 "name": "@atcute/labeler", 3 3 "version": "0.1.0", 4 - "description": "a lightweight core for building AT Protocol labelers", 4 + "description": "label signing and subscription engine for AT Protocol", 5 5 "license": "0BSD", 6 6 "repository": { 7 7 "url": "https://github.com/mary-ext/atcute",