[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import PQueue from "p-queue";
2import { Database } from "./db/index.ts";
3import { Logger } from "@logtape/logtape";
4
5// A simple queue for in-process, out-of-band/backgrounded work
6
7export class BackgroundQueue {
8 queue = new PQueue();
9 destroyed = false;
10 private processAllInterval: number | null = null;
11 private isProcessingAll = false;
12
13 constructor(public db: Database, public logger: Logger) {}
14
15 add(task: Task) {
16 if (this.destroyed) {
17 return;
18 }
19 this.queue
20 .add(() => task(this.db))
21 .catch((err: Error) => {
22 // Check if this is a MongoDB connection error during shutdown
23 if (
24 err.message?.includes("Client must be connected") && this.destroyed
25 ) {
26 this.logger.debug(
27 "Ignoring MongoDB connection error during shutdown",
28 { err: err.message },
29 );
30 return;
31 }
32
33 // Check for MongoDB duplicate key errors
34 const mongoError = err as { code?: number };
35 if (mongoError.code === 11000) {
36 this.logger.warn(
37 "Ignoring duplicate key error in background task",
38 { err: err.message },
39 );
40 return;
41 }
42
43 this.logger.error("background queue task failed", { err });
44 });
45 }
46
47 async processAll() {
48 this.isProcessingAll = true;
49
50 // Start logging queue progress every 10 seconds
51 this.processAllInterval = setInterval(() => {
52 if (!this.destroyed && this.isProcessingAll) {
53 const pending = this.queue.size;
54 const running = this.queue.pending;
55
56 // Stop logging if queue is empty
57 if (pending === 0 && running === 0) {
58 this.stopProcessAllLogging();
59 }
60 }
61 }, 10000); // Log every 10 seconds
62
63 let timeoutId: number | undefined;
64 try {
65 // Add timeout protection to prevent hanging
66 const processPromise = this.queue.onIdle();
67 const timeoutPromise = new Promise((_, reject) => {
68 timeoutId = setTimeout(
69 () => reject(new Error("Background queue processing timeout")),
70 30000,
71 );
72 });
73
74 await Promise.race([processPromise, timeoutPromise]);
75 } catch (error) {
76 this.logger.error(
77 "Background queue processing failed or timed out",
78 { error },
79 );
80 throw error;
81 } finally {
82 this.stopProcessAllLogging();
83 if (timeoutId !== undefined) {
84 clearTimeout(timeoutId);
85 timeoutId = undefined;
86 }
87 }
88 }
89
90 private stopProcessAllLogging() {
91 if (this.processAllInterval) {
92 clearInterval(this.processAllInterval);
93 this.processAllInterval = null;
94 }
95 this.isProcessingAll = false;
96 }
97
98 // On destroy we stop accepting new tasks, but complete all pending/in-progress tasks.
99 // The application calls this only once http connections have drained (tasks no longer being added).
100 async destroy() {
101 this.destroyed = true;
102 this.stopProcessAllLogging();
103 await this.queue.onIdle();
104 }
105}
106
107type Task = (db: Database) => Promise<void>;