PDS Admin tool make it easier to moderate your PDS with labels
43
fork

Configure Feed

Select the types of activity you want to include in your feed.

cursor resume and some funky stuff I'm not sure if I like for closing websockets

+230 -26
+5
drizzle/0002_greedy_dormammu.sql
··· 1 + DROP INDEX "labeler_cursors_labeler_id_unique";--> statement-breakpoint 2 + DROP INDEX "watched_repos_did_unique";--> statement-breakpoint 3 + ALTER TABLE `labeler_cursors` ALTER COLUMN "cursor" TO "cursor" integer NOT NULL;--> statement-breakpoint 4 + CREATE UNIQUE INDEX `labeler_cursors_labeler_id_unique` ON `labeler_cursors` (`labeler_id`);--> statement-breakpoint 5 + CREATE UNIQUE INDEX `watched_repos_did_unique` ON `watched_repos` (`did`);
+155
drizzle/meta/0002_snapshot.json
··· 1 + { 2 + "version": "6", 3 + "dialect": "sqlite", 4 + "id": "d53a3c1c-688b-4aed-a866-be1a3366eda4", 5 + "prevId": "69fbc25d-9d45-43d4-973b-b6bcd6148bb3", 6 + "tables": { 7 + "labeler_cursors": { 8 + "name": "labeler_cursors", 9 + "columns": { 10 + "labeler_id": { 11 + "name": "labeler_id", 12 + "type": "text", 13 + "primaryKey": false, 14 + "notNull": false, 15 + "autoincrement": false 16 + }, 17 + "cursor": { 18 + "name": "cursor", 19 + "type": "integer", 20 + "primaryKey": false, 21 + "notNull": true, 22 + "autoincrement": false 23 + } 24 + }, 25 + "indexes": { 26 + "labeler_cursors_labeler_id_unique": { 27 + "name": "labeler_cursors_labeler_id_unique", 28 + "columns": [ 29 + "labeler_id" 30 + ], 31 + "isUnique": true 32 + } 33 + }, 34 + "foreignKeys": {}, 35 + "compositePrimaryKeys": {}, 36 + "uniqueConstraints": {}, 37 + "checkConstraints": {} 38 + }, 39 + "labels_applied": { 40 + "name": "labels_applied", 41 + "columns": { 42 + "id": { 43 + "name": "id", 44 + "type": "integer", 45 + "primaryKey": true, 46 + "notNull": true, 47 + "autoincrement": true 48 + }, 49 + "did": { 50 + "name": "did", 51 + "type": "text", 52 + "primaryKey": false, 53 + "notNull": true, 54 + "autoincrement": false 55 + }, 56 + "label": { 57 + "name": "label", 58 + "type": "text", 59 + "primaryKey": false, 60 + "notNull": true, 61 + "autoincrement": false 62 + }, 63 + "action": { 64 + "name": "action", 65 + "type": "text", 66 + "primaryKey": false, 67 + "notNull": true, 68 + "autoincrement": false 69 + }, 70 + "date_applied": { 71 + "name": "date_applied", 72 + "type": "integer", 73 + "primaryKey": false, 74 + "notNull": true, 75 + "autoincrement": false 76 + } 77 + }, 78 + "indexes": {}, 79 + "foreignKeys": { 80 + "labels_applied_did_watched_repos_did_fk": { 81 + "name": "labels_applied_did_watched_repos_did_fk", 82 + "tableFrom": "labels_applied", 83 + "tableTo": "watched_repos", 84 + "columnsFrom": [ 85 + "did" 86 + ], 87 + "columnsTo": [ 88 + "did" 89 + ], 90 + "onDelete": "no action", 91 + "onUpdate": "no action" 92 + } 93 + }, 94 + "compositePrimaryKeys": {}, 95 + "uniqueConstraints": {}, 96 + "checkConstraints": {} 97 + }, 98 + "watched_repos": { 99 + "name": "watched_repos", 100 + "columns": { 101 + "did": { 102 + "name": "did", 103 + "type": "text", 104 + "primaryKey": true, 105 + "notNull": true, 106 + "autoincrement": false 107 + }, 108 + "pds_host": { 109 + "name": "pds_host", 110 + "type": "text", 111 + "primaryKey": false, 112 + "notNull": true, 113 + "autoincrement": false 114 + }, 115 + "active": { 116 + "name": "active", 117 + "type": "integer", 118 + "primaryKey": false, 119 + "notNull": true, 120 + "autoincrement": false 121 + }, 122 + "date_first_seen": { 123 + "name": "date_first_seen", 124 + "type": "integer", 125 + "primaryKey": false, 126 + "notNull": true, 127 + "autoincrement": false 128 + } 129 + }, 130 + "indexes": { 131 + "watched_repos_did_unique": { 132 + "name": "watched_repos_did_unique", 133 + "columns": [ 134 + "did" 135 + ], 136 + "isUnique": true 137 + } 138 + }, 139 + "foreignKeys": {}, 140 + "compositePrimaryKeys": {}, 141 + "uniqueConstraints": {}, 142 + "checkConstraints": {} 143 + } 144 + }, 145 + "views": {}, 146 + "enums": {}, 147 + "_meta": { 148 + "schemas": {}, 149 + "tables": {}, 150 + "columns": {} 151 + }, 152 + "internal": { 153 + "indexes": {} 154 + } 155 + }
+7
drizzle/meta/_journal.json
··· 15 15 "when": 1771593593688, 16 16 "tag": "0001_faithful_shard", 17 17 "breakpoints": true 18 + }, 19 + { 20 + "idx": 2, 21 + "version": "6", 22 + "when": 1771611742268, 23 + "tag": "0002_greedy_dormammu", 24 + "breakpoints": true 18 25 } 19 26 ] 20 27 }
+2 -2
src/db/schema.ts
··· 1 - import { sqliteTable, text, integer } from "drizzle-orm/sqlite-core"; 1 + import { sqliteTable, text, integer, int } from "drizzle-orm/sqlite-core"; 2 2 import { relations } from "drizzle-orm"; 3 3 4 4 export const watchedRepos = sqliteTable("watched_repos", { ··· 20 20 21 21 export const labelerCursor = sqliteTable("labeler_cursors", { 22 22 labelerId: text("labeler_id").unique(), 23 - cursor: text("cursor").notNull(), 23 + cursor: integer("cursor").notNull(), 24 24 }); 25 25 26 26 export const watchedReposRelations = relations(watchedRepos, ({ many }) => ({
+47 -15
src/handlers/lablerSubscriber.ts
··· 4 4 import type PQueue from "p-queue"; 5 5 import { handleNewLabel } from "./handleNewLabel.js"; 6 6 import { logger } from "../logger.js"; 7 + import type { LibSQLDatabase } from "drizzle-orm/libsql"; 8 + import { labelerCursor } from "../db/schema.js"; 9 + import { eq } from "drizzle-orm"; 10 + import * as schema from "../db/schema.js"; 7 11 8 - export const labelerSubscriber = async ( 12 + export const labelerSubscriber = ( 9 13 config: LabelerConfig, 14 + lastCursor: number | undefined, 15 + db: LibSQLDatabase<typeof schema>, 10 16 queue: PQueue, 11 - ) => { 17 + ): (() => void) => { 18 + let cursor = lastCursor; 19 + if (cursor) { 20 + logger.info({ host: config.host }, `Starting from cursor: ${cursor}`); 21 + } 22 + 12 23 const subscription = new FirehoseSubscription({ 13 24 service: `wss://${config.host}`, 14 25 nsid: ComAtprotoLabelSubscribeLabels.mainSchema, 26 + params: () => ({ cursor: cursor }), 15 27 }); 16 28 17 - logger.info({ host: config.host }, "Listening"); 18 - for await (const message of subscription) { 19 - switch (message.$type) { 20 - case "com.atproto.label.subscribeLabels#info": { 21 - logger.info({ message }, "commit"); 22 - break; 23 - } 24 - case "com.atproto.label.subscribeLabels#labels": { 25 - for (const label of message.labels) { 26 - queue.add(async () => { 27 - await handleNewLabel(config, label); 29 + const iterator = subscription[Symbol.asyncIterator](); 30 + 31 + const run = async () => { 32 + logger.info({ host: config.host }, "Listening"); 33 + for await (const message of iterator) { 34 + if ("seq" in message) { 35 + cursor = message.seq; 36 + await db 37 + .insert(labelerCursor) 38 + .values({ labelerId: config.host, cursor: message.seq }) 39 + .onConflictDoUpdate({ 40 + target: [labelerCursor.labelerId], 41 + set: { cursor: message.seq }, 28 42 }); 43 + } 44 + switch (message.$type) { 45 + case "com.atproto.label.subscribeLabels#info": { 46 + logger.info({ message }, "info event"); 47 + break; 29 48 } 30 - break; 49 + case "com.atproto.label.subscribeLabels#labels": { 50 + for (const label of message.labels) { 51 + queue.add(async () => { 52 + await handleNewLabel(config, label); 53 + }); 54 + } 55 + break; 56 + } 31 57 } 32 58 } 33 - } 59 + }; 60 + 61 + run().catch((err) => logger.error({ err }, "Subscriber error")); 62 + 63 + return () => { 64 + iterator.return?.(); 65 + }; 34 66 };
+14 -9
src/index.ts
··· 6 6 import { labelerSubscriber } from "./handlers/lablerSubscriber.js"; 7 7 import type { Settings } from "./types/settings.js"; 8 8 import { logger } from "./logger.js"; 9 - 9 + import { labelerCursor } from "./db/schema.js"; 10 + import { eq } from "drizzle-orm"; 10 11 const queue = new PQueue({ concurrency: 2 }); 11 12 12 13 // TODO ··· 26 27 27 28 const labelers = settings.labeler; 28 29 30 + const lastCursors = await db.select().from(labelerCursor); 31 + 32 + const subscribers = Object.entries(labelers).map(([_, config]) => { 33 + let lastCursorRow = lastCursors.find( 34 + (cursor) => cursor.labelerId === config.host, 35 + ); 36 + let lastCursor = lastCursorRow?.cursor ?? undefined; 37 + return labelerSubscriber(config, lastCursor, db, queue); 38 + }); 39 + 29 40 // --- Graceful shutdown --- 30 41 async function shutdown(signal: string) { 31 42 logger.info(`Received ${signal}, shutting down...`); 32 43 33 - // TODO maybe should make sure the websockets close here? 44 + logger.info("Closing subscriptions..."); 45 + subscribers.forEach((close) => close()); 34 46 35 - // Drain all queues in parallel 36 47 logger.info("Draining the queue..."); 37 48 await queue.onIdle(); 38 49 ··· 46 57 process.on("unhandledRejection", (reason) => { 47 58 logger.error({ reason }, "Unhandled rejection"); 48 59 }); 49 - 50 - Promise.all( 51 - Object.entries(labelers).map(([_, config]) => 52 - labelerSubscriber(config, queue), 53 - ), 54 - );