source dump of claude code
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 275 lines 9.1 kB view raw
1import { jsonStringify } from '../../utils/slowOperations.js' 2 3/** 4 * Serial ordered event uploader with batching, retry, and backpressure. 5 * 6 * - enqueue() adds events to a pending buffer 7 * - At most 1 POST in-flight at a time 8 * - Drains up to maxBatchSize items per POST 9 * - New events accumulate while in-flight 10 * - On failure: exponential backoff (clamped), retries indefinitely 11 * until success or close() — unless maxConsecutiveFailures is set, 12 * in which case the failing batch is dropped and drain advances 13 * - flush() blocks until pending is empty and kicks drain if needed 14 * - Backpressure: enqueue() blocks when maxQueueSize is reached 15 */ 16 17/** 18 * Throw from config.send() to make the uploader wait a server-supplied 19 * duration before retrying (e.g. 429 with Retry-After). When retryAfterMs 20 * is set, it overrides exponential backoff for that attempt — clamped to 21 * [baseDelayMs, maxDelayMs] and jittered so a misbehaving server can 22 * neither hot-loop nor stall the client, and many sessions sharing a rate 23 * limit don't all pounce at the same instant. Without retryAfterMs, behaves 24 * like any other thrown error (exponential backoff). 25 */ 26export class RetryableError extends Error { 27 constructor( 28 message: string, 29 readonly retryAfterMs?: number, 30 ) { 31 super(message) 32 } 33} 34 35type SerialBatchEventUploaderConfig<T> = { 36 /** Max items per POST (1 = no batching) */ 37 maxBatchSize: number 38 /** 39 * Max serialized bytes per POST. First item always goes in regardless of 40 * size; subsequent items only if cumulative JSON bytes stay under this. 41 * Undefined = no byte limit (count-only batching). 42 */ 43 maxBatchBytes?: number 44 /** Max pending items before enqueue() blocks */ 45 maxQueueSize: number 46 /** The actual HTTP call — caller controls payload format */ 47 send: (batch: T[]) => Promise<void> 48 /** Base delay for exponential backoff (ms) */ 49 baseDelayMs: number 50 /** Max delay cap (ms) */ 51 maxDelayMs: number 52 /** Random jitter range added to retry delay (ms) */ 53 jitterMs: number 54 /** 55 * After this many consecutive send() failures, drop the failing batch 56 * and move on to the next pending item with a fresh failure budget. 57 * Undefined = retry indefinitely (default). 58 */ 59 maxConsecutiveFailures?: number 60 /** Called when a batch is dropped for hitting maxConsecutiveFailures. */ 61 onBatchDropped?: (batchSize: number, failures: number) => void 62} 63 64export class SerialBatchEventUploader<T> { 65 private pending: T[] = [] 66 private pendingAtClose = 0 67 private draining = false 68 private closed = false 69 private backpressureResolvers: Array<() => void> = [] 70 private sleepResolve: (() => void) | null = null 71 private flushResolvers: Array<() => void> = [] 72 private droppedBatches = 0 73 private readonly config: SerialBatchEventUploaderConfig<T> 74 75 constructor(config: SerialBatchEventUploaderConfig<T>) { 76 this.config = config 77 } 78 79 /** 80 * Monotonic count of batches dropped via maxConsecutiveFailures. Callers 81 * can snapshot before flush() and compare after to detect silent drops 82 * (flush() resolves normally even when batches were dropped). 83 */ 84 get droppedBatchCount(): number { 85 return this.droppedBatches 86 } 87 88 /** 89 * Pending queue depth. After close(), returns the count at close time — 90 * close() clears the queue but shutdown diagnostics may read this after. 91 */ 92 get pendingCount(): number { 93 return this.closed ? this.pendingAtClose : this.pending.length 94 } 95 96 /** 97 * Add events to the pending buffer. Returns immediately if space is 98 * available. Blocks (awaits) if the buffer is full — caller pauses 99 * until drain frees space. 100 */ 101 async enqueue(events: T | T[]): Promise<void> { 102 if (this.closed) return 103 const items = Array.isArray(events) ? events : [events] 104 if (items.length === 0) return 105 106 // Backpressure: wait until there's space 107 while ( 108 this.pending.length + items.length > this.config.maxQueueSize && 109 !this.closed 110 ) { 111 await new Promise<void>(resolve => { 112 this.backpressureResolvers.push(resolve) 113 }) 114 } 115 116 if (this.closed) return 117 this.pending.push(...items) 118 void this.drain() 119 } 120 121 /** 122 * Block until all pending events have been sent. 123 * Used at turn boundaries and graceful shutdown. 124 */ 125 flush(): Promise<void> { 126 if (this.pending.length === 0 && !this.draining) { 127 return Promise.resolve() 128 } 129 void this.drain() 130 return new Promise<void>(resolve => { 131 this.flushResolvers.push(resolve) 132 }) 133 } 134 135 /** 136 * Drop pending events and stop processing. 137 * Resolves any blocked enqueue() and flush() callers. 138 */ 139 close(): void { 140 if (this.closed) return 141 this.closed = true 142 this.pendingAtClose = this.pending.length 143 this.pending = [] 144 this.sleepResolve?.() 145 this.sleepResolve = null 146 for (const resolve of this.backpressureResolvers) resolve() 147 this.backpressureResolvers = [] 148 for (const resolve of this.flushResolvers) resolve() 149 this.flushResolvers = [] 150 } 151 152 /** 153 * Drain loop. At most one instance runs at a time (guarded by this.draining). 154 * Sends batches serially. On failure, backs off and retries indefinitely. 155 */ 156 private async drain(): Promise<void> { 157 if (this.draining || this.closed) return 158 this.draining = true 159 let failures = 0 160 161 try { 162 while (this.pending.length > 0 && !this.closed) { 163 const batch = this.takeBatch() 164 if (batch.length === 0) continue 165 166 try { 167 await this.config.send(batch) 168 failures = 0 169 } catch (err) { 170 failures++ 171 if ( 172 this.config.maxConsecutiveFailures !== undefined && 173 failures >= this.config.maxConsecutiveFailures 174 ) { 175 this.droppedBatches++ 176 this.config.onBatchDropped?.(batch.length, failures) 177 failures = 0 178 this.releaseBackpressure() 179 continue 180 } 181 // Re-queue the failed batch at the front. Use concat (single 182 // allocation) instead of unshift(...batch) which shifts every 183 // pending item batch.length times. Only hit on failure path. 184 this.pending = batch.concat(this.pending) 185 const retryAfterMs = 186 err instanceof RetryableError ? err.retryAfterMs : undefined 187 await this.sleep(this.retryDelay(failures, retryAfterMs)) 188 continue 189 } 190 191 // Release backpressure waiters if space opened up 192 this.releaseBackpressure() 193 } 194 } finally { 195 this.draining = false 196 // Notify flush waiters if queue is empty 197 if (this.pending.length === 0) { 198 for (const resolve of this.flushResolvers) resolve() 199 this.flushResolvers = [] 200 } 201 } 202 } 203 204 /** 205 * Pull the next batch from pending. Respects both maxBatchSize and 206 * maxBatchBytes. The first item is always taken; subsequent items only 207 * if adding them keeps the cumulative JSON size under maxBatchBytes. 208 * 209 * Un-serializable items (BigInt, circular refs, throwing toJSON) are 210 * dropped in place — they can never be sent and leaving them at 211 * pending[0] would poison the queue and hang flush() forever. 212 */ 213 private takeBatch(): T[] { 214 const { maxBatchSize, maxBatchBytes } = this.config 215 if (maxBatchBytes === undefined) { 216 return this.pending.splice(0, maxBatchSize) 217 } 218 let bytes = 0 219 let count = 0 220 while (count < this.pending.length && count < maxBatchSize) { 221 let itemBytes: number 222 try { 223 itemBytes = Buffer.byteLength(jsonStringify(this.pending[count])) 224 } catch { 225 this.pending.splice(count, 1) 226 continue 227 } 228 if (count > 0 && bytes + itemBytes > maxBatchBytes) break 229 bytes += itemBytes 230 count++ 231 } 232 return this.pending.splice(0, count) 233 } 234 235 private retryDelay(failures: number, retryAfterMs?: number): number { 236 const jitter = Math.random() * this.config.jitterMs 237 if (retryAfterMs !== undefined) { 238 // Jitter on top of the server's hint prevents thundering herd when 239 // many sessions share a rate limit and all receive the same 240 // Retry-After. Clamp first, then spread — same shape as the 241 // exponential path (effective ceiling is maxDelayMs + jitterMs). 242 const clamped = Math.max( 243 this.config.baseDelayMs, 244 Math.min(retryAfterMs, this.config.maxDelayMs), 245 ) 246 return clamped + jitter 247 } 248 const exponential = Math.min( 249 this.config.baseDelayMs * 2 ** (failures - 1), 250 this.config.maxDelayMs, 251 ) 252 return exponential + jitter 253 } 254 255 private releaseBackpressure(): void { 256 const resolvers = this.backpressureResolvers 257 this.backpressureResolvers = [] 258 for (const resolve of resolvers) resolve() 259 } 260 261 private sleep(ms: number): Promise<void> { 262 return new Promise(resolve => { 263 this.sleepResolve = resolve 264 setTimeout( 265 (self, resolve) => { 266 self.sleepResolve = null 267 resolve() 268 }, 269 ms, 270 this, 271 resolve, 272 ) 273 }) 274 } 275}