GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

add health endpoint to ingester for status page monitoring

background thread on :8080 responds with JSON status
(ingested count, deleted count, bloom filter size, RSS).
added [http_service] to fly.toml with shared IPv4.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+50
+8
ingester/fly.toml
··· 15 15 [processes] 16 16 app = './typeahead-ingester' 17 17 18 + [http_service] 19 + internal_port = 8080 20 + force_https = true 21 + auto_stop_machines = 'off' 22 + auto_start_machines = true 23 + min_machines_running = 1 24 + processes = ['app'] 25 + 18 26 [[vm]] 19 27 memory = '256mb' 20 28 cpu_kind = 'shared'
+42
ingester/src/main.zig
··· 406 406 return null; 407 407 } 408 408 409 + const HealthState = struct { 410 + handler: *IngestHandler, 411 + }; 412 + 413 + fn healthThread(state: *HealthState) void { 414 + const addr = std.net.Address.parseIp4("0.0.0.0", 8080) catch return; 415 + var srv = addr.listen(.{ .reuse_address = true }) catch |err| { 416 + log.err("health listener failed: {s}", .{@errorName(err)}); 417 + return; 418 + }; 419 + log.info("health endpoint listening on :8080", .{}); 420 + 421 + while (true) { 422 + const conn = srv.accept() catch continue; 423 + handleHealthConn(conn.stream, state.handler) catch {}; 424 + conn.stream.close(); 425 + } 426 + } 427 + 428 + fn handleHealthConn(stream: std.net.Stream, handler: *IngestHandler) !void { 429 + var buf: [1024]u8 = undefined; 430 + _ = stream.read(&buf) catch return; 431 + 432 + const rss = getRssKB(); 433 + var body_buf: [256]u8 = undefined; 434 + const body = std.fmt.bufPrint(&body_buf, "{{\"status\":\"ok\",\"ingested\":{d},\"deleted\":{d},\"bloom\":{d},\"rss_kb\":{d}}}", .{ 435 + handler.total_ingested, handler.total_deleted, handler.bloom.count, rss, 436 + }) catch return; 437 + 438 + var resp_buf: [512]u8 = undefined; 439 + const resp = std.fmt.bufPrint(&resp_buf, "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {d}\r\nConnection: close\r\n\r\n{s}", .{ 440 + body.len, body, 441 + }) catch return; 442 + 443 + _ = stream.write(resp) catch {}; 444 + } 445 + 409 446 pub fn main() !void { 410 447 var gpa = std.heap.GeneralPurposeAllocator(.{}){}; 411 448 defer _ = gpa.deinit(); ··· 427 464 428 465 var handler = try IngestHandler.init(allocator, config, &transport); 429 466 defer handler.deinit(); 467 + 468 + // health endpoint in background thread 469 + var health_state = HealthState{ .handler = &handler }; 470 + const health_thread = try std.Thread.spawn(.{}, healthThread, .{&health_state}); 471 + health_thread.detach(); 430 472 431 473 var client = zat.JetstreamClient.init(allocator, .{ 432 474 .wanted_collections = &.{