A Bluesky labeler that labels accounts hosted on PDSes operated by entities other than Bluesky PBC
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

Cease getting stuck: add timeouts all over the place

gbl08ma 1c549642 f79aa336

+23 -16
+1 -1
index.ts
··· 19 19 retryFailedCrawlsAfterMinMilliseconds: parseInt(process.env.RETRY_FAILED_CRAWLS_AFTER_MS ?? "43200000" /* 12h */), 20 20 updateLabelDefinitionsEveryMilliseconds: parseInt(process.env.UPDATE_LABEL_DEFINITIONS_EVERY_MS ?? "7200000" /* 2h */), 21 21 allowIssuingLabels: process.env.ACTUALLY_EMIT_LABELS ? process.env.ACTUALLY_EMIT_LABELS === "true" : throwMissingEnvVarError(), 22 - maxPDSDedicatedLabels: parseInt(process.env.MAX_PDS_DEDICATED_LABELS ?? "50"), 22 + maxPDSDedicatedLabels: parseInt(process.env.MAX_PDS_DEDICATED_LABELS ?? "100"), 23 23 maxExpectedReposPerPDS: parseInt(process.env.MAX_EXPECTED_REPOS_PER_PDS ?? "100000"), 24 24 maxParallelTasks: parseInt(process.env.MAX_PARALLEL_TASKS ?? 25 + ""), 25 25 labeledUsersList: process.env.LABELED_USERS_LIST_URI,
+9 -9
labeler.ts
··· 231 231 await this.processPDS(pds); 232 232 } 233 233 }); 234 - }); 234 + }, 15*60*1000 /* 15 minutes */); 235 235 } 236 236 } 237 237 ··· 261 261 totalRepos, 262 262 }); 263 263 console.log(`Done crawling ${pds} with a total of ${totalRepos} repos`); 264 - } catch { 264 + } catch(e) { 265 265 await this.knownPDSStorage.markPDSCrawlFailure(pds, new Date()); 266 - console.log(`Failed to crawl ${pds}, took note`); 266 + console.log(`Failed to crawl ${pds}, took note: ${e}`); 267 267 } 268 268 } 269 269 ··· 323 323 324 324 this.taskProcessor.schedule(async () => { 325 325 await this.processEvent(event.kind, event.did, event.time_us, clockSkew, callback); 326 - }, 0); // schedule with highest priority 326 + }, 10*1000 /* 10 seconds */, 0); // schedule with highest priority 327 327 } 328 328 } catch (e) { 329 329 console.log(`Failed to consume jetstream: ${e}`); ··· 374 374 return; 375 375 } 376 376 377 - this.taskProcessor.schedule(this.refreshDIDOrCrawlPDS.bind(this, pds, did)); 377 + this.taskProcessor.schedule(this.refreshDIDOrCrawlPDS.bind(this, pds, did), 15*60*1000 /* 15 minutes */); 378 378 }); 379 379 } 380 380 ··· 395 395 await this.knownPDSStorage.removeKnownPDS(knownPDS.uri); 396 396 } 397 397 }); 398 - }); 398 + }, 15*60*1000 /* 15 minutes */); 399 399 } 400 400 } 401 401 ··· 420 420 if (++totalAttempted === totalToProcess) { 421 421 resolveAllAttempted(); 422 422 } 423 - }); 423 + }, 15*1000 /* 15 seconds */); 424 424 // avoid having too many in-flight tasks at once, 425 425 // by waiting until the queue is unsaturated (25% of the workers are free) to schedule more 426 426 // otherwise we'd load dozens of thousands of tasks into memory ··· 465 465 wrapAsyncInCatch(this.existingLabelsProcessing()); 466 466 467 467 setInterval(() => { 468 - this.taskProcessor.schedule(this.longevityBasedProcessing.bind(this)); 468 + this.taskProcessor.schedule(this.longevityBasedProcessing.bind(this), 15*60*1000 /* 15 minutes */); 469 469 }, 60 * 1000); 470 470 471 471 setInterval(() => { 472 - this.taskProcessor.schedule(this.updateLabelerLabels.bind(this, false)); 472 + this.taskProcessor.schedule(this.updateLabelerLabels.bind(this, false), 1*60*1000 /* 1 minute */); 473 473 }, this.updateLabelDefinitionsEveryMilliseconds); 474 474 475 475 setInterval(() => {
+2 -2
pdsCrawler.ts
··· 35 35 36 36 let numRepos = 0; 37 37 while (true) { 38 - const pageData = await ok(rpc.get('com.atproto.sync.listRepos', { 38 + const pageData = await withTimeout(ok(rpc.get('com.atproto.sync.listRepos', { 39 39 params: { 40 40 limit: 1000, 41 41 cursor, 42 42 }, 43 - })); 43 + })), 20000, "timed out while listing repos"); 44 44 45 45 cursor = pageData.cursor; 46 46
+11 -4
taskProcessor.ts
··· 1 1 import { priorityQueue, type AsyncPriorityQueue } from 'async'; 2 + import { withTimeout } from './utils.js'; 2 3 3 4 export class TaskProcessor { 4 5 private maxParallelTasks: number; ··· 15 16 console.log(`Task processor created with a maximum of ${maxParallelTasks} parallel tasks`); 16 17 } 17 18 18 - schedule(task: () => Promise<any>, priority = 100): Promise<void> { 19 - return new Promise((resolve) => { 20 - this.taskQueue.buffer 21 - this.taskQueue.push(task, priority, () => resolve()); 19 + schedule(task: () => Promise<any>, timeoutMs: number, priority = 100): Promise<void> { 20 + return new Promise((resolve, reject) => { 21 + const wrappedTask = () => withTimeout(task(), timeoutMs); 22 + this.taskQueue.push(wrappedTask, priority, (err) => { 23 + if (err) { 24 + reject(err); 25 + } else { 26 + resolve(); 27 + } 28 + }); 22 29 }) 23 30 } 24 31