[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import PQueue from "p-queue";
2import { Database } from "./db/index.ts";
3
4// A simple queue for in-process, out-of-band/backgrounded work
5
6export class BackgroundQueue {
7 queue = new PQueue();
8 destroyed = false;
9 private processAllInterval: number | null = null;
10 private isProcessingAll = false;
11
12 constructor(public db: Database) {}
13
14 add(task: Task) {
15 if (this.destroyed) {
16 return;
17 }
18 this.queue
19 .add(() => task(this.db))
20 .catch((err: Error) => {
21 // Check if this is a MongoDB connection error during shutdown
22 if (
23 err.message?.includes("Client must be connected") && this.destroyed
24 ) {
25 console.debug(
26 "Ignoring MongoDB connection error during shutdown",
27 { err: err.message },
28 );
29 return;
30 }
31
32 // Check for MongoDB duplicate key errors
33 const mongoError = err as { code?: number };
34 if (mongoError.code === 11000) {
35 console.warn(
36 "Ignoring duplicate key error in background task",
37 { err: err.message },
38 );
39 return;
40 }
41
42 console.error("background queue task failed", { err });
43 });
44 }
45
46 async processAll() {
47 this.isProcessingAll = true;
48
49 // Start logging queue progress every 10 seconds
50 this.processAllInterval = setInterval(() => {
51 if (!this.destroyed && this.isProcessingAll) {
52 const pending = this.queue.size;
53 const running = this.queue.pending;
54
55 // Stop logging if queue is empty
56 if (pending === 0 && running === 0) {
57 this.stopProcessAllLogging();
58 }
59 }
60 }, 10000); // Log every 10 seconds
61
62 let timeoutId: number | undefined;
63 try {
64 // Add timeout protection to prevent hanging
65 const processPromise = this.queue.onIdle();
66 const timeoutPromise = new Promise((_, reject) => {
67 timeoutId = setTimeout(
68 () => reject(new Error("Background queue processing timeout")),
69 30000,
70 );
71 });
72
73 await Promise.race([processPromise, timeoutPromise]);
74 } catch (error) {
75 console.error(
76 "Background queue processing failed or timed out",
77 { error },
78 );
79 throw error;
80 } finally {
81 this.stopProcessAllLogging();
82 if (timeoutId !== undefined) {
83 clearTimeout(timeoutId);
84 timeoutId = undefined;
85 }
86 }
87 }
88
89 private stopProcessAllLogging() {
90 if (this.processAllInterval) {
91 clearInterval(this.processAllInterval);
92 this.processAllInterval = null;
93 }
94 this.isProcessingAll = false;
95 }
96
97 // On destroy we stop accepting new tasks, but complete all pending/in-progress tasks.
98 // The application calls this only once http connections have drained (tasks no longer being added).
99 async destroy() {
100 this.destroyed = true;
101 this.stopProcessAllLogging();
102 await this.queue.onIdle();
103 }
104}
105
106type Task = (db: Database) => Promise<void>;