GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix batch delivery semantics and update smoke tests

ingester: retain failed buffers for retry instead of dropping them.
cursor only advances after both ingest and delete batches succeed.
backlog overflow cap (5000 events) prevents unbounded memory growth
when D1 is persistently unavailable.

cursor fetch retries 3 times with backoff (1s, 3s, 10s) before
falling back to live.

smoke tests: empty query now expects 400 (not 200), added test for
limit>100 returning 400.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 3cf8817c 764b29e1

+51 -20
+44 -15
ingester/src/main.zig
··· 38 38 delete_buffer: std.ArrayList([]const u8), 39 39 /// arena owns all string data in buffer/delete_buffer 40 40 arena: std.heap.ArenaAllocator, 41 - last_cursor: i64 = 0, 41 + pending_cursor: i64 = 0, 42 + flushed_cursor: i64 = 0, 42 43 total_ingested: u64 = 0, 43 44 total_deleted: u64 = 0, 44 45 last_flush: i64 = 0, ··· 71 72 .account => |acct| self.handleAccount(acct), 72 73 } 73 74 74 - self.last_cursor = event.timeUs(); 75 + self.pending_cursor = event.timeUs(); 75 76 76 77 const now = std.time.timestamp(); 78 + // drop incoming events if buffers are backed up (persistent failure) 79 + const MAX_BACKLOG = MAX_BATCH * 10; 80 + if (self.buffer.items.len >= MAX_BACKLOG or self.delete_buffer.items.len >= MAX_BACKLOG) { 81 + log.err("backlog overflow ({d} ingest, {d} delete), dropping oldest events", .{ 82 + self.buffer.items.len, self.delete_buffer.items.len, 83 + }); 84 + self.buffer.clearRetainingCapacity(); 85 + self.delete_buffer.clearRetainingCapacity(); 86 + _ = self.arena.reset(.retain_capacity); 87 + self.flushed_cursor = self.pending_cursor; 88 + } 89 + 77 90 const should_flush = self.buffer.items.len >= MAX_BATCH or 78 91 self.delete_buffer.items.len >= MAX_BATCH or 79 92 (now - self.last_flush >= 5 and (self.buffer.items.len > 0 or self.delete_buffer.items.len > 0)); ··· 131 144 132 145 fn flush(self: *IngestHandler) void { 133 146 self.last_flush = std.time.timestamp(); 147 + var ingest_ok = true; 148 + var delete_ok = true; 134 149 135 150 if (self.buffer.items.len > 0) { 136 151 const count = self.buffer.items.len; 137 - const ok = postBatch( 152 + ingest_ok = postBatch( 138 153 self.allocator, 139 154 self.config, 140 155 self.buffer.items, 141 - self.last_cursor, 156 + self.flushed_cursor, 142 157 ); 143 - if (ok) { 158 + if (ingest_ok) { 144 159 self.total_ingested += count; 145 160 log.info("+{d} actors (total: {d}) cursor={d}", .{ 146 - count, self.total_ingested, self.last_cursor, 161 + count, self.total_ingested, self.pending_cursor, 147 162 }); 163 + self.buffer.clearRetainingCapacity(); 148 164 } else { 149 - log.err("ingest batch failed ({d} events)", .{count}); 165 + log.err("ingest batch failed ({d} events), will retry", .{count}); 150 166 } 151 - self.buffer.clearRetainingCapacity(); 152 167 } 153 168 154 169 if (self.delete_buffer.items.len > 0) { 155 170 const count = self.delete_buffer.items.len; 156 - const ok = deleteActors( 171 + delete_ok = deleteActors( 157 172 self.allocator, 158 173 self.config, 159 174 self.delete_buffer.items, 160 175 ); 161 - if (ok) { 176 + if (delete_ok) { 162 177 self.total_deleted += count; 163 178 log.info("-{d} moderated (total: {d})", .{ 164 179 count, self.total_deleted, 165 180 }); 181 + self.delete_buffer.clearRetainingCapacity(); 166 182 } else { 167 - log.err("delete batch failed ({d} dids)", .{count}); 183 + log.err("delete batch failed ({d} dids), will retry", .{count}); 168 184 } 169 - self.delete_buffer.clearRetainingCapacity(); 170 185 } 171 186 172 - // free all duped strings at once 173 - _ = self.arena.reset(.retain_capacity); 187 + // only advance cursor when all pending work succeeded 188 + if (ingest_ok and delete_ok) { 189 + self.flushed_cursor = self.pending_cursor; 190 + // free all duped strings at once — safe because buffers are empty 191 + _ = self.arena.reset(.retain_capacity); 192 + } 174 193 } 175 194 }; 176 195 ··· 292 311 return result.status == .ok; 293 312 } 294 313 295 - fn fetchCursor(allocator: Allocator, config: Config) ?i64 { 314 + fn fetchCursorOnce(allocator: Allocator, config: Config) ?i64 { 296 315 var client = http.Client{ .allocator = allocator }; 297 316 defer client.deinit(); 298 317 ··· 328 347 .float => |f| @intFromFloat(f), 329 348 else => null, 330 349 }; 350 + } 351 + 352 + fn fetchCursor(allocator: Allocator, config: Config) ?i64 { 353 + const backoff = [_]u64{ 1, 3, 10 }; 354 + for (backoff, 0..) |delay, attempt| { 355 + if (fetchCursorOnce(allocator, config)) |cursor| return cursor; 356 + log.warn("cursor fetch failed (attempt {d}/3), retrying in {d}s", .{ attempt + 1, delay }); 357 + std.Thread.sleep(delay * std.time.ns_per_s); 358 + } 359 + return null; 331 360 } 332 361 333 362 pub fn main() !void {
+7 -5
scripts/smoke.py
··· 131 131 def test_empty_query(base_url: str): 132 132 print("\n--- empty query ---") 133 133 data, _ = fetch(f"{base_url}{XRPC_PATH}?q=") 134 - if not data or "_error" in data or "_http_error" in data: 135 - check("fetch succeeded", False) 136 - return 134 + check("empty query returns 400", data is not None and data.get("_http_error") == 400, f"got {data}") 137 135 138 - actors = data.get("actors", []) 139 - check("empty query returns empty actors", len(actors) == 0, f"got {len(actors)}") 136 + 137 + def test_limit_over_max(base_url: str): 138 + print("\n--- limit > 100 ---") 139 + data, _ = fetch(f"{base_url}{XRPC_PATH}?q=test&limit=200") 140 + check("limit>100 returns 400", data is not None and data.get("_http_error") == 400, f"got {data}") 140 141 141 142 142 143 def test_comparison(base_url: str, queries: list[str]): ··· 185 186 test_deprecated_param(args.url) 186 187 test_limit_bounds(args.url) 187 188 test_empty_query(args.url) 189 + test_limit_over_max(args.url) 188 190 189 191 if args.compare: 190 192 test_comparison(args.url, args.queries)