A Bluesky labeler that labels accounts hosted on PDSes operated by entities other than Bluesky PBC
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

Reprocess active labels on server start-up

gbl08ma cb634373 fe2d5450

+64
+35
activeLabelsStorage.ts
··· 23 23 PRIMARY KEY (uri, val) 24 24 ); 25 25 `); 26 + await this.db.execute(` 27 + CREATE INDEX IF NOT EXISTS idx_active_labels_uri ON active_labels (uri); 28 + `); 26 29 } 27 30 28 31 async computeLabelsDiff(uri: string, desiredOutcome: string[]): Promise<{ ··· 68 71 } 69 72 70 73 async registerLabelActivation(uri: string, val: string) { 74 + await this.dbInitLock; 75 + 71 76 const result = await this.db.execute({ 72 77 sql: ` 73 78 INSERT INTO active_labels (uri, val) ··· 80 85 } 81 86 82 87 async registerLabelDeactivation(uri: string, val: string) { 88 + await this.dbInitLock; 89 + 83 90 const result = await this.db.execute({ 84 91 sql: ` 85 92 DELETE FROM active_labels ··· 89 96 }); 90 97 91 98 if (!result.rowsAffected) throw new Error("Failed to register label deactivation"); 99 + } 100 + 101 + async* getActiveLabelSubjects() { 102 + await this.dbInitLock; 103 + 104 + let cursor = ""; 105 + while (true) { 106 + const result = await this.db.execute({ 107 + sql: ` 108 + SELECT DISTINCT uri 109 + FROM active_labels 110 + WHERE uri > ? 111 + ORDER BY uri 112 + LIMIT 100 113 + `, 114 + args: [cursor], 115 + }); 116 + 117 + if (!result.rows.length) { 118 + return; 119 + } 120 + 121 + for (const row of result.rows) { 122 + const v = row.uri as string; 123 + yield v; 124 + cursor = v; 125 + } 126 + } 92 127 } 93 128 }
+29
labeler.ts
··· 217 217 } 218 218 } 219 219 220 + private async existingLabelsProcessing() { 221 + console.log("Reprocessing currently active labels"); 222 + let promises: Promise<void>[] = []; 223 + let totalProcessed = 0; 224 + for await (const uri of this.activeLabelsStorage.getActiveLabelSubjects()) { 225 + const p = new Promise<void>((resolve) => { 226 + this.scheduleTask(async () => { 227 + const did = uri as `did:${"plc" | "web"}:${string}`; 228 + const pds = await this.crawler.identifyPDSofDID(did, true); 229 + await this.processSingleDID(pds, did); 230 + totalProcessed++; 231 + resolve(); 232 + }); 233 + }); 234 + // avoid having too many in-flight tasks at once 235 + promises.push(p); 236 + if (promises.length >= 100) { 237 + await Promise.all(promises); 238 + promises = []; 239 + console.log(`Active label processing: processed ${totalProcessed} DIDs so far`); 240 + } 241 + } 242 + await Promise.all(promises); 243 + console.log(`Active label processing: processed a total of ${totalProcessed} DIDs`); 244 + } 245 + 220 246 private async updateLabelerLabels(forceUpdate: boolean) { 221 247 const topPDSs = await Array.fromAsync(this.knownPDSStorage.getBiggestKnownPDSs(this.maxPDSDedicatedLabels)); 222 248 const updated = await this.labelDefiner.updateDedicatedPDSLabels(topPDSs, forceUpdate); ··· 243 269 console.log("Labeler server started on port", this.listenerOptions.port); 244 270 245 271 wrapAsyncInCatch(this.activityBasedProcessing()); 272 + 273 + // should be ok to only do this once when the server starts 274 + wrapAsyncInCatch(this.existingLabelsProcessing()); 246 275 247 276 setInterval(() => { 248 277 this.scheduleTask(this.longevityBasedProcessing.bind(this));