atproto utils for zig zat.dev
atproto sdk zig
26
fork

Configure Feed

Select the types of activity you want to include in your feed.

thread io: std.Io through streaming clients, fix firehose ws bug

- add `io: std.Io` field to JetstreamClient and FirehoseClient
- subscribe() returns Io.Cancelable!void (enables async cancellation)
- replace libc.nanosleep() with io.sleep() for reconnect backoff
- pass caller-provided io to websocket.Client.init() instead of debug_io
- fix bug: firehose connectAndRead() was missing required `io` param
- bump version to 0.3.0-alpha.6

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+20 -16
+1 -1
build.zig.zon
··· 1 1 .{ 2 2 .name = .zat, 3 - .version = "0.3.0-alpha.5", 3 + .version = "0.3.0-alpha.6", 4 4 .fingerprint = 0x8da9db57ee82fbe4, 5 5 .minimum_zig_version = "0.16.0", 6 6 .dependencies = .{
+2 -2
scripts/jetstream_smoke.zig
··· 9 9 std.debug.print("smoke test starting\n", .{}); 10 10 11 11 var handler = Handler{}; 12 - var client = zat.JetstreamClient.init(allocator, .{ 12 + var client = zat.JetstreamClient.init(std.Options.debug_io, allocator, .{ 13 13 .hosts = &.{"jetstream2.us-east.bsky.network"}, 14 14 .wanted_collections = &.{"app.bsky.feed.post"}, 15 15 }); 16 - client.subscribe(&handler); 16 + try client.subscribe(&handler); 17 17 } 18 18 19 19 const Handler = struct {
+7 -5
src/internal/streaming/firehose.zig
··· 18 18 const mem = std.mem; 19 19 const Allocator = mem.Allocator; 20 20 const posix = std.posix; 21 - const libc = std.c; 21 + const Io = std.Io; 22 22 const log = std.log.scoped(.zat); 23 23 24 24 pub const CommitAction = sync.CommitAction; ··· 410 410 } 411 411 412 412 pub const FirehoseClient = struct { 413 + io: Io, 413 414 allocator: Allocator, 414 415 options: Options, 415 416 last_seq: ?i64 = null, 416 417 417 - pub fn init(allocator: Allocator, options: Options) FirehoseClient { 418 + pub fn init(io: Io, allocator: Allocator, options: Options) FirehoseClient { 418 419 return .{ 420 + .io = io, 419 421 .allocator = allocator, 420 422 .options = options, 421 423 .last_seq = if (options.cursor) |c| c else null, ··· 429 431 /// optional: fn onError(*@TypeOf(handler), anyerror) void 430 432 /// blocks forever — reconnects with exponential backoff on disconnect. 431 433 /// rotates through hosts on each reconnect attempt. 432 - pub fn subscribe(self: *FirehoseClient, handler: anytype) void { 434 + pub fn subscribe(self: *FirehoseClient, handler: anytype) Io.Cancelable!void { 433 435 var backoff: u64 = 1; 434 436 var host_index: usize = 0; 435 437 const max_backoff: u64 = 60; ··· 456 458 457 459 prev_host_index = effective_index; 458 460 host_index += 1; 459 - _ = libc.nanosleep(&.{ .sec = @intCast(backoff), .nsec = 0 }, null); 461 + try self.io.sleep(Io.Duration.fromSeconds(@intCast(backoff)), .awake); 460 462 backoff = @min(backoff * 2, max_backoff); 461 463 } 462 464 } ··· 473 475 474 476 log.info("connecting to wss://{s}{s}", .{ host, path }); 475 477 476 - var client = try websocket.Client.init(self.allocator, .{ 478 + var client = try websocket.Client.init(self.io, self.allocator, .{ 477 479 .host = host, 478 480 .port = 443, 479 481 .tls = true,
+10 -8
src/internal/streaming/jetstream.zig
··· 13 13 const mem = std.mem; 14 14 const json = std.json; 15 15 const posix = std.posix; 16 - const libc = std.c; 17 16 const Allocator = mem.Allocator; 17 + const Io = std.Io; 18 18 const log = std.log.scoped(.zat); 19 19 20 20 pub const CommitAction = sync.CommitAction; ··· 131 131 } 132 132 133 133 pub const JetstreamClient = struct { 134 + io: Io, 134 135 allocator: Allocator, 135 136 options: Options, 136 137 last_time_us: ?i64 = null, 137 138 138 - pub fn init(allocator: Allocator, options: Options) JetstreamClient { 139 + pub fn init(io: Io, allocator: Allocator, options: Options) JetstreamClient { 139 140 return .{ 141 + .io = io, 140 142 .allocator = allocator, 141 143 .options = options, 142 144 .last_time_us = options.cursor, ··· 151 153 /// optional: fn onConnect(*@TypeOf(handler), []const u8) void — called with host on connect 152 154 /// blocks forever — reconnects with exponential backoff on disconnect. 153 155 /// rotates through hosts on each reconnect attempt. 154 - pub fn subscribe(self: *JetstreamClient, handler: anytype) void { 156 + pub fn subscribe(self: *JetstreamClient, handler: anytype) Io.Cancelable!void { 155 157 var backoff: u64 = 1; 156 158 var host_index: usize = 0; 157 159 const max_backoff: u64 = 60; ··· 181 183 182 184 prev_host_index = effective_index; 183 185 host_index += 1; 184 - _ = libc.nanosleep(&.{ .sec = @intCast(backoff), .nsec = 0 }, null); 186 + try self.io.sleep(Io.Duration.fromSeconds(@intCast(backoff)), .awake); 185 187 backoff = @min(backoff * 2, max_backoff); 186 188 } 187 189 } ··· 192 194 193 195 log.info("connecting to wss://{s}{s}", .{ host, path }); 194 196 195 - var client = try websocket.Client.init(std.Options.debug_io, self.allocator, .{ 197 + var client = try websocket.Client.init(self.io, self.allocator, .{ 196 198 .host = host, 197 199 .port = 443, 198 200 .tls = true, ··· 462 464 } 463 465 464 466 test "build subscribe path" { 465 - var client = JetstreamClient.init(std.testing.allocator, .{ 467 + var client = JetstreamClient.init(std.Options.debug_io, std.testing.allocator, .{ 466 468 .wanted_collections = &.{"app.bsky.feed.post"}, 467 469 }); 468 470 ··· 472 474 } 473 475 474 476 test "build subscribe path with multiple params" { 475 - var client = JetstreamClient.init(std.testing.allocator, .{ 477 + var client = JetstreamClient.init(std.Options.debug_io, std.testing.allocator, .{ 476 478 .wanted_collections = &.{ "app.bsky.feed.post", "app.bsky.feed.like" }, 477 479 .wanted_dids = &.{"did:plc:abc123"}, 478 480 .cursor = 1700000000000, ··· 487 489 } 488 490 489 491 test "build subscribe path no params" { 490 - var client = JetstreamClient.init(std.testing.allocator, .{}); 492 + var client = JetstreamClient.init(std.Options.debug_io, std.testing.allocator, .{}); 491 493 492 494 var buf: [2048]u8 = undefined; 493 495 const path = try client.buildSubscribePath(&buf);