Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

replay

+318 -44
+2
apps/firehose-service/src/config.ts
··· 27 27 redisUrl: process.env.REDIS_URL, 28 28 revalidateStream: process.env.WISP_REVALIDATE_STREAM || 'wisp:revalidate', 29 29 revalidateGroup: process.env.WISP_REVALIDATE_GROUP || 'firehose-service', 30 + cacheInvalidationStream: process.env.WISP_CACHE_INVALIDATION_STREAM || 'wisp:cache-invalidate-stream', 31 + cacheInvalidationStreamMaxLen: parseInt(process.env.WISP_CACHE_INVALIDATION_STREAM_MAXLEN || '10000', 10), 30 32 31 33 // Leader election (for distributed HA deployments) 32 34 leaderElection: process.env.LEADER_ELECTION === 'true',
+18 -2
apps/firehose-service/src/lib/cache-invalidation.ts
··· 54 54 if (!redis) return 55 55 56 56 try { 57 - const message = JSON.stringify({ did, rkey, action, token }) 58 - logger.debug(`[CacheInvalidation] Publishing ${action} for ${did}/${rkey} to ${CHANNEL}`) 57 + const streamId = await redis.xadd( 58 + config.cacheInvalidationStream, 59 + 'MAXLEN', 60 + '~', 61 + config.cacheInvalidationStreamMaxLen.toString(), 62 + '*', 63 + 'did', 64 + did, 65 + 'rkey', 66 + rkey, 67 + 'action', 68 + action, 69 + ...(token ? (['token', token] as const) : []), 70 + 'ts', 71 + Date.now().toString(), 72 + ) 73 + const message = JSON.stringify({ did, rkey, action, token, streamId }) 74 + logger.debug(`[CacheInvalidation] Publishing ${action} for ${did}/${rkey} to ${CHANNEL} (stream ${streamId})`) 59 75 await redis.publish(CHANNEL, message) 60 76 } catch (err) { 61 77 logger.error('[CacheInvalidation] Failed to publish', err)
+45
apps/hosting-service/src/lib/cache-invalidation.test.ts
··· 1 1 import { beforeEach, describe, expect, test } from 'bun:test' 2 2 import { 3 3 clearSiteUpdating, 4 + compareStreamIds, 4 5 isSiteUpdating, 5 6 markSiteUpdating, 6 7 parseCacheInvalidationMessage, 8 + parseCacheInvalidationStreamEntry, 7 9 resetUpdatingSitesForTests, 8 10 } from './cache-invalidation' 9 11 ··· 46 48 action: 'update', 47 49 token: 'token-a', 48 50 }) 51 + }) 52 + 53 + test('message parsing preserves stream id', () => { 54 + expect( 55 + parseCacheInvalidationMessage( 56 + JSON.stringify({ did: DID, rkey: RKEY, action: 'update', token: 'token-a', streamId: '1713811200000-2' }), 57 + ), 58 + ).toEqual({ 59 + did: DID, 60 + rkey: RKEY, 61 + action: 'update', 62 + token: 'token-a', 63 + streamId: '1713811200000-2', 64 + }) 65 + }) 66 + 67 + test('stream entry parsing reconstructs invalidation messages', () => { 68 + expect( 69 + parseCacheInvalidationStreamEntry('1713811200000-5', [ 70 + 'did', 71 + DID, 72 + 'rkey', 73 + RKEY, 74 + 'action', 75 + 'updating', 76 + 'token', 77 + 'token-a', 78 + 'ts', 79 + '1713811200000', 80 + ]), 81 + ).toEqual({ 82 + did: DID, 83 + rkey: RKEY, 84 + action: 'updating', 85 + token: 'token-a', 86 + streamId: '1713811200000-5', 87 + }) 88 + }) 89 + 90 + test('stream ids sort by timestamp and sequence', () => { 91 + expect(compareStreamIds('1713811200000-1', '1713811200000-2')).toBeLessThan(0) 92 + expect(compareStreamIds('1713811200001-0', '1713811200000-999')).toBeGreaterThan(0) 93 + expect(compareStreamIds('1713811200001-3', '1713811200001-3')).toBe(0) 49 94 }) 50 95 })
+252 -42
apps/hosting-service/src/lib/cache-invalidation.ts
··· 1 1 /** 2 2 * Cache invalidation subscriber 3 3 * 4 - * Listens to Redis pub/sub for cache invalidation messages from the firehose-service. 4 + * Uses Redis pub/sub for low-latency invalidation and a Redis stream for replay. 5 5 * When a site is updated/deleted, clears the hosting-service's local caches 6 6 * (tiered storage hot+warm tiers, redirect rules) so stale data isn't served. 7 7 * ··· 9 9 * the serving layer can show a "site updating" page instead of stale/partial content. 10 10 */ 11 11 12 + import { existsSync, readFileSync } from 'node:fs' 13 + import { mkdir, rename, writeFile } from 'node:fs/promises' 14 + import { dirname, resolve } from 'node:path' 12 15 import type { StorageTier } from '@wispplace/tiered-storage' 13 16 import Redis from 'ioredis' 14 17 import { cache } from './cache-manager' 15 18 import { hotTier, warmTier } from './storage' 16 19 17 20 const CHANNEL = 'wisp:cache-invalidate' 21 + const STREAM = process.env.WISP_CACHE_INVALIDATION_STREAM || 'wisp:cache-invalidate-stream' 22 + const STREAM_BLOCK_MS = parsePositiveInt(process.env.WISP_CACHE_INVALIDATION_BLOCK_MS, 5000) 23 + const STREAM_BATCH_COUNT = parsePositiveInt(process.env.WISP_CACHE_INVALIDATION_BATCH_COUNT, 100) 24 + const CURSOR_FILE = 25 + process.env.WISP_CACHE_INVALIDATION_CURSOR_FILE || 26 + resolve(process.env.CACHE_DIR || './cache/sites', '..', 'cache-invalidation.lastid') 27 + const STREAM_ID_PATTERN = /^\d+-\d+$/ 18 28 19 29 type CacheInvalidationAction = 'updating' | 'update' | 'delete' | 'settings' 20 30 ··· 23 33 rkey: string 24 34 action: CacheInvalidationAction 25 35 token?: string 36 + streamId?: string 26 37 } 27 38 28 39 // Sites currently being downloaded by the firehose-service. ··· 67 78 } 68 79 69 80 let subscriber: Redis | null = null 81 + let replayClient: Redis | null = null 82 + let stopReplayRequested = false 83 + let replayLoop: Promise<void> | null = null 84 + let processingQueue: Promise<void> = Promise.resolve() 85 + let cursorPersistQueue: Promise<void> = Promise.resolve() 86 + let lastProcessedStreamId = '0-0' 87 + 88 + function parsePositiveInt(value: string | undefined, fallback: number): number { 89 + if (!value) return fallback 90 + const parsed = Number.parseInt(value, 10) 91 + return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback 92 + } 93 + 94 + function normalizeStreamId(streamId: string | undefined): string | undefined { 95 + if (!streamId) return undefined 96 + const trimmed = streamId.trim() 97 + return STREAM_ID_PATTERN.test(trimmed) ? trimmed : undefined 98 + } 99 + 100 + function parseStreamIdParts(streamId: string): [bigint, bigint] { 101 + const normalized = normalizeStreamId(streamId) 102 + if (!normalized) { 103 + throw new Error(`Invalid Redis stream id: ${streamId}`) 104 + } 105 + 106 + const [msRaw, seqRaw] = normalized.split('-') as [string, string] 107 + return [BigInt(msRaw), BigInt(seqRaw)] 108 + } 109 + 110 + export function compareStreamIds(a: string, b: string): number { 111 + const [aMs, aSeq] = parseStreamIdParts(a) 112 + const [bMs, bSeq] = parseStreamIdParts(b) 113 + if (aMs < bMs) return -1 114 + if (aMs > bMs) return 1 115 + if (aSeq < bSeq) return -1 116 + if (aSeq > bSeq) return 1 117 + return 0 118 + } 119 + 120 + function loadCursorFromDisk(): void { 121 + if (!existsSync(CURSOR_FILE)) return 122 + 123 + try { 124 + const stored = normalizeStreamId(readFileSync(CURSOR_FILE, 'utf8')) 125 + if (!stored) { 126 + console.warn(`[CacheInvalidation] Ignoring invalid cursor file contents in ${CURSOR_FILE}`) 127 + return 128 + } 129 + lastProcessedStreamId = stored 130 + console.log(`[CacheInvalidation] Loaded replay cursor ${stored} from ${CURSOR_FILE}`) 131 + } catch (err) { 132 + console.error(`[CacheInvalidation] Failed to load cursor file ${CURSOR_FILE}:`, err) 133 + } 134 + } 135 + 136 + function queueCursorPersist(streamId: string): void { 137 + cursorPersistQueue = cursorPersistQueue 138 + .catch(() => undefined) 139 + .then(async () => { 140 + const dir = dirname(CURSOR_FILE) 141 + const tmp = `${CURSOR_FILE}.tmp` 142 + await mkdir(dir, { recursive: true }) 143 + await writeFile(tmp, `${streamId}\n`, 'utf8') 144 + await rename(tmp, CURSOR_FILE) 145 + }) 146 + .catch((err) => { 147 + console.error(`[CacheInvalidation] Failed to persist cursor ${streamId} to ${CURSOR_FILE}:`, err) 148 + }) 149 + } 150 + 151 + function advanceStreamCursor(streamId: string | undefined): void { 152 + const normalized = normalizeStreamId(streamId) 153 + if (!normalized) return 154 + if (compareStreamIds(normalized, lastProcessedStreamId) <= 0) return 155 + lastProcessedStreamId = normalized 156 + queueCursorPersist(normalized) 157 + } 158 + 159 + function shouldSkipReplayMessage(streamId: string | undefined): boolean { 160 + const normalized = normalizeStreamId(streamId) 161 + if (!normalized) return false 162 + return compareStreamIds(normalized, lastProcessedStreamId) <= 0 163 + } 70 164 71 165 export function parseCacheInvalidationMessage(message: string): CacheInvalidationMessage | null { 72 - const parsed = JSON.parse(message) as Partial<CacheInvalidationMessage> 166 + let parsed: Partial<CacheInvalidationMessage> 167 + try { 168 + parsed = JSON.parse(message) as Partial<CacheInvalidationMessage> 169 + } catch { 170 + return null 171 + } 73 172 74 173 if ( 75 174 typeof parsed.did !== 'string' || ··· 87 186 rkey: parsed.rkey, 88 187 action: parsed.action, 89 188 token: typeof parsed.token === 'string' ? parsed.token : undefined, 189 + streamId: normalizeStreamId(parsed.streamId), 90 190 } 91 191 } 92 192 193 + export function parseCacheInvalidationStreamEntry(streamId: string, fields: string[]): CacheInvalidationMessage | null { 194 + const payload: Record<string, string> = {} 195 + 196 + for (let index = 0; index < fields.length - 1; index += 2) { 197 + payload[fields[index]!] = fields[index + 1]! 198 + } 199 + 200 + const parsed = parseCacheInvalidationMessage( 201 + JSON.stringify({ 202 + did: payload.did, 203 + rkey: payload.rkey, 204 + action: payload.action, 205 + token: payload.token, 206 + streamId, 207 + }), 208 + ) 209 + return parsed 210 + } 211 + 93 212 /** 94 213 * Directly invalidate a tier by listing and deleting all keys with the given prefix. 95 214 * Each tier is invalidated independently so a failure in one doesn't block the others. ··· 110 229 } 111 230 } 112 231 232 + async function applyCacheInvalidation(parsed: CacheInvalidationMessage, source: 'pubsub' | 'replay'): Promise<void> { 233 + const { did, rkey, action, token, streamId } = parsed 234 + 235 + if (shouldSkipReplayMessage(streamId)) { 236 + console.log( 237 + `[CacheInvalidation] Skipping duplicate ${action} for ${did}/${rkey} from ${source} (stream ${streamId})`, 238 + ) 239 + return 240 + } 241 + 242 + console.log( 243 + `[CacheInvalidation] Received ${action} for ${did}/${rkey} from ${source}${streamId ? ` (stream ${streamId})` : ''}`, 244 + ) 245 + 246 + if (action === 'updating') { 247 + markSiteUpdating(did, rkey, token) 248 + advanceStreamCursor(streamId) 249 + console.log(`[CacheInvalidation] Marked ${did}/${rkey} as updating`) 250 + return 251 + } 252 + 253 + const cleared = clearSiteUpdating(did, rkey, token) 254 + if (!cleared && action === 'update' && token) { 255 + console.log(`[CacheInvalidation] Ignored stale update clear for ${did}/${rkey}`) 256 + advanceStreamCursor(streamId) 257 + return 258 + } 259 + 260 + const prefix = `${did}/${rkey}/` 261 + const hotDeleted = await invalidateTier(hotTier, 'hot', prefix) 262 + const warmDeleted = warmTier ? await invalidateTier(warmTier, 'warm', prefix) : 0 263 + 264 + console.log(`[CacheInvalidation] Cleared ${hotDeleted} hot + ${warmDeleted} warm keys for ${did}/${rkey}`) 265 + 266 + cache.delete('redirectRules', `${did}:${rkey}`) 267 + cache.delete('settings', `${did}:${rkey}`) 268 + cache.deletePrefix('siteFiles', `${did}:${rkey}:`) 269 + advanceStreamCursor(streamId) 270 + } 271 + 272 + function enqueueCacheInvalidation(parsed: CacheInvalidationMessage, source: 'pubsub' | 'replay'): Promise<void> { 273 + processingQueue = processingQueue 274 + .then(() => applyCacheInvalidation(parsed, source)) 275 + .catch((err) => { 276 + console.error('[CacheInvalidation] Error processing message:', err) 277 + }) 278 + return processingQueue 279 + } 280 + 281 + function ensureReplayLoopStarted(): void { 282 + if (!replayClient || replayLoop || stopReplayRequested) return 283 + 284 + replayLoop = (async () => { 285 + console.log(`[CacheInvalidation] Starting replay loop on ${STREAM} from ${lastProcessedStreamId}`) 286 + 287 + while (!stopReplayRequested) { 288 + try { 289 + const response = await replayClient.xread( 290 + 'COUNT', 291 + STREAM_BATCH_COUNT, 292 + 'BLOCK', 293 + STREAM_BLOCK_MS, 294 + 'STREAMS', 295 + STREAM, 296 + lastProcessedStreamId, 297 + ) 298 + 299 + if (!response) continue 300 + 301 + for (const [, entries] of response as [string, Array<[string, string[]]>][]) { 302 + for (const [streamId, fields] of entries) { 303 + const parsed = parseCacheInvalidationStreamEntry(streamId, fields) 304 + if (!parsed) { 305 + console.warn(`[CacheInvalidation] Invalid stream entry ${streamId} on ${STREAM}`) 306 + advanceStreamCursor(streamId) 307 + continue 308 + } 309 + 310 + await enqueueCacheInvalidation(parsed, 'replay') 311 + } 312 + } 313 + } catch (err) { 314 + if (stopReplayRequested) break 315 + console.error('[CacheInvalidation] Replay loop error:', err) 316 + await new Promise((resolve) => setTimeout(resolve, 1000)) 317 + } 318 + } 319 + })() 320 + .catch((err) => { 321 + if (!stopReplayRequested) { 322 + console.error('[CacheInvalidation] Replay loop crashed:', err) 323 + } 324 + }) 325 + .finally(() => { 326 + replayLoop = null 327 + }) 328 + } 329 + 113 330 export function startCacheInvalidationSubscriber(): void { 114 331 const redisUrl = process.env.REDIS_URL 115 332 if (!redisUrl) { ··· 117 334 return 118 335 } 119 336 337 + loadCursorFromDisk() 338 + stopReplayRequested = false 339 + 120 340 console.log(`[CacheInvalidation] Connecting to Redis for subscribing: ${redisUrl}`) 121 341 subscriber = new Redis(redisUrl, { 122 342 maxRetriesPerRequest: 2, 123 343 enableReadyCheck: true, 124 344 }) 345 + replayClient = new Redis(redisUrl, { 346 + maxRetriesPerRequest: 2, 347 + enableReadyCheck: true, 348 + }) 125 349 126 350 subscriber.on('error', (err) => { 127 351 console.error('[CacheInvalidation] Redis error:', err) 352 + }) 353 + replayClient.on('error', (err) => { 354 + console.error('[CacheInvalidation] Replay Redis error:', err) 128 355 }) 129 356 130 357 subscriber.on('ready', () => { 131 358 console.log('[CacheInvalidation] Redis subscriber connected') 132 359 }) 360 + replayClient.on('ready', () => { 361 + console.log('[CacheInvalidation] Redis replay client connected') 362 + ensureReplayLoopStarted() 363 + }) 133 364 134 365 subscriber.subscribe(CHANNEL, (err) => { 135 366 if (err) { ··· 140 371 }) 141 372 142 373 subscriber.on('message', async (_channel: string, message: string) => { 143 - try { 144 - const parsed = parseCacheInvalidationMessage(message) 145 - if (!parsed) { 146 - console.warn('[CacheInvalidation] Invalid message:', message) 147 - return 148 - } 374 + const parsed = parseCacheInvalidationMessage(message) 375 + if (!parsed) { 376 + console.warn('[CacheInvalidation] Invalid message:', message) 377 + return 378 + } 149 379 150 - const { did, rkey, action, token } = parsed 151 - 152 - console.log(`[CacheInvalidation] Received ${action} for ${did}/${rkey}`) 380 + await enqueueCacheInvalidation(parsed, 'pubsub') 381 + }) 153 382 154 - if (action === 'updating') { 155 - // Firehose is about to download new files — mark site as updating 156 - markSiteUpdating(did, rkey, token) 157 - console.log(`[CacheInvalidation] Marked ${did}/${rkey} as updating`) 158 - return 159 - } 160 - 161 - // For update/delete/settings: clear the updating flag and invalidate caches 162 - const cleared = clearSiteUpdating(did, rkey, token) 163 - if (!cleared && action === 'update' && token) { 164 - console.log(`[CacheInvalidation] Ignored stale update clear for ${did}/${rkey}`) 165 - return 166 - } 167 - 168 - const prefix = `${did}/${rkey}/` 169 - 170 - // Invalidate each tier independently - a failure in one tier 171 - // (e.g. S3 listKeys timeout) must NOT prevent hot/warm from being cleared 172 - const hotDeleted = await invalidateTier(hotTier, 'hot', prefix) 173 - const warmDeleted = warmTier ? await invalidateTier(warmTier, 'warm', prefix) : 0 174 - 175 - console.log(`[CacheInvalidation] Cleared ${hotDeleted} hot + ${warmDeleted} warm keys for ${did}/${rkey}`) 176 - 177 - // Clear in-memory caches for this site 178 - cache.delete('redirectRules', `${did}:${rkey}`) 179 - cache.delete('settings', `${did}:${rkey}`) 180 - cache.deletePrefix('siteFiles', `${did}:${rkey}:`) 181 - } catch (err) { 182 - console.error('[CacheInvalidation] Error processing message:', err) 183 - } 184 - }) 383 + ensureReplayLoopStarted() 185 384 } 186 385 187 386 export async function stopCacheInvalidationSubscriber(): Promise<void> { 387 + stopReplayRequested = true 388 + await replayLoop 389 + await processingQueue 390 + await cursorPersistQueue.catch(() => undefined) 391 + 188 392 if (subscriber) { 189 393 const toClose = subscriber 190 394 subscriber = null 395 + await toClose.quit() 396 + } 397 + 398 + if (replayClient) { 399 + const toClose = replayClient 400 + replayClient = null 191 401 await toClose.quit() 192 402 } 193 403 }
+1
package.json
··· 33 33 "lint:fix": "biome check --write .", 34 34 "format": "biome format --write .", 35 35 "codegen": "./scripts/codegen.sh", 36 + "download:aturi": "bun run scripts/download-place-wisp-fs.ts", 36 37 "publish:cli": "cd cli && bun run build && npm publish && cd ../packages/create-wisp && npm publish" 37 38 }, 38 39 "trustedDependencies": [