atproto utils for zig zat.dev
atproto sdk zig
26
fork

Configure Feed

Select the types of activity you want to include in your feed.

at codex/xrpc-errors-retry 596 lines 21 kB view raw
1//! jetstream client - AT Protocol event stream via WebSocket 2//! 3//! typed, reconnecting client for the Bluesky Jetstream service. 4//! parses commit, identity, and account events into typed structs. 5//! 6//! see: https://github.com/bluesky-social/jetstream 7 8const std = @import("std"); 9const websocket = @import("websocket"); 10const json_helpers = @import("../xrpc/json.zig"); 11const sync = @import("sync.zig"); 12 13const mem = std.mem; 14const json = std.json; 15const posix = std.posix; 16const Allocator = mem.Allocator; 17const Io = std.Io; 18const log = std.log.scoped(.zat); 19 20pub const CommitAction = sync.CommitAction; 21pub const AccountStatus = sync.AccountStatus; 22 23pub const default_hosts = [_][]const u8{ 24 "jetstream1.us-east.bsky.network", 25 "jetstream2.us-east.bsky.network", 26 "jetstream1.us-west.bsky.network", 27 "jetstream2.us-west.bsky.network", 28 "jetstream.waow.tech", 29 "jetstream.fire.hose.cam", 30 "jet.firehose.stream", 31 "sfo.firehose.stream", 32 "nyc.firehose.stream", 33 "london.firehose.stream", 34 "frankfurt.firehose.stream", 35 "chennai.firehose.stream", 36}; 37 38pub const Options = struct { 39 hosts: []const []const u8 = &default_hosts, 40 wanted_collections: []const []const u8 = &.{}, 41 wanted_dids: []const []const u8 = &.{}, 42 cursor: ?i64 = null, 43 max_message_size: usize = 1024 * 1024, 44}; 45 46pub const Event = union(enum) { 47 commit: CommitEvent, 48 identity: IdentityEvent, 49 account: AccountEvent, 50 51 pub fn timeUs(self: Event) i64 { 52 return switch (self) { 53 inline else => |e| e.time_us, 54 }; 55 } 56}; 57 58pub const CommitEvent = struct { 59 did: []const u8, 60 time_us: i64, 61 rev: ?[]const u8 = null, 62 operation: CommitAction, 63 collection: []const u8, 64 rkey: []const u8, 65 record: ?json.Value = null, 66 cid: ?[]const u8 = null, 67}; 68 69pub const IdentityEvent = struct { 70 did: []const u8, 71 time_us: i64, 72 handle: ?[]const u8 = null, 73 seq: ?i64 = null, 74 time: ?[]const u8 = null, 75}; 76 77pub const AccountEvent = struct { 78 did: []const u8, 79 time_us: i64, 80 active: bool, 81 status: ?AccountStatus = null, 82 seq: ?i64 = null, 83 time: ?[]const u8 = null, 84}; 85 86/// parse a raw JSON payload into a typed Event. 87/// allocator is used for JSON structural data (ObjectMaps for record fields). 88/// string slices in the returned Event reference the source `payload` bytes. 89/// keep both `payload` and allocator-owned memory alive while using the Event. 90pub fn parseEvent(allocator: Allocator, payload: []const u8) !Event { 91 const parsed = try json.parseFromSlice(json.Value, allocator, payload, .{}); 92 const root = parsed.value; 93 94 const kind_str = json_helpers.getString(root, "kind") orelse return error.MissingKind; 95 const did = json_helpers.getString(root, "did") orelse return error.MissingDid; 96 const time_us = json_helpers.getInt(root, "time_us") orelse return error.MissingTimeUs; 97 98 if (mem.eql(u8, kind_str, "commit")) { 99 const op_str = json_helpers.getString(root, "commit.operation") orelse return error.MissingOperation; 100 return .{ .commit = .{ 101 .did = did, 102 .time_us = time_us, 103 .operation = CommitAction.parse(op_str) orelse return error.UnknownOperation, 104 .collection = json_helpers.getString(root, "commit.collection") orelse return error.MissingCollection, 105 .rkey = json_helpers.getString(root, "commit.rkey") orelse return error.MissingRkey, 106 .rev = json_helpers.getString(root, "commit.rev"), 107 .cid = json_helpers.getString(root, "commit.cid"), 108 .record = json_helpers.getPath(root, "commit.record"), 109 } }; 110 } else if (mem.eql(u8, kind_str, "identity")) { 111 return .{ .identity = .{ 112 .did = did, 113 .time_us = time_us, 114 .handle = json_helpers.getString(root, "identity.handle"), 115 .seq = json_helpers.getInt(root, "identity.seq"), 116 .time = json_helpers.getString(root, "identity.time"), 117 } }; 118 } else if (mem.eql(u8, kind_str, "account")) { 119 const status_str = json_helpers.getString(root, "account.status"); 120 return .{ .account = .{ 121 .did = did, 122 .time_us = time_us, 123 .active = json_helpers.getBool(root, "account.active") orelse true, 124 .status = if (status_str) |s| AccountStatus.parse(s) else null, 125 .seq = json_helpers.getInt(root, "account.seq"), 126 .time = json_helpers.getString(root, "account.time"), 127 } }; 128 } 129 130 return error.UnknownKind; 131} 132 133pub const JetstreamClient = struct { 134 io: Io, 135 allocator: Allocator, 136 options: Options, 137 last_time_us: ?i64 = null, 138 139 pub fn init(io: Io, allocator: Allocator, options: Options) JetstreamClient { 140 return .{ 141 .io = io, 142 .allocator = allocator, 143 .options = options, 144 .last_time_us = options.cursor, 145 }; 146 } 147 148 pub fn deinit(_: *JetstreamClient) void {} 149 150 /// subscribe with a user-provided handler. 151 /// handler must implement: fn onEvent(*@TypeOf(handler), Event) void 152 /// optional: fn onError(*@TypeOf(handler), anyerror) void 153 /// optional: fn onConnect(*@TypeOf(handler), []const u8) void — called with host on connect 154 /// blocks forever — reconnects with exponential backoff on disconnect. 155 /// rotates through hosts on each reconnect attempt. 156 pub fn subscribe(self: *JetstreamClient, handler: anytype) Io.Cancelable!void { 157 var backoff: u64 = 1; 158 var host_index: usize = 0; 159 const max_backoff: u64 = 60; 160 var prev_host_index: usize = 0; 161 162 while (true) { 163 const host = self.options.hosts[host_index % self.options.hosts.len]; 164 const effective_index = host_index % self.options.hosts.len; 165 166 // rewind cursor by 10s on host switch (different instances may lag) 167 if (host_index > 0 and effective_index != prev_host_index) { 168 if (self.last_time_us) |t| { 169 self.last_time_us = t - 10_000_000; 170 } 171 backoff = 1; 172 } 173 174 log.info("connecting to host {d}/{d}: {s}", .{ effective_index + 1, self.options.hosts.len, host }); 175 176 self.connectAndRead(host, handler) catch |err| { 177 if (comptime @hasDecl(@TypeOf(handler.*), "onError")) { 178 handler.onError(err); 179 } else { 180 log.err("jetstream error: {s}, reconnecting in {d}s...", .{ @errorName(err), backoff }); 181 } 182 }; 183 184 prev_host_index = effective_index; 185 host_index += 1; 186 try self.io.sleep(Io.Duration.fromSeconds(@intCast(backoff)), .awake); 187 backoff = @min(backoff * 2, max_backoff); 188 } 189 } 190 191 fn connectAndRead(self: *JetstreamClient, host: []const u8, handler: anytype) !void { 192 var path_buf: [2048]u8 = undefined; 193 const path = try self.buildSubscribePath(&path_buf); 194 195 log.info("connecting to wss://{s}{s}", .{ host, path }); 196 197 var client = try websocket.Client.init(self.io, self.allocator, .{ 198 .host = host, 199 .port = 443, 200 .tls = true, 201 .max_size = self.options.max_message_size, 202 }); 203 defer client.deinit(); 204 205 var host_header_buf: [256]u8 = undefined; 206 const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{host}) catch host; 207 208 try client.handshake(path, .{ .headers = host_header }); 209 configureKeepalive(&client); 210 211 log.info("jetstream connected to {s}", .{host}); 212 213 if (comptime @hasDecl(@TypeOf(handler.*), "onConnect")) { 214 handler.onConnect(host); 215 } 216 217 var ws_handler = WsHandler(@TypeOf(handler.*)){ 218 .allocator = self.allocator, 219 .handler = handler, 220 .client_state = self, 221 }; 222 try client.readLoop(&ws_handler); 223 } 224 225 fn buildSubscribePath(self: *JetstreamClient, buf: *[2048]u8) ![]const u8 { 226 var w: std.Io.Writer = .fixed(buf); 227 228 try w.writeAll("/subscribe"); 229 230 var has_param = false; 231 232 for (self.options.wanted_collections) |col| { 233 try w.writeByte(if (!has_param) '?' else '&'); 234 try w.writeAll("wantedCollections="); 235 try w.writeAll(col); 236 has_param = true; 237 } 238 239 for (self.options.wanted_dids) |did| { 240 try w.writeByte(if (!has_param) '?' else '&'); 241 try w.writeAll("wantedDids="); 242 try w.writeAll(did); 243 has_param = true; 244 } 245 246 if (self.last_time_us) |cursor| { 247 try w.writeByte(if (!has_param) '?' else '&'); 248 try w.print("cursor={d}", .{cursor}); 249 } 250 251 return w.buffered(); 252 } 253}; 254 255fn WsHandler(comptime H: type) type { 256 return struct { 257 allocator: Allocator, 258 handler: *H, 259 client_state: *JetstreamClient, 260 261 const Self = @This(); 262 263 pub fn serverMessage(self: *Self, data: []const u8) !void { 264 var arena = std.heap.ArenaAllocator.init(self.allocator); 265 defer arena.deinit(); 266 267 const event = parseEvent(arena.allocator(), data) catch |err| { 268 log.debug("message parse error: {s}", .{@errorName(err)}); 269 return; 270 }; 271 272 self.client_state.last_time_us = event.timeUs(); 273 self.handler.onEvent(event); 274 } 275 276 pub fn close(_: *Self) void { 277 log.info("jetstream connection closed", .{}); 278 } 279 }; 280} 281 282/// enable TCP keepalive so reads don't block forever when a peer 283/// disappears without FIN/RST (network partition, crash, power loss). 284/// detection time: 10s idle + 5s × 2 probes = 20s. 285fn configureKeepalive(client: *websocket.Client) void { 286 const fd = client.stream.stream.socket.handle; 287 const builtin = @import("builtin"); 288 posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.KEEPALIVE, &std.mem.toBytes(@as(i32, 1))) catch return; 289 const tcp: i32 = @intCast(posix.IPPROTO.TCP); 290 if (builtin.os.tag == .linux) { 291 posix.setsockopt(fd, tcp, posix.TCP.KEEPIDLE, &std.mem.toBytes(@as(i32, 10))) catch return; 292 } else if (builtin.os.tag == .macos) { 293 posix.setsockopt(fd, tcp, posix.TCP.KEEPALIVE, &std.mem.toBytes(@as(i32, 10))) catch return; 294 } 295 posix.setsockopt(fd, tcp, posix.TCP.KEEPINTVL, &std.mem.toBytes(@as(i32, 5))) catch return; 296 posix.setsockopt(fd, tcp, posix.TCP.KEEPCNT, &std.mem.toBytes(@as(i32, 2))) catch return; 297} 298 299// === tests === 300 301test "parse commit event" { 302 const payload = 303 \\{ 304 \\ "did": "did:plc:abc123", 305 \\ "time_us": 1700000000000, 306 \\ "kind": "commit", 307 \\ "commit": { 308 \\ "rev": "3mbspmpaidl2a", 309 \\ "operation": "create", 310 \\ "collection": "app.bsky.feed.post", 311 \\ "rkey": "xyz789", 312 \\ "cid": "bafyreitest", 313 \\ "record": { 314 \\ "text": "hello world", 315 \\ "$type": "app.bsky.feed.post" 316 \\ } 317 \\ } 318 \\} 319 ; 320 321 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 322 defer arena.deinit(); 323 324 const event = try parseEvent(arena.allocator(), payload); 325 const commit = event.commit; 326 327 try std.testing.expectEqualStrings("did:plc:abc123", commit.did); 328 try std.testing.expectEqual(@as(i64, 1700000000000), commit.time_us); 329 try std.testing.expectEqualStrings("3mbspmpaidl2a", commit.rev.?); 330 try std.testing.expectEqual(CommitAction.create, commit.operation); 331 try std.testing.expectEqualStrings("app.bsky.feed.post", commit.collection); 332 try std.testing.expectEqualStrings("xyz789", commit.rkey); 333 try std.testing.expectEqualStrings("bafyreitest", commit.cid.?); 334 try std.testing.expect(commit.record != null); 335 try std.testing.expectEqualStrings("hello world", json_helpers.getString(commit.record.?, "text").?); 336} 337 338test "parse identity event" { 339 const payload = 340 \\{ 341 \\ "did": "did:plc:abc123", 342 \\ "time_us": 1700000000000, 343 \\ "kind": "identity", 344 \\ "identity": { 345 \\ "handle": "alice.bsky.social", 346 \\ "seq": 42, 347 \\ "time": "2024-01-01T00:00:00Z" 348 \\ } 349 \\} 350 ; 351 352 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 353 defer arena.deinit(); 354 355 const event = try parseEvent(arena.allocator(), payload); 356 const identity = event.identity; 357 358 try std.testing.expectEqualStrings("did:plc:abc123", identity.did); 359 try std.testing.expectEqual(@as(i64, 1700000000000), identity.time_us); 360 try std.testing.expectEqualStrings("alice.bsky.social", identity.handle.?); 361 try std.testing.expectEqual(@as(i64, 42), identity.seq.?); 362 try std.testing.expectEqualStrings("2024-01-01T00:00:00Z", identity.time.?); 363} 364 365test "parse account event" { 366 const payload = 367 \\{ 368 \\ "did": "did:plc:abc123", 369 \\ "time_us": 1700000000000, 370 \\ "kind": "account", 371 \\ "account": { 372 \\ "active": false, 373 \\ "status": "suspended", 374 \\ "seq": 99, 375 \\ "time": "2024-01-01T00:00:00Z" 376 \\ } 377 \\} 378 ; 379 380 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 381 defer arena.deinit(); 382 383 const event = try parseEvent(arena.allocator(), payload); 384 const account = event.account; 385 386 try std.testing.expectEqualStrings("did:plc:abc123", account.did); 387 try std.testing.expectEqual(@as(i64, 1700000000000), account.time_us); 388 try std.testing.expectEqual(false, account.active); 389 try std.testing.expectEqual(AccountStatus.suspended, account.status.?); 390 try std.testing.expectEqual(@as(i64, 99), account.seq.?); 391 try std.testing.expectEqualStrings("2024-01-01T00:00:00Z", account.time.?); 392} 393 394test "parse unknown kind returns error" { 395 const payload = 396 \\{ 397 \\ "did": "did:plc:abc123", 398 \\ "time_us": 1700000000000, 399 \\ "kind": "unknown_kind" 400 \\} 401 ; 402 403 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 404 defer arena.deinit(); 405 406 try std.testing.expectError(error.UnknownKind, parseEvent(arena.allocator(), payload)); 407} 408 409test "parse commit with unknown operation returns error" { 410 const payload = 411 \\{ 412 \\ "did": "did:plc:abc123", 413 \\ "time_us": 1700000000000, 414 \\ "kind": "commit", 415 \\ "commit": { 416 \\ "operation": "archive", 417 \\ "collection": "app.bsky.feed.post", 418 \\ "rkey": "xyz789" 419 \\ } 420 \\} 421 ; 422 423 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 424 defer arena.deinit(); 425 426 try std.testing.expectError(error.UnknownOperation, parseEvent(arena.allocator(), payload)); 427} 428 429test "cursor tracking via time_us" { 430 const payloads = [_][]const u8{ 431 \\{"did":"did:plc:a","time_us":100,"kind":"commit","commit":{"operation":"create","collection":"app.bsky.feed.post","rkey":"1"}} 432 , 433 \\{"did":"did:plc:b","time_us":200,"kind":"commit","commit":{"operation":"create","collection":"app.bsky.feed.post","rkey":"2"}} 434 , 435 }; 436 437 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 438 defer arena.deinit(); 439 440 const e1 = try parseEvent(arena.allocator(), payloads[0]); 441 const e2 = try parseEvent(arena.allocator(), payloads[1]); 442 443 try std.testing.expect(e1.timeUs() > 0); 444 try std.testing.expect(e2.timeUs() > e1.timeUs()); 445} 446 447test "Event.timeUs works for all variants" { 448 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 449 defer arena.deinit(); 450 451 const commit = try parseEvent(arena.allocator(), 452 \\{"did":"did:plc:a","time_us":100,"kind":"commit","commit":{"operation":"create","collection":"x","rkey":"1"}} 453 ); 454 const identity = try parseEvent(arena.allocator(), 455 \\{"did":"did:plc:a","time_us":200,"kind":"identity","identity":{}} 456 ); 457 const account = try parseEvent(arena.allocator(), 458 \\{"did":"did:plc:a","time_us":300,"kind":"account","account":{"active":true}} 459 ); 460 461 try std.testing.expectEqual(@as(i64, 100), commit.timeUs()); 462 try std.testing.expectEqual(@as(i64, 200), identity.timeUs()); 463 try std.testing.expectEqual(@as(i64, 300), account.timeUs()); 464} 465 466test "build subscribe path" { 467 var client = JetstreamClient.init(std.Options.debug_io, std.testing.allocator, .{ 468 .wanted_collections = &.{"app.bsky.feed.post"}, 469 }); 470 471 var buf: [2048]u8 = undefined; 472 const path = try client.buildSubscribePath(&buf); 473 try std.testing.expectEqualStrings("/subscribe?wantedCollections=app.bsky.feed.post", path); 474} 475 476test "build subscribe path with multiple params" { 477 var client = JetstreamClient.init(std.Options.debug_io, std.testing.allocator, .{ 478 .wanted_collections = &.{ "app.bsky.feed.post", "app.bsky.feed.like" }, 479 .wanted_dids = &.{"did:plc:abc123"}, 480 .cursor = 1700000000000, 481 }); 482 483 var buf: [2048]u8 = undefined; 484 const path = try client.buildSubscribePath(&buf); 485 try std.testing.expectEqualStrings( 486 "/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.like&wantedDids=did:plc:abc123&cursor=1700000000000", 487 path, 488 ); 489} 490 491test "build subscribe path no params" { 492 var client = JetstreamClient.init(std.Options.debug_io, std.testing.allocator, .{}); 493 494 var buf: [2048]u8 = undefined; 495 const path = try client.buildSubscribePath(&buf); 496 try std.testing.expectEqualStrings("/subscribe", path); 497} 498 499test "parse commit event with delete operation" { 500 const payload = 501 \\{ 502 \\ "did": "did:plc:abc123", 503 \\ "time_us": 1700000000000, 504 \\ "kind": "commit", 505 \\ "commit": { 506 \\ "operation": "delete", 507 \\ "collection": "app.bsky.feed.post", 508 \\ "rkey": "xyz789" 509 \\ } 510 \\} 511 ; 512 513 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 514 defer arena.deinit(); 515 516 const commit = (try parseEvent(arena.allocator(), payload)).commit; 517 518 try std.testing.expectEqual(CommitAction.delete, commit.operation); 519 try std.testing.expect(commit.record == null); 520 try std.testing.expect(commit.rev == null); 521 try std.testing.expect(commit.cid == null); 522} 523 524test "parse identity event with minimal fields" { 525 const payload = 526 \\{ 527 \\ "did": "did:plc:abc123", 528 \\ "time_us": 1700000000000, 529 \\ "kind": "identity", 530 \\ "identity": {} 531 \\} 532 ; 533 534 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 535 defer arena.deinit(); 536 537 const identity = (try parseEvent(arena.allocator(), payload)).identity; 538 539 try std.testing.expectEqualStrings("did:plc:abc123", identity.did); 540 try std.testing.expect(identity.handle == null); 541 try std.testing.expect(identity.seq == null); 542 try std.testing.expect(identity.time == null); 543} 544 545test "parse missing did returns error" { 546 const payload = 547 \\{ 548 \\ "time_us": 1700000000000, 549 \\ "kind": "commit" 550 \\} 551 ; 552 553 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 554 defer arena.deinit(); 555 556 try std.testing.expectError(error.MissingDid, parseEvent(arena.allocator(), payload)); 557} 558 559test "default hosts contains known jetstream instances" { 560 try std.testing.expectEqual(@as(usize, 12), default_hosts.len); 561 try std.testing.expectEqualStrings("jetstream1.us-east.bsky.network", default_hosts[0]); 562 try std.testing.expectEqualStrings("jetstream2.us-east.bsky.network", default_hosts[1]); 563 try std.testing.expectEqualStrings("jetstream1.us-west.bsky.network", default_hosts[2]); 564 try std.testing.expectEqualStrings("jetstream2.us-west.bsky.network", default_hosts[3]); 565 try std.testing.expectEqualStrings("jetstream.waow.tech", default_hosts[4]); 566 try std.testing.expectEqualStrings("jetstream.fire.hose.cam", default_hosts[5]); 567 try std.testing.expectEqualStrings("jet.firehose.stream", default_hosts[6]); 568 try std.testing.expectEqualStrings("chennai.firehose.stream", default_hosts[11]); 569} 570 571test "round-robin cycles through hosts" { 572 const hosts = [_][]const u8{ "host-a", "host-b", "host-c" }; 573 // simulate the index logic from subscribe() 574 for (0..9) |i| { 575 const host = hosts[i % hosts.len]; 576 const expected: []const u8 = switch (i % 3) { 577 0 => "host-a", 578 1 => "host-b", 579 2 => "host-c", 580 else => unreachable, 581 }; 582 try std.testing.expectEqualStrings(expected, host); 583 } 584} 585 586test "options default hosts are used" { 587 const opts = Options{}; 588 try std.testing.expectEqual(@as(usize, 12), opts.hosts.len); 589 try std.testing.expectEqualStrings("jetstream1.us-east.bsky.network", opts.hosts[0]); 590} 591 592test "options custom single host" { 593 const opts = Options{ .hosts = &.{"my-custom-host.example.com"} }; 594 try std.testing.expectEqual(@as(usize, 1), opts.hosts.len); 595 try std.testing.expectEqualStrings("my-custom-host.example.com", opts.hosts[0]); 596}