source dump of claude code
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 131 lines 3.9 kB view raw
1import { sleep } from '../../utils/sleep.js' 2 3/** 4 * Coalescing uploader for PUT /worker (session state + metadata). 5 * 6 * - 1 in-flight PUT + 1 pending patch 7 * - New calls coalesce into pending (never grows beyond 1 slot) 8 * - On success: send pending if exists 9 * - On failure: exponential backoff (clamped), retries indefinitely 10 * until success or close(). Absorbs any pending patches before each retry. 11 * - No backpressure needed — naturally bounded at 2 slots 12 * 13 * Coalescing rules: 14 * - Top-level keys (worker_status, external_metadata) — last value wins 15 * - Inside external_metadata / internal_metadata — RFC 7396 merge: 16 * keys are added/overwritten, null values preserved (server deletes) 17 */ 18 19type WorkerStateUploaderConfig = { 20 send: (body: Record<string, unknown>) => Promise<boolean> 21 /** Base delay for exponential backoff (ms) */ 22 baseDelayMs: number 23 /** Max delay cap (ms) */ 24 maxDelayMs: number 25 /** Random jitter range added to retry delay (ms) */ 26 jitterMs: number 27} 28 29export class WorkerStateUploader { 30 private inflight: Promise<void> | null = null 31 private pending: Record<string, unknown> | null = null 32 private closed = false 33 private readonly config: WorkerStateUploaderConfig 34 35 constructor(config: WorkerStateUploaderConfig) { 36 this.config = config 37 } 38 39 /** 40 * Enqueue a patch to PUT /worker. Coalesces with any existing pending 41 * patch. Fire-and-forget — callers don't need to await. 42 */ 43 enqueue(patch: Record<string, unknown>): void { 44 if (this.closed) return 45 this.pending = this.pending ? coalescePatches(this.pending, patch) : patch 46 void this.drain() 47 } 48 49 close(): void { 50 this.closed = true 51 this.pending = null 52 } 53 54 private async drain(): Promise<void> { 55 if (this.inflight || this.closed) return 56 if (!this.pending) return 57 58 const payload = this.pending 59 this.pending = null 60 61 this.inflight = this.sendWithRetry(payload).then(() => { 62 this.inflight = null 63 if (this.pending && !this.closed) { 64 void this.drain() 65 } 66 }) 67 } 68 69 /** Retries indefinitely with exponential backoff until success or close(). */ 70 private async sendWithRetry(payload: Record<string, unknown>): Promise<void> { 71 let current = payload 72 let failures = 0 73 while (!this.closed) { 74 const ok = await this.config.send(current) 75 if (ok) return 76 77 failures++ 78 await sleep(this.retryDelay(failures)) 79 80 // Absorb any patches that arrived during the retry 81 if (this.pending && !this.closed) { 82 current = coalescePatches(current, this.pending) 83 this.pending = null 84 } 85 } 86 } 87 88 private retryDelay(failures: number): number { 89 const exponential = Math.min( 90 this.config.baseDelayMs * 2 ** (failures - 1), 91 this.config.maxDelayMs, 92 ) 93 const jitter = Math.random() * this.config.jitterMs 94 return exponential + jitter 95 } 96} 97 98/** 99 * Coalesce two patches for PUT /worker. 100 * 101 * Top-level keys: overlay replaces base (last value wins). 102 * Metadata keys (external_metadata, internal_metadata): RFC 7396 merge 103 * one level deep — overlay keys are added/overwritten, null values 104 * preserved for server-side delete. 105 */ 106function coalescePatches( 107 base: Record<string, unknown>, 108 overlay: Record<string, unknown>, 109): Record<string, unknown> { 110 const merged = { ...base } 111 112 for (const [key, value] of Object.entries(overlay)) { 113 if ( 114 (key === 'external_metadata' || key === 'internal_metadata') && 115 merged[key] && 116 typeof merged[key] === 'object' && 117 typeof value === 'object' && 118 value !== null 119 ) { 120 // RFC 7396 merge — overlay keys win, nulls preserved for server 121 merged[key] = { 122 ...(merged[key] as Record<string, unknown>), 123 ...(value as Record<string, unknown>), 124 } 125 } else { 126 merged[key] = value 127 } 128 } 129 130 return merged 131}