[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 3b73895e29748ca524bbe040b656ddb4e167104b 106 lines 3.0 kB view raw
1import PQueue from "p-queue"; 2import { Database } from "./db/index.ts"; 3 4// A simple queue for in-process, out-of-band/backgrounded work 5 6export class BackgroundQueue { 7 queue = new PQueue(); 8 destroyed = false; 9 private processAllInterval: number | null = null; 10 private isProcessingAll = false; 11 12 constructor(public db: Database) {} 13 14 add(task: Task) { 15 if (this.destroyed) { 16 return; 17 } 18 this.queue 19 .add(() => task(this.db)) 20 .catch((err: Error) => { 21 // Check if this is a MongoDB connection error during shutdown 22 if ( 23 err.message?.includes("Client must be connected") && this.destroyed 24 ) { 25 console.debug( 26 "Ignoring MongoDB connection error during shutdown", 27 { err: err.message }, 28 ); 29 return; 30 } 31 32 // Check for MongoDB duplicate key errors 33 const mongoError = err as { code?: number }; 34 if (mongoError.code === 11000) { 35 console.warn( 36 "Ignoring duplicate key error in background task", 37 { err: err.message }, 38 ); 39 return; 40 } 41 42 console.error("background queue task failed", { err }); 43 }); 44 } 45 46 async processAll() { 47 this.isProcessingAll = true; 48 49 // Start logging queue progress every 10 seconds 50 this.processAllInterval = setInterval(() => { 51 if (!this.destroyed && this.isProcessingAll) { 52 const pending = this.queue.size; 53 const running = this.queue.pending; 54 55 // Stop logging if queue is empty 56 if (pending === 0 && running === 0) { 57 this.stopProcessAllLogging(); 58 } 59 } 60 }, 10000); // Log every 10 seconds 61 62 let timeoutId: number | undefined; 63 try { 64 // Add timeout protection to prevent hanging 65 const processPromise = this.queue.onIdle(); 66 const timeoutPromise = new Promise((_, reject) => { 67 timeoutId = setTimeout( 68 () => reject(new Error("Background queue processing timeout")), 69 30000, 70 ); 71 }); 72 73 await Promise.race([processPromise, timeoutPromise]); 74 } catch (error) { 75 console.error( 76 "Background queue processing failed or timed out", 77 { error }, 78 ); 79 throw error; 80 } finally { 81 this.stopProcessAllLogging(); 82 if (timeoutId !== undefined) { 83 clearTimeout(timeoutId); 84 timeoutId = undefined; 85 } 86 } 87 } 88 89 private stopProcessAllLogging() { 90 if (this.processAllInterval) { 91 clearInterval(this.processAllInterval); 92 this.processAllInterval = null; 93 } 94 this.isProcessingAll = false; 95 } 96 97 // On destroy we stop accepting new tasks, but complete all pending/in-progress tasks. 98 // The application calls this only once http connections have drained (tasks no longer being added). 99 async destroy() { 100 this.destroyed = true; 101 this.stopProcessAllLogging(); 102 await this.queue.onIdle(); 103 } 104} 105 106type Task = (db: Database) => Promise<void>;