A Bluesky labeler that labels accounts hosted on PDSes operated by entities other than Bluesky PBC
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

Improve parallelism and error handling

gbl08ma f09c2997 0d65b96a

+120 -77
+6
bun.lock
··· 10 10 "@atcute/identity-resolver-node": "^1.0.2", 11 11 "@atcute/jetstream": "^1.1.0", 12 12 "@skyware/labeler": "^0.2.0", 13 + "async": "^3.2.6", 13 14 "cacheable": "^2.0.1", 14 15 }, 15 16 "devDependencies": { 17 + "@types/async": "^3.2.25", 16 18 "@types/bun": "latest", 17 19 }, 18 20 "peerDependencies": { ··· 120 122 121 123 "@skyware/labeler": ["@skyware/labeler@0.2.0", "", { "dependencies": { "@atcute/bluesky": "^1.0.7", "@atcute/cbor": "^1.0.2", "@atcute/client": "^2.0.3", "@atcute/ozone": "^1.0.4", "@fastify/websocket": "^10.0.1", "@libsql/client": "^0.14.0", "@noble/curves": "^1.6.0", "@noble/hashes": "^1.5.0", "fastify": "^4.28.1", "prompts": "^2.4.2", "uint8arrays": "^5.1.0" }, "bin": { "labeler": "dist/bin.js" } }, "sha512-5Xsjly8Crvi+vILmFkObNs5y/FeNj/mIztcm1qfNafUzwv9n9B7O5MQmmg3aZfnalmQWsYfs2BgVlaBBBni5bw=="], 122 124 125 + "@types/async": ["@types/async@3.2.25", "", {}, "sha512-O6Th/DI18XjrL9TX8LO9F/g26qAz5vynmQqlXt/qLGrskvzCKXKc5/tATz3G2N6lM8eOf3M8/StB14FncAmocg=="], 126 + 123 127 "@types/bun": ["@types/bun@1.2.22", "", { "dependencies": { "bun-types": "1.2.22" } }, "sha512-5A/KrKos2ZcN0c6ljRSOa1fYIyCKhZfIVYeuyb4snnvomnpFqC0tTsEkdqNxbAgExV384OETQ//WAjl3XbYqQA=="], 124 128 125 129 "@types/node": ["@types/node@24.5.2", "", { "dependencies": { "undici-types": "~7.12.0" } }, "sha512-FYxk1I7wPv3K2XBaoyH2cTnocQEu8AOZ60hPbsyukMPLv5/5qr7V1i8PLHdl6Zf87I+xZXFvPCXYjiTFq+YSDQ=="], ··· 133 137 "ajv": ["ajv@8.17.1", "", { "dependencies": { "fast-deep-equal": "^3.1.3", "fast-uri": "^3.0.1", "json-schema-traverse": "^1.0.0", "require-from-string": "^2.0.2" } }, "sha512-B/gBuNg5SiMTrPkC+A2+cW0RszwxYmn6VYxB/inlBStS5nx6xHIt/ehKRhIMhqusl7a8LjQoZnjCs5vhwxOQ1g=="], 134 138 135 139 "ajv-formats": ["ajv-formats@2.1.1", "", { "dependencies": { "ajv": "^8.0.0" } }, "sha512-Wx0Kx52hxE7C18hkMEggYlEifqWZtYaRgouJor+WMdPnQyEK13vgEWyVNup7SoeeoLMsr4kf5h6dOW11I15MUA=="], 140 + 141 + "async": ["async@3.2.6", "", {}, "sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA=="], 136 142 137 143 "atomic-sleep": ["atomic-sleep@1.0.0", "", {}, "sha512-kNOjDqAh7px0XWNI+4QbzoiR/nTkHAWNud2uvnJquD1/x5a7EQZMJT0AczqK0Qn67oY/TTQ1LbUKajZpp3I9tQ=="], 138 144
+8 -1
index.ts
··· 1 + import process from "node:process"; 2 + import os from "os"; 1 3 import type { LabelerOptions } from "./labeler"; 2 4 import { Labeler } from "./labeler"; 5 + 6 + // atcute is internally spawning Promises whose rejections I can't catch https://github.com/mary-ext/atcute/issues/43 7 + process.on('unhandledRejection', (reason, promise) => { 8 + console.log('Unhandled Rejection at:', promise, 'reason:', reason, "stack", (reason as any).stack); 9 + }); 3 10 4 11 function throwMissingEnvVarError<T>(): T { 5 12 throw Error("Missing required environment variable"); ··· 19 26 allowIssuingLabels: process.env.ACTUALLY_EMIT_LABELS ? process.env.ACTUALLY_EMIT_LABELS === "true" : throwMissingEnvVarError(), 20 27 maxPDSDedicatedLabels: parseInt(process.env.MAX_PDS_DEDICATED_LABELS ?? "50"), 21 28 maxExpectedReposPerPDS: parseInt(process.env.MAX_EXPECTED_REPOS_PER_PDS ?? "100000"), 29 + maxParallelTasks: parseInt(process.env.MAX_PARALLEL_TASKS ?? os.cpus().length * 4 + "") 22 30 } 23 31 24 32 new Labeler(labelerOptions).run(); 25 -
+60 -74
labeler.ts
··· 1 + import { JetstreamSubscription } from "@atcute/jetstream"; 1 2 import { LabelerServer } from "@skyware/labeler"; 2 3 import { ActiveLabelsStorage } from "./activeLabelsStorage"; 3 - import { KnownPDSStorage, type KnownPDS } from "./knownPDSStorage"; 4 + import { KnownPDSStorage } from "./knownPDSStorage"; 4 5 import { LabelDefiner } from "./labelDefiner"; 5 6 import { PDSCrawler } from "./pdsCrawler"; 6 - import { JetstreamSubscription } from "@atcute/jetstream"; 7 + import { TaskProcessor } from "./taskProcessor"; 8 + import { wrapAsyncInCatch, type DID } from "./utils"; 7 9 8 10 export type LabelerOptions = { 9 11 databasePath: string | undefined; ··· 19 21 allowIssuingLabels: boolean, 20 22 maxPDSDedicatedLabels: number, 21 23 maxExpectedReposPerPDS: number, 24 + maxParallelTasks: number, 22 25 } 23 26 24 - 25 - async function wrapAsyncInCatch(a: PromiseLike<any>) { 26 - try { 27 - await a; 28 - } catch (e) { 29 - console.log("Error in async process: " + e); 30 - } 31 - } 27 + type crawlCallback = (v: { 28 + pds: string, 29 + did: DID, 30 + }) => void; 32 31 33 32 export class Labeler { 33 + private taskProcessor: TaskProcessor; 34 34 private server: LabelerServer; 35 35 private listenerOptions: Parameters<typeof this.server.start>[0]; 36 36 private activeLabelsStorage: ActiveLabelsStorage; ··· 46 46 private updateLabelDefinitionsEveryMilliseconds: number; 47 47 private maxPDSDedicatedLabels: number; 48 48 49 - private taskQueue: (() => Promise<void>)[] = []; 50 - 51 49 constructor(options: LabelerOptions) { 50 + this.taskProcessor = new TaskProcessor(options.maxParallelTasks); 51 + 52 52 this.server = new LabelerServer({ 53 53 dbPath: options.databasePath, 54 54 did: options.did, ··· 82 82 return this.labelDefiner.determinePDSLabels(pds).length > 0; 83 83 } 84 84 85 - private async processDIDWithLabels(did: `did:${string}:${string}`, correctLabelIDs: string[]) { 85 + private async processDIDWithLabels(did: DID, correctLabelIDs: string[]) { 86 86 try { 87 87 const labelsDiff = await this.activeLabelsStorage.computeLabelsDiff(did, correctLabelIDs); 88 88 ··· 137 137 } 138 138 } 139 139 140 - private async processSingleDID(pds: string, did: `did:${string}:${string}`) { 140 + private async processSingleDID(pds: string, did: DID) { 141 141 const correctLabels = this.labelDefiner.determinePDSLabels(pds); 142 142 const correctLabelIDs = correctLabels.map((l) => l.identifier); 143 143 await this.processDIDWithLabels(did, correctLabelIDs); 144 144 } 145 145 146 - private async* getPDSOrDIDToCrawl( 146 + 147 + private async getPDSOrDIDToCrawl( 147 148 wantedCollections: string[], 149 + callback: crawlCallback, 148 150 ) { 149 151 while (true) { 150 152 let clockSkew: number | undefined = undefined; ··· 165 167 if (event.kind === "identity" && this.labelDefiner.shouldIgnoreHandle(event.identity.handle)) { 166 168 continue; 167 169 } 168 - try { 169 - const mayUseCache = event.kind !== "identity"; 170 - const pds = await this.crawler.identifyPDSofDID(event.did as `did:${"plc" | "web"}:${string}`, mayUseCache); 171 170 172 - yield { 173 - pds: pds, 174 - did: event.did, 175 - } 176 - } catch { 177 - continue; 178 - } 171 + this.taskProcessor.schedule(async () => { 172 + await this.processEvent(event.kind, event.did, event.time_us, clockSkew, callback); 173 + }, 0); // schedule with highest priority 179 174 } 175 + } { } 176 + } 177 + 178 + private async processEvent(eventKind: string, eventDID: DID, eventTimeUs: number, clockSkew: number | undefined, callback: crawlCallback) { 179 + const timeDiff: number = new Date().getTime() - eventTimeUs / 1000 - (clockSkew ?? 0); 180 + if (timeDiff > 10000) { 181 + console.log("Event processing task was older than 10s, forgetting about it"); 182 + return; 183 + } 184 + 185 + try { 186 + const mayUseCache = eventKind !== "identity"; 187 + const pds = await this.crawler.identifyPDSofDID(eventDID as `did:${"plc" | "web"}:${string}`, mayUseCache); 188 + 189 + callback({ 190 + pds: pds, 191 + did: eventDID, 192 + }) 193 + } catch { 180 194 } 181 195 } 182 196 183 197 private async activityBasedProcessing() { 184 - for await (const pdsAndDID of this.getPDSOrDIDToCrawl(["app.bsky.actor.profile", "app.bsky.graph.follow", "app.bsky.feed.post"])) { 198 + await this.getPDSOrDIDToCrawl(["app.bsky.actor.profile", "app.bsky.graph.follow", "app.bsky.feed.post"], async (pdsAndDID) => { 185 199 if (!this.pdsFilter(pdsAndDID.pds)) { 186 - continue; 200 + return; 187 201 } 188 202 189 - this.scheduleTask(async () => { 203 + this.taskProcessor.schedule(async () => { 190 204 const maybeKnownPDS = await this.knownPDSStorage.getKnownPDS(pdsAndDID.pds); 191 205 192 206 if (typeof maybeKnownPDS === "undefined") { ··· 200 214 await this.processSingleDID(pdsAndDID.pds, pdsAndDID.did); 201 215 } 202 216 }); 203 - } 217 + }); 204 218 } 205 219 206 220 private async longevityBasedProcessing() { ··· 210 224 new Date(nowMS - this.retryFailedCrawlsAfterMinMilliseconds)); 211 225 for await (const knownPDS of generator) { 212 226 console.log(`Scheduling known PDS ${knownPDS.uri} for re-crawling`); 213 - this.scheduleTask(async () => { 227 + this.taskProcessor.schedule(async () => { 214 228 console.log(`Re-crawling known PDS ${knownPDS.uri}`); 215 229 await this.processPDS(knownPDS.uri); 216 230 }); ··· 222 236 let promises: Promise<void>[] = []; 223 237 let totalProcessed = 0; 224 238 for await (const uri of this.activeLabelsStorage.getActiveLabelSubjects()) { 225 - const p = new Promise<void>((resolve) => { 226 - this.scheduleTask(async () => { 227 - try { 228 - const did = uri as `did:${"plc" | "web"}:${string}`; 229 - const pds = await this.crawler.identifyPDSofDID(did, true); 230 - const maybeKnownPDS = await this.knownPDSStorage.getKnownPDS(pds); 239 + const p = this.taskProcessor.schedule(async () => { 240 + try { 241 + const did = uri as `did:${"plc" | "web"}:${string}`; 242 + const pds = await this.crawler.identifyPDSofDID(did, true); 243 + const maybeKnownPDS = await this.knownPDSStorage.getKnownPDS(pds); 231 244 232 - if (typeof maybeKnownPDS === "undefined") { 233 - console.log(`Crawling new PDS discovered when reprocessing labels for ${did}: ${pds}`); 234 - await this.processPDS(pds); 235 - } else { 236 - await this.processSingleDID(pds, did); 237 - } 238 - totalProcessed++; 239 - } finally { 240 - // let's never block because processing one failed 241 - resolve(); 245 + if (typeof maybeKnownPDS === "undefined") { 246 + console.log(`Crawling new PDS discovered when reprocessing labels for ${did}: ${pds}`); 247 + await this.processPDS(pds); 248 + } else { 249 + await this.processSingleDID(pds, did); 242 250 } 243 - }); 251 + totalProcessed++; 252 + } catch (e) { 253 + console.log(`Failed to reprocess labels of ${uri}: ${e}`); 254 + } 244 255 }); 245 256 // avoid having too many in-flight tasks at once 246 257 promises.push(p); ··· 285 296 wrapAsyncInCatch(this.existingLabelsProcessing()); 286 297 287 298 setInterval(() => { 288 - this.scheduleTask(this.longevityBasedProcessing.bind(this)); 299 + this.taskProcessor.schedule(this.longevityBasedProcessing.bind(this)); 289 300 }, 60 * 1000); 290 301 291 302 setInterval(() => { 292 - this.scheduleTask(this.updateLabelerLabels.bind(this, false)); 303 + this.taskProcessor.schedule(this.updateLabelerLabels.bind(this, false)); 293 304 }, this.updateLabelDefinitionsEveryMilliseconds); 294 305 295 306 setInterval(() => { 296 307 console.log("DID cache hit ratio: " + this.crawler.cacheHitRatio); 297 308 }, 60 * 1000); 298 - } 299 - 300 - private scheduleTask(task: () => Promise<void>) { 301 - this.taskQueue.push(task); 302 - wrapAsyncInCatch(this.processTasks()); 303 - } 304 - 305 - private isProcessingTasks = false; 306 - 307 - private async processTasks() { 308 - if (this.isProcessingTasks) { 309 - return; 310 - } 311 - this.isProcessingTasks = true; 312 - while (this.taskQueue.length) { 313 - try { 314 - const promise = this.taskQueue.shift(); 315 - if (promise) { 316 - await promise(); 317 - } 318 - } catch (e) { 319 - console.log("Error in task queue task: " + e); 320 - } 321 - } 322 - this.isProcessingTasks = false; 323 309 } 324 310 }
+2
package.json
··· 4 4 "type": "module", 5 5 "private": true, 6 6 "devDependencies": { 7 + "@types/async": "^3.2.25", 7 8 "@types/bun": "latest" 8 9 }, 9 10 "peerDependencies": { ··· 16 17 "@atcute/identity-resolver-node": "^1.0.2", 17 18 "@atcute/jetstream": "^1.1.0", 18 19 "@skyware/labeler": "^0.2.0", 20 + "async": "^3.2.6", 19 21 "cacheable": "^2.0.1" 20 22 }, 21 23 "overrides": {
+3 -2
pdsCrawler.ts
··· 4 4 import type { } from '@atcute/bluesky'; 5 5 import { CompositeDidDocumentResolver, PlcDidDocumentResolver, WebDidDocumentResolver } from "@atcute/identity-resolver"; 6 6 import { CacheableMemory } from "cacheable"; 7 + import { withTimeout, type DID } from "./utils"; 7 8 8 9 export class PDSCrawler { 9 10 private maxExpectedReposPerPDS: number; ··· 61 62 } 62 63 } 63 64 64 - async identifyPDSofDID(did: `did:${string}:${string}`, useCache: boolean): Promise<string> { 65 + async identifyPDSofDID(did: DID, useCache: boolean): Promise<string> { 65 66 if (useCache) { 66 67 const v = await this.pdsCache.get<string>(did); 67 68 if (v) { ··· 70 71 } 71 72 this.cacheMisses++; 72 73 } 73 - const doc = await this.docResolver.resolve(did as `did:${"plc" | "web"}:${string}`); 74 + const doc = await withTimeout(this.docResolver.resolve(did as `did:${"plc" | "web"}:${string}`), 10000, "timed out while resolving DID doc"); 74 75 for (const service of doc.service ?? []) { 75 76 if (service.type == "AtprotoPersonalDataServer" && 76 77 typeof service.serviceEndpoint === "string") {
+23
taskProcessor.ts
··· 1 + import { priorityQueue, type AsyncPriorityQueue } from 'async'; 2 + 3 + export class TaskProcessor { 4 + private maxParallelTasks: number; 5 + private taskQueue: AsyncPriorityQueue<() => Promise<void>>; 6 + 7 + constructor(maxParallelTasks: number) { 8 + this.maxParallelTasks = maxParallelTasks; 9 + this.taskQueue = priorityQueue(function (task, callback) { 10 + task().then(() => callback(), (e) => callback(e)); 11 + }, this.maxParallelTasks); 12 + this.taskQueue.error((e) => { 13 + console.log("Error in task queue task: " + e); 14 + }) 15 + console.log(`Task processor created with a maximum of ${maxParallelTasks} parallel tasks`); 16 + } 17 + 18 + schedule(task: () => Promise<void>, priority = 100): Promise<void> { 19 + return new Promise((resolve) => { 20 + this.taskQueue.push(task, priority, () => resolve()); 21 + }) 22 + } 23 + }
+18
utils.ts
··· 1 + export async function wrapAsyncInCatch(a: PromiseLike<any>) { 2 + try { 3 + await a; 4 + } catch (e) { 5 + console.log("Error in async process: " + e); 6 + } 7 + } 8 + 9 + export function withTimeout<T>(promise: Promise<T>, timeoutMs: number, reason = "timed out"): Promise<T> { 10 + let timer: ReturnType<typeof setTimeout>; 11 + return Promise.race([ 12 + promise, 13 + new Promise<T>((_, rej) => 14 + timer = setTimeout(rej, timeoutMs, reason)) 15 + ]).finally(() => clearTimeout(timer)); 16 + } 17 + 18 + export type DID = `did:${string}:${string}`;