atproto relay implementation in zig zlay.waow.tech
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 322 lines 10 kB view raw
1//! collection index resyncer — updates collection index on #sync events 2//! 3//! a #sync event means a repo discontinuity (migration, rebase, bulk import). 4//! the collection index may be stale for that DID. this worker fetches 5//! describeRepo from the PDS to get the current collection list, then 6//! replaces the index entries for that DID. 7//! 8//! runs on pool_io — enqueue() is called from frame worker threads, 9//! background worker is a plain std.Thread. 10 11const std = @import("std"); 12const Io = std.Io; 13const http = std.http; 14const collection_index_mod = @import("collection_index.zig"); 15 16const Allocator = std.mem.Allocator; 17const log = std.log.scoped(.resync); 18 19const queue_capacity = 4096; 20 21const ResyncItem = struct { 22 did_buf: [128]u8, 23 did_len: u8, 24 host_buf: [256]u8, 25 host_len: u16, 26 27 fn did(self: *const ResyncItem) []const u8 { 28 return self.did_buf[0..self.did_len]; 29 } 30 31 fn hostname(self: *const ResyncItem) []const u8 { 32 return self.host_buf[0..self.host_len]; 33 } 34}; 35 36pub const Resyncer = struct { 37 allocator: Allocator, 38 /// pool_io (Threaded) — used for all synchronization AND the HTTP client. 39 /// this struct must never touch the Evented io. 40 io: Io, 41 collection_index: *collection_index_mod.CollectionIndex, 42 43 // bounded ring buffer queue 44 queue: [queue_capacity]ResyncItem, 45 head: usize, 46 tail: usize, 47 len: usize, 48 mutex: Io.Mutex, 49 cond: Io.Condition, 50 51 running: std.atomic.Value(bool), 52 thread: ?std.Thread, 53 54 // stats 55 processed: std.atomic.Value(u64), 56 failed: std.atomic.Value(u64), 57 dropped: std.atomic.Value(u64), 58 59 pub fn init( 60 allocator: Allocator, 61 io: Io, 62 collection_index: *collection_index_mod.CollectionIndex, 63 ) Resyncer { 64 return .{ 65 .allocator = allocator, 66 .io = io, 67 .collection_index = collection_index, 68 .queue = undefined, 69 .head = 0, 70 .tail = 0, 71 .len = 0, 72 .mutex = Io.Mutex.init, 73 .cond = Io.Condition.init, 74 .running = .{ .raw = false }, 75 .thread = null, 76 .processed = .{ .raw = 0 }, 77 .failed = .{ .raw = 0 }, 78 .dropped = .{ .raw = 0 }, 79 }; 80 } 81 82 /// start the background worker thread. 83 pub fn start(self: *Resyncer) !void { 84 self.running.store(true, .release); 85 self.thread = try std.Thread.spawn(.{}, run, .{self}); 86 } 87 88 /// enqueue a DID for resync. non-blocking, drops if queue full or inputs too long. 89 /// called from frame worker threads (plain std.Thread). 90 pub fn enqueue(self: *Resyncer, did: []const u8, hostname: []const u8) void { 91 if (did.len == 0 or did.len > 128 or hostname.len == 0 or hostname.len > 256) return; 92 93 self.mutex.lockUncancelable(self.io); 94 defer self.mutex.unlock(self.io); 95 96 if (self.len >= queue_capacity) { 97 _ = self.dropped.fetchAdd(1, .monotonic); 98 return; 99 } 100 101 var item: ResyncItem = .{ 102 .did_buf = undefined, 103 .did_len = @intCast(did.len), 104 .host_buf = undefined, 105 .host_len = @intCast(hostname.len), 106 }; 107 @memcpy(item.did_buf[0..did.len], did); 108 @memcpy(item.host_buf[0..hostname.len], hostname); 109 110 self.queue[self.tail] = item; 111 self.tail = (self.tail + 1) % queue_capacity; 112 self.len += 1; 113 self.cond.signal(self.io); 114 } 115 116 /// dequeue one item. blocks until available or shutdown. 117 fn dequeue(self: *Resyncer) ?ResyncItem { 118 self.mutex.lockUncancelable(self.io); 119 defer self.mutex.unlock(self.io); 120 121 while (self.len == 0 and self.running.load(.acquire)) { 122 self.cond.waitUncancelable(self.io, &self.mutex); 123 } 124 125 if (self.len == 0) return null; 126 127 const item = self.queue[self.head]; 128 self.head = (self.head + 1) % queue_capacity; 129 self.len -= 1; 130 return item; 131 } 132 133 fn run(self: *Resyncer) void { 134 log.info("resync worker started", .{}); 135 136 var client: http.Client = .{ .allocator = self.allocator, .io = self.io }; 137 defer client.deinit(); 138 139 while (self.running.load(.acquire)) { 140 const item = self.dequeue() orelse continue; 141 self.processItem(&client, &item); 142 143 // brief pause between items 144 self.io.sleep(Io.Duration.fromMilliseconds(50), .awake) catch {}; 145 } 146 147 log.info("resync worker stopped (processed={d}, failed={d}, dropped={d})", .{ 148 self.processed.load(.monotonic), 149 self.failed.load(.monotonic), 150 self.dropped.load(.monotonic), 151 }); 152 } 153 154 fn processItem(self: *Resyncer, client: *http.Client, item: *const ResyncItem) void { 155 const did = item.did(); 156 const hostname = item.hostname(); 157 158 // fetch describeRepo from PDS 159 var url_buf: [512]u8 = undefined; 160 const url = std.fmt.bufPrint(&url_buf, "https://{s}/xrpc/com.atproto.repo.describeRepo?repo={s}", .{ hostname, did }) catch { 161 _ = self.failed.fetchAdd(1, .monotonic); 162 return; 163 }; 164 165 var aw: std.Io.Writer.Allocating = .init(self.allocator); 166 defer aw.deinit(); 167 168 const result = client.fetch(.{ 169 .location = .{ .url = url }, 170 .response_writer = &aw.writer, 171 .method = .GET, 172 }) catch |err| { 173 log.debug("resync: describeRepo failed for {s} on {s}: {s}", .{ did, hostname, @errorName(err) }); 174 _ = self.failed.fetchAdd(1, .monotonic); 175 return; 176 }; 177 178 if (result.status != .ok) { 179 log.debug("resync: describeRepo {d} for {s} on {s}", .{ @intFromEnum(result.status), did, hostname }); 180 _ = self.failed.fetchAdd(1, .monotonic); 181 return; 182 } 183 184 const body = aw.written(); 185 186 // parse {"collections":["app.bsky.feed.post",...], ...} 187 const parsed = std.json.parseFromSlice( 188 DescribeRepoResponse, 189 self.allocator, 190 body, 191 .{ .ignore_unknown_fields = true }, 192 ) catch { 193 log.debug("resync: JSON parse failed for {s}", .{did}); 194 _ = self.failed.fetchAdd(1, .monotonic); 195 return; 196 }; 197 defer parsed.deinit(); 198 199 const collections = parsed.value.collections orelse { 200 // no collections field — just remove all 201 self.collection_index.removeAll(did) catch {}; 202 _ = self.processed.fetchAdd(1, .monotonic); 203 log.debug("resync: {s} — no collections, removed all", .{did}); 204 return; 205 }; 206 207 // removeAll then re-add each collection 208 self.collection_index.removeAll(did) catch |err| { 209 log.debug("resync: removeAll failed for {s}: {s}", .{ did, @errorName(err) }); 210 _ = self.failed.fetchAdd(1, .monotonic); 211 return; 212 }; 213 214 for (collections) |collection| { 215 self.collection_index.addCollection(did, collection) catch {}; 216 } 217 218 _ = self.processed.fetchAdd(1, .monotonic); 219 log.debug("resync: {s} — {d} collections indexed", .{ did, collections.len }); 220 } 221 222 pub fn queueDepth(self: *Resyncer) usize { 223 self.mutex.lockUncancelable(self.io); 224 defer self.mutex.unlock(self.io); 225 return self.len; 226 } 227 228 pub fn stop(self: *Resyncer) void { 229 self.running.store(false, .release); 230 self.cond.signal(self.io); 231 } 232 233 pub fn deinit(self: *Resyncer) void { 234 self.stop(); 235 if (self.thread) |t| t.join(); 236 self.thread = null; 237 } 238 239 const DescribeRepoResponse = struct { 240 collections: ?[]const []const u8 = null, 241 }; 242}; 243 244// --- tests --- 245 246test "Resyncer init and enqueue" { 247 // just test the queue mechanics, not the HTTP/RocksDB parts 248 var r: Resyncer = .{ 249 .allocator = std.testing.allocator, 250 .io = std.testing.io, 251 .collection_index = undefined, // not used in this test 252 .queue = undefined, 253 .head = 0, 254 .tail = 0, 255 .len = 0, 256 .mutex = Io.Mutex.init, 257 .cond = Io.Condition.init, 258 .running = .{ .raw = true }, 259 .thread = null, 260 .processed = .{ .raw = 0 }, 261 .failed = .{ .raw = 0 }, 262 .dropped = .{ .raw = 0 }, 263 }; 264 265 // enqueue and dequeue 266 r.enqueue("did:plc:test123", "pds.example.com"); 267 try std.testing.expectEqual(@as(usize, 1), r.queueDepth()); 268 269 const item = r.dequeue().?; 270 try std.testing.expectEqualStrings("did:plc:test123", item.did()); 271 try std.testing.expectEqualStrings("pds.example.com", item.hostname()); 272 try std.testing.expectEqual(@as(usize, 0), r.queueDepth()); 273} 274 275test "Resyncer drops when full" { 276 var r: Resyncer = .{ 277 .allocator = std.testing.allocator, 278 .io = std.testing.io, 279 .collection_index = undefined, 280 .queue = undefined, 281 .head = 0, 282 .tail = 0, 283 .len = queue_capacity, // pretend full 284 .mutex = Io.Mutex.init, 285 .cond = Io.Condition.init, 286 .running = .{ .raw = true }, 287 .thread = null, 288 .processed = .{ .raw = 0 }, 289 .failed = .{ .raw = 0 }, 290 .dropped = .{ .raw = 0 }, 291 }; 292 293 r.enqueue("did:plc:test", "pds.example.com"); 294 try std.testing.expectEqual(@as(u64, 1), r.dropped.load(.monotonic)); 295} 296 297test "Resyncer rejects oversized inputs" { 298 var r: Resyncer = .{ 299 .allocator = std.testing.allocator, 300 .io = std.testing.io, 301 .collection_index = undefined, 302 .queue = undefined, 303 .head = 0, 304 .tail = 0, 305 .len = 0, 306 .mutex = Io.Mutex.init, 307 .cond = Io.Condition.init, 308 .running = .{ .raw = true }, 309 .thread = null, 310 .processed = .{ .raw = 0 }, 311 .failed = .{ .raw = 0 }, 312 .dropped = .{ .raw = 0 }, 313 }; 314 315 // empty DID 316 r.enqueue("", "pds.example.com"); 317 try std.testing.expectEqual(@as(usize, 0), r.queueDepth()); 318 319 // empty hostname 320 r.enqueue("did:plc:test", ""); 321 try std.testing.expectEqual(@as(usize, 0), r.queueDepth()); 322}