A Bluesky labeler that labels accounts hosted on PDSes operated by entities other than Bluesky PBC
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

Rework jetstream consumption

gbl08ma 0bf6ba61 1386c0b4

+34 -25
+3 -1
index.ts
··· 22 22 maxPDSDedicatedLabels: parseInt(process.env.MAX_PDS_DEDICATED_LABELS ?? "50"), 23 23 maxExpectedReposPerPDS: parseInt(process.env.MAX_EXPECTED_REPOS_PER_PDS ?? "100000"), 24 24 maxParallelTasks: parseInt(process.env.MAX_PARALLEL_TASKS ?? 25 + ""), 25 - labeledUsersList: process.env.LABELED_USERS_LIST_URI 25 + labeledUsersList: process.env.LABELED_USERS_LIST_URI, 26 + jetstreamURL: process.env.JETSTREAM_URL ?? "wss://jetstream2.us-east.bsky.network", 27 + jetstreamCollections: process.env.JETSTREAM_COLLECTIONS?.split(",") ?? ["app.bsky.actor.profile"], 26 28 } 27 29 28 30 new Labeler(labelerOptions).run();
+31 -24
labeler.ts
··· 31 31 maxExpectedReposPerPDS: number, 32 32 maxParallelTasks: number, 33 33 labeledUsersList: string | undefined, 34 + jetstreamURL: string, 35 + jetstreamCollections: string[], 34 36 } 35 37 36 38 type crawlCallback = (pds: string, did: DID) => void; 37 39 38 40 export class Labeler { 41 + private readonly jetstreamURL: string; 42 + private readonly jetstreamCollections: string[]; 39 43 private taskProcessor: TaskProcessor; 40 44 private server: LabelerServer; 41 45 private listenerOptions: Parameters<typeof this.server.start>[0]; ··· 63 67 64 68 private initialized: Promise<void>; 65 69 70 + 66 71 constructor(options: LabelerOptions) { 72 + this.jetstreamURL = options.jetstreamURL; 73 + this.jetstreamCollections = options.jetstreamCollections; 67 74 this.taskProcessor = new TaskProcessor(options.maxParallelTasks); 68 75 69 76 this.server = new LabelerServer({ ··· 217 224 await this.pdsProcessingMutex.runExclusive(pds, async () => { 218 225 // this is a task that's scheduled async, must confirm it is still unknown within the mutex 219 226 const maybeKnownPDS = await this.knownPDSStorage.getKnownPDS(pds); 220 - const failedRecently = await this.knownPDSStorage.hasPDSCrawlFailed(pds, new Date(new Date().getTime() - 30 * 60 * 1000)); 227 + const failedRecently = await this.knownPDSStorage.hasPDSCrawlFailed(pds, new Date(Date.now() - 30 * 60 * 1000)); 221 228 222 229 if (typeof maybeKnownPDS === "undefined" && !failedRecently) { 223 230 console.log(`Crawling new PDS discovered when processing labels for ${becauseOfDID}: ${pds}`); ··· 294 301 } 295 302 296 303 private async getDIDToPossiblyUpdate( 297 - wantedCollections: string[], 298 304 callback: crawlCallback, 299 305 ) { 300 306 while (true) { 307 + console.log(`Connecting to jetstream ${this.jetstreamURL} with wanted collections ${this.jetstreamCollections.join(", ")}`); 301 308 let clockSkew: number | undefined = undefined; 302 309 const subscription = new JetstreamSubscription({ 303 - url: 'wss://jetstream2.us-east.bsky.network', 304 - wantedCollections, 310 + url: this.jetstreamURL, 311 + wantedCollections: this.jetstreamCollections, 312 + ws: { 313 + maxEnqueuedMessages: 100, 314 + } 305 315 }); 306 316 307 - for await (const event of subscription) { 308 - const timeDiff: number = new Date().getTime() - event.time_us / 1000 - (clockSkew ?? 0); 309 - if (typeof clockSkew === "undefined") { 310 - clockSkew = timeDiff; 311 - } else if (timeDiff > 30000) { 312 - console.log("Fell more than 30s behind, reconnecting to jetstream"); 313 - break; 317 + try { 318 + for await (const event of subscription) { 319 + // avoid looking up DIDs for handles which we know can't be on an independent PDS 320 + if (event.kind === "identity" && this.labelDefiner!.shouldIgnoreHandle(event.identity.handle)) { 321 + continue; 322 + } 323 + 324 + this.taskProcessor.schedule(async () => { 325 + await this.processEvent(event.kind, event.did, event.time_us, clockSkew, callback); 326 + }, 0); // schedule with highest priority 314 327 } 315 - // avoid looking up DIDs for handles which we know can't be on an independent PDS 316 - if (event.kind === "identity" && this.labelDefiner!.shouldIgnoreHandle(event.identity.handle)) { 317 - continue; 318 - } 319 - 320 - this.taskProcessor.schedule(async () => { 321 - await this.processEvent(event.kind, event.did, event.time_us, clockSkew, callback); 322 - }, 0); // schedule with highest priority 328 + } catch (e) { 329 + console.log(`Failed to consume jetstream: ${e}`); 323 330 } 324 331 } { } 325 332 } 326 333 327 334 private async processEvent(eventKind: string, eventDID: DID, eventTimeUs: number, clockSkew: number | undefined, callback: crawlCallback) { 328 335 const timeDiff: number = new Date().getTime() - eventTimeUs / 1000 - (clockSkew ?? 0); 329 - if (timeDiff > 10000) { 330 - console.log("Event processing task was older than 10s, forgetting about it"); 336 + if (timeDiff > 15000) { 337 + console.log("Event processing task was older than 15s, forgetting about it"); 331 338 return; 332 339 } 333 340 ··· 347 354 if (typeof maybeKnownPDS === "undefined") { 348 355 console.log("Crawling previously undiscovered independent PDS: " + pds); 349 356 await this.processPDS(pds); 350 - } else if (new Date().getTime() - maybeKnownPDS.lastCrawled.getTime() > this.reconsiderActivePDSForRecrawlingAfterMilliseconds) { 357 + } else if (Date.now() - maybeKnownPDS.lastCrawled.getTime() > this.reconsiderActivePDSForRecrawlingAfterMilliseconds) { 351 358 console.log("Re-crawling already known independent PDS based on detected activity: " + pds); 352 359 await this.processPDS(pds); 353 360 } else { ··· 362 369 } 363 370 364 371 private async activityBasedProcessing() { 365 - await this.getDIDToPossiblyUpdate(["app.bsky.actor.profile"], async (pds, did) => { 372 + await this.getDIDToPossiblyUpdate(async (pds, did) => { 366 373 if (!this.pdsFilter(pds)) { 367 374 return; 368 375 } ··· 372 379 } 373 380 374 381 private async longevityBasedProcessing() { 375 - const nowMS = new Date().getTime(); 382 + const nowMS = Date.now(); 376 383 const generator = this.knownPDSStorage.getKnownPDSs( 377 384 new Date(nowMS - this.recrawlKnownPDSAfterNotCrawledForMilliseconds), 378 385 new Date(nowMS - this.retryFailedCrawlsAfterMinMilliseconds));