A Bluesky labeler that labels accounts hosted on PDSes operated by entities other than Bluesky PBC
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

Reliability and performance improvements

gbl08ma 8171e3fa 1c549642

+69 -17
+69 -17
labeler.ts
··· 64 64 private authenticatedRPC: Client | undefined; 65 65 66 66 private pdsProcessingMutex = new NamedMutex(); 67 + private alreadyScheduledForCrawling = new Set<string>(); 67 68 68 69 private initialized: Promise<void>; 69 70 ··· 219 220 220 221 const shouldHaveLabels = this.pdsFilter(pds); 221 222 222 - if (typeof maybeKnownPDS === "undefined" && shouldHaveLabels) { 223 + if (typeof maybeKnownPDS === "undefined" && shouldHaveLabels && !this.alreadyScheduledForCrawling.has(pds)) { 223 224 this.taskProcessor.schedule(async () => { 224 225 await this.pdsProcessingMutex.runExclusive(pds, async () => { 225 226 // this is a task that's scheduled async, must confirm it is still unknown within the mutex ··· 230 231 console.log(`Crawling new PDS discovered when processing labels for ${becauseOfDID}: ${pds}`); 231 232 await this.processPDS(pds); 232 233 } 234 + this.alreadyScheduledForCrawling.delete(pds); 233 235 }); 234 - }, 15*60*1000 /* 15 minutes */); 236 + }, 15 * 60 * 1000 /* 15 minutes */).catch((e) => { 237 + console.log(`Error processing PDS ${pds}: ${e}`); 238 + }); 239 + this.alreadyScheduledForCrawling.add(pds); 235 240 } 236 241 } 237 242 ··· 242 247 stripWWW: false, 243 248 defaultProtocol: "https", 244 249 } 250 + 251 + const maxConcurrent = 10; 252 + const pendingDIDs: Promise<void>[] = []; 245 253 246 254 try { 247 255 for await (const repo of this.crawler.getPDSRepos(pds)) { 248 - try { 249 - const realPDSOfRepo = await this.processDID(repo.did, false, false, true); 250 - if (normalizeUrl(realPDSOfRepo, normOptions) === normalizeUrl(pds, normOptions)) { 251 - totalRepos++; 256 + const did = repo.did; 257 + const promise = (async () => { 258 + try { 259 + const realPDSOfRepo = await this.processDID(did, false, false, true); 260 + if (normalizeUrl(realPDSOfRepo, normOptions) === normalizeUrl(pds, normOptions)) { 261 + totalRepos++; 262 + if(totalRepos % 1000 === 0) { 263 + console.log(`Crawling ${pds}: processed ${totalRepos} repos so far`); 264 + } 265 + } 266 + } catch (e) { 267 + console.log(`Failed to process ${did} within PDS ${pds}: ${e}`); 252 268 } 253 - } catch (e) { 254 - console.log(`Failed to process ${repo} within PDS ${pds}: ${e}`); 269 + })(); 270 + pendingDIDs.push(promise); 271 + 272 + // Process in batches of maxConcurrent to limit memory usage 273 + if (pendingDIDs.length >= maxConcurrent) { 274 + await Promise.all(pendingDIDs); 275 + pendingDIDs.length = 0; 255 276 } 277 + } 278 + 279 + // Wait for any remaining DIDs to be processed 280 + if (pendingDIDs.length > 0) { 281 + await Promise.all(pendingDIDs); 256 282 } 257 283 258 284 await this.knownPDSStorage.upsertKnownPDS({ ··· 261 287 totalRepos, 262 288 }); 263 289 console.log(`Done crawling ${pds} with a total of ${totalRepos} repos`); 264 - } catch(e) { 290 + } catch (e) { 265 291 await this.knownPDSStorage.markPDSCrawlFailure(pds, new Date()); 266 292 console.log(`Failed to crawl ${pds}, took note: ${e}`); 267 293 } 294 + this.alreadyScheduledForCrawling.delete(pds); 268 295 } 269 296 270 297 private async processDID(did: DID, considerAddingToLists: boolean, scheduleUnknownPDSForCrawling: boolean, mayUseCache: boolean): Promise<string> { ··· 321 348 continue; 322 349 } 323 350 324 - this.taskProcessor.schedule(async () => { 325 - await this.processEvent(event.kind, event.did, event.time_us, clockSkew, callback); 326 - }, 10*1000 /* 10 seconds */, 0); // schedule with highest priority 351 + this.taskProcessor.schedule( 352 + async () => { 353 + await this.processEvent(event.kind, event.did, event.time_us, clockSkew, callback); 354 + }, 355 + 10 * 1000, // 10 seconds 356 + 0 // schedule with highest priority 357 + ).catch((e) => { 358 + console.log(`Failed to process event ${event.kind} for ${event.did}: ${e}`); 359 + }); 327 360 } 328 361 } catch (e) { 329 362 console.log(`Failed to consume jetstream: ${e}`); ··· 352 385 const maybeKnownPDS = await this.knownPDSStorage.getKnownPDS(pds); 353 386 354 387 if (typeof maybeKnownPDS === "undefined") { 388 + if (this.alreadyScheduledForCrawling.has(pds)) { 389 + return; 390 + } 391 + this.alreadyScheduledForCrawling.add(pds); 355 392 console.log("Crawling previously undiscovered independent PDS: " + pds); 356 393 await this.processPDS(pds); 357 394 } else if (Date.now() - maybeKnownPDS.lastCrawled.getTime() > this.reconsiderActivePDSForRecrawlingAfterMilliseconds) { 395 + if (this.alreadyScheduledForCrawling.has(pds)) { 396 + return; 397 + } 398 + this.alreadyScheduledForCrawling.add(pds); 358 399 console.log("Re-crawling already known independent PDS based on detected activity: " + pds); 359 400 await this.processPDS(pds); 360 401 } else { ··· 374 415 return; 375 416 } 376 417 377 - this.taskProcessor.schedule(this.refreshDIDOrCrawlPDS.bind(this, pds, did), 15*60*1000 /* 15 minutes */); 418 + this.taskProcessor.schedule(this.refreshDIDOrCrawlPDS.bind(this, pds, did), 15 * 60 * 1000 /* 15 minutes */).catch( 419 + (e) => console.log(`Failed to process ${did} after detected activity: ${e}`) 420 + ); 378 421 }); 379 422 } 380 423 ··· 384 427 new Date(nowMS - this.recrawlKnownPDSAfterNotCrawledForMilliseconds), 385 428 new Date(nowMS - this.retryFailedCrawlsAfterMinMilliseconds)); 386 429 for await (const knownPDS of generator) { 430 + if (this.alreadyScheduledForCrawling.has(knownPDS.uri)) { 431 + continue; 432 + } 387 433 console.log(`Scheduling known PDS ${knownPDS.uri} for re-crawling`); 388 434 this.taskProcessor.schedule(async () => { 389 435 await this.pdsProcessingMutex.runExclusive(knownPDS.uri, async () => { ··· 393 439 } else { 394 440 console.log(`Removing known PDS ${knownPDS.uri}`); 395 441 await this.knownPDSStorage.removeKnownPDS(knownPDS.uri); 442 + this.alreadyScheduledForCrawling.delete(knownPDS.uri); 396 443 } 397 444 }); 398 - }, 15*60*1000 /* 15 minutes */); 445 + }, 15 * 60 * 1000 /* 15 minutes */).catch((e) => { 446 + console.log(`Failed to reprocess PDS ${knownPDS.uri}: ${e}`); 447 + }); 448 + this.alreadyScheduledForCrawling.add(knownPDS.uri); 399 449 } 400 450 } 401 451 ··· 420 470 if (++totalAttempted === totalToProcess) { 421 471 resolveAllAttempted(); 422 472 } 423 - }, 15*1000 /* 15 seconds */); 473 + }, 15 * 1000 /* 15 seconds */, 200).catch( 474 + (e) => console.log(`Failed to reprocess labels of ${uri}: ${e}`) 475 + ); 424 476 // avoid having too many in-flight tasks at once, 425 477 // by waiting until the queue is unsaturated (25% of the workers are free) to schedule more 426 478 // otherwise we'd load dozens of thousands of tasks into memory ··· 465 517 wrapAsyncInCatch(this.existingLabelsProcessing()); 466 518 467 519 setInterval(() => { 468 - this.taskProcessor.schedule(this.longevityBasedProcessing.bind(this), 15*60*1000 /* 15 minutes */); 520 + this.taskProcessor.schedule(this.longevityBasedProcessing.bind(this), 15 * 60 * 1000 /* 15 minutes */); 469 521 }, 60 * 1000); 470 522 471 523 setInterval(() => { 472 - this.taskProcessor.schedule(this.updateLabelerLabels.bind(this, false), 1*60*1000 /* 1 minute */); 524 + this.taskProcessor.schedule(this.updateLabelerLabels.bind(this, false), 1 * 60 * 1000 /* 1 minute */); 473 525 }, this.updateLabelDefinitionsEveryMilliseconds); 474 526 475 527 setInterval(() => {