🪻 distributed transcription service thistle.dunkirk.sh
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add graceful shutdown handlers

Implements graceful shutdown on SIGTERM/SIGINT that:
- Stops accepting new requests via server.stop()
- Closes all active SSE streams (sync will reconnect on restart)
- Stops transcription service streams to Murmur
- Clears cleanup intervals (sessions, sync, files)
- Closes database connections cleanly

Prevents database corruption and ensures clean shutdown.

💘 Generated with Crush

Assisted-by: Claude Sonnet 4.5 via Crush <crush@charm.land>

+81 -3
+63 -3
src/index.ts
··· 165 165 ); 166 166 167 167 // Clean up expired sessions every hour 168 - setInterval(cleanupExpiredSessions, 60 * 60 * 1000); 168 + const sessionCleanupInterval = setInterval( 169 + cleanupExpiredSessions, 170 + 60 * 60 * 1000, 171 + ); 169 172 170 173 // Helper function to sync user subscriptions from Polar 171 174 async function syncUserSubscriptionsFromPolar( ··· 277 280 } 278 281 279 282 // Periodic sync every 5 minutes as backup (SSE handles real-time updates) 280 - setInterval( 283 + const syncInterval = setInterval( 281 284 async () => { 282 285 try { 283 286 await whisperService.syncWithWhisper(); ··· 292 295 ); 293 296 294 297 // Clean up stale files daily 295 - setInterval(() => whisperService.cleanupStaleFiles(), 24 * 60 * 60 * 1000); 298 + const fileCleanupInterval = setInterval( 299 + () => whisperService.cleanupStaleFiles(), 300 + 24 * 60 * 60 * 1000, 301 + ); 296 302 297 303 const server = Bun.serve({ 298 304 port: process.env.PORT ? Number.parseInt(process.env.PORT, 10) : 3000, ··· 1619 1625 // Event-driven SSE stream with reconnection support 1620 1626 const stream = new ReadableStream({ 1621 1627 async start(controller) { 1628 + // Track this stream for graceful shutdown 1629 + activeSSEStreams.add(controller); 1630 + 1622 1631 const encoder = new TextEncoder(); 1623 1632 let isClosed = false; 1624 1633 let lastEventId = Math.floor(Date.now() / 1000); ··· 1669 1678 current?.status === "failed" 1670 1679 ) { 1671 1680 isClosed = true; 1681 + activeSSEStreams.delete(controller); 1672 1682 controller.close(); 1673 1683 return; 1674 1684 } ··· 1699 1709 isClosed = true; 1700 1710 clearInterval(heartbeatInterval); 1701 1711 transcriptionEvents.off(transcriptionId, updateHandler); 1712 + activeSSEStreams.delete(controller); 1702 1713 controller.close(); 1703 1714 } 1704 1715 }; ··· 1708 1719 isClosed = true; 1709 1720 clearInterval(heartbeatInterval); 1710 1721 transcriptionEvents.off(transcriptionId, updateHandler); 1722 + activeSSEStreams.delete(controller); 1711 1723 }; 1712 1724 }, 1713 1725 }); ··· 3095 3107 }, 3096 3108 }); 3097 3109 console.log(`🪻 Thistle running at http://localhost:${server.port}`); 3110 + 3111 + // Track active SSE streams for graceful shutdown 3112 + const activeSSEStreams = new Set<ReadableStreamDefaultController>(); 3113 + 3114 + // Graceful shutdown handler 3115 + let isShuttingDown = false; 3116 + 3117 + async function shutdown(signal: string) { 3118 + if (isShuttingDown) return; 3119 + isShuttingDown = true; 3120 + 3121 + console.log(`\n${signal} received, starting graceful shutdown...`); 3122 + 3123 + // 1. Stop accepting new requests 3124 + console.log("[Shutdown] Closing server..."); 3125 + server.stop(); 3126 + 3127 + // 2. Close all active SSE streams (safe to kill - sync will handle reconnection) 3128 + console.log(`[Shutdown] Closing ${activeSSEStreams.size} active SSE streams...`); 3129 + for (const controller of activeSSEStreams) { 3130 + try { 3131 + controller.close(); 3132 + } catch { 3133 + // Already closed 3134 + } 3135 + } 3136 + activeSSEStreams.clear(); 3137 + 3138 + // 3. Stop transcription service (closes streams to Murmur) 3139 + whisperService.stop(); 3140 + 3141 + // 4. Stop cleanup intervals 3142 + console.log("[Shutdown] Stopping cleanup intervals..."); 3143 + clearInterval(sessionCleanupInterval); 3144 + clearInterval(syncInterval); 3145 + clearInterval(fileCleanupInterval); 3146 + 3147 + // 5. Close database connections 3148 + console.log("[Shutdown] Closing database..."); 3149 + db.close(); 3150 + 3151 + console.log("[Shutdown] Complete"); 3152 + process.exit(0); 3153 + } 3154 + 3155 + // Register shutdown handlers 3156 + process.on("SIGTERM", () => shutdown("SIGTERM")); 3157 + process.on("SIGINT", () => shutdown("SIGINT"));
+18
src/lib/transcription.ts
··· 669 669 console.error("[Cleanup] Failed:", error); 670 670 } 671 671 } 672 + 673 + stop(): void { 674 + console.log("[Transcription] Closing active streams..."); 675 + // Close all active SSE streams to Murmur 676 + for (const [transcriptionId, stream] of this.activeStreams.entries()) { 677 + try { 678 + stream.close(); 679 + this.streamLocks.delete(transcriptionId); 680 + } catch (error) { 681 + console.error( 682 + `[Transcription] Error closing stream ${transcriptionId}:`, 683 + error, 684 + ); 685 + } 686 + } 687 + this.activeStreams.clear(); 688 + console.log("[Transcription] All streams closed"); 689 + } 672 690 }