GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

use zat.HttpTransport for ingester HTTP calls

replaces 3 ad-hoc std.http.Client instantiations with a single shared
HttpTransport from zat v0.2.18. gives connection keepalive to the worker,
centralizes the gzip workaround, and isolates the 0.16 migration surface.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 467bd07b 4398427c

+34 -56
+2 -2
ingester/build.zig.zon
··· 5 5 .minimum_zig_version = "0.15.0", 6 6 .dependencies = .{ 7 7 .zat = .{ 8 - .url = "https://tangled.sh/zat.dev/zat/archive/v0.2.15", 9 - .hash = "zat-0.2.15-5PuC7tjwBAAR5tL2Nc5LfSUaUORA6vONr1IdgaU4vAvo", 8 + .url = "https://tangled.sh/zat.dev/zat/archive/v0.2.18", 9 + .hash = "zat-0.2.18-5PuC7iVfBQAV6IUlytvAWrpwrVUbwbLlAcSTvkXwncb_", 10 10 }, 11 11 }, 12 12 .paths = .{
+32 -54
ingester/src/main.zig
··· 1 1 const std = @import("std"); 2 2 const mem = std.mem; 3 3 const json = std.json; 4 - const http = std.http; 5 4 const Allocator = mem.Allocator; 6 5 const zat = @import("zat"); 6 + const HttpTransport = zat.HttpTransport; 7 7 8 8 const log = std.log.scoped(.ingester); 9 9 ··· 88 88 const IngestHandler = struct { 89 89 allocator: Allocator, 90 90 config: Config, 91 + transport: *HttpTransport, 91 92 buffer: std.ArrayList(ActorEvent), 92 93 delete_buffer: std.ArrayList([]const u8), 93 94 /// arena owns all string data in buffer/delete_buffer ··· 101 102 last_flush: i64 = 0, 102 103 retry_after: i64 = 0, // timestamp before which we skip flush attempts 103 104 104 - fn init(allocator: Allocator, config: Config) !IngestHandler { 105 + fn init(allocator: Allocator, config: Config, transport: *HttpTransport) !IngestHandler { 105 106 return .{ 106 107 .allocator = allocator, 107 108 .config = config, 109 + .transport = transport, 108 110 .buffer = .{}, 109 111 .delete_buffer = .{}, 110 112 .arena = std.heap.ArenaAllocator.init(allocator), ··· 240 242 const batch_end = @min(self.buffer.items.len, MAX_BATCH); 241 243 const batch = self.buffer.items[0..batch_end]; 242 244 const ok = postBatch( 243 - self.allocator, 245 + self.transport, 244 246 self.config, 245 247 batch, 246 248 self.flushed_cursor, ··· 272 274 const batch_end = @min(self.delete_buffer.items.len, MAX_BATCH); 273 275 const batch = self.delete_buffer.items[0..batch_end]; 274 276 const ok = deleteActors( 275 - self.allocator, 277 + self.transport, 276 278 self.config, 277 279 batch, 278 280 ); ··· 308 310 } 309 311 }; 310 312 311 - fn postBatch(allocator: Allocator, config: Config, events: []const ActorEvent, cursor: i64) bool { 312 - var client = http.Client{ .allocator = allocator }; 313 - defer client.deinit(); 314 - 315 - var output: std.Io.Writer.Allocating = .init(allocator); 313 + fn postBatch(transport: *HttpTransport, config: Config, events: []const ActorEvent, cursor: i64) bool { 314 + var output: std.Io.Writer.Allocating = .init(transport.allocator); 316 315 defer output.deinit(); 317 316 318 317 var jw: json.Stringify = .{ ··· 327 326 var auth_buf: [256]u8 = undefined; 328 327 const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return false; 329 328 330 - var resp_output: std.Io.Writer.Allocating = .init(allocator); 331 - defer resp_output.deinit(); 332 - 333 329 const body = output.toArrayList(); 334 330 335 - const result = client.fetch(.{ 336 - .location = .{ .url = url }, 331 + const result = transport.fetch(.{ 332 + .url = url, 337 333 .method = .POST, 338 - .headers = .{ 339 - .content_type = .{ .override = "application/json" }, 340 - .authorization = .{ .override = auth }, 341 - }, 342 334 .payload = body.items, 343 - .response_writer = &resp_output.writer, 335 + .authorization = auth, 344 336 }) catch return false; 337 + defer transport.allocator.free(result.body); 345 338 346 339 if (result.status != .ok) { 347 - const resp = resp_output.toArrayList(); 348 - log.err("ingest HTTP {d}: {s}", .{ @intFromEnum(result.status), resp.items }); 340 + log.err("ingest HTTP {d}: {s}", .{ @intFromEnum(result.status), result.body }); 349 341 } 350 342 351 343 return result.status == .ok; 352 344 } 353 345 354 - fn deleteActors(allocator: Allocator, config: Config, dids: []const []const u8) bool { 355 - var client = http.Client{ .allocator = allocator }; 356 - defer client.deinit(); 357 - 358 - var output: std.Io.Writer.Allocating = .init(allocator); 346 + fn deleteActors(transport: *HttpTransport, config: Config, dids: []const []const u8) bool { 347 + var output: std.Io.Writer.Allocating = .init(transport.allocator); 359 348 defer output.deinit(); 360 349 361 350 var jw: json.Stringify = .{ .writer = &output.writer }; ··· 369 358 370 359 const body = output.toArrayList(); 371 360 372 - const result = client.fetch(.{ 373 - .location = .{ .url = url }, 361 + const result = transport.fetch(.{ 362 + .url = url, 374 363 .method = .POST, 375 - .headers = .{ 376 - .content_type = .{ .override = "application/json" }, 377 - .authorization = .{ .override = auth }, 378 - }, 379 364 .payload = body.items, 365 + .authorization = auth, 380 366 }) catch return false; 367 + defer transport.allocator.free(result.body); 381 368 382 369 return result.status == .ok; 383 370 } 384 371 385 - fn fetchCursorOnce(allocator: Allocator, config: Config) ?i64 { 386 - var client = http.Client{ .allocator = allocator }; 387 - defer client.deinit(); 388 - 372 + fn fetchCursorOnce(transport: *HttpTransport, config: Config) ?i64 { 389 373 var url_buf: [512]u8 = undefined; 390 374 const url = std.fmt.bufPrint(&url_buf, "{s}/admin/cursor", .{config.worker_url}) catch return null; 391 375 392 376 var auth_buf: [256]u8 = undefined; 393 377 const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return null; 394 378 395 - var aw: std.Io.Writer.Allocating = .init(allocator); 396 - defer aw.deinit(); 397 - 398 - const result = client.fetch(.{ 399 - .location = .{ .url = url }, 400 - .method = .GET, 401 - .headers = .{ 402 - .authorization = .{ .override = auth }, 403 - }, 404 - .response_writer = &aw.writer, 379 + const result = transport.fetch(.{ 380 + .url = url, 381 + .authorization = auth, 405 382 }) catch return null; 383 + defer transport.allocator.free(result.body); 406 384 407 385 if (result.status != .ok) return null; 408 386 409 - var resp = aw.toArrayList(); 410 - defer resp.deinit(allocator); 411 - 412 - const parsed = json.parseFromSlice(json.Value, allocator, resp.items, .{}) catch return null; 387 + const parsed = json.parseFromSlice(json.Value, transport.allocator, result.body, .{}) catch return null; 413 388 defer parsed.deinit(); 414 389 415 390 const cursor_val = parsed.value.object.get("cursor") orelse return null; ··· 420 395 }; 421 396 } 422 397 423 - fn fetchCursor(allocator: Allocator, config: Config) ?i64 { 398 + fn fetchCursor(transport: *HttpTransport, config: Config) ?i64 { 424 399 const backoff = [_]u64{ 1, 3, 10 }; 425 400 for (backoff, 0..) |delay, attempt| { 426 - if (fetchCursorOnce(allocator, config)) |cursor| return cursor; 401 + if (fetchCursorOnce(transport, config)) |cursor| return cursor; 427 402 log.warn("cursor fetch failed (attempt {d}/3), retrying in {d}s", .{ attempt + 1, delay }); 428 403 std.Thread.sleep(delay * std.time.ns_per_s); 429 404 } ··· 438 413 const config = getConfig(); 439 414 log.info("typeahead ingester → {s}", .{config.worker_url}); 440 415 441 - const cursor = fetchCursor(allocator, config); 416 + var transport = HttpTransport.init(allocator); 417 + defer transport.deinit(); 418 + 419 + const cursor = fetchCursor(&transport, config); 442 420 if (cursor) |c| { 443 421 log.info("resuming from cursor {d}", .{c}); 444 422 } else { 445 423 log.info("no cursor found, starting from live", .{}); 446 424 } 447 425 448 - var handler = try IngestHandler.init(allocator, config); 426 + var handler = try IngestHandler.init(allocator, config, &transport); 449 427 defer handler.deinit(); 450 428 451 429 var client = zat.JetstreamClient.init(allocator, .{