Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

add secondary relay support

+114 -23
+2
apps/firehose-service/.env.example
··· 3 3 4 4 # Firehose 5 5 FIREHOSE_SERVICE=wss://bsky.network 6 + # Optional fallback relay used when the primary fails 3 times in a row 7 + FIREHOSE_SERVICE_SECONDARY= 6 8 FIREHOSE_MAX_CONCURRENCY=5 7 9 8 10 # Redis (cache invalidation + revalidation queue)
+1
apps/firehose-service/src/config.ts
··· 8 8 9 9 // Firehose 10 10 firehoseService: process.env.FIREHOSE_SERVICE || 'wss://bsky.network', 11 + firehoseServiceSecondary: process.env.FIREHOSE_SERVICE_SECONDARY || undefined, 11 12 firehoseMaxConcurrency: parseInt(process.env.FIREHOSE_MAX_CONCURRENCY || '5', 10), 12 13 13 14 // S3 storage (write destination)
+111 -23
apps/firehose-service/src/lib/firehose.ts
··· 26 26 let activeHandlers = 0 27 27 let queuedHandlers = 0 28 28 let consecutiveFailures = 0 29 + // Counts how many times we've switched relays without a successful event since 30 + // the last successful connection. Used to detect when both relays are down. 31 + let swapsWithoutSuccess = 0 32 + // Counts stall watchdog firings without a received event in between. 1 = try a 33 + // same-relay reconnect; 2 = relay is dead, fail over. 34 + let stallReconnects = 0 35 + let activeService: string = config.firehoseService 29 36 const siteQueues = new Map<string, Promise<void>>() 30 37 38 + const STALL_THRESHOLD_MS = 30_000 39 + 31 40 // Track current firehose sequence number for cursor-based resumption 32 41 let currentSeq: number | undefined 42 + 43 + function getAlternateService(current: string): string | undefined { 44 + if (!config.firehoseServiceSecondary) return undefined 45 + return current === config.firehoseService ? config.firehoseServiceSecondary : config.firehoseService 46 + } 33 47 34 48 export function getCurrentSeq(): number | undefined { 35 49 return currentSeq ··· 99 113 try { 100 114 lastEventTime = Date.now() 101 115 if (consecutiveFailures > 0) consecutiveFailures = 0 116 + // Any successful event means the active relay is working — reset the 117 + // cross-relay failover counter so a future failure triggers a fresh swap. 118 + if (swapsWithoutSuccess > 0) swapsWithoutSuccess = 0 119 + if (stallReconnects > 0) stallReconnects = 0 102 120 if ('seq' in evt) currentSeq = evt.seq 103 121 104 122 if (!('event' in evt)) return ··· 161 179 } 162 180 163 181 function handleError(err: Error, onTooManyFailures?: () => void): void { 164 - logger.error('Firehose connection error', err) 182 + logger.error(`Firehose connection error on ${activeService}`, err) 165 183 consecutiveFailures++ 166 - if (consecutiveFailures >= 3) { 184 + if (consecutiveFailures < 3) return 185 + 186 + const alternate = getAlternateService(activeService) 187 + if (alternate && swapsWithoutSuccess < 2) { 188 + logger.warn(`Firehose ${activeService} failed ${consecutiveFailures} times, failing over to ${alternate}`) 189 + consecutiveFailures = 0 190 + swapsWithoutSuccess++ 191 + firehoseHandle?.destroy() 192 + firehoseHandle = null 193 + activeService = alternate 194 + connect(onTooManyFailures) 195 + return 196 + } 197 + 198 + if (alternate) { 199 + logger.error('Both primary and secondary relays failing, triggering offline callback') 200 + } else { 167 201 logger.warn(`Firehose failed ${consecutiveFailures} times, triggering offline callback`) 168 - onTooManyFailures?.() 169 202 } 203 + onTooManyFailures?.() 170 204 } 171 205 172 206 let firehoseHandle: { destroy: () => void } | null = null 207 + let stallWatchdogHandle: ReturnType<typeof setInterval> | null = null 173 208 174 - /** 175 - * Start the firehose worker 176 - */ 177 - export function startFirehose(initialCursor?: number, onTooManyFailures?: () => void): void { 178 - logger.info(`Starting firehose (runtime: ${isBun ? 'Bun' : 'Node.js'})`) 179 - logger.info(`Service: ${config.firehoseService}`) 180 - logger.info(`Max concurrency: ${config.firehoseMaxConcurrency}`) 181 - if (initialCursor !== undefined) { 182 - currentSeq = initialCursor 183 - logger.info(`Resuming from cursor: ${initialCursor}`) 209 + function handleStall(onTooManyFailures?: () => void): void { 210 + const silenceMs = Date.now() - lastEventTime 211 + if (silenceMs < STALL_THRESHOLD_MS) return 212 + 213 + stallReconnects++ 214 + const silenceSec = Math.round(silenceMs / 1000) 215 + 216 + // First stall: try reconnecting to the same relay with the current cursor. 217 + if (stallReconnects === 1) { 218 + logger.warn(`No events for ${silenceSec}s on ${activeService}, reconnecting with cursor`) 219 + firehoseHandle?.destroy() 220 + firehoseHandle = null 221 + // Grace window so the next watchdog tick doesn't fire before reconnect 222 + // has had a chance to receive events. 223 + lastEventTime = Date.now() 224 + connect(onTooManyFailures) 225 + return 184 226 } 185 227 228 + // Second stall: same-relay reconnect didn't help — fail over if possible. 229 + const alternate = getAlternateService(activeService) 230 + if (alternate && swapsWithoutSuccess < 2) { 231 + logger.warn(`${activeService} still silent after reconnect, failing over to ${alternate}`) 232 + swapsWithoutSuccess++ 233 + stallReconnects = 0 234 + consecutiveFailures = 0 235 + activeService = alternate 236 + firehoseHandle?.destroy() 237 + firehoseHandle = null 238 + lastEventTime = Date.now() 239 + connect(onTooManyFailures) 240 + return 241 + } 242 + 243 + logger.error(`Firehose stalled on ${activeService} with no recoverable relay, triggering offline callback`) 244 + onTooManyFailures?.() 245 + } 246 + 247 + function connect(onTooManyFailures?: () => void): void { 248 + logger.info(`Connecting firehose to ${activeService}`) 186 249 if (isBun) { 187 250 // Use BunFirehose for Bun runtime 188 251 const bunFirehose = new BunFirehose({ 189 252 idResolver, 190 - service: config.firehoseService, 253 + service: activeService, 191 254 filterCollections: ['place.wisp.fs', 'place.wisp.settings'], 192 255 handleEvent, 193 256 onError: (err: Error) => handleError(err, onTooManyFailures), ··· 195 258 onConnect: () => { 196 259 isConnected = true 197 260 consecutiveFailures = 0 198 - logger.info('Firehose connected') 261 + swapsWithoutSuccess = 0 262 + logger.info(`Firehose connected to ${activeService}`) 199 263 }, 200 264 onDisconnect: () => { 201 265 isConnected = false ··· 209 273 isConnected = true 210 274 const nodeFirehose = new Firehose({ 211 275 idResolver, 212 - service: config.firehoseService, 276 + service: activeService, 213 277 filterCollections: ['place.wisp.fs', 'place.wisp.settings'], 214 278 handleEvent: handleEvent as any, 215 279 onError: (err: Error) => handleError(err, onTooManyFailures), ··· 217 281 nodeFirehose.start() 218 282 firehoseHandle = { destroy: () => nodeFirehose.destroy() } 219 283 } 284 + } 285 + 286 + /** 287 + * Start the firehose worker 288 + */ 289 + export function startFirehose(initialCursor?: number, onTooManyFailures?: () => void): void { 290 + logger.info(`Starting firehose (runtime: ${isBun ? 'Bun' : 'Node.js'})`) 291 + logger.info(`Primary service: ${config.firehoseService}`) 292 + if (config.firehoseServiceSecondary) { 293 + logger.info(`Secondary service: ${config.firehoseServiceSecondary}`) 294 + } 295 + logger.info(`Max concurrency: ${config.firehoseMaxConcurrency}`) 296 + if (initialCursor !== undefined) { 297 + currentSeq = initialCursor 298 + logger.info(`Resuming from cursor: ${initialCursor}`) 299 + } 300 + 301 + activeService = config.firehoseService 302 + swapsWithoutSuccess = 0 303 + connect(onTooManyFailures) 220 304 221 305 // Log cache info hourly 222 306 setInterval( ··· 226 310 60 * 60 * 1000, 227 311 ) 228 312 229 - // Log status periodically 230 - setInterval(() => { 231 - const health = getFirehoseHealth() 232 - if (health.timeSinceLastEvent > 30000) { 233 - logger.warn(`No events for ${Math.round(health.timeSinceLastEvent / 1000)}s`) 234 - } 235 - }, 30000) 313 + // Stall watchdog: if no events for STALL_THRESHOLD_MS, reconnect (same relay 314 + // first, then fail over on the next stall). 315 + if (stallWatchdogHandle) clearInterval(stallWatchdogHandle) 316 + stallWatchdogHandle = setInterval(() => handleStall(onTooManyFailures), STALL_THRESHOLD_MS) 236 317 } 237 318 238 319 /** ··· 242 323 logger.info('Stopping firehose') 243 324 isConnected = false 244 325 consecutiveFailures = 0 326 + swapsWithoutSuccess = 0 327 + stallReconnects = 0 328 + activeService = config.firehoseService 329 + if (stallWatchdogHandle) { 330 + clearInterval(stallWatchdogHandle) 331 + stallWatchdogHandle = null 332 + } 245 333 firehoseHandle?.destroy() 246 334 firehoseHandle = null 247 335 currentSeq = undefined