A Bluesky labeler that labels accounts hosted on PDSes operated by entities other than Bluesky PBC
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

Now 19% less terrible

gbl08ma 3084aa50 4fe318e9

+61 -48
+14
activeLabelsStorage.ts
··· 98 98 if (!result.rowsAffected) throw new Error("Failed to register label deactivation"); 99 99 } 100 100 101 + async countActiveLabelSubjects() { 102 + await this.dbInitLock; 103 + 104 + const result = await this.db.execute({ 105 + sql: ` 106 + SELECT COUNT (DISTINCT uri) AS count 107 + FROM active_labels 108 + `, 109 + args: [], 110 + }); 111 + 112 + return result.rows[0]?.count as number ?? 0; 113 + } 114 + 101 115 async* getActiveLabelSubjects() { 102 116 await this.dbInitLock; 103 117
+1 -1
index.ts
··· 21 21 allowIssuingLabels: process.env.ACTUALLY_EMIT_LABELS ? process.env.ACTUALLY_EMIT_LABELS === "true" : throwMissingEnvVarError(), 22 22 maxPDSDedicatedLabels: parseInt(process.env.MAX_PDS_DEDICATED_LABELS ?? "50"), 23 23 maxExpectedReposPerPDS: parseInt(process.env.MAX_EXPECTED_REPOS_PER_PDS ?? "100000"), 24 - maxParallelTasks: parseInt(process.env.MAX_PARALLEL_TASKS ?? os.cpus().length * 4 + ""), 24 + maxParallelTasks: parseInt(process.env.MAX_PARALLEL_TASKS ?? 25 + ""), 25 25 labeledUsersList: process.env.LABELED_USERS_LIST_URI 26 26 } 27 27
+41 -35
labeler.ts
··· 10 10 import { ListDeterminer } from "./listDeterminer"; 11 11 import { PDSCrawler } from "./pdsCrawler"; 12 12 import { TaskProcessor } from "./taskProcessor"; 13 - import { combinedRunExclusive, NamedMutex, wrapAsyncInCatch, type DID } from "./utils"; 13 + import { NamedMutex, wrapAsyncInCatch, type DID } from "./utils"; 14 14 15 15 export type LabelerOptions = { 16 16 databasePath: string | undefined; ··· 55 55 private credentialManager: CredentialManager | undefined; 56 56 private authenticatedRPC: Client | undefined; 57 57 58 - private didProcessingMutex = new NamedMutex(); 59 58 private pdsProcessingMutex = new NamedMutex(); 60 59 61 60 private initialized: Promise<void>; ··· 284 283 } 285 284 286 285 private async refreshDIDOrCrawlPDS(pds: string, did: DID) { 287 - await combinedRunExclusive([ 288 - [this.didProcessingMutex, did], 289 - [this.pdsProcessingMutex, pds] 290 - ], async () => { 286 + await this.pdsProcessingMutex.runExclusive(pds, async () => { 291 287 const maybeKnownPDS = await this.knownPDSStorage.getKnownPDS(pds); 292 288 293 289 if (typeof maybeKnownPDS === "undefined") { ··· 331 327 332 328 private async existingLabelsProcessing() { 333 329 console.log("Reprocessing currently active labels"); 334 - let promises: Promise<void>[] = []; 330 + const totalToProcess = await this.activeLabelsStorage.countActiveLabelSubjects(); 331 + let totalAttempted = 0; 335 332 let totalProcessed = 0; 333 + const { promise: allAttempted, resolve: resolveAllAttempted } = Promise.withResolvers(); 336 334 for await (const uri of this.activeLabelsStorage.getActiveLabelSubjects()) { 337 - const p = this.taskProcessor.schedule(async () => { 335 + this.taskProcessor.schedule(async () => { 338 336 const did = uri as `did:${"plc" | "web"}:${string}`; 339 - await this.didProcessingMutex.runExclusive(did, async () => { 337 + let pds: string; 338 + try { 339 + pds = await this.crawler.identifyPDSofDID(did, true); 340 + } catch (e) { 341 + console.log(`Failed to identify PDS of ${did} for label reprocessing: ${e}`); 342 + if (++totalAttempted === totalToProcess) { 343 + resolveAllAttempted(); 344 + } 345 + return; 346 + } 347 + 348 + await this.pdsProcessingMutex.runExclusive(pds, async () => { 340 349 try { 341 - const pds = await this.crawler.identifyPDSofDID(did, true); 342 - await this.pdsProcessingMutex.runExclusive(pds, async () => { 343 - try { 344 - const maybeKnownPDS = await this.knownPDSStorage.getKnownPDS(pds); 350 + const maybeKnownPDS = await this.knownPDSStorage.getKnownPDS(pds); 345 351 346 - if (typeof maybeKnownPDS === "undefined") { 347 - console.log(`Crawling new PDS discovered when reprocessing labels for ${did}: ${pds}`); 348 - await this.processPDS(pds); 349 - } else { 350 - await this.processSingleDID(pds, did, false); 351 - } 352 - totalProcessed++; 353 - } catch (e) { 354 - console.log(`Failed to reprocess labels of ${did}: ${e}`); 355 - } 356 - }); 352 + if (typeof maybeKnownPDS === "undefined") { 353 + console.log(`Crawling new PDS discovered when reprocessing labels for ${did}: ${pds}`); 354 + await this.processPDS(pds); 355 + } else { 356 + await this.processSingleDID(pds, did, false); 357 + } 358 + totalProcessed++; 359 + if (totalProcessed % 100 === 0) { 360 + console.log(`Active label processing: processed ${totalProcessed} / ${totalToProcess} DIDs so far`); 361 + } 357 362 } catch (e) { 358 - console.log(`Failed to identify PDS of ${did} for label reprocessing: ${e}`); 363 + console.log(`Failed to reprocess labels of ${did}: ${e}`); 359 364 } 360 - 361 365 }); 366 + if (++totalAttempted === totalToProcess) { 367 + resolveAllAttempted(); 368 + } 362 369 }); 363 - // avoid having too many in-flight tasks at once 364 - promises.push(p); 365 - if (promises.length >= 100) { 366 - await Promise.all(promises); 367 - promises = []; 368 - console.log(`Active label processing: processed ${totalProcessed} DIDs so far`); 369 - } 370 + // avoid having too many in-flight tasks at once, 371 + // by waiting until the queue is unsaturated (25% of the workers are free) to schedule more 372 + // otherwise we'd load dozens of thousands of tasks into memory 373 + await this.taskProcessor.unsaturated(); 370 374 } 371 - await Promise.all(promises); 372 - console.log(`Active label processing: processed a total of ${totalProcessed} DIDs`); 375 + if (totalToProcess > 0) { 376 + await allAttempted; 377 + } 378 + console.log(`Active label processing: processed a total of ${totalProcessed} DIDs (${totalAttempted - totalProcessed} failures)`); 373 379 } 374 380 375 381 private async updateLabelerLabels(forceUpdate: boolean) {
+5
taskProcessor.ts
··· 17 17 18 18 schedule(task: () => Promise<void>, priority = 100): Promise<void> { 19 19 return new Promise((resolve) => { 20 + this.taskQueue.buffer 20 21 this.taskQueue.push(task, priority, () => resolve()); 21 22 }) 23 + } 24 + 25 + unsaturated(): Promise<void> { 26 + return this.taskQueue.unsaturated(); 22 27 } 23 28 }
-12
utils.ts
··· 62 62 return this.getOrCreate(key).cancel(); 63 63 }; 64 64 } 65 - 66 - type NamedMutexReference = [nmu: NamedMutex, key: string, priority?: number]; 67 - export async function combinedRunExclusive<T>(nmus: [NamedMutexReference, ...NamedMutexReference[]], callback: MutexInterface.Worker<T>): Promise<T> { 68 - if (!nmus || nmus.length == 0) { 69 - throw new Error("missing at least one mutex reference"); 70 - } 71 - if (nmus.length == 1) { 72 - const nmu = nmus[0]; 73 - return await nmu[0].runExclusive(nmu[1], callback, nmu[2]); 74 - } 75 - return await combinedRunExclusive(nmus.slice(1) as [NamedMutexReference, ...NamedMutexReference[]], callback); 76 - }