A Bluesky labeler that labels accounts hosted on PDSes operated by entities other than Bluesky PBC
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

Allow for adding labeled users to a list feat. more terrible code

gbl08ma 4fe318e9 8672c4f4

+346 -66
+102
activeListItemsStorage.ts
··· 1 + import type { LabelerServer } from "@skyware/labeler"; 2 + 3 + export class ActiveListItemsStorage { 4 + private db: typeof LabelerServer.prototype.db; 5 + 6 + /** 7 + * Promise that resolves when database initialization is complete. 8 + * This should be awaited before any database operations. 9 + */ 10 + private readonly dbInitLock?: Promise<void>; 11 + 12 + constructor(db: typeof this.db) { 13 + this.db = db; 14 + 15 + this.dbInitLock = this.initializeDatabase(); 16 + } 17 + 18 + async initializeDatabase() { 19 + await this.db.execute(` 20 + CREATE TABLE IF NOT EXISTS active_list_items ( 21 + uri TEXT NOT NULL, 22 + list TEXT NOT NULL, 23 + subject TEXT NOT NULL, 24 + PRIMARY KEY (uri), 25 + UNIQUE (list, subject) 26 + ); 27 + `); 28 + } 29 + 30 + async computeListsDiff(subject: string, desiredLists: string[]): Promise<{ 31 + listsToAddTo: string[], 32 + listItemURIsToRemove: string[], 33 + }> { 34 + await this.dbInitLock; 35 + 36 + const diff: { 37 + listsToAddTo: string[], 38 + listItemURIsToRemove: string[], 39 + } = { 40 + listsToAddTo: [], 41 + listItemURIsToRemove: [], 42 + }; 43 + 44 + const desiredOutcomeSet = new Set(desiredLists); 45 + 46 + const result = await this.db.execute({ 47 + sql: ` 48 + SELECT uri, list, subject FROM active_list_items 49 + WHERE subject = ? 50 + `, 51 + args: [subject], 52 + }); 53 + 54 + const activeLists = result.rows.map((row) => ({ 55 + uri: row.uri as string, 56 + list: row.list as string, 57 + })); 58 + const activeListsSet = new Set(activeLists.map(l => l.list)); 59 + 60 + for (const listRow of activeLists) { 61 + if (!desiredOutcomeSet.has(listRow.list)) { 62 + diff.listItemURIsToRemove.push(listRow.uri); 63 + } 64 + } 65 + 66 + for (const list of desiredLists) { 67 + if (!activeListsSet.has(list)) { 68 + diff.listsToAddTo.push(list); 69 + } 70 + } 71 + 72 + return diff; 73 + } 74 + 75 + async registerListItemCreation(uri: string, list: string, subject: string) { 76 + await this.dbInitLock; 77 + 78 + const result = await this.db.execute({ 79 + sql: ` 80 + INSERT INTO active_list_items (uri, list, subject) 81 + VALUES (?, ?, ?) 82 + `, 83 + args: [uri, list, subject], 84 + }); 85 + 86 + if (!result.rowsAffected) throw new Error("Failed to register list item creation"); 87 + } 88 + 89 + async registerListItemDeletion(uri: string) { 90 + await this.dbInitLock; 91 + 92 + const result = await this.db.execute({ 93 + sql: ` 94 + DELETE FROM active_list_items 95 + WHERE uri = ? 96 + `, 97 + args: [uri], 98 + }); 99 + 100 + if (!result.rowsAffected) throw new Error("Failed to register list item deletion"); 101 + } 102 + }
+5
bun.lock
··· 11 11 "@atcute/jetstream": "^1.1.0", 12 12 "@skyware/labeler": "^0.2.0", 13 13 "async": "^3.2.6", 14 + "async-mutex": "^0.5.0", 14 15 "cacheable": "^2.0.1", 15 16 }, 16 17 "devDependencies": { ··· 140 141 "ajv-formats": ["ajv-formats@2.1.1", "", { "dependencies": { "ajv": "^8.0.0" } }, "sha512-Wx0Kx52hxE7C18hkMEggYlEifqWZtYaRgouJor+WMdPnQyEK13vgEWyVNup7SoeeoLMsr4kf5h6dOW11I15MUA=="], 141 142 142 143 "async": ["async@3.2.6", "", {}, "sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA=="], 144 + 145 + "async-mutex": ["async-mutex@0.5.0", "", { "dependencies": { "tslib": "^2.4.0" } }, "sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA=="], 143 146 144 147 "atomic-sleep": ["atomic-sleep@1.0.0", "", {}, "sha512-kNOjDqAh7px0XWNI+4QbzoiR/nTkHAWNud2uvnJquD1/x5a7EQZMJT0AczqK0Qn67oY/TTQ1LbUKajZpp3I9tQ=="], 145 148 ··· 278 281 "thread-stream": ["thread-stream@3.1.0", "", { "dependencies": { "real-require": "^0.2.0" } }, "sha512-OqyPZ9u96VohAyMfJykzmivOrY2wfMSf3C5TtFJVgN+Hm6aj+voFhlK+kZEIv2FBh1X6Xp3DlnCOfEQ3B2J86A=="], 279 282 280 283 "toad-cache": ["toad-cache@3.7.0", "", {}, "sha512-/m8M+2BJUpoJdgAHoG+baCwBT+tf2VraSfkBgl0Y00qIWt41DJ8R5B8nsEw0I58YwF5IZH6z24/2TobDKnqSWw=="], 284 + 285 + "tslib": ["tslib@2.8.1", "", {}, "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w=="], 281 286 282 287 "type-fest": ["type-fest@4.41.0", "", {}, "sha512-TeTSQ6H5YHvpqVwBRcnLDCBnDOHWYu7IvGbHT6N8AOymcr9PJGjc1GTtiWZTYg0NCgYwvnYWEkVChQAr9bjfwA=="], 283 288
+2 -1
index.ts
··· 21 21 allowIssuingLabels: process.env.ACTUALLY_EMIT_LABELS ? process.env.ACTUALLY_EMIT_LABELS === "true" : throwMissingEnvVarError(), 22 22 maxPDSDedicatedLabels: parseInt(process.env.MAX_PDS_DEDICATED_LABELS ?? "50"), 23 23 maxExpectedReposPerPDS: parseInt(process.env.MAX_EXPECTED_REPOS_PER_PDS ?? "100000"), 24 - maxParallelTasks: parseInt(process.env.MAX_PARALLEL_TASKS ?? os.cpus().length * 4 + "") 24 + maxParallelTasks: parseInt(process.env.MAX_PARALLEL_TASKS ?? os.cpus().length * 4 + ""), 25 + labeledUsersList: process.env.LABELED_USERS_LIST_URI 25 26 } 26 27 27 28 new Labeler(labelerOptions).run();
+1 -13
labelDefiner.ts
··· 1 1 import type { ComAtprotoLabelDefs } from "@atcute/atproto"; 2 2 import { getLabelerLabelDefinitions, setLabelerLabelDefinitions, type LoginCredentials } from "@skyware/labeler/scripts"; 3 - import { PDSCrawler } from "./pdsCrawler"; 4 3 import type { KnownPDS } from "./knownPDSStorage"; 5 4 6 5 type simplifiedLabel = { ··· 10 9 offByDefault: boolean, 11 10 } 12 11 13 - const baseLabel: simplifiedLabel = { 12 + export const baseLabel: simplifiedLabel = { 14 13 identifier: "internal-independent", 15 14 name: "Independent PDS", 16 15 description: "Account is hosted by an independent PDS (any of them, combines with labels below)", ··· 35 34 36 35 export class LabelDefiner { 37 36 private loginCredentials: LoginCredentials; 38 - private setPDSOnCredentials = false; 39 37 40 38 private _dedicatedLabels: simplifiedLabel[] = []; 41 39 private _dedicatedLabelsMap = new Map<string, simplifiedLabel>(); ··· 55 53 } 56 54 57 55 private async setLabelerLabelDefinitions() { 58 - if (!this.setPDSOnCredentials) { 59 - try { 60 - const crawler = new PDSCrawler(0); 61 - this.loginCredentials.pds = await crawler.identifyPDSofDID(this.loginCredentials.identifier as `did:${"plc" | "web"}:${string}`, false); 62 - } catch (e) { 63 - throw new Error("Could not determine PDS of labeler DID:" + e); 64 - } 65 - 66 - } 67 - 68 56 let labelDefinitions: ComAtprotoLabelDefs.LabelValueDefinition[] = []; 69 57 for (const simplifiedLabel of [baseLabel, ...this.dedicatedLabels]) { 70 58 labelDefinitions.push({
+160 -51
labeler.ts
··· 1 + import { Client, CredentialManager, ok } from "@atcute/client"; 1 2 import { JetstreamSubscription } from "@atcute/jetstream"; 3 + import { parseResourceUri } from "@atcute/lexicons/syntax"; 2 4 import { LabelerServer } from "@skyware/labeler"; 5 + import type { LoginCredentials } from "@skyware/labeler/scripts"; 3 6 import { ActiveLabelsStorage } from "./activeLabelsStorage"; 7 + import { ActiveListItemsStorage } from "./activeListItemsStorage"; 4 8 import { KnownPDSStorage } from "./knownPDSStorage"; 5 9 import { LabelDefiner } from "./labelDefiner"; 10 + import { ListDeterminer } from "./listDeterminer"; 6 11 import { PDSCrawler } from "./pdsCrawler"; 7 12 import { TaskProcessor } from "./taskProcessor"; 8 - import { wrapAsyncInCatch, type DID } from "./utils"; 13 + import { combinedRunExclusive, NamedMutex, wrapAsyncInCatch, type DID } from "./utils"; 9 14 10 15 export type LabelerOptions = { 11 16 databasePath: string | undefined; ··· 22 27 maxPDSDedicatedLabels: number, 23 28 maxExpectedReposPerPDS: number, 24 29 maxParallelTasks: number, 30 + labeledUsersList: string | undefined, 25 31 } 26 32 27 - type crawlCallback = (v: { 28 - pds: string, 29 - did: DID, 30 - }) => void; 33 + type crawlCallback = (pds: string, did: DID) => void; 31 34 32 35 export class Labeler { 33 36 private taskProcessor: TaskProcessor; 34 37 private server: LabelerServer; 35 38 private listenerOptions: Parameters<typeof this.server.start>[0]; 36 39 private activeLabelsStorage: ActiveLabelsStorage; 40 + private activeListItemsStorage: ActiveListItemsStorage; 37 41 private knownPDSStorage: KnownPDSStorage; 38 42 private crawler: PDSCrawler; 39 - private labelDefiner: LabelDefiner; 43 + private labelDefiner: LabelDefiner | undefined; 44 + private listDeterminer: ListDeterminer; 40 45 41 - private allowIssuingLabels; 46 + private allowIssuingLabels: boolean; 47 + private allowManagingLists: boolean; 42 48 43 49 private reconsiderActivePDSForRecrawlingAfterMilliseconds: number; 44 50 private recrawlKnownPDSAfterNotCrawledForMilliseconds: number; ··· 46 52 private updateLabelDefinitionsEveryMilliseconds: number; 47 53 private maxPDSDedicatedLabels: number; 48 54 55 + private credentialManager: CredentialManager | undefined; 56 + private authenticatedRPC: Client | undefined; 57 + 58 + private didProcessingMutex = new NamedMutex(); 59 + private pdsProcessingMutex = new NamedMutex(); 60 + 61 + private initialized: Promise<void>; 62 + 49 63 constructor(options: LabelerOptions) { 50 64 this.taskProcessor = new TaskProcessor(options.maxParallelTasks); 51 65 ··· 61 75 } 62 76 63 77 this.activeLabelsStorage = new ActiveLabelsStorage(this.server.db); 78 + this.activeListItemsStorage = new ActiveListItemsStorage(this.server.db); 64 79 this.knownPDSStorage = new KnownPDSStorage(this.server.db); 65 80 this.crawler = new PDSCrawler(options.maxExpectedReposPerPDS); 66 81 ··· 72 87 this.allowIssuingLabels = options.allowIssuingLabels; 73 88 this.maxPDSDedicatedLabels = options.maxPDSDedicatedLabels; 74 89 75 - this.labelDefiner = new LabelDefiner({ 90 + this.listDeterminer = new ListDeterminer(options.labeledUsersList); 91 + 92 + this.allowManagingLists = typeof options.labeledUsersList !== "undefined"; 93 + 94 + this.initialized = this.initializeAsync({ 76 95 identifier: options.did, 77 96 password: options.accountPassword, 78 97 }); 79 98 } 80 99 100 + private async initializeAsync(loginCredentials: LoginCredentials) { 101 + try { 102 + const crawler = new PDSCrawler(0); 103 + loginCredentials.pds = await crawler.identifyPDSofDID(loginCredentials.identifier as `did:${"plc" | "web"}:${string}`, false); 104 + } catch (e) { 105 + throw new Error("Could not determine PDS of labeler DID:" + e); 106 + } 107 + 108 + this.labelDefiner = new LabelDefiner(loginCredentials); 109 + 110 + this.credentialManager = new CredentialManager({ service: loginCredentials.pds }); 111 + await this.credentialManager.login({ identifier: loginCredentials.identifier, password: loginCredentials.password }) 112 + 113 + this.authenticatedRPC = new Client({ handler: this.credentialManager }); 114 + } 115 + 81 116 private pdsFilter(pds: string): boolean { 82 - return this.labelDefiner.determinePDSLabels(pds).length > 0; 117 + return (this.labelDefiner?.determinePDSLabels(pds).length ?? 0) > 0; 83 118 } 84 119 85 120 private async processDIDWithLabels(did: DID, correctLabelIDs: string[]) { ··· 114 149 } 115 150 } 116 151 152 + private async processDIDWithLists(did: DID, correctLists: string[]) { 153 + try { 154 + const listsDiff = await this.activeListItemsStorage.computeListsDiff(did, correctLists); 155 + 156 + for (const listURI of listsDiff.listsToAddTo) { 157 + console.log(`Adding ${did} to list ${listURI}`); 158 + const result = await ok( 159 + this.authenticatedRPC!.post("com.atproto.repo.createRecord", { 160 + input: { 161 + repo: this.credentialManager!.session?.did!, 162 + collection: "app.bsky.graph.listitem", 163 + record: { 164 + type: "app.bsky.graph.listitem", 165 + subject: did, 166 + list: listURI, 167 + createdAt: new Date().toISOString(), 168 + }, 169 + } 170 + }) 171 + ); 172 + 173 + const listItemURI = result.uri; 174 + await this.activeListItemsStorage.registerListItemCreation(listItemURI, listURI, did); 175 + } 176 + for (const listItemURI of listsDiff.listItemURIsToRemove) { 177 + console.log(`Deleting list item ${listItemURI}`); 178 + 179 + const parsedResourceURI = parseResourceUri(listItemURI); 180 + if (!parsedResourceURI.ok) { 181 + throw new Error("Invalid list item URI stored!"); 182 + } 183 + await ok( 184 + this.authenticatedRPC!.post("com.atproto.repo.deleteRecord", { 185 + input: { 186 + repo: parsedResourceURI.value.repo, 187 + collection: parsedResourceURI.value.collection!, 188 + rkey: parsedResourceURI.value.rkey!, 189 + } 190 + }) 191 + ); 192 + 193 + await this.activeListItemsStorage.registerListItemDeletion(listItemURI); 194 + } 195 + } catch (e) { 196 + console.log("Error processing list items:", e); 197 + } 198 + } 199 + 117 200 private async processPDS(pds: string) { 118 - const correctLabels = this.labelDefiner.determinePDSLabels(pds); 201 + const correctLabels = this.labelDefiner!.determinePDSLabels(pds); 119 202 const correctLabelIDs = correctLabels.map((l) => l.identifier); 120 203 let totalRepos = 0; 121 204 ··· 137 220 } 138 221 } 139 222 140 - private async processSingleDID(pds: string, did: DID) { 141 - const correctLabels = this.labelDefiner.determinePDSLabels(pds); 223 + private async processSingleDID(pds: string, did: DID, considerAddingToLists: boolean) { 224 + const correctLabels = this.labelDefiner!.determinePDSLabels(pds); 142 225 const correctLabelIDs = correctLabels.map((l) => l.identifier); 143 226 await this.processDIDWithLabels(did, correctLabelIDs); 144 - } 145 227 228 + if (considerAddingToLists && this.allowManagingLists) { 229 + try { 230 + // separate try-catch for this since it's expected to fail more often (rate limits) and doesn't affect label processing 231 + const correctLists = this.listDeterminer.determineUserLists(correctLabelIDs); 232 + await this.processDIDWithLists(did, correctLists); 233 + } catch (e) { 234 + console.log(`Failed to manage list membership for ${did}: ${e}`); 235 + } 236 + } 237 + } 146 238 147 - private async getPDSOrDIDToCrawl( 239 + private async getDIDToPossiblyUpdate( 148 240 wantedCollections: string[], 149 241 callback: crawlCallback, 150 242 ) { ··· 164 256 break; 165 257 } 166 258 // avoid looking up DIDs for handles which we know can't be on an independent PDS 167 - if (event.kind === "identity" && this.labelDefiner.shouldIgnoreHandle(event.identity.handle)) { 259 + if (event.kind === "identity" && this.labelDefiner!.shouldIgnoreHandle(event.identity.handle)) { 168 260 continue; 169 261 } 170 262 ··· 186 278 const mayUseCache = eventKind !== "identity"; 187 279 const pds = await this.crawler.identifyPDSofDID(eventDID as `did:${"plc" | "web"}:${string}`, mayUseCache); 188 280 189 - callback({ 190 - pds: pds, 191 - did: eventDID, 192 - }) 281 + callback(pds, eventDID); 193 282 } catch { 194 283 } 195 284 } 196 285 286 + private async refreshDIDOrCrawlPDS(pds: string, did: DID) { 287 + await combinedRunExclusive([ 288 + [this.didProcessingMutex, did], 289 + [this.pdsProcessingMutex, pds] 290 + ], async () => { 291 + const maybeKnownPDS = await this.knownPDSStorage.getKnownPDS(pds); 292 + 293 + if (typeof maybeKnownPDS === "undefined") { 294 + console.log("Crawling previously undiscovered independent PDS: " + pds); 295 + await this.processPDS(pds); 296 + } else if (new Date().getTime() - maybeKnownPDS.lastCrawled.getTime() > this.reconsiderActivePDSForRecrawlingAfterMilliseconds) { 297 + console.log("Re-crawling already known independent PDS based on detected activity: " + pds); 298 + await this.processPDS(pds); 299 + } else { 300 + console.log(`Refreshing single DID ${did} on PDS ${pds} based on detected activity`); 301 + await this.processSingleDID(pds, did, true); 302 + } 303 + }); 304 + } 305 + 197 306 private async activityBasedProcessing() { 198 - await this.getPDSOrDIDToCrawl(["app.bsky.actor.profile", "app.bsky.graph.follow", "app.bsky.feed.post"], async (pdsAndDID) => { 199 - if (!this.pdsFilter(pdsAndDID.pds)) { 307 + await this.getDIDToPossiblyUpdate(["app.bsky.actor.profile", "app.bsky.graph.follow", "app.bsky.feed.post"], async (pds, did) => { 308 + if (!this.pdsFilter(pds)) { 200 309 return; 201 310 } 202 311 203 - this.taskProcessor.schedule(async () => { 204 - const maybeKnownPDS = await this.knownPDSStorage.getKnownPDS(pdsAndDID.pds); 205 - 206 - if (typeof maybeKnownPDS === "undefined") { 207 - console.log("Crawling previously undiscovered independent PDS: " + pdsAndDID.pds); 208 - await this.processPDS(pdsAndDID.pds); 209 - } else if (new Date().getTime() - maybeKnownPDS.lastCrawled.getTime() > this.reconsiderActivePDSForRecrawlingAfterMilliseconds) { 210 - console.log("Re-crawling already known independent PDS based on detected activity: " + pdsAndDID.pds); 211 - await this.processPDS(pdsAndDID.pds); 212 - } else { 213 - console.log(`Refreshing single DID ${pdsAndDID.did} on PDS ${pdsAndDID.pds} based on detected activity`); 214 - await this.processSingleDID(pdsAndDID.pds, pdsAndDID.did); 215 - } 216 - }); 312 + this.taskProcessor.schedule(this.refreshDIDOrCrawlPDS.bind(this, pds, did)); 217 313 }); 218 314 } 219 315 ··· 225 321 for await (const knownPDS of generator) { 226 322 console.log(`Scheduling known PDS ${knownPDS.uri} for re-crawling`); 227 323 this.taskProcessor.schedule(async () => { 228 - console.log(`Re-crawling known PDS ${knownPDS.uri}`); 229 - await this.processPDS(knownPDS.uri); 324 + await this.pdsProcessingMutex.runExclusive(knownPDS.uri, async () => { 325 + console.log(`Re-crawling known PDS ${knownPDS.uri}`); 326 + await this.processPDS(knownPDS.uri); 327 + }); 230 328 }); 231 329 } 232 330 } ··· 237 335 let totalProcessed = 0; 238 336 for await (const uri of this.activeLabelsStorage.getActiveLabelSubjects()) { 239 337 const p = this.taskProcessor.schedule(async () => { 240 - try { 241 - const did = uri as `did:${"plc" | "web"}:${string}`; 242 - const pds = await this.crawler.identifyPDSofDID(did, true); 243 - const maybeKnownPDS = await this.knownPDSStorage.getKnownPDS(pds); 338 + const did = uri as `did:${"plc" | "web"}:${string}`; 339 + await this.didProcessingMutex.runExclusive(did, async () => { 340 + try { 341 + const pds = await this.crawler.identifyPDSofDID(did, true); 342 + await this.pdsProcessingMutex.runExclusive(pds, async () => { 343 + try { 344 + const maybeKnownPDS = await this.knownPDSStorage.getKnownPDS(pds); 244 345 245 - if (typeof maybeKnownPDS === "undefined") { 246 - console.log(`Crawling new PDS discovered when reprocessing labels for ${did}: ${pds}`); 247 - await this.processPDS(pds); 248 - } else { 249 - await this.processSingleDID(pds, did); 346 + if (typeof maybeKnownPDS === "undefined") { 347 + console.log(`Crawling new PDS discovered when reprocessing labels for ${did}: ${pds}`); 348 + await this.processPDS(pds); 349 + } else { 350 + await this.processSingleDID(pds, did, false); 351 + } 352 + totalProcessed++; 353 + } catch (e) { 354 + console.log(`Failed to reprocess labels of ${did}: ${e}`); 355 + } 356 + }); 357 + } catch (e) { 358 + console.log(`Failed to identify PDS of ${did} for label reprocessing: ${e}`); 250 359 } 251 - totalProcessed++; 252 - } catch (e) { 253 - console.log(`Failed to reprocess labels of ${uri}: ${e}`); 254 - } 360 + 361 + }); 255 362 }); 256 363 // avoid having too many in-flight tasks at once 257 364 promises.push(p); ··· 267 374 268 375 private async updateLabelerLabels(forceUpdate: boolean) { 269 376 const topPDSs = await Array.fromAsync(this.knownPDSStorage.getBiggestKnownPDSs(this.maxPDSDedicatedLabels)); 270 - const updated = await this.labelDefiner.updateDedicatedPDSLabels(topPDSs, forceUpdate); 377 + const updated = await this.labelDefiner!.updateDedicatedPDSLabels(topPDSs, forceUpdate); 271 378 if (updated) { 272 379 console.log("Updated labeler labels"); 273 380 } else { ··· 276 383 } 277 384 278 385 async run() { 386 + await this.initialized; 387 + 279 388 const serverStartPromise = new Promise((resolve, reject) => { 280 389 try { 281 390 this.server.start(this.listenerOptions, resolve)
+16
listDeterminer.ts
··· 1 + import { baseLabel } from "./labelDefiner"; 2 + 3 + export class ListDeterminer { 4 + private labeledUsersList: string | undefined; 5 + 6 + constructor(labeledUsersList: string | undefined) { 7 + this.labeledUsersList = labeledUsersList; 8 + } 9 + 10 + determineUserLists(labelIDs: string[]): string[] { 11 + if (labelIDs.indexOf(baseLabel.identifier) >= 0 && typeof this.labeledUsersList !== "undefined") { 12 + return [this.labeledUsersList]; 13 + } 14 + return []; 15 + } 16 + }
+1
package.json
··· 18 18 "@atcute/jetstream": "^1.1.0", 19 19 "@skyware/labeler": "^0.2.0", 20 20 "async": "^3.2.6", 21 + "async-mutex": "^0.5.0", 21 22 "cacheable": "^2.0.1" 22 23 }, 23 24 "overrides": {
+59 -1
utils.ts
··· 1 + import { Mutex, type MutexInterface } from "async-mutex"; 2 + 1 3 export async function wrapAsyncInCatch(a: PromiseLike<any>) { 2 4 try { 3 5 await a; ··· 15 17 ]).finally(() => clearTimeout(timer)); 16 18 } 17 19 18 - export type DID = `did:${string}:${string}`; 20 + export type DID = `did:${string}:${string}`; 21 + 22 + export class NamedMutex { 23 + private muMap = new Map<string, Mutex>(); 24 + 25 + private getOrCreate(key: string): Mutex { 26 + let v = this.muMap.get(key); 27 + if (!v) { 28 + v = new Mutex(); 29 + this.muMap.set(key, v); 30 + } 31 + return v; 32 + } 33 + 34 + async acquire(key: string, priority?: number): Promise<MutexInterface.Releaser> { 35 + const releaser = await this.getOrCreate(key).acquire(priority); 36 + this.muMap.delete(key); 37 + return releaser; 38 + }; 39 + async runExclusive<T>(key: string, callback: MutexInterface.Worker<T>, priority?: number): Promise<T> { 40 + const result = await this.getOrCreate(key).runExclusive(callback, priority); 41 + this.muMap.delete(key); 42 + return result; 43 + }; 44 + isLocked(key: string): boolean { 45 + let v = this.muMap.get(key); 46 + if (!v) { 47 + return false; 48 + } 49 + return v.isLocked(); 50 + }; 51 + async waitForUnlock(key: string, priority?: number): Promise<void> { 52 + let v = this.muMap.get(key); 53 + if (!v) { 54 + return; 55 + } 56 + return await v.waitForUnlock(priority); 57 + }; 58 + release(key: string): void { 59 + return this.getOrCreate(key).release(); 60 + }; 61 + cancel(key: string): void { 62 + return this.getOrCreate(key).cancel(); 63 + }; 64 + } 65 + 66 + type NamedMutexReference = [nmu: NamedMutex, key: string, priority?: number]; 67 + export async function combinedRunExclusive<T>(nmus: [NamedMutexReference, ...NamedMutexReference[]], callback: MutexInterface.Worker<T>): Promise<T> { 68 + if (!nmus || nmus.length == 0) { 69 + throw new Error("missing at least one mutex reference"); 70 + } 71 + if (nmus.length == 1) { 72 + const nmu = nmus[0]; 73 + return await nmu[0].runExclusive(nmu[1], callback, nmu[2]); 74 + } 75 + return await combinedRunExclusive(nmus.slice(1) as [NamedMutexReference, ...NamedMutexReference[]], callback); 76 + }