A Bluesky labeler that labels accounts hosted on PDSes operated by entities other than Bluesky PBC
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

Do not count DIDs hosted elsewhere towards a PDS's repo count

gbl08ma 90b83e27 d1fb0825

+32 -17
+3
bun.lock
··· 13 13 "async": "^3.2.6", 14 14 "async-mutex": "^0.5.0", 15 15 "cacheable": "^2.0.1", 16 + "normalize-url": "^8.1.0", 16 17 }, 17 18 "devDependencies": { 18 19 "@types/async": "^3.2.25", ··· 221 222 "node-domexception": ["node-domexception@1.0.0", "", {}, "sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ=="], 222 223 223 224 "node-fetch": ["node-fetch@3.3.2", "", { "dependencies": { "data-uri-to-buffer": "^4.0.0", "fetch-blob": "^3.1.4", "formdata-polyfill": "^4.0.10" } }, "sha512-dRB78srN/l6gqWulah9SrxeYnxeddIG30+GOqK/9OlLVyLg3HPnr6SqOWTWOXKRwC2eGYCkZ59NNuSgvSrpgOA=="], 225 + 226 + "normalize-url": ["normalize-url@8.1.0", "", {}, "sha512-X06Mfd/5aKsRHc0O0J5CUedwnPmnDtLF2+nq+KN9KSDlJHkPuh0JUviWjEWMe0SW/9TDdSLVPuk7L5gGTIA1/w=="], 224 227 225 228 "on-exit-leak-free": ["on-exit-leak-free@2.1.2", "", {}, "sha512-0eJJY6hXLGf1udHwfNftBqH+g73EU4B504nZeKpz1sYRKafAghwxEJunB2O7rDZkL4PGfsMVnTXZ2EjibbqcsA=="], 226 229
+27 -16
labeler.ts
··· 3 3 import { parseResourceUri } from "@atcute/lexicons/syntax"; 4 4 import { LabelerServer } from "@skyware/labeler"; 5 5 import type { LoginCredentials } from "@skyware/labeler/scripts"; 6 + import normalizeUrl, { type Options as NormalizeURLOptions } from "normalize-url"; 6 7 import { ActiveLabelsStorage } from "./activeLabelsStorage"; 7 8 import { ActiveListItemsStorage } from "./activeListItemsStorage"; 8 9 import { KnownPDSStorage } from "./knownPDSStorage"; ··· 206 207 await this.pdsProcessingMutex.runExclusive(pds, async () => { 207 208 // this is a task that's scheduled async, must confirm it is still unknown within the mutex 208 209 const maybeKnownPDS = await this.knownPDSStorage.getKnownPDS(pds); 209 - const failedRecently = await this.knownPDSStorage.hasPDSCrawlFailed(pds, new Date(new Date().getTime() - 30*60*1000)); 210 + const failedRecently = await this.knownPDSStorage.hasPDSCrawlFailed(pds, new Date(new Date().getTime() - 30 * 60 * 1000)); 210 211 211 212 if (typeof maybeKnownPDS === "undefined" && !failedRecently) { 212 213 console.log(`Crawling new PDS discovered when processing labels for ${becauseOfDID}: ${pds}`); ··· 220 221 private async processPDS(pds: string) { 221 222 let totalRepos = 0; 222 223 224 + const normOptions: NormalizeURLOptions = { 225 + stripWWW: false, 226 + defaultProtocol: "https", 227 + } 228 + 223 229 try { 224 230 for await (const repo of this.crawler.getPDSRepos(pds)) { 225 - this.taskProcessor.schedule(this.processDID.bind(this, repo.did, false, false, true)); 226 - totalRepos++; 227 - if (totalRepos % 200 == 0) { 228 - // avoid having too many in-flight tasks at once, 229 - // by periodically waiting until the queue is unsaturated (25% of the workers are free) to schedule more 230 - // otherwise we'd load dozens of thousands of tasks into memory 231 - await this.taskProcessor.unsaturated(); 231 + try { 232 + const realPDSOfRepo = await this.processDID(repo.did, false, false, true); 233 + if (normalizeUrl(realPDSOfRepo, normOptions) === normalizeUrl(pds, normOptions)) { 234 + totalRepos++; 235 + } 236 + } catch (e) { 237 + console.log(`Failed to process ${repo} within PDS ${pds}: ${e}`); 232 238 } 233 239 } 234 240 ··· 244 250 } 245 251 } 246 252 247 - private async processDID(did: DID, considerAddingToLists: boolean, scheduleUnknownPDSForCrawling: boolean, mayUseCache: boolean): Promise<boolean> { 253 + private async processDID(did: DID, considerAddingToLists: boolean, scheduleUnknownPDSForCrawling: boolean, mayUseCache: boolean): Promise<string> { 248 254 // always reconfirm the current PDS of the DID because even when we're processing a PDS, the PDS may claim 249 255 // to host accounts that it doesn't actually host anymore 250 256 let pds: string; 251 257 try { 252 258 pds = await this.crawler.identifyPDSofDID(did, mayUseCache); 253 259 } catch (e) { 254 - console.log(`Failed to identify PDS of ${did} for label reprocessing: ${e}`); 255 - return false; 260 + throw new Error(`Failed to identify PDS of ${did} for label reprocessing: ${e}`, { 261 + cause: e, 262 + }) 256 263 } 257 264 258 265 if (scheduleUnknownPDSForCrawling) { ··· 270 277 await this.processDIDWithLists(did, correctLists); 271 278 } catch (e) { 272 279 console.log(`Failed to manage list membership for ${did}: ${e}`); 273 - return false; 280 + // but continue to return since only list management failed 274 281 } 275 282 } 276 - return true; 283 + return pds; 277 284 } 278 285 279 286 private async getDIDToPossiblyUpdate( ··· 335 342 await this.processPDS(pds); 336 343 } else { 337 344 console.log(`Refreshing single DID ${did} on PDS ${pds} based on detected activity`); 338 - await this.processDID(did, true, false, false); 345 + try { 346 + await this.processDID(did, true, false, false); 347 + } catch (e) { 348 + console.log(`Failed to process ${did} after detected activity: ${e}`); 349 + } 339 350 } 340 351 }); 341 352 } ··· 381 392 this.taskProcessor.schedule(async () => { 382 393 const did = uri as `did:${"plc" | "web"}:${string}`; 383 394 try { 384 - const success = await this.processDID(did, false, true, true); 385 - if (success && ++totalProcessed % 100 === 0) { 395 + await this.processDID(did, false, true, true); 396 + if (++totalProcessed % 100 === 0) { 386 397 console.log(`Active label processing: processed ${totalProcessed} / ${totalToProcess} DIDs so far`); 387 398 } 388 399 } catch (e) {
+2 -1
package.json
··· 19 19 "@skyware/labeler": "^0.2.0", 20 20 "async": "^3.2.6", 21 21 "async-mutex": "^0.5.0", 22 - "cacheable": "^2.0.1" 22 + "cacheable": "^2.0.1", 23 + "normalize-url": "^8.1.0" 23 24 }, 24 25 "overrides": { 25 26 "@atcute/ozone": "^3.1.6",