atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

subscriber: extract prepareFrameWork + add UAF regression test

follow-up to 1eec324 (fix UAF: dupe FrameWork.hostname per submit).

the dupe-at-submit logic was inline in FrameHandler.onMessage, which
made it hard to regression-test the invariant. extracted a small
Subscriber method that returns a FrameWork with heap-owned data +
hostname, and added a unit test that:

- builds a FrameWork from a to-be-freed hostname buffer
- asserts the returned slices have distinct pointers from the inputs
- simulates slurper.runWorker teardown by freeing the source hostname
- reads the FrameWork.hostname again — would trip the testing
allocator's use-after-free detection if the dupe was elided

no behavior change at the submit site. tested via zig build test.

zzstoatzz 31825b25 1eec3241

+92 -21
+92 -21
src/subscriber.zig
··· 256 256 return self.shutdown.load(.acquire) or self.host_shutdown.load(.acquire); 257 257 } 258 258 259 + /// build a FrameWork that owns its data + hostname, decoupled from the 260 + /// subscriber lifetime. returns null on OOM. 261 + /// 262 + /// UAF-safe contract: after this returns, the caller (or the slurper) 263 + /// may free `self.options.hostname` and the input `data` immediately 264 + /// without affecting the work item. the worker will free both through 265 + /// `work.allocator` when it finishes processing. 266 + /// 267 + /// see `test "prepareFrameWork dupes hostname and data (UAF regression)"` 268 + fn prepareFrameWork(self: *Subscriber, data: []const u8) ?frame_worker_mod.FrameWork { 269 + const duped = self.allocator.dupe(u8, data) catch return null; 270 + const hostname_dup = self.allocator.dupe(u8, self.options.hostname) catch { 271 + self.allocator.free(duped); 272 + return null; 273 + }; 274 + return .{ 275 + .data = duped, 276 + .host_id = self.options.host_id, 277 + .hostname = hostname_dup, 278 + .allocator = self.allocator, 279 + .io = self.pool_io orelse self.io, 280 + .bc = self.bc, 281 + .validator = self.validator, 282 + .persist = self.persist, 283 + .collection_index = self.collection_index, 284 + .resyncer = self.resyncer, 285 + }; 286 + } 287 + 259 288 /// run the subscriber loop. reconnects with exponential backoff. 260 289 /// blocks until shutdown flag is set or host is exhausted. 261 290 pub fn run(self: *Subscriber) void { ··· 477 506 const d = payload.getString("repo") orelse payload.getString("did"); 478 507 break :blk if (d) |s| std.hash.Wyhash.hash(0, s) else sub.options.host_id; 479 508 }; 480 - const duped = sub.allocator.dupe(u8, data) catch return; 481 - // dupe hostname per-frame: subscriber teardown (slurper.runWorker) 509 + // dupe data + hostname per-frame: subscriber teardown (slurper.runWorker) 482 510 // frees sub.options.hostname after sub.run() returns, but FrameWorks 483 511 // can still be queued in the pool. borrowing the slice would be a 484 512 // use-after-free (corrupt hostnames in chain-break logs, etc.). 485 - const hostname_dup = sub.allocator.dupe(u8, sub.options.hostname) catch { 486 - sub.allocator.free(duped); 487 - return; 488 - }; 513 + const work = sub.prepareFrameWork(data) orelse return; 489 514 const t0 = nanoTimestamp(io); 490 - if (pool.submit(did_key, .{ 491 - .data = duped, 492 - .host_id = sub.options.host_id, 493 - .hostname = hostname_dup, 494 - .allocator = sub.allocator, 495 - .io = sub.pool_io orelse sub.io, // pool_io (Threaded) for worker-safe ops 496 - .bc = sub.bc, 497 - .validator = sub.validator, 498 - .persist = sub.persist, 499 - .collection_index = sub.collection_index, 500 - .resyncer = sub.resyncer, 501 - }, sub.shutdown)) { 515 + if (pool.submit(did_key, work, sub.shutdown)) { 502 516 // pool accepted — advance cursor past this frame 503 - _ = sub.bc.stats.pool_queued_bytes.fetchAdd(duped.len, .monotonic); 517 + _ = sub.bc.stats.pool_queued_bytes.fetchAdd(work.data.len, .monotonic); 504 518 if (upstream_seq) |s| sub.last_upstream_seq = s; 505 519 if (nanoTimestamp(io) - t0 > 1_000_000) { // >1ms = had to wait 506 520 _ = sub.bc.stats.pool_backpressure.fetchAdd(1, .monotonic); 507 521 } 508 522 } else { 509 523 // shutdown requested — don't advance cursor so reconnect replays this frame 510 - sub.allocator.free(duped); 511 - sub.allocator.free(hostname_dup); 524 + sub.allocator.free(work.data); 525 + sub.allocator.free(work.hostname); 512 526 } 513 527 return; 514 528 } ··· 778 792 }; 779 793 780 794 // --- tests --- 795 + 796 + test "prepareFrameWork dupes hostname and data (UAF regression)" { 797 + // regression test for UAF: slurper.runWorker frees sub.options.hostname 798 + // after sub.run() returns, but FrameWorks can still be queued in the 799 + // frame pool with that hostname slice. prepareFrameWork must heap-dupe 800 + // both `data` and `hostname` so the work item is independent of the 801 + // subscriber's lifetime. 802 + // 803 + // the test simulates subscriber teardown by freeing the source hostname 804 + // after the FrameWork is built, then asserts the work item is still 805 + // intact (would trip the testing allocator's use-after-free detection 806 + // if the dupe was skipped). 807 + const alloc = std.testing.allocator; 808 + 809 + const orig_hostname = try alloc.dupe(u8, "example.pds.host"); 810 + const data_input = try alloc.dupe(u8, "raw frame bytes"); 811 + defer alloc.free(data_input); 812 + 813 + var shutdown: std.atomic.Value(bool) = .{ .raw = false }; 814 + var sub: Subscriber = .{ 815 + .allocator = alloc, 816 + .io = std.testing.io, 817 + .options = .{ .hostname = orig_hostname, .host_id = 42 }, 818 + .bc = undefined, // not dereferenced by prepareFrameWork 819 + .validator = undefined, 820 + .persist = null, 821 + .collection_index = null, 822 + .resyncer = null, 823 + .pool = null, 824 + .pool_io = null, 825 + .shutdown = &shutdown, 826 + }; 827 + 828 + const work = sub.prepareFrameWork(data_input) orelse return error.OutOfMemory; 829 + defer alloc.free(work.data); 830 + defer alloc.free(work.hostname); 831 + 832 + // content is correct 833 + try std.testing.expectEqualStrings("example.pds.host", work.hostname); 834 + try std.testing.expectEqualStrings("raw frame bytes", work.data); 835 + 836 + // pointers must be distinct from caller's buffers — this is the core 837 + // UAF invariant. if the dupe was elided, these would alias. 838 + try std.testing.expect(work.hostname.ptr != orig_hostname.ptr); 839 + try std.testing.expect(work.data.ptr != data_input.ptr); 840 + 841 + // scalar fields propagate 842 + try std.testing.expectEqual(@as(u64, 42), work.host_id); 843 + 844 + // simulate subscriber teardown (slurper.runWorker frees hostname) 845 + alloc.free(orig_hostname); 846 + 847 + // work item must still be readable and correct — if hostname was 848 + // borrowed instead of duped, the testing allocator would catch the 849 + // read-after-free above when expectEqualStrings is called again here. 850 + try std.testing.expectEqualStrings("example.pds.host", work.hostname); 851 + } 781 852 782 853 test "decode frame via SDK and extract fields" { 783 854 const cbor = zat.cbor;