this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Harden shutdown

alice 3ab65bde ff54152d

+94 -20
+50 -12
packages/backend/src/index.ts
··· 1 1 import { EMIT_INTERVAL, LOG_INTERVAL, METRICS_PORT, PORT } from './config.js'; 2 - import { getEmojiStats, getTopLanguages, logEmojiStats } from './lib/emojiStats.js'; 2 + import { getEmojiStats, getTopLanguages, logEmojiStats, initiateShutdown } from './lib/emojiStats.js'; 3 3 import { flushPostgresBatch } from './lib/emojiStats.js'; 4 4 import { initializeJetstream, jetstream } from './lib/jetstream.js'; 5 5 import logger from './lib/logger.js'; 6 6 import { startMetricsServer } from './lib/metrics.js'; 7 + import { pool } from './lib/postgres.js'; 7 8 import { loadRedisScripts, redis } from './lib/redis.js'; 8 9 import { io, startSocketServer } from './lib/socket.io.js'; 9 10 ··· 50 51 }, LOG_INTERVAL); 51 52 /* End logging stats */ 52 53 54 + let isShuttingDown = false; 55 + 53 56 async function shutdown() { 57 + if (isShuttingDown) { 58 + logger.info('Shutdown called but one is already in progress.'); 59 + return; 60 + } 61 + 62 + isShuttingDown = true; 63 + 54 64 logger.info('Shutting down gracefully...'); 55 65 try { 66 + jetstream.close(); 67 + } catch (error) { 68 + logger.error(`Error closing Jetstream: ${(error as Error).message}`); 69 + } 70 + 71 + try { 72 + await initiateShutdown(); 73 + } catch (error) { 74 + logger.error(`Error initiating shutdown: ${(error as Error).message}`); 75 + } 76 + 77 + try { 56 78 await flushPostgresBatch(); 57 79 logger.info('Flushed remaining PostgreSQL batch.'); 58 80 } catch (error) { 59 81 logger.error(`Error flushing PostgreSQL batch during shutdown: ${(error as Error).message}`); 60 82 } 61 - void io.close(); 62 - jetstream.close(); 63 - metricsServer.close(); 64 - redis 65 - .quit() 66 - .catch((error: unknown) => { 67 - logger.error('Error disconnecting Redis client:', error); 68 - }) 69 - .finally(() => { 70 - process.exit(0); 71 - }); 83 + 84 + try { 85 + await io.close(); 86 + } catch (error) { 87 + logger.error(`Error closing Socket.io server: ${(error as Error).message}`); 88 + } 89 + 90 + try { 91 + metricsServer.close(); 92 + } catch (error) { 93 + logger.error(`Error closing Metrics server: ${(error as Error).message}`); 94 + } 95 + 96 + try { 97 + await redis.quit(); 98 + } catch (error) { 99 + logger.error(`Error disconnecting Redis client: ${(error as Error).message}`); 100 + } 101 + 102 + try { 103 + await pool.end(); 104 + logger.info('PostgreSQL pool disconnected.'); 105 + } catch (error) { 106 + logger.error(`Error disconnecting PostgreSQL pool: ${(error as Error).message}`); 107 + } 108 + 109 + process.exit(0); 72 110 } 73 111 74 112 process.on('SIGINT', () => {
+30 -3
packages/backend/src/lib/emojiStats.ts
··· 50 50 let isBatching = false; 51 51 let batchTimer: NodeJS.Timeout | null = null; 52 52 53 + let isShuttingDown = false; 54 + let ongoingHandleCreates = 0; 55 + let shutdownPromise: Promise<void> | null = null; 56 + 57 + function createShutdownPromise(): Promise<void> { 58 + return new Promise<void>((resolve) => { 59 + const checkCompletion = setInterval(() => { 60 + logger.info(`Shutting down, ongoing handleCreates: ${ongoingHandleCreates}`); 61 + if (isShuttingDown && ongoingHandleCreates === 0) { 62 + logger.info('All ongoing handleCreate operations have finished.'); 63 + clearInterval(checkCompletion); 64 + resolve(); 65 + } 66 + }, 50); 67 + }); 68 + } 69 + 70 + export function initiateShutdown(): Promise<void> { 71 + if (!shutdownPromise) { 72 + isShuttingDown = true; 73 + shutdownPromise = createShutdownPromise(); 74 + } 75 + return shutdownPromise; 76 + } 77 + 53 78 /** 54 79 * Flush the current batch to the PostgreSQL database. 55 80 */ ··· 134 159 } 135 160 136 161 export async function handleCreate(event: CommitCreateEvent<'app.bsky.feed.post'>) { 162 + ongoingHandleCreates++; 137 163 concurrentHandleCreates.inc(); 138 164 try { 139 165 const timer = postProcessingDuration.startTimer(); ··· 191 217 incrementTotalPosts(); 192 218 concurrentRedisInserts.dec(); 193 219 } catch (error) { 194 - logger.error(`Error processing "create" commit: ${(error as Error).message}`); 195 - logger.error(`Commit data: ${JSON.stringify(commit, null, 2)}`); 196 - logger.error(`Record data: ${JSON.stringify(record, null, 2)}`); 220 + console.error('Error processing "create" commit:', error); 221 + console.dir(commit, { depth: null, colors: true }); 222 + console.dir(record, { depth: null, colors: true }); 197 223 } finally { 198 224 timer(); 199 225 } 200 226 } finally { 201 227 concurrentHandleCreates.dec(); 228 + ongoingHandleCreates--; 202 229 } 203 230 } 204 231
+5
packages/backend/src/lib/metrics.ts
··· 116 116 const server = app.listen(port, host, () => { 117 117 logger.info(`Metrics server listening on port ${port}`); 118 118 }); 119 + 120 + server.on('close', () => { 121 + logger.info('Metrics server closed.'); 122 + }); 123 + 119 124 return server; 120 125 };
+3 -3
packages/backend/src/lib/redis.ts
··· 11 11 }); 12 12 13 13 redis.on('connect', () => { 14 - logger.info('Connected to Redis'); 14 + logger.info('Connected to Redis.'); 15 15 }); 16 16 17 17 redis.on('ready', () => { 18 - logger.info('Redis client ready'); 18 + logger.info('Redis client ready.'); 19 19 }); 20 20 21 21 redis.on('end', () => { 22 - logger.info('Redis client disconnected'); 22 + logger.info('Redis client disconnected.'); 23 23 }); 24 24 25 25 let SCRIPT_SHA: string;
+6 -2
packages/backend/src/lib/socket.io.ts
··· 35 35 // socket.emit('emojiInfo', emojiInfo); 36 36 }); 37 37 38 - socket.on('disconnect', () => { 39 - logger.info('A user disconnected'); 38 + socket.on('disconnect', (reason) => { 39 + logger.info(`A user disconnected. Reason: ${reason}`); 40 40 }); 41 + }); 42 + 43 + io.on('close', () => { 44 + logger.info('Socket.io server closed.'); 41 45 }); 42 46 43 47 export const startSocketServer = (port: number) => {