[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 17a4453b336a3693a4e2b55eced2d8a4a0cf1bd5 107 lines 3.1 kB view raw
1import PQueue from "p-queue"; 2import { Database } from "./db/index.ts"; 3import { Logger } from "@logtape/logtape"; 4 5// A simple queue for in-process, out-of-band/backgrounded work 6 7export class BackgroundQueue { 8 queue = new PQueue(); 9 destroyed = false; 10 private processAllInterval: number | null = null; 11 private isProcessingAll = false; 12 13 constructor(public db: Database, public logger: Logger) {} 14 15 add(task: Task) { 16 if (this.destroyed) { 17 return; 18 } 19 this.queue 20 .add(() => task(this.db)) 21 .catch((err: Error) => { 22 // Check if this is a MongoDB connection error during shutdown 23 if ( 24 err.message?.includes("Client must be connected") && this.destroyed 25 ) { 26 this.logger.debug( 27 "Ignoring MongoDB connection error during shutdown", 28 { err: err.message }, 29 ); 30 return; 31 } 32 33 // Check for MongoDB duplicate key errors 34 const mongoError = err as { code?: number }; 35 if (mongoError.code === 11000) { 36 this.logger.warn( 37 "Ignoring duplicate key error in background task", 38 { err: err.message }, 39 ); 40 return; 41 } 42 43 this.logger.error("background queue task failed", { err }); 44 }); 45 } 46 47 async processAll() { 48 this.isProcessingAll = true; 49 50 // Start logging queue progress every 10 seconds 51 this.processAllInterval = setInterval(() => { 52 if (!this.destroyed && this.isProcessingAll) { 53 const pending = this.queue.size; 54 const running = this.queue.pending; 55 56 // Stop logging if queue is empty 57 if (pending === 0 && running === 0) { 58 this.stopProcessAllLogging(); 59 } 60 } 61 }, 10000); // Log every 10 seconds 62 63 let timeoutId: number | undefined; 64 try { 65 // Add timeout protection to prevent hanging 66 const processPromise = this.queue.onIdle(); 67 const timeoutPromise = new Promise((_, reject) => { 68 timeoutId = setTimeout( 69 () => reject(new Error("Background queue processing timeout")), 70 30000, 71 ); 72 }); 73 74 await Promise.race([processPromise, timeoutPromise]); 75 } catch (error) { 76 this.logger.error( 77 "Background queue processing failed or timed out", 78 { error }, 79 ); 80 throw error; 81 } finally { 82 this.stopProcessAllLogging(); 83 if (timeoutId !== undefined) { 84 clearTimeout(timeoutId); 85 timeoutId = undefined; 86 } 87 } 88 } 89 90 private stopProcessAllLogging() { 91 if (this.processAllInterval) { 92 clearInterval(this.processAllInterval); 93 this.processAllInterval = null; 94 } 95 this.isProcessingAll = false; 96 } 97 98 // On destroy we stop accepting new tasks, but complete all pending/in-progress tasks. 99 // The application calls this only once http connections have drained (tasks no longer being added). 100 async destroy() { 101 this.destroyed = true; 102 this.stopProcessAllLogging(); 103 await this.queue.onIdle(); 104 } 105} 106 107type Task = (db: Database) => Promise<void>;