atproto utils for zig zat.dev
atproto sdk zig
26
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add jetstream WebSocket client for AT Protocol event streams

typed, reconnecting client that parses commit/identity/account events.
replaces ~200 lines of boilerplate in downstream projects (find-bufo,
music-atmosphere-feed) with ~15 lines of zat API usage.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 31d2d328 89fc63ec

+720
+8
build.zig
··· 4 4 const target = b.standardTargetOptions(.{}); 5 5 const optimize = b.standardOptimizeOption(.{}); 6 6 7 + const websocket = b.dependency("websocket", .{ 8 + .target = target, 9 + .optimize = optimize, 10 + }); 11 + 7 12 const mod = b.addModule("zat", .{ 8 13 .root_source_file = b.path("src/root.zig"), 9 14 .target = target, 10 15 .optimize = optimize, 16 + .imports = &.{ 17 + .{ .name = "websocket", .module = websocket.module("websocket") }, 18 + }, 11 19 }); 12 20 13 21 const tests = b.addTest(.{ .root_module = mod });
+6
build.zig.zon
··· 3 3 .version = "0.1.0", 4 4 .fingerprint = 0x8da9db57ee82fbe4, 5 5 .minimum_zig_version = "0.15.0", 6 + .dependencies = .{ 7 + .websocket = .{ 8 + .url = "https://github.com/karlseguin/websocket.zig/archive/refs/heads/master.tar.gz", 9 + .hash = "websocket-0.1.0-ZPISdRNzAwAGszh62EpRtoQxu8wb1MSMVI6Ow0o2dmyJ", 10 + }, 11 + }, 6 12 .paths = .{ 7 13 "build.zig", 8 14 "build.zig.zon",
+701
src/internal/jetstream.zig
··· 1 + //! jetstream client - AT Protocol event stream via WebSocket 2 + //! 3 + //! typed, reconnecting client for the Bluesky Jetstream service. 4 + //! parses commit, identity, and account events into typed structs. 5 + //! 6 + //! see: https://github.com/bluesky-social/jetstream 7 + 8 + const std = @import("std"); 9 + const websocket = @import("websocket"); 10 + const json_helpers = @import("json.zig"); 11 + const sync = @import("sync.zig"); 12 + 13 + const mem = std.mem; 14 + const json = std.json; 15 + const posix = std.posix; 16 + const Allocator = mem.Allocator; 17 + const log = std.log.scoped(.zat); 18 + 19 + pub const CommitAction = sync.CommitAction; 20 + pub const AccountStatus = sync.AccountStatus; 21 + 22 + pub const Options = struct { 23 + host: []const u8 = "jetstream2.us-east.bsky.network", 24 + wanted_collections: []const []const u8 = &.{}, 25 + wanted_dids: []const []const u8 = &.{}, 26 + cursor: ?i64 = null, 27 + max_message_size: usize = 1024 * 1024, 28 + }; 29 + 30 + pub const Event = union(enum) { 31 + commit: CommitEvent, 32 + identity: IdentityEvent, 33 + account: AccountEvent, 34 + }; 35 + 36 + pub const CommitEvent = struct { 37 + did: []const u8, 38 + time_us: i64, 39 + rev: ?[]const u8 = null, 40 + operation: CommitAction, 41 + collection: []const u8, 42 + rkey: []const u8, 43 + record: ?json.Value = null, 44 + cid: ?[]const u8 = null, 45 + }; 46 + 47 + pub const IdentityEvent = struct { 48 + did: []const u8, 49 + time_us: i64, 50 + handle: ?[]const u8 = null, 51 + seq: ?i64 = null, 52 + time: ?[]const u8 = null, 53 + }; 54 + 55 + pub const AccountEvent = struct { 56 + did: []const u8, 57 + time_us: i64, 58 + active: bool, 59 + status: ?AccountStatus = null, 60 + seq: ?i64 = null, 61 + time: ?[]const u8 = null, 62 + }; 63 + 64 + pub const JetstreamClient = struct { 65 + allocator: Allocator, 66 + options: Options, 67 + last_time_us: ?i64 = null, 68 + 69 + pub fn init(allocator: Allocator, options: Options) JetstreamClient { 70 + return .{ 71 + .allocator = allocator, 72 + .options = options, 73 + .last_time_us = options.cursor, 74 + }; 75 + } 76 + 77 + pub fn deinit(_: *JetstreamClient) void {} 78 + 79 + /// subscribe with a user-provided handler. 80 + /// handler must implement: fn onEvent(*@TypeOf(handler), Event) void 81 + /// optional: fn onError(*@TypeOf(handler), anyerror) void 82 + /// blocks until deinit — reconnects with exponential backoff on disconnect. 83 + pub fn subscribe(self: *JetstreamClient, handler: anytype) void { 84 + var backoff: u64 = 1; 85 + const max_backoff: u64 = 60; 86 + 87 + while (true) { 88 + self.connectAndRead(handler) catch |err| { 89 + if (comptime hasOnError(@TypeOf(handler.*))) { 90 + handler.onError(err); 91 + } else { 92 + log.err("jetstream error: {s}, reconnecting in {d}s...", .{ @errorName(err), backoff }); 93 + } 94 + }; 95 + posix.nanosleep(backoff, 0); 96 + backoff = @min(backoff * 2, max_backoff); 97 + } 98 + } 99 + 100 + fn connectAndRead(self: *JetstreamClient, handler: anytype) !void { 101 + var path_buf: [2048]u8 = undefined; 102 + const path = try self.buildSubscribePath(&path_buf); 103 + 104 + log.info("connecting to wss://{s}{s}", .{ self.options.host, path }); 105 + 106 + var client = try websocket.Client.init(self.allocator, .{ 107 + .host = self.options.host, 108 + .port = 443, 109 + .tls = true, 110 + .max_size = self.options.max_message_size, 111 + }); 112 + defer client.deinit(); 113 + 114 + var host_header_buf: [256]u8 = undefined; 115 + const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{self.options.host}) catch self.options.host; 116 + 117 + try client.handshake(path, .{ .headers = host_header }); 118 + 119 + log.info("jetstream connected", .{}); 120 + 121 + var ws_handler = WsHandler(@TypeOf(handler.*)){ 122 + .allocator = self.allocator, 123 + .handler = handler, 124 + .client_state = self, 125 + }; 126 + try client.readLoop(&ws_handler); 127 + } 128 + 129 + fn buildSubscribePath(self: *JetstreamClient, buf: *[2048]u8) ![]const u8 { 130 + var stream = std.io.fixedBufferStream(buf); 131 + const writer = stream.writer(); 132 + 133 + try writer.writeAll("/subscribe"); 134 + 135 + var has_param = false; 136 + 137 + for (self.options.wanted_collections) |col| { 138 + try writer.writeByte(if (!has_param) '?' else '&'); 139 + try writer.writeAll("wantedCollections="); 140 + try writer.writeAll(col); 141 + has_param = true; 142 + } 143 + 144 + for (self.options.wanted_dids) |did| { 145 + try writer.writeByte(if (!has_param) '?' else '&'); 146 + try writer.writeAll("wantedDids="); 147 + try writer.writeAll(did); 148 + has_param = true; 149 + } 150 + 151 + if (self.last_time_us) |cursor| { 152 + try writer.writeByte(if (!has_param) '?' else '&'); 153 + try writer.print("cursor={d}", .{cursor}); 154 + } 155 + 156 + return stream.getWritten(); 157 + } 158 + 159 + fn hasOnError(comptime T: type) bool { 160 + return @hasDecl(T, "onError"); 161 + } 162 + }; 163 + 164 + fn WsHandler(comptime H: type) type { 165 + return struct { 166 + allocator: Allocator, 167 + handler: *H, 168 + client_state: *JetstreamClient, 169 + 170 + const Self = @This(); 171 + 172 + pub fn serverMessage(self: *Self, data: []const u8) !void { 173 + self.processMessage(data) catch |err| { 174 + log.debug("message parse error: {s}", .{@errorName(err)}); 175 + }; 176 + } 177 + 178 + pub fn close(_: *Self) void { 179 + log.info("jetstream connection closed", .{}); 180 + } 181 + 182 + fn processMessage(self: *Self, payload: []const u8) !void { 183 + var arena = std.heap.ArenaAllocator.init(self.allocator); 184 + defer arena.deinit(); 185 + const alloc = arena.allocator(); 186 + 187 + const parsed = try json.parseFromSlice(json.Value, alloc, payload, .{}); 188 + const root = parsed.value; 189 + 190 + const kind_str = json_helpers.getString(root, "kind") orelse return; 191 + const did = json_helpers.getString(root, "did") orelse return; 192 + const time_us = json_helpers.getInt(root, "time_us") orelse return; 193 + 194 + // track cursor 195 + self.client_state.last_time_us = time_us; 196 + 197 + if (mem.eql(u8, kind_str, "commit")) { 198 + const commit = switch (root) { 199 + .object => |obj| obj.get("commit") orelse return, 200 + else => return, 201 + }; 202 + const commit_obj = switch (commit) { 203 + .object => |obj| obj, 204 + else => return, 205 + }; 206 + 207 + const operation_str = switch (commit_obj.get("operation") orelse return) { 208 + .string => |s| s, 209 + else => return, 210 + }; 211 + const operation = CommitAction.parse(operation_str) orelse return; 212 + 213 + const collection = switch (commit_obj.get("collection") orelse return) { 214 + .string => |s| s, 215 + else => return, 216 + }; 217 + const rkey = switch (commit_obj.get("rkey") orelse return) { 218 + .string => |s| s, 219 + else => return, 220 + }; 221 + 222 + const rev = blk: { 223 + const v = commit_obj.get("rev") orelse break :blk null; 224 + break :blk switch (v) { 225 + .string => |s| @as(?[]const u8, s), 226 + else => null, 227 + }; 228 + }; 229 + 230 + const cid = blk: { 231 + const v = commit_obj.get("cid") orelse break :blk null; 232 + break :blk switch (v) { 233 + .string => |s| @as(?[]const u8, s), 234 + else => null, 235 + }; 236 + }; 237 + 238 + const record = commit_obj.get("record"); 239 + 240 + self.handler.onEvent(.{ .commit = .{ 241 + .did = did, 242 + .time_us = time_us, 243 + .rev = rev, 244 + .operation = operation, 245 + .collection = collection, 246 + .rkey = rkey, 247 + .record = record, 248 + .cid = cid, 249 + } }); 250 + } else if (mem.eql(u8, kind_str, "identity")) { 251 + const identity = switch (root) { 252 + .object => |obj| obj.get("identity"), 253 + else => null, 254 + }; 255 + const identity_obj = if (identity) |id| switch (id) { 256 + .object => |obj| obj, 257 + else => null, 258 + } else null; 259 + 260 + const handle = if (identity_obj) |obj| switch (obj.get("handle") orelse json.Value{ .null = {} }) { 261 + .string => |s| @as(?[]const u8, s), 262 + else => null, 263 + } else null; 264 + 265 + const seq = if (identity_obj) |obj| switch (obj.get("seq") orelse json.Value{ .null = {} }) { 266 + .integer => |i| @as(?i64, i), 267 + else => null, 268 + } else null; 269 + 270 + const time_val = if (identity_obj) |obj| switch (obj.get("time") orelse json.Value{ .null = {} }) { 271 + .string => |s| @as(?[]const u8, s), 272 + else => null, 273 + } else null; 274 + 275 + self.handler.onEvent(.{ .identity = .{ 276 + .did = did, 277 + .time_us = time_us, 278 + .handle = handle, 279 + .seq = seq, 280 + .time = time_val, 281 + } }); 282 + } else if (mem.eql(u8, kind_str, "account")) { 283 + const account = switch (root) { 284 + .object => |obj| obj.get("account"), 285 + else => null, 286 + }; 287 + const account_obj = if (account) |a| switch (a) { 288 + .object => |obj| obj, 289 + else => null, 290 + } else null; 291 + 292 + const active = if (account_obj) |obj| switch (obj.get("active") orelse json.Value{ .null = {} }) { 293 + .bool => |b| b, 294 + else => true, 295 + } else true; 296 + 297 + const status_val = if (account_obj) |obj| blk: { 298 + const v = obj.get("status") orelse break :blk null; 299 + break :blk switch (v) { 300 + .string => |s| AccountStatus.parse(s), 301 + else => null, 302 + }; 303 + } else null; 304 + 305 + const seq = if (account_obj) |obj| switch (obj.get("seq") orelse json.Value{ .null = {} }) { 306 + .integer => |i| @as(?i64, i), 307 + else => null, 308 + } else null; 309 + 310 + const time_val = if (account_obj) |obj| switch (obj.get("time") orelse json.Value{ .null = {} }) { 311 + .string => |s| @as(?[]const u8, s), 312 + else => null, 313 + } else null; 314 + 315 + self.handler.onEvent(.{ .account = .{ 316 + .did = did, 317 + .time_us = time_us, 318 + .active = active, 319 + .status = status_val, 320 + .seq = seq, 321 + .time = time_val, 322 + } }); 323 + } 324 + // unknown kinds are silently ignored 325 + } 326 + }; 327 + } 328 + 329 + // === parsing helpers (used by tests and internal parsing) === 330 + 331 + /// parse a raw JSON message into an Event 332 + pub fn parseEvent(allocator: Allocator, payload: []const u8) !Event { 333 + var arena = std.heap.ArenaAllocator.init(allocator); 334 + defer arena.deinit(); 335 + const alloc = arena.allocator(); 336 + 337 + const parsed = try json.parseFromSlice(json.Value, alloc, payload, .{}); 338 + const root = parsed.value; 339 + 340 + const kind_str = json_helpers.getString(root, "kind") orelse return error.MissingKind; 341 + const did = json_helpers.getString(root, "did") orelse return error.MissingDid; 342 + const time_us = json_helpers.getInt(root, "time_us") orelse return error.MissingTimeUs; 343 + 344 + if (mem.eql(u8, kind_str, "commit")) { 345 + const commit = switch (root) { 346 + .object => |obj| obj.get("commit") orelse return error.MissingCommit, 347 + else => return error.InvalidRoot, 348 + }; 349 + const commit_obj = switch (commit) { 350 + .object => |obj| obj, 351 + else => return error.InvalidCommit, 352 + }; 353 + 354 + const operation_str = switch (commit_obj.get("operation") orelse return error.MissingOperation) { 355 + .string => |s| s, 356 + else => return error.InvalidOperation, 357 + }; 358 + const operation = CommitAction.parse(operation_str) orelse return error.UnknownOperation; 359 + 360 + const collection = switch (commit_obj.get("collection") orelse return error.MissingCollection) { 361 + .string => |s| s, 362 + else => return error.InvalidCollection, 363 + }; 364 + const rkey = switch (commit_obj.get("rkey") orelse return error.MissingRkey) { 365 + .string => |s| s, 366 + else => return error.InvalidRkey, 367 + }; 368 + 369 + const rev = blk: { 370 + const v = commit_obj.get("rev") orelse break :blk null; 371 + break :blk switch (v) { 372 + .string => |s| @as(?[]const u8, s), 373 + else => null, 374 + }; 375 + }; 376 + 377 + const cid = blk: { 378 + const v = commit_obj.get("cid") orelse break :blk null; 379 + break :blk switch (v) { 380 + .string => |s| @as(?[]const u8, s), 381 + else => null, 382 + }; 383 + }; 384 + 385 + return .{ .commit = .{ 386 + .did = did, 387 + .time_us = time_us, 388 + .rev = rev, 389 + .operation = operation, 390 + .collection = collection, 391 + .rkey = rkey, 392 + .record = commit_obj.get("record"), 393 + .cid = cid, 394 + } }; 395 + } else if (mem.eql(u8, kind_str, "identity")) { 396 + const identity = switch (root) { 397 + .object => |obj| obj.get("identity"), 398 + else => null, 399 + }; 400 + const identity_obj = if (identity) |id| switch (id) { 401 + .object => |obj| obj, 402 + else => null, 403 + } else null; 404 + 405 + const handle = if (identity_obj) |obj| switch (obj.get("handle") orelse json.Value{ .null = {} }) { 406 + .string => |s| @as(?[]const u8, s), 407 + else => null, 408 + } else null; 409 + 410 + const seq = if (identity_obj) |obj| switch (obj.get("seq") orelse json.Value{ .null = {} }) { 411 + .integer => |i| @as(?i64, i), 412 + else => null, 413 + } else null; 414 + 415 + const time_val = if (identity_obj) |obj| switch (obj.get("time") orelse json.Value{ .null = {} }) { 416 + .string => |s| @as(?[]const u8, s), 417 + else => null, 418 + } else null; 419 + 420 + return .{ .identity = .{ 421 + .did = did, 422 + .time_us = time_us, 423 + .handle = handle, 424 + .seq = seq, 425 + .time = time_val, 426 + } }; 427 + } else if (mem.eql(u8, kind_str, "account")) { 428 + const account = switch (root) { 429 + .object => |obj| obj.get("account"), 430 + else => null, 431 + }; 432 + const account_obj = if (account) |a| switch (a) { 433 + .object => |obj| obj, 434 + else => null, 435 + } else null; 436 + 437 + const active = if (account_obj) |obj| switch (obj.get("active") orelse json.Value{ .null = {} }) { 438 + .bool => |b| b, 439 + else => true, 440 + } else true; 441 + 442 + const status_val = if (account_obj) |obj| blk: { 443 + const v = obj.get("status") orelse break :blk null; 444 + break :blk switch (v) { 445 + .string => |s| AccountStatus.parse(s), 446 + else => null, 447 + }; 448 + } else null; 449 + 450 + const seq = if (account_obj) |obj| switch (obj.get("seq") orelse json.Value{ .null = {} }) { 451 + .integer => |i| @as(?i64, i), 452 + else => null, 453 + } else null; 454 + 455 + const time_val = if (account_obj) |obj| switch (obj.get("time") orelse json.Value{ .null = {} }) { 456 + .string => |s| @as(?[]const u8, s), 457 + else => null, 458 + } else null; 459 + 460 + return .{ .account = .{ 461 + .did = did, 462 + .time_us = time_us, 463 + .active = active, 464 + .status = status_val, 465 + .seq = seq, 466 + .time = time_val, 467 + } }; 468 + } 469 + 470 + return error.UnknownKind; 471 + } 472 + 473 + // === tests === 474 + 475 + test "parse commit event" { 476 + const payload = 477 + \\{ 478 + \\ "did": "did:plc:abc123", 479 + \\ "time_us": 1700000000000, 480 + \\ "kind": "commit", 481 + \\ "commit": { 482 + \\ "rev": "3mbspmpaidl2a", 483 + \\ "operation": "create", 484 + \\ "collection": "app.bsky.feed.post", 485 + \\ "rkey": "xyz789", 486 + \\ "cid": "bafyreitest", 487 + \\ "record": { 488 + \\ "text": "hello world", 489 + \\ "$type": "app.bsky.feed.post" 490 + \\ } 491 + \\ } 492 + \\} 493 + ; 494 + 495 + const event = try parseEvent(std.testing.allocator, payload); 496 + const commit = event.commit; 497 + 498 + try std.testing.expectEqualStrings("did:plc:abc123", commit.did); 499 + try std.testing.expectEqual(@as(i64, 1700000000000), commit.time_us); 500 + try std.testing.expectEqualStrings("3mbspmpaidl2a", commit.rev.?); 501 + try std.testing.expectEqual(CommitAction.create, commit.operation); 502 + try std.testing.expectEqualStrings("app.bsky.feed.post", commit.collection); 503 + try std.testing.expectEqualStrings("xyz789", commit.rkey); 504 + try std.testing.expectEqualStrings("bafyreitest", commit.cid.?); 505 + try std.testing.expect(commit.record != null); 506 + 507 + // verify record contents via json helpers 508 + const text = json_helpers.getString(commit.record.?, "text"); 509 + try std.testing.expectEqualStrings("hello world", text.?); 510 + } 511 + 512 + test "parse identity event" { 513 + const payload = 514 + \\{ 515 + \\ "did": "did:plc:abc123", 516 + \\ "time_us": 1700000000000, 517 + \\ "kind": "identity", 518 + \\ "identity": { 519 + \\ "handle": "alice.bsky.social", 520 + \\ "seq": 42, 521 + \\ "time": "2024-01-01T00:00:00Z" 522 + \\ } 523 + \\} 524 + ; 525 + 526 + const event = try parseEvent(std.testing.allocator, payload); 527 + const identity = event.identity; 528 + 529 + try std.testing.expectEqualStrings("did:plc:abc123", identity.did); 530 + try std.testing.expectEqual(@as(i64, 1700000000000), identity.time_us); 531 + try std.testing.expectEqualStrings("alice.bsky.social", identity.handle.?); 532 + try std.testing.expectEqual(@as(i64, 42), identity.seq.?); 533 + try std.testing.expectEqualStrings("2024-01-01T00:00:00Z", identity.time.?); 534 + } 535 + 536 + test "parse account event" { 537 + const payload = 538 + \\{ 539 + \\ "did": "did:plc:abc123", 540 + \\ "time_us": 1700000000000, 541 + \\ "kind": "account", 542 + \\ "account": { 543 + \\ "active": false, 544 + \\ "status": "suspended", 545 + \\ "seq": 99, 546 + \\ "time": "2024-01-01T00:00:00Z" 547 + \\ } 548 + \\} 549 + ; 550 + 551 + const event = try parseEvent(std.testing.allocator, payload); 552 + const account = event.account; 553 + 554 + try std.testing.expectEqualStrings("did:plc:abc123", account.did); 555 + try std.testing.expectEqual(@as(i64, 1700000000000), account.time_us); 556 + try std.testing.expectEqual(false, account.active); 557 + try std.testing.expectEqual(AccountStatus.suspended, account.status.?); 558 + try std.testing.expectEqual(@as(i64, 99), account.seq.?); 559 + try std.testing.expectEqualStrings("2024-01-01T00:00:00Z", account.time.?); 560 + } 561 + 562 + test "parse unknown kind returns error" { 563 + const payload = 564 + \\{ 565 + \\ "did": "did:plc:abc123", 566 + \\ "time_us": 1700000000000, 567 + \\ "kind": "unknown_kind" 568 + \\} 569 + ; 570 + 571 + const result = parseEvent(std.testing.allocator, payload); 572 + try std.testing.expectError(error.UnknownKind, result); 573 + } 574 + 575 + test "parse commit with unknown operation returns error" { 576 + const payload = 577 + \\{ 578 + \\ "did": "did:plc:abc123", 579 + \\ "time_us": 1700000000000, 580 + \\ "kind": "commit", 581 + \\ "commit": { 582 + \\ "operation": "archive", 583 + \\ "collection": "app.bsky.feed.post", 584 + \\ "rkey": "xyz789" 585 + \\ } 586 + \\} 587 + ; 588 + 589 + const result = parseEvent(std.testing.allocator, payload); 590 + try std.testing.expectError(error.UnknownOperation, result); 591 + } 592 + 593 + test "cursor tracking via time_us" { 594 + // verify that parseEvent extracts time_us correctly for cursor resumption 595 + const payloads = [_][]const u8{ 596 + \\{"did":"did:plc:a","time_us":100,"kind":"commit","commit":{"operation":"create","collection":"app.bsky.feed.post","rkey":"1"}} 597 + , 598 + \\{"did":"did:plc:b","time_us":200,"kind":"commit","commit":{"operation":"create","collection":"app.bsky.feed.post","rkey":"2"}} 599 + , 600 + }; 601 + 602 + for (payloads) |payload| { 603 + const event = try parseEvent(std.testing.allocator, payload); 604 + switch (event) { 605 + .commit => |c| try std.testing.expect(c.time_us > 0), 606 + else => unreachable, 607 + } 608 + } 609 + 610 + // verify second event has higher time_us 611 + const e1 = try parseEvent(std.testing.allocator, payloads[0]); 612 + const e2 = try parseEvent(std.testing.allocator, payloads[1]); 613 + try std.testing.expect(e2.commit.time_us > e1.commit.time_us); 614 + } 615 + 616 + test "build subscribe path" { 617 + var client = JetstreamClient.init(std.testing.allocator, .{ 618 + .wanted_collections = &.{"app.bsky.feed.post"}, 619 + }); 620 + 621 + var buf: [2048]u8 = undefined; 622 + const path = try client.buildSubscribePath(&buf); 623 + try std.testing.expectEqualStrings("/subscribe?wantedCollections=app.bsky.feed.post", path); 624 + } 625 + 626 + test "build subscribe path with multiple params" { 627 + var client = JetstreamClient.init(std.testing.allocator, .{ 628 + .wanted_collections = &.{ "app.bsky.feed.post", "app.bsky.feed.like" }, 629 + .wanted_dids = &.{"did:plc:abc123"}, 630 + .cursor = 1700000000000, 631 + }); 632 + 633 + var buf: [2048]u8 = undefined; 634 + const path = try client.buildSubscribePath(&buf); 635 + try std.testing.expectEqualStrings( 636 + "/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.like&wantedDids=did:plc:abc123&cursor=1700000000000", 637 + path, 638 + ); 639 + } 640 + 641 + test "build subscribe path no params" { 642 + var client = JetstreamClient.init(std.testing.allocator, .{}); 643 + 644 + var buf: [2048]u8 = undefined; 645 + const path = try client.buildSubscribePath(&buf); 646 + try std.testing.expectEqualStrings("/subscribe", path); 647 + } 648 + 649 + test "parse commit event with delete operation" { 650 + const payload = 651 + \\{ 652 + \\ "did": "did:plc:abc123", 653 + \\ "time_us": 1700000000000, 654 + \\ "kind": "commit", 655 + \\ "commit": { 656 + \\ "operation": "delete", 657 + \\ "collection": "app.bsky.feed.post", 658 + \\ "rkey": "xyz789" 659 + \\ } 660 + \\} 661 + ; 662 + 663 + const event = try parseEvent(std.testing.allocator, payload); 664 + const commit = event.commit; 665 + 666 + try std.testing.expectEqual(CommitAction.delete, commit.operation); 667 + try std.testing.expect(commit.record == null); 668 + try std.testing.expect(commit.rev == null); 669 + try std.testing.expect(commit.cid == null); 670 + } 671 + 672 + test "parse identity event with minimal fields" { 673 + const payload = 674 + \\{ 675 + \\ "did": "did:plc:abc123", 676 + \\ "time_us": 1700000000000, 677 + \\ "kind": "identity", 678 + \\ "identity": {} 679 + \\} 680 + ; 681 + 682 + const event = try parseEvent(std.testing.allocator, payload); 683 + const identity = event.identity; 684 + 685 + try std.testing.expectEqualStrings("did:plc:abc123", identity.did); 686 + try std.testing.expect(identity.handle == null); 687 + try std.testing.expect(identity.seq == null); 688 + try std.testing.expect(identity.time == null); 689 + } 690 + 691 + test "parse missing did returns error" { 692 + const payload = 693 + \\{ 694 + \\ "time_us": 1700000000000, 695 + \\ "kind": "commit" 696 + \\} 697 + ; 698 + 699 + const result = parseEvent(std.testing.allocator, payload); 700 + try std.testing.expectError(error.MissingDid, result); 701 + }
+5
src/root.zig
··· 32 32 pub const CommitAction = sync.CommitAction; 33 33 pub const EventKind = sync.EventKind; 34 34 pub const AccountStatus = sync.AccountStatus; 35 + 36 + // jetstream 37 + pub const jetstream = @import("internal/jetstream.zig"); 38 + pub const JetstreamClient = jetstream.JetstreamClient; 39 + pub const JetstreamEvent = jetstream.Event;