atproto utils for zig
zat.dev
atproto
sdk
zig
1//! jetstream client - AT Protocol event stream via WebSocket
2//!
3//! typed, reconnecting client for the Bluesky Jetstream service.
4//! parses commit, identity, and account events into typed structs.
5//!
6//! see: https://github.com/bluesky-social/jetstream
7
8const std = @import("std");
9const websocket = @import("websocket");
10const json_helpers = @import("../xrpc/json.zig");
11const sync = @import("sync.zig");
12
13const mem = std.mem;
14const json = std.json;
15const posix = std.posix;
16const Allocator = mem.Allocator;
17const Io = std.Io;
18const log = std.log.scoped(.zat);
19
20pub const CommitAction = sync.CommitAction;
21pub const AccountStatus = sync.AccountStatus;
22
23pub const default_hosts = [_][]const u8{
24 "jetstream1.us-east.bsky.network",
25 "jetstream2.us-east.bsky.network",
26 "jetstream1.us-west.bsky.network",
27 "jetstream2.us-west.bsky.network",
28 "jetstream.waow.tech",
29 "jetstream.fire.hose.cam",
30 "jet.firehose.stream",
31 "sfo.firehose.stream",
32 "nyc.firehose.stream",
33 "london.firehose.stream",
34 "frankfurt.firehose.stream",
35 "chennai.firehose.stream",
36};
37
38pub const Options = struct {
39 hosts: []const []const u8 = &default_hosts,
40 wanted_collections: []const []const u8 = &.{},
41 wanted_dids: []const []const u8 = &.{},
42 cursor: ?i64 = null,
43 max_message_size: usize = 1024 * 1024,
44};
45
46pub const Event = union(enum) {
47 commit: CommitEvent,
48 identity: IdentityEvent,
49 account: AccountEvent,
50
51 pub fn timeUs(self: Event) i64 {
52 return switch (self) {
53 inline else => |e| e.time_us,
54 };
55 }
56};
57
58pub const CommitEvent = struct {
59 did: []const u8,
60 time_us: i64,
61 rev: ?[]const u8 = null,
62 operation: CommitAction,
63 collection: []const u8,
64 rkey: []const u8,
65 record: ?json.Value = null,
66 cid: ?[]const u8 = null,
67};
68
69pub const IdentityEvent = struct {
70 did: []const u8,
71 time_us: i64,
72 handle: ?[]const u8 = null,
73 seq: ?i64 = null,
74 time: ?[]const u8 = null,
75};
76
77pub const AccountEvent = struct {
78 did: []const u8,
79 time_us: i64,
80 active: bool,
81 status: ?AccountStatus = null,
82 seq: ?i64 = null,
83 time: ?[]const u8 = null,
84};
85
86/// parse a raw JSON payload into a typed Event.
87/// allocator is used for JSON structural data (ObjectMaps for record fields).
88/// string slices in the returned Event reference the source `payload` bytes.
89/// keep both `payload` and allocator-owned memory alive while using the Event.
90pub fn parseEvent(allocator: Allocator, payload: []const u8) !Event {
91 const parsed = try json.parseFromSlice(json.Value, allocator, payload, .{});
92 const root = parsed.value;
93
94 const kind_str = json_helpers.getString(root, "kind") orelse return error.MissingKind;
95 const did = json_helpers.getString(root, "did") orelse return error.MissingDid;
96 const time_us = json_helpers.getInt(root, "time_us") orelse return error.MissingTimeUs;
97
98 if (mem.eql(u8, kind_str, "commit")) {
99 const op_str = json_helpers.getString(root, "commit.operation") orelse return error.MissingOperation;
100 return .{ .commit = .{
101 .did = did,
102 .time_us = time_us,
103 .operation = CommitAction.parse(op_str) orelse return error.UnknownOperation,
104 .collection = json_helpers.getString(root, "commit.collection") orelse return error.MissingCollection,
105 .rkey = json_helpers.getString(root, "commit.rkey") orelse return error.MissingRkey,
106 .rev = json_helpers.getString(root, "commit.rev"),
107 .cid = json_helpers.getString(root, "commit.cid"),
108 .record = json_helpers.getPath(root, "commit.record"),
109 } };
110 } else if (mem.eql(u8, kind_str, "identity")) {
111 return .{ .identity = .{
112 .did = did,
113 .time_us = time_us,
114 .handle = json_helpers.getString(root, "identity.handle"),
115 .seq = json_helpers.getInt(root, "identity.seq"),
116 .time = json_helpers.getString(root, "identity.time"),
117 } };
118 } else if (mem.eql(u8, kind_str, "account")) {
119 const status_str = json_helpers.getString(root, "account.status");
120 return .{ .account = .{
121 .did = did,
122 .time_us = time_us,
123 .active = json_helpers.getBool(root, "account.active") orelse true,
124 .status = if (status_str) |s| AccountStatus.parse(s) else null,
125 .seq = json_helpers.getInt(root, "account.seq"),
126 .time = json_helpers.getString(root, "account.time"),
127 } };
128 }
129
130 return error.UnknownKind;
131}
132
133pub const JetstreamClient = struct {
134 io: Io,
135 allocator: Allocator,
136 options: Options,
137 last_time_us: ?i64 = null,
138
139 pub fn init(io: Io, allocator: Allocator, options: Options) JetstreamClient {
140 return .{
141 .io = io,
142 .allocator = allocator,
143 .options = options,
144 .last_time_us = options.cursor,
145 };
146 }
147
148 pub fn deinit(_: *JetstreamClient) void {}
149
150 /// subscribe with a user-provided handler.
151 /// handler must implement: fn onEvent(*@TypeOf(handler), Event) void
152 /// optional: fn onError(*@TypeOf(handler), anyerror) void
153 /// optional: fn onConnect(*@TypeOf(handler), []const u8) void — called with host on connect
154 /// blocks forever — reconnects with exponential backoff on disconnect.
155 /// rotates through hosts on each reconnect attempt.
156 pub fn subscribe(self: *JetstreamClient, handler: anytype) Io.Cancelable!void {
157 var backoff: u64 = 1;
158 var host_index: usize = 0;
159 const max_backoff: u64 = 60;
160 var prev_host_index: usize = 0;
161
162 while (true) {
163 const host = self.options.hosts[host_index % self.options.hosts.len];
164 const effective_index = host_index % self.options.hosts.len;
165
166 // rewind cursor by 10s on host switch (different instances may lag)
167 if (host_index > 0 and effective_index != prev_host_index) {
168 if (self.last_time_us) |t| {
169 self.last_time_us = t - 10_000_000;
170 }
171 backoff = 1;
172 }
173
174 log.info("connecting to host {d}/{d}: {s}", .{ effective_index + 1, self.options.hosts.len, host });
175
176 self.connectAndRead(host, handler) catch |err| {
177 if (comptime @hasDecl(@TypeOf(handler.*), "onError")) {
178 handler.onError(err);
179 } else {
180 log.err("jetstream error: {s}, reconnecting in {d}s...", .{ @errorName(err), backoff });
181 }
182 };
183
184 prev_host_index = effective_index;
185 host_index += 1;
186 try self.io.sleep(Io.Duration.fromSeconds(@intCast(backoff)), .awake);
187 backoff = @min(backoff * 2, max_backoff);
188 }
189 }
190
191 fn connectAndRead(self: *JetstreamClient, host: []const u8, handler: anytype) !void {
192 var path_buf: [2048]u8 = undefined;
193 const path = try self.buildSubscribePath(&path_buf);
194
195 log.info("connecting to wss://{s}{s}", .{ host, path });
196
197 var client = try websocket.Client.init(self.io, self.allocator, .{
198 .host = host,
199 .port = 443,
200 .tls = true,
201 .max_size = self.options.max_message_size,
202 });
203 defer client.deinit();
204
205 var host_header_buf: [256]u8 = undefined;
206 const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{host}) catch host;
207
208 try client.handshake(path, .{ .headers = host_header });
209 configureKeepalive(&client);
210
211 log.info("jetstream connected to {s}", .{host});
212
213 if (comptime @hasDecl(@TypeOf(handler.*), "onConnect")) {
214 handler.onConnect(host);
215 }
216
217 var ws_handler = WsHandler(@TypeOf(handler.*)){
218 .allocator = self.allocator,
219 .handler = handler,
220 .client_state = self,
221 };
222 try client.readLoop(&ws_handler);
223 }
224
225 fn buildSubscribePath(self: *JetstreamClient, buf: *[2048]u8) ![]const u8 {
226 var w: std.Io.Writer = .fixed(buf);
227
228 try w.writeAll("/subscribe");
229
230 var has_param = false;
231
232 for (self.options.wanted_collections) |col| {
233 try w.writeByte(if (!has_param) '?' else '&');
234 try w.writeAll("wantedCollections=");
235 try w.writeAll(col);
236 has_param = true;
237 }
238
239 for (self.options.wanted_dids) |did| {
240 try w.writeByte(if (!has_param) '?' else '&');
241 try w.writeAll("wantedDids=");
242 try w.writeAll(did);
243 has_param = true;
244 }
245
246 if (self.last_time_us) |cursor| {
247 try w.writeByte(if (!has_param) '?' else '&');
248 try w.print("cursor={d}", .{cursor});
249 }
250
251 return w.buffered();
252 }
253};
254
255fn WsHandler(comptime H: type) type {
256 return struct {
257 allocator: Allocator,
258 handler: *H,
259 client_state: *JetstreamClient,
260
261 const Self = @This();
262
263 pub fn serverMessage(self: *Self, data: []const u8) !void {
264 var arena = std.heap.ArenaAllocator.init(self.allocator);
265 defer arena.deinit();
266
267 const event = parseEvent(arena.allocator(), data) catch |err| {
268 log.debug("message parse error: {s}", .{@errorName(err)});
269 return;
270 };
271
272 self.client_state.last_time_us = event.timeUs();
273 self.handler.onEvent(event);
274 }
275
276 pub fn close(_: *Self) void {
277 log.info("jetstream connection closed", .{});
278 }
279 };
280}
281
282/// enable TCP keepalive so reads don't block forever when a peer
283/// disappears without FIN/RST (network partition, crash, power loss).
284/// detection time: 10s idle + 5s × 2 probes = 20s.
285fn configureKeepalive(client: *websocket.Client) void {
286 const fd = client.stream.stream.socket.handle;
287 const builtin = @import("builtin");
288 posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.KEEPALIVE, &std.mem.toBytes(@as(i32, 1))) catch return;
289 const tcp: i32 = @intCast(posix.IPPROTO.TCP);
290 if (builtin.os.tag == .linux) {
291 posix.setsockopt(fd, tcp, posix.TCP.KEEPIDLE, &std.mem.toBytes(@as(i32, 10))) catch return;
292 } else if (builtin.os.tag == .macos) {
293 posix.setsockopt(fd, tcp, posix.TCP.KEEPALIVE, &std.mem.toBytes(@as(i32, 10))) catch return;
294 }
295 posix.setsockopt(fd, tcp, posix.TCP.KEEPINTVL, &std.mem.toBytes(@as(i32, 5))) catch return;
296 posix.setsockopt(fd, tcp, posix.TCP.KEEPCNT, &std.mem.toBytes(@as(i32, 2))) catch return;
297}
298
299// === tests ===
300
301test "parse commit event" {
302 const payload =
303 \\{
304 \\ "did": "did:plc:abc123",
305 \\ "time_us": 1700000000000,
306 \\ "kind": "commit",
307 \\ "commit": {
308 \\ "rev": "3mbspmpaidl2a",
309 \\ "operation": "create",
310 \\ "collection": "app.bsky.feed.post",
311 \\ "rkey": "xyz789",
312 \\ "cid": "bafyreitest",
313 \\ "record": {
314 \\ "text": "hello world",
315 \\ "$type": "app.bsky.feed.post"
316 \\ }
317 \\ }
318 \\}
319 ;
320
321 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
322 defer arena.deinit();
323
324 const event = try parseEvent(arena.allocator(), payload);
325 const commit = event.commit;
326
327 try std.testing.expectEqualStrings("did:plc:abc123", commit.did);
328 try std.testing.expectEqual(@as(i64, 1700000000000), commit.time_us);
329 try std.testing.expectEqualStrings("3mbspmpaidl2a", commit.rev.?);
330 try std.testing.expectEqual(CommitAction.create, commit.operation);
331 try std.testing.expectEqualStrings("app.bsky.feed.post", commit.collection);
332 try std.testing.expectEqualStrings("xyz789", commit.rkey);
333 try std.testing.expectEqualStrings("bafyreitest", commit.cid.?);
334 try std.testing.expect(commit.record != null);
335 try std.testing.expectEqualStrings("hello world", json_helpers.getString(commit.record.?, "text").?);
336}
337
338test "parse identity event" {
339 const payload =
340 \\{
341 \\ "did": "did:plc:abc123",
342 \\ "time_us": 1700000000000,
343 \\ "kind": "identity",
344 \\ "identity": {
345 \\ "handle": "alice.bsky.social",
346 \\ "seq": 42,
347 \\ "time": "2024-01-01T00:00:00Z"
348 \\ }
349 \\}
350 ;
351
352 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
353 defer arena.deinit();
354
355 const event = try parseEvent(arena.allocator(), payload);
356 const identity = event.identity;
357
358 try std.testing.expectEqualStrings("did:plc:abc123", identity.did);
359 try std.testing.expectEqual(@as(i64, 1700000000000), identity.time_us);
360 try std.testing.expectEqualStrings("alice.bsky.social", identity.handle.?);
361 try std.testing.expectEqual(@as(i64, 42), identity.seq.?);
362 try std.testing.expectEqualStrings("2024-01-01T00:00:00Z", identity.time.?);
363}
364
365test "parse account event" {
366 const payload =
367 \\{
368 \\ "did": "did:plc:abc123",
369 \\ "time_us": 1700000000000,
370 \\ "kind": "account",
371 \\ "account": {
372 \\ "active": false,
373 \\ "status": "suspended",
374 \\ "seq": 99,
375 \\ "time": "2024-01-01T00:00:00Z"
376 \\ }
377 \\}
378 ;
379
380 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
381 defer arena.deinit();
382
383 const event = try parseEvent(arena.allocator(), payload);
384 const account = event.account;
385
386 try std.testing.expectEqualStrings("did:plc:abc123", account.did);
387 try std.testing.expectEqual(@as(i64, 1700000000000), account.time_us);
388 try std.testing.expectEqual(false, account.active);
389 try std.testing.expectEqual(AccountStatus.suspended, account.status.?);
390 try std.testing.expectEqual(@as(i64, 99), account.seq.?);
391 try std.testing.expectEqualStrings("2024-01-01T00:00:00Z", account.time.?);
392}
393
394test "parse unknown kind returns error" {
395 const payload =
396 \\{
397 \\ "did": "did:plc:abc123",
398 \\ "time_us": 1700000000000,
399 \\ "kind": "unknown_kind"
400 \\}
401 ;
402
403 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
404 defer arena.deinit();
405
406 try std.testing.expectError(error.UnknownKind, parseEvent(arena.allocator(), payload));
407}
408
409test "parse commit with unknown operation returns error" {
410 const payload =
411 \\{
412 \\ "did": "did:plc:abc123",
413 \\ "time_us": 1700000000000,
414 \\ "kind": "commit",
415 \\ "commit": {
416 \\ "operation": "archive",
417 \\ "collection": "app.bsky.feed.post",
418 \\ "rkey": "xyz789"
419 \\ }
420 \\}
421 ;
422
423 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
424 defer arena.deinit();
425
426 try std.testing.expectError(error.UnknownOperation, parseEvent(arena.allocator(), payload));
427}
428
429test "cursor tracking via time_us" {
430 const payloads = [_][]const u8{
431 \\{"did":"did:plc:a","time_us":100,"kind":"commit","commit":{"operation":"create","collection":"app.bsky.feed.post","rkey":"1"}}
432 ,
433 \\{"did":"did:plc:b","time_us":200,"kind":"commit","commit":{"operation":"create","collection":"app.bsky.feed.post","rkey":"2"}}
434 ,
435 };
436
437 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
438 defer arena.deinit();
439
440 const e1 = try parseEvent(arena.allocator(), payloads[0]);
441 const e2 = try parseEvent(arena.allocator(), payloads[1]);
442
443 try std.testing.expect(e1.timeUs() > 0);
444 try std.testing.expect(e2.timeUs() > e1.timeUs());
445}
446
447test "Event.timeUs works for all variants" {
448 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
449 defer arena.deinit();
450
451 const commit = try parseEvent(arena.allocator(),
452 \\{"did":"did:plc:a","time_us":100,"kind":"commit","commit":{"operation":"create","collection":"x","rkey":"1"}}
453 );
454 const identity = try parseEvent(arena.allocator(),
455 \\{"did":"did:plc:a","time_us":200,"kind":"identity","identity":{}}
456 );
457 const account = try parseEvent(arena.allocator(),
458 \\{"did":"did:plc:a","time_us":300,"kind":"account","account":{"active":true}}
459 );
460
461 try std.testing.expectEqual(@as(i64, 100), commit.timeUs());
462 try std.testing.expectEqual(@as(i64, 200), identity.timeUs());
463 try std.testing.expectEqual(@as(i64, 300), account.timeUs());
464}
465
466test "build subscribe path" {
467 var client = JetstreamClient.init(std.Options.debug_io, std.testing.allocator, .{
468 .wanted_collections = &.{"app.bsky.feed.post"},
469 });
470
471 var buf: [2048]u8 = undefined;
472 const path = try client.buildSubscribePath(&buf);
473 try std.testing.expectEqualStrings("/subscribe?wantedCollections=app.bsky.feed.post", path);
474}
475
476test "build subscribe path with multiple params" {
477 var client = JetstreamClient.init(std.Options.debug_io, std.testing.allocator, .{
478 .wanted_collections = &.{ "app.bsky.feed.post", "app.bsky.feed.like" },
479 .wanted_dids = &.{"did:plc:abc123"},
480 .cursor = 1700000000000,
481 });
482
483 var buf: [2048]u8 = undefined;
484 const path = try client.buildSubscribePath(&buf);
485 try std.testing.expectEqualStrings(
486 "/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.like&wantedDids=did:plc:abc123&cursor=1700000000000",
487 path,
488 );
489}
490
491test "build subscribe path no params" {
492 var client = JetstreamClient.init(std.Options.debug_io, std.testing.allocator, .{});
493
494 var buf: [2048]u8 = undefined;
495 const path = try client.buildSubscribePath(&buf);
496 try std.testing.expectEqualStrings("/subscribe", path);
497}
498
499test "parse commit event with delete operation" {
500 const payload =
501 \\{
502 \\ "did": "did:plc:abc123",
503 \\ "time_us": 1700000000000,
504 \\ "kind": "commit",
505 \\ "commit": {
506 \\ "operation": "delete",
507 \\ "collection": "app.bsky.feed.post",
508 \\ "rkey": "xyz789"
509 \\ }
510 \\}
511 ;
512
513 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
514 defer arena.deinit();
515
516 const commit = (try parseEvent(arena.allocator(), payload)).commit;
517
518 try std.testing.expectEqual(CommitAction.delete, commit.operation);
519 try std.testing.expect(commit.record == null);
520 try std.testing.expect(commit.rev == null);
521 try std.testing.expect(commit.cid == null);
522}
523
524test "parse identity event with minimal fields" {
525 const payload =
526 \\{
527 \\ "did": "did:plc:abc123",
528 \\ "time_us": 1700000000000,
529 \\ "kind": "identity",
530 \\ "identity": {}
531 \\}
532 ;
533
534 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
535 defer arena.deinit();
536
537 const identity = (try parseEvent(arena.allocator(), payload)).identity;
538
539 try std.testing.expectEqualStrings("did:plc:abc123", identity.did);
540 try std.testing.expect(identity.handle == null);
541 try std.testing.expect(identity.seq == null);
542 try std.testing.expect(identity.time == null);
543}
544
545test "parse missing did returns error" {
546 const payload =
547 \\{
548 \\ "time_us": 1700000000000,
549 \\ "kind": "commit"
550 \\}
551 ;
552
553 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
554 defer arena.deinit();
555
556 try std.testing.expectError(error.MissingDid, parseEvent(arena.allocator(), payload));
557}
558
559test "default hosts contains known jetstream instances" {
560 try std.testing.expectEqual(@as(usize, 12), default_hosts.len);
561 try std.testing.expectEqualStrings("jetstream1.us-east.bsky.network", default_hosts[0]);
562 try std.testing.expectEqualStrings("jetstream2.us-east.bsky.network", default_hosts[1]);
563 try std.testing.expectEqualStrings("jetstream1.us-west.bsky.network", default_hosts[2]);
564 try std.testing.expectEqualStrings("jetstream2.us-west.bsky.network", default_hosts[3]);
565 try std.testing.expectEqualStrings("jetstream.waow.tech", default_hosts[4]);
566 try std.testing.expectEqualStrings("jetstream.fire.hose.cam", default_hosts[5]);
567 try std.testing.expectEqualStrings("jet.firehose.stream", default_hosts[6]);
568 try std.testing.expectEqualStrings("chennai.firehose.stream", default_hosts[11]);
569}
570
571test "round-robin cycles through hosts" {
572 const hosts = [_][]const u8{ "host-a", "host-b", "host-c" };
573 // simulate the index logic from subscribe()
574 for (0..9) |i| {
575 const host = hosts[i % hosts.len];
576 const expected: []const u8 = switch (i % 3) {
577 0 => "host-a",
578 1 => "host-b",
579 2 => "host-c",
580 else => unreachable,
581 };
582 try std.testing.expectEqualStrings(expected, host);
583 }
584}
585
586test "options default hosts are used" {
587 const opts = Options{};
588 try std.testing.expectEqual(@as(usize, 12), opts.hosts.len);
589 try std.testing.expectEqualStrings("jetstream1.us-east.bsky.network", opts.hosts[0]);
590}
591
592test "options custom single host" {
593 const opts = Options{ .hosts = &.{"my-custom-host.example.com"} };
594 try std.testing.expectEqual(@as(usize, 1), opts.hosts.len);
595 try std.testing.expectEqualStrings("my-custom-host.example.com", opts.hosts[0]);
596}