atproto relay implementation in zig
zlay.waow.tech
1//! collection index resyncer — updates collection index on #sync events
2//!
3//! a #sync event means a repo discontinuity (migration, rebase, bulk import).
4//! the collection index may be stale for that DID. this worker fetches
5//! describeRepo from the PDS to get the current collection list, then
6//! replaces the index entries for that DID.
7//!
8//! runs on pool_io — enqueue() is called from frame worker threads,
9//! background worker is a plain std.Thread.
10
11const std = @import("std");
12const Io = std.Io;
13const http = std.http;
14const collection_index_mod = @import("collection_index.zig");
15
16const Allocator = std.mem.Allocator;
17const log = std.log.scoped(.resync);
18
19const queue_capacity = 4096;
20
21const ResyncItem = struct {
22 did_buf: [128]u8,
23 did_len: u8,
24 host_buf: [256]u8,
25 host_len: u16,
26
27 fn did(self: *const ResyncItem) []const u8 {
28 return self.did_buf[0..self.did_len];
29 }
30
31 fn hostname(self: *const ResyncItem) []const u8 {
32 return self.host_buf[0..self.host_len];
33 }
34};
35
36pub const Resyncer = struct {
37 allocator: Allocator,
38 /// pool_io (Threaded) — used for all synchronization AND the HTTP client.
39 /// this struct must never touch the Evented io.
40 io: Io,
41 collection_index: *collection_index_mod.CollectionIndex,
42
43 // bounded ring buffer queue
44 queue: [queue_capacity]ResyncItem,
45 head: usize,
46 tail: usize,
47 len: usize,
48 mutex: Io.Mutex,
49 cond: Io.Condition,
50
51 running: std.atomic.Value(bool),
52 thread: ?std.Thread,
53
54 // stats
55 processed: std.atomic.Value(u64),
56 failed: std.atomic.Value(u64),
57 dropped: std.atomic.Value(u64),
58
59 pub fn init(
60 allocator: Allocator,
61 io: Io,
62 collection_index: *collection_index_mod.CollectionIndex,
63 ) Resyncer {
64 return .{
65 .allocator = allocator,
66 .io = io,
67 .collection_index = collection_index,
68 .queue = undefined,
69 .head = 0,
70 .tail = 0,
71 .len = 0,
72 .mutex = Io.Mutex.init,
73 .cond = Io.Condition.init,
74 .running = .{ .raw = false },
75 .thread = null,
76 .processed = .{ .raw = 0 },
77 .failed = .{ .raw = 0 },
78 .dropped = .{ .raw = 0 },
79 };
80 }
81
82 /// start the background worker thread.
83 pub fn start(self: *Resyncer) !void {
84 self.running.store(true, .release);
85 self.thread = try std.Thread.spawn(.{}, run, .{self});
86 }
87
88 /// enqueue a DID for resync. non-blocking, drops if queue full or inputs too long.
89 /// called from frame worker threads (plain std.Thread).
90 pub fn enqueue(self: *Resyncer, did: []const u8, hostname: []const u8) void {
91 if (did.len == 0 or did.len > 128 or hostname.len == 0 or hostname.len > 256) return;
92
93 self.mutex.lockUncancelable(self.io);
94 defer self.mutex.unlock(self.io);
95
96 if (self.len >= queue_capacity) {
97 _ = self.dropped.fetchAdd(1, .monotonic);
98 return;
99 }
100
101 var item: ResyncItem = .{
102 .did_buf = undefined,
103 .did_len = @intCast(did.len),
104 .host_buf = undefined,
105 .host_len = @intCast(hostname.len),
106 };
107 @memcpy(item.did_buf[0..did.len], did);
108 @memcpy(item.host_buf[0..hostname.len], hostname);
109
110 self.queue[self.tail] = item;
111 self.tail = (self.tail + 1) % queue_capacity;
112 self.len += 1;
113 self.cond.signal(self.io);
114 }
115
116 /// dequeue one item. blocks until available or shutdown.
117 fn dequeue(self: *Resyncer) ?ResyncItem {
118 self.mutex.lockUncancelable(self.io);
119 defer self.mutex.unlock(self.io);
120
121 while (self.len == 0 and self.running.load(.acquire)) {
122 self.cond.waitUncancelable(self.io, &self.mutex);
123 }
124
125 if (self.len == 0) return null;
126
127 const item = self.queue[self.head];
128 self.head = (self.head + 1) % queue_capacity;
129 self.len -= 1;
130 return item;
131 }
132
133 fn run(self: *Resyncer) void {
134 log.info("resync worker started", .{});
135
136 var client: http.Client = .{ .allocator = self.allocator, .io = self.io };
137 defer client.deinit();
138
139 while (self.running.load(.acquire)) {
140 const item = self.dequeue() orelse continue;
141 self.processItem(&client, &item);
142
143 // brief pause between items
144 self.io.sleep(Io.Duration.fromMilliseconds(50), .awake) catch {};
145 }
146
147 log.info("resync worker stopped (processed={d}, failed={d}, dropped={d})", .{
148 self.processed.load(.monotonic),
149 self.failed.load(.monotonic),
150 self.dropped.load(.monotonic),
151 });
152 }
153
154 fn processItem(self: *Resyncer, client: *http.Client, item: *const ResyncItem) void {
155 const did = item.did();
156 const hostname = item.hostname();
157
158 // fetch describeRepo from PDS
159 var url_buf: [512]u8 = undefined;
160 const url = std.fmt.bufPrint(&url_buf, "https://{s}/xrpc/com.atproto.repo.describeRepo?repo={s}", .{ hostname, did }) catch {
161 _ = self.failed.fetchAdd(1, .monotonic);
162 return;
163 };
164
165 var aw: std.Io.Writer.Allocating = .init(self.allocator);
166 defer aw.deinit();
167
168 const result = client.fetch(.{
169 .location = .{ .url = url },
170 .response_writer = &aw.writer,
171 .method = .GET,
172 }) catch |err| {
173 log.debug("resync: describeRepo failed for {s} on {s}: {s}", .{ did, hostname, @errorName(err) });
174 _ = self.failed.fetchAdd(1, .monotonic);
175 return;
176 };
177
178 if (result.status != .ok) {
179 log.debug("resync: describeRepo {d} for {s} on {s}", .{ @intFromEnum(result.status), did, hostname });
180 _ = self.failed.fetchAdd(1, .monotonic);
181 return;
182 }
183
184 const body = aw.written();
185
186 // parse {"collections":["app.bsky.feed.post",...], ...}
187 const parsed = std.json.parseFromSlice(
188 DescribeRepoResponse,
189 self.allocator,
190 body,
191 .{ .ignore_unknown_fields = true },
192 ) catch {
193 log.debug("resync: JSON parse failed for {s}", .{did});
194 _ = self.failed.fetchAdd(1, .monotonic);
195 return;
196 };
197 defer parsed.deinit();
198
199 const collections = parsed.value.collections orelse {
200 // no collections field — just remove all
201 self.collection_index.removeAll(did) catch {};
202 _ = self.processed.fetchAdd(1, .monotonic);
203 log.debug("resync: {s} — no collections, removed all", .{did});
204 return;
205 };
206
207 // removeAll then re-add each collection
208 self.collection_index.removeAll(did) catch |err| {
209 log.debug("resync: removeAll failed for {s}: {s}", .{ did, @errorName(err) });
210 _ = self.failed.fetchAdd(1, .monotonic);
211 return;
212 };
213
214 for (collections) |collection| {
215 self.collection_index.addCollection(did, collection) catch {};
216 }
217
218 _ = self.processed.fetchAdd(1, .monotonic);
219 log.debug("resync: {s} — {d} collections indexed", .{ did, collections.len });
220 }
221
222 pub fn queueDepth(self: *Resyncer) usize {
223 self.mutex.lockUncancelable(self.io);
224 defer self.mutex.unlock(self.io);
225 return self.len;
226 }
227
228 pub fn stop(self: *Resyncer) void {
229 self.running.store(false, .release);
230 self.cond.signal(self.io);
231 }
232
233 pub fn deinit(self: *Resyncer) void {
234 self.stop();
235 if (self.thread) |t| t.join();
236 self.thread = null;
237 }
238
239 const DescribeRepoResponse = struct {
240 collections: ?[]const []const u8 = null,
241 };
242};
243
244// --- tests ---
245
246test "Resyncer init and enqueue" {
247 // just test the queue mechanics, not the HTTP/RocksDB parts
248 var r: Resyncer = .{
249 .allocator = std.testing.allocator,
250 .io = std.testing.io,
251 .collection_index = undefined, // not used in this test
252 .queue = undefined,
253 .head = 0,
254 .tail = 0,
255 .len = 0,
256 .mutex = Io.Mutex.init,
257 .cond = Io.Condition.init,
258 .running = .{ .raw = true },
259 .thread = null,
260 .processed = .{ .raw = 0 },
261 .failed = .{ .raw = 0 },
262 .dropped = .{ .raw = 0 },
263 };
264
265 // enqueue and dequeue
266 r.enqueue("did:plc:test123", "pds.example.com");
267 try std.testing.expectEqual(@as(usize, 1), r.queueDepth());
268
269 const item = r.dequeue().?;
270 try std.testing.expectEqualStrings("did:plc:test123", item.did());
271 try std.testing.expectEqualStrings("pds.example.com", item.hostname());
272 try std.testing.expectEqual(@as(usize, 0), r.queueDepth());
273}
274
275test "Resyncer drops when full" {
276 var r: Resyncer = .{
277 .allocator = std.testing.allocator,
278 .io = std.testing.io,
279 .collection_index = undefined,
280 .queue = undefined,
281 .head = 0,
282 .tail = 0,
283 .len = queue_capacity, // pretend full
284 .mutex = Io.Mutex.init,
285 .cond = Io.Condition.init,
286 .running = .{ .raw = true },
287 .thread = null,
288 .processed = .{ .raw = 0 },
289 .failed = .{ .raw = 0 },
290 .dropped = .{ .raw = 0 },
291 };
292
293 r.enqueue("did:plc:test", "pds.example.com");
294 try std.testing.expectEqual(@as(u64, 1), r.dropped.load(.monotonic));
295}
296
297test "Resyncer rejects oversized inputs" {
298 var r: Resyncer = .{
299 .allocator = std.testing.allocator,
300 .io = std.testing.io,
301 .collection_index = undefined,
302 .queue = undefined,
303 .head = 0,
304 .tail = 0,
305 .len = 0,
306 .mutex = Io.Mutex.init,
307 .cond = Io.Condition.init,
308 .running = .{ .raw = true },
309 .thread = null,
310 .processed = .{ .raw = 0 },
311 .failed = .{ .raw = 0 },
312 .dropped = .{ .raw = 0 },
313 };
314
315 // empty DID
316 r.enqueue("", "pds.example.com");
317 try std.testing.expectEqual(@as(usize, 0), r.queueDepth());
318
319 // empty hostname
320 r.enqueue("did:plc:test", "");
321 try std.testing.expectEqual(@as(usize, 0), r.queueDepth());
322}