GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix flush: chunk batches at 500 and backoff 5s on failure

the previous retry logic accumulated all events into a single growing
buffer and re-sent the entire thing on every incoming event. D1 rejects
large batches (1101), so this created a death spiral: bigger batch →
rejection → buffer grows → even bigger batch.

now flush sends at most MAX_BATCH (500) items per attempt, shifts
remaining items forward, and waits 5 seconds before retrying after
a failure. during catch-up this drains the backlog incrementally
instead of choking.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+53 -22
+53 -22
ingester/src/main.zig
··· 43 43 total_ingested: u64 = 0, 44 44 total_deleted: u64 = 0, 45 45 last_flush: i64 = 0, 46 + retry_after: i64 = 0, // timestamp before which we skip flush attempts 46 47 47 48 fn init(allocator: Allocator, config: Config) IngestHandler { 48 49 return .{ ··· 85 86 self.delete_buffer.clearRetainingCapacity(); 86 87 _ = self.arena.reset(.retain_capacity); 87 88 self.flushed_cursor = self.pending_cursor; 89 + self.retry_after = 0; 88 90 } 91 + 92 + // respect backoff cooldown after failures 93 + if (now < self.retry_after) return; 89 94 90 95 const should_flush = self.buffer.items.len >= MAX_BATCH or 91 96 self.delete_buffer.items.len >= MAX_BATCH or ··· 144 149 145 150 fn flush(self: *IngestHandler) void { 146 151 self.last_flush = std.time.timestamp(); 147 - var ingest_ok = true; 148 - var delete_ok = true; 152 + var any_failure = false; 149 153 150 154 if (self.buffer.items.len > 0) { 151 - const count = self.buffer.items.len; 152 - ingest_ok = postBatch( 155 + // send at most MAX_BATCH at a time to avoid overwhelming D1 156 + const batch_end = @min(self.buffer.items.len, MAX_BATCH); 157 + const batch = self.buffer.items[0..batch_end]; 158 + const ok = postBatch( 153 159 self.allocator, 154 160 self.config, 155 - self.buffer.items, 161 + batch, 156 162 self.flushed_cursor, 157 163 ); 158 - if (ingest_ok) { 159 - self.total_ingested += count; 160 - log.info("+{d} actors (total: {d}) cursor={d}", .{ 161 - count, self.total_ingested, self.pending_cursor, 164 + if (ok) { 165 + self.total_ingested += batch_end; 166 + log.info("+{d} actors (total: {d}, pending: {d}) cursor={d}", .{ 167 + batch_end, self.total_ingested, 168 + self.buffer.items.len - batch_end, self.pending_cursor, 162 169 }); 163 - self.buffer.clearRetainingCapacity(); 170 + // shift remaining items forward 171 + if (batch_end < self.buffer.items.len) { 172 + std.mem.copyForwards( 173 + ActorEvent, 174 + self.buffer.items[0 .. self.buffer.items.len - batch_end], 175 + self.buffer.items[batch_end..], 176 + ); 177 + } 178 + self.buffer.shrinkRetainingCapacity(self.buffer.items.len - batch_end); 164 179 } else { 165 - log.err("ingest batch failed ({d} events), will retry", .{count}); 180 + log.err("ingest batch failed ({d} events, {d} pending), will retry in 5s", .{ 181 + batch_end, self.buffer.items.len, 182 + }); 183 + any_failure = true; 166 184 } 167 185 } 168 186 169 187 if (self.delete_buffer.items.len > 0) { 170 - const count = self.delete_buffer.items.len; 171 - delete_ok = deleteActors( 188 + const batch_end = @min(self.delete_buffer.items.len, MAX_BATCH); 189 + const batch = self.delete_buffer.items[0..batch_end]; 190 + const ok = deleteActors( 172 191 self.allocator, 173 192 self.config, 174 - self.delete_buffer.items, 193 + batch, 175 194 ); 176 - if (delete_ok) { 177 - self.total_deleted += count; 195 + if (ok) { 196 + self.total_deleted += batch_end; 178 197 log.info("-{d} moderated (total: {d})", .{ 179 - count, self.total_deleted, 198 + batch_end, self.total_deleted, 180 199 }); 181 - self.delete_buffer.clearRetainingCapacity(); 200 + if (batch_end < self.delete_buffer.items.len) { 201 + const remaining = self.delete_buffer.items.len - batch_end; 202 + // shift items - need to copy as slices 203 + var i: usize = 0; 204 + while (i < remaining) : (i += 1) { 205 + self.delete_buffer.items[i] = self.delete_buffer.items[batch_end + i]; 206 + } 207 + self.delete_buffer.shrinkRetainingCapacity(remaining); 208 + } else { 209 + self.delete_buffer.clearRetainingCapacity(); 210 + } 182 211 } else { 183 - log.err("delete batch failed ({d} dids), will retry", .{count}); 212 + log.err("delete batch failed ({d} dids), will retry in 5s", .{batch_end}); 213 + any_failure = true; 184 214 } 185 215 } 186 216 187 - // only advance cursor when all pending work succeeded 188 - if (ingest_ok and delete_ok) { 217 + if (any_failure) { 218 + self.retry_after = std.time.timestamp() + 5; 219 + } else if (self.buffer.items.len == 0 and self.delete_buffer.items.len == 0) { 220 + // only advance cursor and free arena when all work is drained 189 221 self.flushed_cursor = self.pending_cursor; 190 - // free all duped strings at once — safe because buffers are empty 191 222 _ = self.arena.reset(.retain_capacity); 192 223 } 193 224 }