A Bluesky labeler that labels accounts hosted on PDSes operated by entities other than Bluesky PBC
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

Do not rely on PDS presence to determine the currently active PDS of a DID

gbl08ma d1fb0825 8365dde6

+81 -50
+29 -14
knownPDSStorage.ts
··· 89 89 90 90 const result = await this.db.execute({ 91 91 sql: ` 92 - SELECT 93 - uri, last_crawled, total_repos 94 - FROM known_pds 95 - ORDER BY total_repos DESC, uri 96 - LIMIT ? 92 + SELECT 93 + uri, last_crawled, total_repos 94 + FROM known_pds 95 + ORDER BY total_repos DESC, uri 96 + LIMIT ? 97 97 `, 98 98 args: [limit], 99 99 }); ··· 109 109 110 110 const result = await this.db.execute({ 111 111 sql: ` 112 - SELECT 113 - uri, last_crawled, total_repos 114 - FROM known_pds 115 - WHERE uri = ? 112 + SELECT 113 + uri, last_crawled, total_repos 114 + FROM known_pds 115 + WHERE uri = ? 116 116 `, 117 117 args: [uri], 118 118 }); ··· 186 186 async markPDSCrawlFailure(uri: string, at: Date) { 187 187 const result = await this.db.execute({ 188 188 sql: ` 189 - INSERT INTO pds_crawling_failures (uri, last_attempt) 190 - VALUES (?, ?) 191 - ON CONFLICT (uri) DO UPDATE SET 192 - last_attempt = excluded.last_attempt 193 - `, 189 + INSERT INTO pds_crawling_failures (uri, last_attempt) 190 + VALUES (?, ?) 191 + ON CONFLICT (uri) DO UPDATE SET 192 + last_attempt = excluded.last_attempt 193 + `, 194 194 args: [uri, at.toISOString()], 195 195 }); 196 196 197 197 if (!result.rowsAffected) throw new Error("Failed to upsert known PDS"); 198 + } 199 + 200 + async hasPDSCrawlFailed(uri: string, since: Date): Promise<boolean> { 201 + await this.dbInitLock; 202 + 203 + const result = await this.db.execute({ 204 + sql: ` 205 + SELECT 1 206 + FROM pds_crawling_failures 207 + WHERE uri = ? AND last_attempt > ? 208 + `, 209 + args: [uri, since.toISOString()], 210 + }); 211 + 212 + return result.rows.length > 0; 198 213 } 199 214 }
+51 -35
labeler.ts
··· 196 196 } 197 197 } 198 198 199 + private async maybeSchedulePDSProcessing(pds: string, becauseOfDID: DID) { 200 + const maybeKnownPDS = await this.knownPDSStorage.getKnownPDS(pds); 201 + 202 + const shouldHaveLabels = this.pdsFilter(pds); 203 + 204 + if (typeof maybeKnownPDS === "undefined" && shouldHaveLabels) { 205 + this.taskProcessor.schedule(async () => { 206 + await this.pdsProcessingMutex.runExclusive(pds, async () => { 207 + // this is a task that's scheduled async, must confirm it is still unknown within the mutex 208 + const maybeKnownPDS = await this.knownPDSStorage.getKnownPDS(pds); 209 + const failedRecently = await this.knownPDSStorage.hasPDSCrawlFailed(pds, new Date(new Date().getTime() - 30*60*1000)); 210 + 211 + if (typeof maybeKnownPDS === "undefined" && !failedRecently) { 212 + console.log(`Crawling new PDS discovered when processing labels for ${becauseOfDID}: ${pds}`); 213 + await this.processPDS(pds); 214 + } 215 + }); 216 + }); 217 + } 218 + } 219 + 199 220 private async processPDS(pds: string) { 200 - const correctLabels = this.labelDefiner!.determinePDSLabels(pds); 201 - const correctLabelIDs = correctLabels.map((l) => l.identifier); 202 221 let totalRepos = 0; 203 222 204 223 try { 205 224 for await (const repo of this.crawler.getPDSRepos(pds)) { 206 - await this.processDIDWithLabels(repo.did, correctLabelIDs); 225 + this.taskProcessor.schedule(this.processDID.bind(this, repo.did, false, false, true)); 207 226 totalRepos++; 227 + if (totalRepos % 200 == 0) { 228 + // avoid having too many in-flight tasks at once, 229 + // by periodically waiting until the queue is unsaturated (25% of the workers are free) to schedule more 230 + // otherwise we'd load dozens of thousands of tasks into memory 231 + await this.taskProcessor.unsaturated(); 232 + } 208 233 } 209 234 210 235 await this.knownPDSStorage.upsertKnownPDS({ ··· 219 244 } 220 245 } 221 246 222 - private async processSingleDID(pds: string, did: DID, considerAddingToLists: boolean) { 247 + private async processDID(did: DID, considerAddingToLists: boolean, scheduleUnknownPDSForCrawling: boolean, mayUseCache: boolean): Promise<boolean> { 248 + // always reconfirm the current PDS of the DID because even when we're processing a PDS, the PDS may claim 249 + // to host accounts that it doesn't actually host anymore 250 + let pds: string; 251 + try { 252 + pds = await this.crawler.identifyPDSofDID(did, mayUseCache); 253 + } catch (e) { 254 + console.log(`Failed to identify PDS of ${did} for label reprocessing: ${e}`); 255 + return false; 256 + } 257 + 258 + if (scheduleUnknownPDSForCrawling) { 259 + await this.maybeSchedulePDSProcessing(pds, did); 260 + } 261 + 223 262 const correctLabels = this.labelDefiner!.determinePDSLabels(pds); 224 263 const correctLabelIDs = correctLabels.map((l) => l.identifier); 225 264 await this.processDIDWithLabels(did, correctLabelIDs); ··· 231 270 await this.processDIDWithLists(did, correctLists); 232 271 } catch (e) { 233 272 console.log(`Failed to manage list membership for ${did}: ${e}`); 273 + return false; 234 274 } 235 275 } 276 + return true; 236 277 } 237 278 238 279 private async getDIDToPossiblyUpdate( ··· 294 335 await this.processPDS(pds); 295 336 } else { 296 337 console.log(`Refreshing single DID ${did} on PDS ${pds} based on detected activity`); 297 - await this.processSingleDID(pds, did, true); 338 + await this.processDID(did, true, false, false); 298 339 } 299 340 }); 300 341 } ··· 339 380 for await (const uri of this.activeLabelsStorage.getActiveLabelSubjects()) { 340 381 this.taskProcessor.schedule(async () => { 341 382 const did = uri as `did:${"plc" | "web"}:${string}`; 342 - let pds: string; 343 383 try { 344 - pds = await this.crawler.identifyPDSofDID(did, true); 345 - } catch (e) { 346 - console.log(`Failed to identify PDS of ${did} for label reprocessing: ${e}`); 347 - if (++totalAttempted === totalToProcess) { 348 - resolveAllAttempted(); 384 + const success = await this.processDID(did, false, true, true); 385 + if (success && ++totalProcessed % 100 === 0) { 386 + console.log(`Active label processing: processed ${totalProcessed} / ${totalToProcess} DIDs so far`); 349 387 } 350 - return; 388 + } catch (e) { 389 + console.log(`Failed to reprocess labels of ${did}: ${e}`); 351 390 } 352 391 353 - await this.pdsProcessingMutex.runExclusive(pds, async () => { 354 - try { 355 - const maybeKnownPDS = await this.knownPDSStorage.getKnownPDS(pds); 356 - 357 - const shouldHaveLabels = this.pdsFilter(pds); 358 - 359 - if (typeof maybeKnownPDS === "undefined" && shouldHaveLabels) { 360 - console.log(`Crawling new PDS discovered when reprocessing labels for ${did}: ${pds}`); 361 - await this.processPDS(pds); 362 - } else { 363 - if (!shouldHaveLabels) { 364 - await this.knownPDSStorage.removeKnownPDS(pds); 365 - } 366 - await this.processSingleDID(pds, did, false); 367 - } 368 - totalProcessed++; 369 - if (totalProcessed % 100 === 0) { 370 - console.log(`Active label processing: processed ${totalProcessed} / ${totalToProcess} DIDs so far`); 371 - } 372 - } catch (e) { 373 - console.log(`Failed to reprocess labels of ${did}: ${e}`); 374 - } 375 - }); 376 392 if (++totalAttempted === totalToProcess) { 377 393 resolveAllAttempted(); 378 394 }
+1 -1
taskProcessor.ts
··· 15 15 console.log(`Task processor created with a maximum of ${maxParallelTasks} parallel tasks`); 16 16 } 17 17 18 - schedule(task: () => Promise<void>, priority = 100): Promise<void> { 18 + schedule(task: () => Promise<any>, priority = 100): Promise<void> { 19 19 return new Promise((resolve) => { 20 20 this.taskQueue.buffer 21 21 this.taskQueue.push(task, priority, () => resolve());