atproto utils for zig zat.dev
atproto sdk zig
26
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: simplify jetstream parsing with json_helpers

use json_helpers.getString/getInt/getBool consistently instead of
manual switch chains. collapse duplicated processMessage into a
call to parseEvent. fix subtle use-after-free on record field by
making arena lifetime explicit in the API.

702 → 499 lines, parsing core 130 → 30 lines.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 6fadd46b 7123918c

+122 -325
+122 -325
src/internal/jetstream.zig
··· 31 31 commit: CommitEvent, 32 32 identity: IdentityEvent, 33 33 account: AccountEvent, 34 + 35 + pub fn timeUs(self: Event) i64 { 36 + return switch (self) { 37 + inline else => |e| e.time_us, 38 + }; 39 + } 34 40 }; 35 41 36 42 pub const CommitEvent = struct { ··· 61 67 time: ?[]const u8 = null, 62 68 }; 63 69 70 + /// parse a raw JSON payload into a typed Event. 71 + /// allocator is used for JSON structural data (ObjectMaps for record fields). 72 + /// string slices in the returned Event reference the source `payload` bytes. 73 + /// keep both `payload` and allocator-owned memory alive while using the Event. 74 + pub fn parseEvent(allocator: Allocator, payload: []const u8) !Event { 75 + const parsed = try json.parseFromSlice(json.Value, allocator, payload, .{}); 76 + const root = parsed.value; 77 + 78 + const kind_str = json_helpers.getString(root, "kind") orelse return error.MissingKind; 79 + const did = json_helpers.getString(root, "did") orelse return error.MissingDid; 80 + const time_us = json_helpers.getInt(root, "time_us") orelse return error.MissingTimeUs; 81 + 82 + if (mem.eql(u8, kind_str, "commit")) { 83 + const op_str = json_helpers.getString(root, "commit.operation") orelse return error.MissingOperation; 84 + return .{ .commit = .{ 85 + .did = did, 86 + .time_us = time_us, 87 + .operation = CommitAction.parse(op_str) orelse return error.UnknownOperation, 88 + .collection = json_helpers.getString(root, "commit.collection") orelse return error.MissingCollection, 89 + .rkey = json_helpers.getString(root, "commit.rkey") orelse return error.MissingRkey, 90 + .rev = json_helpers.getString(root, "commit.rev"), 91 + .cid = json_helpers.getString(root, "commit.cid"), 92 + .record = json_helpers.getPath(root, "commit.record"), 93 + } }; 94 + } else if (mem.eql(u8, kind_str, "identity")) { 95 + return .{ .identity = .{ 96 + .did = did, 97 + .time_us = time_us, 98 + .handle = json_helpers.getString(root, "identity.handle"), 99 + .seq = json_helpers.getInt(root, "identity.seq"), 100 + .time = json_helpers.getString(root, "identity.time"), 101 + } }; 102 + } else if (mem.eql(u8, kind_str, "account")) { 103 + const status_str = json_helpers.getString(root, "account.status"); 104 + return .{ .account = .{ 105 + .did = did, 106 + .time_us = time_us, 107 + .active = json_helpers.getBool(root, "account.active") orelse true, 108 + .status = if (status_str) |s| AccountStatus.parse(s) else null, 109 + .seq = json_helpers.getInt(root, "account.seq"), 110 + .time = json_helpers.getString(root, "account.time"), 111 + } }; 112 + } 113 + 114 + return error.UnknownKind; 115 + } 116 + 64 117 pub const JetstreamClient = struct { 65 118 allocator: Allocator, 66 119 options: Options, ··· 79 132 /// subscribe with a user-provided handler. 80 133 /// handler must implement: fn onEvent(*@TypeOf(handler), Event) void 81 134 /// optional: fn onError(*@TypeOf(handler), anyerror) void 82 - /// blocks until deinit — reconnects with exponential backoff on disconnect. 135 + /// blocks forever — reconnects with exponential backoff on disconnect. 83 136 pub fn subscribe(self: *JetstreamClient, handler: anytype) void { 84 137 var backoff: u64 = 1; 85 138 const max_backoff: u64 = 60; 86 139 87 140 while (true) { 88 141 self.connectAndRead(handler) catch |err| { 89 - if (comptime hasOnError(@TypeOf(handler.*))) { 142 + if (comptime @hasDecl(@TypeOf(handler.*), "onError")) { 90 143 handler.onError(err); 91 144 } else { 92 145 log.err("jetstream error: {s}, reconnecting in {d}s...", .{ @errorName(err), backoff }); ··· 155 208 156 209 return stream.getWritten(); 157 210 } 158 - 159 - fn hasOnError(comptime T: type) bool { 160 - return @hasDecl(T, "onError"); 161 - } 162 211 }; 163 212 164 213 fn WsHandler(comptime H: type) type { ··· 170 219 const Self = @This(); 171 220 172 221 pub fn serverMessage(self: *Self, data: []const u8) !void { 173 - self.processMessage(data) catch |err| { 222 + var arena = std.heap.ArenaAllocator.init(self.allocator); 223 + defer arena.deinit(); 224 + 225 + const event = parseEvent(arena.allocator(), data) catch |err| { 174 226 log.debug("message parse error: {s}", .{@errorName(err)}); 227 + return; 175 228 }; 229 + 230 + self.client_state.last_time_us = event.timeUs(); 231 + self.handler.onEvent(event); 176 232 } 177 233 178 234 pub fn close(_: *Self) void { 179 235 log.info("jetstream connection closed", .{}); 180 236 } 181 - 182 - fn processMessage(self: *Self, payload: []const u8) !void { 183 - var arena = std.heap.ArenaAllocator.init(self.allocator); 184 - defer arena.deinit(); 185 - const alloc = arena.allocator(); 186 - 187 - const parsed = try json.parseFromSlice(json.Value, alloc, payload, .{}); 188 - const root = parsed.value; 189 - 190 - const kind_str = json_helpers.getString(root, "kind") orelse return; 191 - const did = json_helpers.getString(root, "did") orelse return; 192 - const time_us = json_helpers.getInt(root, "time_us") orelse return; 193 - 194 - // track cursor 195 - self.client_state.last_time_us = time_us; 196 - 197 - if (mem.eql(u8, kind_str, "commit")) { 198 - const commit = switch (root) { 199 - .object => |obj| obj.get("commit") orelse return, 200 - else => return, 201 - }; 202 - const commit_obj = switch (commit) { 203 - .object => |obj| obj, 204 - else => return, 205 - }; 206 - 207 - const operation_str = switch (commit_obj.get("operation") orelse return) { 208 - .string => |s| s, 209 - else => return, 210 - }; 211 - const operation = CommitAction.parse(operation_str) orelse return; 212 - 213 - const collection = switch (commit_obj.get("collection") orelse return) { 214 - .string => |s| s, 215 - else => return, 216 - }; 217 - const rkey = switch (commit_obj.get("rkey") orelse return) { 218 - .string => |s| s, 219 - else => return, 220 - }; 221 - 222 - const rev = blk: { 223 - const v = commit_obj.get("rev") orelse break :blk null; 224 - break :blk switch (v) { 225 - .string => |s| @as(?[]const u8, s), 226 - else => null, 227 - }; 228 - }; 229 - 230 - const cid = blk: { 231 - const v = commit_obj.get("cid") orelse break :blk null; 232 - break :blk switch (v) { 233 - .string => |s| @as(?[]const u8, s), 234 - else => null, 235 - }; 236 - }; 237 - 238 - const record = commit_obj.get("record"); 239 - 240 - self.handler.onEvent(.{ .commit = .{ 241 - .did = did, 242 - .time_us = time_us, 243 - .rev = rev, 244 - .operation = operation, 245 - .collection = collection, 246 - .rkey = rkey, 247 - .record = record, 248 - .cid = cid, 249 - } }); 250 - } else if (mem.eql(u8, kind_str, "identity")) { 251 - const identity = switch (root) { 252 - .object => |obj| obj.get("identity"), 253 - else => null, 254 - }; 255 - const identity_obj = if (identity) |id| switch (id) { 256 - .object => |obj| obj, 257 - else => null, 258 - } else null; 259 - 260 - const handle = if (identity_obj) |obj| switch (obj.get("handle") orelse json.Value{ .null = {} }) { 261 - .string => |s| @as(?[]const u8, s), 262 - else => null, 263 - } else null; 264 - 265 - const seq = if (identity_obj) |obj| switch (obj.get("seq") orelse json.Value{ .null = {} }) { 266 - .integer => |i| @as(?i64, i), 267 - else => null, 268 - } else null; 269 - 270 - const time_val = if (identity_obj) |obj| switch (obj.get("time") orelse json.Value{ .null = {} }) { 271 - .string => |s| @as(?[]const u8, s), 272 - else => null, 273 - } else null; 274 - 275 - self.handler.onEvent(.{ .identity = .{ 276 - .did = did, 277 - .time_us = time_us, 278 - .handle = handle, 279 - .seq = seq, 280 - .time = time_val, 281 - } }); 282 - } else if (mem.eql(u8, kind_str, "account")) { 283 - const account = switch (root) { 284 - .object => |obj| obj.get("account"), 285 - else => null, 286 - }; 287 - const account_obj = if (account) |a| switch (a) { 288 - .object => |obj| obj, 289 - else => null, 290 - } else null; 291 - 292 - const active = if (account_obj) |obj| switch (obj.get("active") orelse json.Value{ .null = {} }) { 293 - .bool => |b| b, 294 - else => true, 295 - } else true; 296 - 297 - const status_val = if (account_obj) |obj| blk: { 298 - const v = obj.get("status") orelse break :blk null; 299 - break :blk switch (v) { 300 - .string => |s| AccountStatus.parse(s), 301 - else => null, 302 - }; 303 - } else null; 304 - 305 - const seq = if (account_obj) |obj| switch (obj.get("seq") orelse json.Value{ .null = {} }) { 306 - .integer => |i| @as(?i64, i), 307 - else => null, 308 - } else null; 309 - 310 - const time_val = if (account_obj) |obj| switch (obj.get("time") orelse json.Value{ .null = {} }) { 311 - .string => |s| @as(?[]const u8, s), 312 - else => null, 313 - } else null; 314 - 315 - self.handler.onEvent(.{ .account = .{ 316 - .did = did, 317 - .time_us = time_us, 318 - .active = active, 319 - .status = status_val, 320 - .seq = seq, 321 - .time = time_val, 322 - } }); 323 - } 324 - // unknown kinds are silently ignored 325 - } 326 237 }; 327 238 } 328 239 329 - // === parsing helpers (used by tests and internal parsing) === 330 - 331 - /// parse a raw JSON message into an Event 332 - pub fn parseEvent(allocator: Allocator, payload: []const u8) !Event { 333 - var arena = std.heap.ArenaAllocator.init(allocator); 334 - defer arena.deinit(); 335 - const alloc = arena.allocator(); 336 - 337 - const parsed = try json.parseFromSlice(json.Value, alloc, payload, .{}); 338 - const root = parsed.value; 339 - 340 - const kind_str = json_helpers.getString(root, "kind") orelse return error.MissingKind; 341 - const did = json_helpers.getString(root, "did") orelse return error.MissingDid; 342 - const time_us = json_helpers.getInt(root, "time_us") orelse return error.MissingTimeUs; 343 - 344 - if (mem.eql(u8, kind_str, "commit")) { 345 - const commit = switch (root) { 346 - .object => |obj| obj.get("commit") orelse return error.MissingCommit, 347 - else => return error.InvalidRoot, 348 - }; 349 - const commit_obj = switch (commit) { 350 - .object => |obj| obj, 351 - else => return error.InvalidCommit, 352 - }; 353 - 354 - const operation_str = switch (commit_obj.get("operation") orelse return error.MissingOperation) { 355 - .string => |s| s, 356 - else => return error.InvalidOperation, 357 - }; 358 - const operation = CommitAction.parse(operation_str) orelse return error.UnknownOperation; 359 - 360 - const collection = switch (commit_obj.get("collection") orelse return error.MissingCollection) { 361 - .string => |s| s, 362 - else => return error.InvalidCollection, 363 - }; 364 - const rkey = switch (commit_obj.get("rkey") orelse return error.MissingRkey) { 365 - .string => |s| s, 366 - else => return error.InvalidRkey, 367 - }; 368 - 369 - const rev = blk: { 370 - const v = commit_obj.get("rev") orelse break :blk null; 371 - break :blk switch (v) { 372 - .string => |s| @as(?[]const u8, s), 373 - else => null, 374 - }; 375 - }; 376 - 377 - const cid = blk: { 378 - const v = commit_obj.get("cid") orelse break :blk null; 379 - break :blk switch (v) { 380 - .string => |s| @as(?[]const u8, s), 381 - else => null, 382 - }; 383 - }; 384 - 385 - return .{ .commit = .{ 386 - .did = did, 387 - .time_us = time_us, 388 - .rev = rev, 389 - .operation = operation, 390 - .collection = collection, 391 - .rkey = rkey, 392 - .record = commit_obj.get("record"), 393 - .cid = cid, 394 - } }; 395 - } else if (mem.eql(u8, kind_str, "identity")) { 396 - const identity = switch (root) { 397 - .object => |obj| obj.get("identity"), 398 - else => null, 399 - }; 400 - const identity_obj = if (identity) |id| switch (id) { 401 - .object => |obj| obj, 402 - else => null, 403 - } else null; 404 - 405 - const handle = if (identity_obj) |obj| switch (obj.get("handle") orelse json.Value{ .null = {} }) { 406 - .string => |s| @as(?[]const u8, s), 407 - else => null, 408 - } else null; 409 - 410 - const seq = if (identity_obj) |obj| switch (obj.get("seq") orelse json.Value{ .null = {} }) { 411 - .integer => |i| @as(?i64, i), 412 - else => null, 413 - } else null; 414 - 415 - const time_val = if (identity_obj) |obj| switch (obj.get("time") orelse json.Value{ .null = {} }) { 416 - .string => |s| @as(?[]const u8, s), 417 - else => null, 418 - } else null; 419 - 420 - return .{ .identity = .{ 421 - .did = did, 422 - .time_us = time_us, 423 - .handle = handle, 424 - .seq = seq, 425 - .time = time_val, 426 - } }; 427 - } else if (mem.eql(u8, kind_str, "account")) { 428 - const account = switch (root) { 429 - .object => |obj| obj.get("account"), 430 - else => null, 431 - }; 432 - const account_obj = if (account) |a| switch (a) { 433 - .object => |obj| obj, 434 - else => null, 435 - } else null; 436 - 437 - const active = if (account_obj) |obj| switch (obj.get("active") orelse json.Value{ .null = {} }) { 438 - .bool => |b| b, 439 - else => true, 440 - } else true; 441 - 442 - const status_val = if (account_obj) |obj| blk: { 443 - const v = obj.get("status") orelse break :blk null; 444 - break :blk switch (v) { 445 - .string => |s| AccountStatus.parse(s), 446 - else => null, 447 - }; 448 - } else null; 449 - 450 - const seq = if (account_obj) |obj| switch (obj.get("seq") orelse json.Value{ .null = {} }) { 451 - .integer => |i| @as(?i64, i), 452 - else => null, 453 - } else null; 454 - 455 - const time_val = if (account_obj) |obj| switch (obj.get("time") orelse json.Value{ .null = {} }) { 456 - .string => |s| @as(?[]const u8, s), 457 - else => null, 458 - } else null; 459 - 460 - return .{ .account = .{ 461 - .did = did, 462 - .time_us = time_us, 463 - .active = active, 464 - .status = status_val, 465 - .seq = seq, 466 - .time = time_val, 467 - } }; 468 - } 469 - 470 - return error.UnknownKind; 471 - } 472 - 473 240 // === tests === 474 241 475 242 test "parse commit event" { ··· 492 259 \\} 493 260 ; 494 261 495 - const event = try parseEvent(std.testing.allocator, payload); 262 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 263 + defer arena.deinit(); 264 + 265 + const event = try parseEvent(arena.allocator(), payload); 496 266 const commit = event.commit; 497 267 498 268 try std.testing.expectEqualStrings("did:plc:abc123", commit.did); ··· 503 273 try std.testing.expectEqualStrings("xyz789", commit.rkey); 504 274 try std.testing.expectEqualStrings("bafyreitest", commit.cid.?); 505 275 try std.testing.expect(commit.record != null); 506 - 507 - // verify record contents via json helpers 508 - const text = json_helpers.getString(commit.record.?, "text"); 509 - try std.testing.expectEqualStrings("hello world", text.?); 276 + try std.testing.expectEqualStrings("hello world", json_helpers.getString(commit.record.?, "text").?); 510 277 } 511 278 512 279 test "parse identity event" { ··· 523 290 \\} 524 291 ; 525 292 526 - const event = try parseEvent(std.testing.allocator, payload); 293 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 294 + defer arena.deinit(); 295 + 296 + const event = try parseEvent(arena.allocator(), payload); 527 297 const identity = event.identity; 528 298 529 299 try std.testing.expectEqualStrings("did:plc:abc123", identity.did); ··· 548 318 \\} 549 319 ; 550 320 551 - const event = try parseEvent(std.testing.allocator, payload); 321 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 322 + defer arena.deinit(); 323 + 324 + const event = try parseEvent(arena.allocator(), payload); 552 325 const account = event.account; 553 326 554 327 try std.testing.expectEqualStrings("did:plc:abc123", account.did); ··· 568 341 \\} 569 342 ; 570 343 571 - const result = parseEvent(std.testing.allocator, payload); 572 - try std.testing.expectError(error.UnknownKind, result); 344 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 345 + defer arena.deinit(); 346 + 347 + try std.testing.expectError(error.UnknownKind, parseEvent(arena.allocator(), payload)); 573 348 } 574 349 575 350 test "parse commit with unknown operation returns error" { ··· 586 361 \\} 587 362 ; 588 363 589 - const result = parseEvent(std.testing.allocator, payload); 590 - try std.testing.expectError(error.UnknownOperation, result); 364 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 365 + defer arena.deinit(); 366 + 367 + try std.testing.expectError(error.UnknownOperation, parseEvent(arena.allocator(), payload)); 591 368 } 592 369 593 370 test "cursor tracking via time_us" { 594 - // verify that parseEvent extracts time_us correctly for cursor resumption 595 371 const payloads = [_][]const u8{ 596 372 \\{"did":"did:plc:a","time_us":100,"kind":"commit","commit":{"operation":"create","collection":"app.bsky.feed.post","rkey":"1"}} 597 373 , ··· 599 375 , 600 376 }; 601 377 602 - for (payloads) |payload| { 603 - const event = try parseEvent(std.testing.allocator, payload); 604 - switch (event) { 605 - .commit => |c| try std.testing.expect(c.time_us > 0), 606 - else => unreachable, 607 - } 608 - } 378 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 379 + defer arena.deinit(); 380 + 381 + const e1 = try parseEvent(arena.allocator(), payloads[0]); 382 + const e2 = try parseEvent(arena.allocator(), payloads[1]); 383 + 384 + try std.testing.expect(e1.timeUs() > 0); 385 + try std.testing.expect(e2.timeUs() > e1.timeUs()); 386 + } 387 + 388 + test "Event.timeUs works for all variants" { 389 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 390 + defer arena.deinit(); 391 + 392 + const commit = try parseEvent(arena.allocator(), 393 + \\{"did":"did:plc:a","time_us":100,"kind":"commit","commit":{"operation":"create","collection":"x","rkey":"1"}} 394 + ); 395 + const identity = try parseEvent(arena.allocator(), 396 + \\{"did":"did:plc:a","time_us":200,"kind":"identity","identity":{}} 397 + ); 398 + const account = try parseEvent(arena.allocator(), 399 + \\{"did":"did:plc:a","time_us":300,"kind":"account","account":{"active":true}} 400 + ); 609 401 610 - // verify second event has higher time_us 611 - const e1 = try parseEvent(std.testing.allocator, payloads[0]); 612 - const e2 = try parseEvent(std.testing.allocator, payloads[1]); 613 - try std.testing.expect(e2.commit.time_us > e1.commit.time_us); 402 + try std.testing.expectEqual(@as(i64, 100), commit.timeUs()); 403 + try std.testing.expectEqual(@as(i64, 200), identity.timeUs()); 404 + try std.testing.expectEqual(@as(i64, 300), account.timeUs()); 614 405 } 615 406 616 407 test "build subscribe path" { ··· 660 451 \\} 661 452 ; 662 453 663 - const event = try parseEvent(std.testing.allocator, payload); 664 - const commit = event.commit; 454 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 455 + defer arena.deinit(); 456 + 457 + const commit = (try parseEvent(arena.allocator(), payload)).commit; 665 458 666 459 try std.testing.expectEqual(CommitAction.delete, commit.operation); 667 460 try std.testing.expect(commit.record == null); ··· 679 472 \\} 680 473 ; 681 474 682 - const event = try parseEvent(std.testing.allocator, payload); 683 - const identity = event.identity; 475 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 476 + defer arena.deinit(); 477 + 478 + const identity = (try parseEvent(arena.allocator(), payload)).identity; 684 479 685 480 try std.testing.expectEqualStrings("did:plc:abc123", identity.did); 686 481 try std.testing.expect(identity.handle == null); ··· 696 491 \\} 697 492 ; 698 493 699 - const result = parseEvent(std.testing.allocator, payload); 700 - try std.testing.expectError(error.MissingDid, result); 494 + var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 495 + defer arena.deinit(); 496 + 497 + try std.testing.expectError(error.MissingDid, parseEvent(arena.allocator(), payload)); 701 498 }