atproto utils for zig zat.dev
atproto sdk zig
26
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: round-robin host rotation for jetstream and firehose clients

rotate through multiple hosts on reconnect for client-side load balancing
and resilience. defaults include official bsky instances plus community
relays (waow.tech, fire.hose.cam, firehose.stream, firehose.network).

- Options.host → Options.hosts (slice with sensible defaults)
- subscribe() advances host_index on each reconnect
- backoff resets when switching to a new host
- jetstream rewinds cursor by 10s on host switch (instances may lag)
- single-host usage still works: .hosts = &.{"my-host"}

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz e6cadb03 779a6aff

+135 -15
+25 -1
CHANGELOG.md
··· 1 1 # changelog 2 2 3 + ## 0.1.6 4 + 5 + - round-robin host rotation for jetstream and firehose clients 6 + - `Options.host` → `Options.hosts` with sensible defaults (bsky + community relays) 7 + - backoff resets on host switch, jetstream rewinds cursor by 10s 8 + - default jetstream hosts: 4 official bsky, waow.tech, fire.hose.cam, 6 firehose.stream regions 9 + - default firehose hosts: bsky.network + 3 firehose.network regions 10 + 11 + ## 0.1.5 12 + 13 + - align firehose event types with AT Protocol sync spec 14 + 15 + ## 0.1.4 16 + 17 + - firehose support: DAG-CBOR codec, CAR codec, CID creation, firehose client 18 + - encode and decode `com.atproto.sync.subscribeRepos` binary frames 19 + 20 + ## 0.1.3 21 + 22 + - jetstream WebSocket client with typed events, reconnection, and cursor tracking 23 + - `extractAt` ignores unknown JSON fields by default 24 + - HTTP I/O isolated behind `HttpTransport` for 0.16 prep 25 + - websocket dependency pinned to specific commit 26 + 3 27 ## 0.1.2 4 28 5 - - `extractAt` now logs diagnostic info on parse failures (enable with `.zat` debug scope) 29 + - `extractAt` logs diagnostic info on parse failures (enable with `.zat` debug scope) 6 30 7 31 ## 0.1.1 8 32
+30 -7
src/internal/firehose.zig
··· 23 23 pub const CommitAction = sync.CommitAction; 24 24 pub const AccountStatus = sync.AccountStatus; 25 25 26 + pub const default_hosts = [_][]const u8{ 27 + "bsky.network", 28 + "northamerica.firehose.network", 29 + "europe.firehose.network", 30 + "asia.firehose.network", 31 + }; 32 + 26 33 pub const Options = struct { 27 - host: []const u8 = "bsky.network", 34 + hosts: []const []const u8 = &default_hosts, 28 35 cursor: ?i64 = null, 29 36 max_message_size: usize = 5 * 1024 * 1024, // 5MB — firehose frames can be large 30 37 }; ··· 421 428 /// handler must implement: fn onEvent(*@TypeOf(handler), Event) void 422 429 /// optional: fn onError(*@TypeOf(handler), anyerror) void 423 430 /// blocks forever — reconnects with exponential backoff on disconnect. 431 + /// rotates through hosts on each reconnect attempt. 424 432 pub fn subscribe(self: *FirehoseClient, handler: anytype) void { 425 433 var backoff: u64 = 1; 434 + var host_index: usize = 0; 426 435 const max_backoff: u64 = 60; 436 + var prev_host_index: usize = 0; 427 437 428 438 while (true) { 429 - self.connectAndRead(handler) catch |err| { 439 + const host = self.options.hosts[host_index % self.options.hosts.len]; 440 + const effective_index = host_index % self.options.hosts.len; 441 + 442 + // reset backoff on host switch (fresh host deserves a fresh chance) 443 + if (host_index > 0 and effective_index != prev_host_index) { 444 + backoff = 1; 445 + } 446 + 447 + log.info("connecting to host {d}/{d}: {s}", .{ effective_index + 1, self.options.hosts.len, host }); 448 + 449 + self.connectAndRead(host, handler) catch |err| { 430 450 if (comptime @hasDecl(@TypeOf(handler.*), "onError")) { 431 451 handler.onError(err); 432 452 } else { 433 453 log.err("firehose error: {s}, reconnecting in {d}s...", .{ @errorName(err), backoff }); 434 454 } 435 455 }; 456 + 457 + prev_host_index = effective_index; 458 + host_index += 1; 436 459 posix.nanosleep(backoff, 0); 437 460 backoff = @min(backoff * 2, max_backoff); 438 461 } 439 462 } 440 463 441 - fn connectAndRead(self: *FirehoseClient, handler: anytype) !void { 464 + fn connectAndRead(self: *FirehoseClient, host: []const u8, handler: anytype) !void { 442 465 var path_buf: [256]u8 = undefined; 443 466 var stream = std.io.fixedBufferStream(&path_buf); 444 467 const writer = stream.writer(); ··· 449 472 } 450 473 const path = stream.getWritten(); 451 474 452 - log.info("connecting to wss://{s}{s}", .{ self.options.host, path }); 475 + log.info("connecting to wss://{s}{s}", .{ host, path }); 453 476 454 477 var client = try websocket.Client.init(self.allocator, .{ 455 - .host = self.options.host, 478 + .host = host, 456 479 .port = 443, 457 480 .tls = true, 458 481 .max_size = self.options.max_message_size, ··· 460 483 defer client.deinit(); 461 484 462 485 var host_header_buf: [256]u8 = undefined; 463 - const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{self.options.host}) catch self.options.host; 486 + const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{host}) catch host; 464 487 465 488 try client.handshake(path, .{ .headers = host_header }); 466 489 467 - log.info("firehose connected", .{}); 490 + log.info("firehose connected to {s}", .{host}); 468 491 469 492 var ws_handler = WsHandler(@TypeOf(handler.*)){ 470 493 .allocator = self.allocator,
+80 -7
src/internal/jetstream.zig
··· 19 19 pub const CommitAction = sync.CommitAction; 20 20 pub const AccountStatus = sync.AccountStatus; 21 21 22 + pub const default_hosts = [_][]const u8{ 23 + "jetstream1.us-east.bsky.network", 24 + "jetstream2.us-east.bsky.network", 25 + "jetstream1.us-west.bsky.network", 26 + "jetstream2.us-west.bsky.network", 27 + "jetstream.waow.tech", 28 + "jetstream.fire.hose.cam", 29 + "jet.firehose.stream", 30 + "sfo.firehose.stream", 31 + "nyc.firehose.stream", 32 + "london.firehose.stream", 33 + "frankfurt.firehose.stream", 34 + "chennai.firehose.stream", 35 + }; 36 + 22 37 pub const Options = struct { 23 - host: []const u8 = "jetstream2.us-east.bsky.network", 38 + hosts: []const []const u8 = &default_hosts, 24 39 wanted_collections: []const []const u8 = &.{}, 25 40 wanted_dids: []const []const u8 = &.{}, 26 41 cursor: ?i64 = null, ··· 133 148 /// handler must implement: fn onEvent(*@TypeOf(handler), Event) void 134 149 /// optional: fn onError(*@TypeOf(handler), anyerror) void 135 150 /// blocks forever — reconnects with exponential backoff on disconnect. 151 + /// rotates through hosts on each reconnect attempt. 136 152 pub fn subscribe(self: *JetstreamClient, handler: anytype) void { 137 153 var backoff: u64 = 1; 154 + var host_index: usize = 0; 138 155 const max_backoff: u64 = 60; 156 + var prev_host_index: usize = 0; 139 157 140 158 while (true) { 141 - self.connectAndRead(handler) catch |err| { 159 + const host = self.options.hosts[host_index % self.options.hosts.len]; 160 + const effective_index = host_index % self.options.hosts.len; 161 + 162 + // rewind cursor by 10s on host switch (different instances may lag) 163 + if (host_index > 0 and effective_index != prev_host_index) { 164 + if (self.last_time_us) |t| { 165 + self.last_time_us = t - 10_000_000; 166 + } 167 + backoff = 1; 168 + } 169 + 170 + log.info("connecting to host {d}/{d}: {s}", .{ effective_index + 1, self.options.hosts.len, host }); 171 + 172 + self.connectAndRead(host, handler) catch |err| { 142 173 if (comptime @hasDecl(@TypeOf(handler.*), "onError")) { 143 174 handler.onError(err); 144 175 } else { 145 176 log.err("jetstream error: {s}, reconnecting in {d}s...", .{ @errorName(err), backoff }); 146 177 } 147 178 }; 179 + 180 + prev_host_index = effective_index; 181 + host_index += 1; 148 182 posix.nanosleep(backoff, 0); 149 183 backoff = @min(backoff * 2, max_backoff); 150 184 } 151 185 } 152 186 153 - fn connectAndRead(self: *JetstreamClient, handler: anytype) !void { 187 + fn connectAndRead(self: *JetstreamClient, host: []const u8, handler: anytype) !void { 154 188 var path_buf: [2048]u8 = undefined; 155 189 const path = try self.buildSubscribePath(&path_buf); 156 190 157 - log.info("connecting to wss://{s}{s}", .{ self.options.host, path }); 191 + log.info("connecting to wss://{s}{s}", .{ host, path }); 158 192 159 193 var client = try websocket.Client.init(self.allocator, .{ 160 - .host = self.options.host, 194 + .host = host, 161 195 .port = 443, 162 196 .tls = true, 163 197 .max_size = self.options.max_message_size, ··· 165 199 defer client.deinit(); 166 200 167 201 var host_header_buf: [256]u8 = undefined; 168 - const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{self.options.host}) catch self.options.host; 202 + const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{host}) catch host; 169 203 170 204 try client.handshake(path, .{ .headers = host_header }); 171 205 172 - log.info("jetstream connected", .{}); 206 + log.info("jetstream connected to {s}", .{host}); 173 207 174 208 var ws_handler = WsHandler(@TypeOf(handler.*)){ 175 209 .allocator = self.allocator, ··· 496 530 497 531 try std.testing.expectError(error.MissingDid, parseEvent(arena.allocator(), payload)); 498 532 } 533 + 534 + test "default hosts contains known jetstream instances" { 535 + try std.testing.expectEqual(@as(usize, 12), default_hosts.len); 536 + try std.testing.expectEqualStrings("jetstream1.us-east.bsky.network", default_hosts[0]); 537 + try std.testing.expectEqualStrings("jetstream2.us-east.bsky.network", default_hosts[1]); 538 + try std.testing.expectEqualStrings("jetstream1.us-west.bsky.network", default_hosts[2]); 539 + try std.testing.expectEqualStrings("jetstream2.us-west.bsky.network", default_hosts[3]); 540 + try std.testing.expectEqualStrings("jetstream.waow.tech", default_hosts[4]); 541 + try std.testing.expectEqualStrings("jetstream.fire.hose.cam", default_hosts[5]); 542 + try std.testing.expectEqualStrings("jet.firehose.stream", default_hosts[6]); 543 + try std.testing.expectEqualStrings("chennai.firehose.stream", default_hosts[11]); 544 + } 545 + 546 + test "round-robin cycles through hosts" { 547 + const hosts = [_][]const u8{ "host-a", "host-b", "host-c" }; 548 + // simulate the index logic from subscribe() 549 + for (0..9) |i| { 550 + const host = hosts[i % hosts.len]; 551 + const expected: []const u8 = switch (i % 3) { 552 + 0 => "host-a", 553 + 1 => "host-b", 554 + 2 => "host-c", 555 + else => unreachable, 556 + }; 557 + try std.testing.expectEqualStrings(expected, host); 558 + } 559 + } 560 + 561 + test "options default hosts are used" { 562 + const opts = Options{}; 563 + try std.testing.expectEqual(@as(usize, 12), opts.hosts.len); 564 + try std.testing.expectEqualStrings("jetstream1.us-east.bsky.network", opts.hosts[0]); 565 + } 566 + 567 + test "options custom single host" { 568 + const opts = Options{ .hosts = &.{"my-custom-host.example.com"} }; 569 + try std.testing.expectEqual(@as(usize, 1), opts.hosts.len); 570 + try std.testing.expectEqualStrings("my-custom-host.example.com", opts.hosts[0]); 571 + }