GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 5080912ca9c3d7d4301e4dce83a5b804de56d00a 429 lines 16 kB view raw
1//! Jetstream event handler: buffers actor events, flushes batches to worker 2//! Dual-writes profile/identity events to local SQLite for immediate search 3 4const std = @import("std"); 5const mem = std.mem; 6const json = std.json; 7const Allocator = mem.Allocator; 8const zat = @import("zat"); 9const HttpTransport = zat.HttpTransport; 10const LocalDb = @import("db/LocalDb.zig"); 11const BloomFilter = @import("bloom.zig").BloomFilter; 12 13const log = std.log.scoped(.ingester); 14 15pub const MAX_BATCH: usize = 100; 16const BLOOM_BITS: usize = 10_000_000; // ~1.2MB fixed 17const BLOOM_HASHES: usize = 7; 18 19pub const Config = struct { 20 worker_url: []const u8, 21 secret: []const u8, 22}; 23 24pub fn getConfig() Config { 25 return .{ 26 .worker_url = std.posix.getenv("TYPEAHEAD_URL") orelse 27 @panic("TYPEAHEAD_URL not set"), 28 .secret = std.posix.getenv("TYPEAHEAD_SECRET") orelse 29 @panic("TYPEAHEAD_SECRET not set"), 30 }; 31} 32 33/// an actor event to POST to the worker 34pub const ActorEvent = struct { 35 did: []const u8, 36 handle: ?[]const u8 = null, 37 display_name: ?[]const u8 = null, 38 avatar_cid: ?[]const u8 = null, 39}; 40 41pub const IngestHandler = struct { 42 allocator: Allocator, 43 config: Config, 44 transport: *HttpTransport, 45 local_db: ?*LocalDb, 46 buffer: std.ArrayList(ActorEvent), 47 delete_buffer: std.ArrayList([]const u8), 48 /// arena owns all string data in buffer/delete_buffer 49 arena: std.heap.ArenaAllocator, 50 /// bloom filter: fixed-size dedup for non-profile DIDs (~1.2MB) 51 bloom: BloomFilter, 52 pending_cursor: i64 = 0, 53 flushed_cursor: i64 = 0, 54 total_ingested: u64 = 0, 55 total_deleted: u64 = 0, 56 last_flush: i64 = 0, 57 retry_after: i64 = 0, // timestamp before which we skip flush attempts 58 59 pub fn init(allocator: Allocator, config: Config, transport: *HttpTransport, local_db: ?*LocalDb) !IngestHandler { 60 return .{ 61 .allocator = allocator, 62 .config = config, 63 .transport = transport, 64 .local_db = local_db, 65 .buffer = .{}, 66 .delete_buffer = .{}, 67 .arena = std.heap.ArenaAllocator.init(allocator), 68 .bloom = try BloomFilter.init(allocator, BLOOM_BITS, BLOOM_HASHES), 69 .last_flush = std.time.timestamp(), 70 }; 71 } 72 73 pub fn deinit(self: *IngestHandler) void { 74 self.bloom.deinit(self.allocator); 75 self.arena.deinit(); 76 self.buffer.deinit(self.allocator); 77 self.delete_buffer.deinit(self.allocator); 78 } 79 80 fn dupe(self: *IngestHandler, s: []const u8) ?[]const u8 { 81 return self.arena.allocator().dupe(u8, s) catch null; 82 } 83 84 fn resetBloom(self: *IngestHandler) void { 85 log.info("resetting bloom filter ({d} insertions)", .{self.bloom.count}); 86 self.bloom.reset(); 87 } 88 89 pub fn onEvent(self: *IngestHandler, event: zat.JetstreamEvent) void { 90 switch (event) { 91 .commit => |c| self.handleCommit(c), 92 .identity => |id| self.handleIdentity(id), 93 .account => |acct| self.handleAccount(acct), 94 } 95 96 self.pending_cursor = event.timeUs(); 97 98 const now = std.time.timestamp(); 99 // drop incoming events if buffers are backed up (persistent failure) 100 const MAX_BACKLOG = MAX_BATCH * 10; 101 if (self.buffer.items.len >= MAX_BACKLOG or self.delete_buffer.items.len >= MAX_BACKLOG) { 102 log.err("backlog overflow ({d} ingest, {d} delete), dropping oldest events", .{ 103 self.buffer.items.len, self.delete_buffer.items.len, 104 }); 105 self.buffer.clearRetainingCapacity(); 106 self.delete_buffer.clearRetainingCapacity(); 107 _ = self.arena.reset(.retain_capacity); 108 self.resetBloom(); 109 self.flushed_cursor = self.pending_cursor; 110 self.retry_after = 0; 111 } 112 113 // respect backoff cooldown after failures 114 if (now < self.retry_after) return; 115 116 const should_flush = self.buffer.items.len >= MAX_BATCH or 117 self.delete_buffer.items.len >= MAX_BATCH or 118 (now - self.last_flush >= 5 and (self.buffer.items.len > 0 or self.delete_buffer.items.len > 0)); 119 120 if (should_flush) { 121 self.flush(); 122 } 123 } 124 125 pub fn onError(_: *IngestHandler, err: anyerror) void { 126 log.err("jetstream: {s}", .{@errorName(err)}); 127 } 128 129 pub fn onConnect(_: *IngestHandler, host: []const u8) void { 130 log.info("connected to {s}", .{host}); 131 } 132 133 fn handleCommit(self: *IngestHandler, c: zat.jetstream.CommitEvent) void { 134 if (mem.eql(u8, c.collection, "app.bsky.actor.profile")) { 135 // rich extraction from profile records 136 if (c.operation != .create and c.operation != .update) return; 137 138 const record = c.record orelse return; 139 140 const did = self.dupe(c.did) orelse return; 141 var event = ActorEvent{ .did = did }; 142 143 if (zat.json.getString(record, "displayName")) |name| { 144 event.display_name = self.dupe(name); 145 } 146 147 if (zat.json.getPath(record, "avatar")) |avatar| { 148 if (zat.json.getString(avatar, "ref.$link")) |cid| { 149 event.avatar_cid = self.dupe(cid); 150 } 151 } 152 153 // dual-write to local SQLite (profile events have searchable data) 154 self.writeToLocal(event); 155 156 // ingester only sees self-labels from profile records — it can never 157 // correctly determine hidden. let refreshModeration cron handle it. 158 159 self.buffer.append(self.allocator, event) catch return; 160 } else { 161 // non-profile collections: just discover the DID 162 if (c.operation == .delete) return; 163 164 // dedup: skip if bloom filter says we've seen this DID recently 165 if (self.bloom.contains(c.did)) return; 166 self.bloom.insert(c.did); 167 168 const did = self.dupe(c.did) orelse return; 169 self.buffer.append(self.allocator, .{ .did = did }) catch return; 170 } 171 } 172 173 fn handleIdentity(self: *IngestHandler, id: zat.jetstream.IdentityEvent) void { 174 const handle = id.handle orelse return; 175 const event = ActorEvent{ 176 .did = self.dupe(id.did) orelse return, 177 .handle = self.dupe(handle), 178 }; 179 180 // dual-write: identity events have handles (searchable) 181 self.writeToLocal(event); 182 183 self.buffer.append(self.allocator, event) catch return; 184 } 185 186 fn handleAccount(self: *IngestHandler, acct: zat.jetstream.AccountEvent) void { 187 if (!acct.active) { 188 const did = self.dupe(acct.did) orelse return; 189 190 // dual-write: delete from local 191 if (self.local_db) |db| { 192 db.exec("DELETE FROM actors WHERE did = ?", .{did}) catch {}; 193 db.exec("DELETE FROM actors_fts WHERE did = ?", .{did}) catch {}; 194 } 195 196 self.delete_buffer.append(self.allocator, did) catch return; 197 } 198 } 199 200 /// Dual-write to local SQLite for immediate search availability 201 fn writeToLocal(self: *IngestHandler, event: ActorEvent) void { 202 const db = self.local_db orelse return; 203 if (!db.isReady()) return; 204 205 const conn = db.getConn() orelse return; 206 207 db.lock(); 208 defer db.unlock(); 209 210 // only write if we have searchable data (handle or display_name) 211 const has_handle = event.handle != null and event.handle.?.len > 0; 212 const has_name = event.display_name != null and event.display_name.?.len > 0; 213 if (!has_handle and !has_name) return; 214 215 // upsert actor 216 conn.exec( 217 \\INSERT INTO actors (did, handle, display_name, avatar_url) 218 \\VALUES (?, ?, ?, ?) 219 \\ON CONFLICT(did) DO UPDATE SET 220 \\ handle = COALESCE(NULLIF(excluded.handle, ''), actors.handle), 221 \\ display_name = COALESCE(NULLIF(excluded.display_name, ''), actors.display_name), 222 \\ avatar_url = COALESCE(NULLIF(excluded.avatar_url, ''), actors.avatar_url) 223 , .{ 224 event.did, 225 event.handle orelse "", 226 event.display_name orelse "", 227 event.avatar_cid orelse "", 228 }) catch return; 229 230 // update FTS 231 conn.exec("DELETE FROM actors_fts WHERE did = ?", .{event.did}) catch {}; 232 233 // read back current handle + display_name for FTS 234 const row = conn.row( 235 "SELECT handle, display_name FROM actors WHERE did = ?", 236 .{event.did}, 237 ) catch return; 238 if (row) |r| { 239 defer r.deinit(); 240 const h = r.text(0); 241 const dn = r.text(1); 242 if (h.len > 0 or dn.len > 0) { 243 conn.exec( 244 "INSERT INTO actors_fts (did, handle, display_name) VALUES (?, ?, ?)", 245 .{ event.did, h, dn }, 246 ) catch {}; 247 } 248 } 249 } 250 251 fn flush(self: *IngestHandler) void { 252 self.last_flush = std.time.timestamp(); 253 var any_failure = false; 254 255 if (self.buffer.items.len > 0) { 256 const batch_end = @min(self.buffer.items.len, MAX_BATCH); 257 const batch = self.buffer.items[0..batch_end]; 258 const ok = postBatch( 259 self.transport, 260 self.config, 261 batch, 262 self.flushed_cursor, 263 ); 264 if (ok) { 265 self.total_ingested += batch_end; 266 const rss = getRssKB(); 267 log.info("+{d} actors (total: {d}, pending: {d}) cursor={d} rss={d}KB", .{ 268 batch_end, self.total_ingested, 269 self.buffer.items.len - batch_end, self.pending_cursor, rss, 270 }); 271 if (batch_end < self.buffer.items.len) { 272 std.mem.copyForwards( 273 ActorEvent, 274 self.buffer.items[0 .. self.buffer.items.len - batch_end], 275 self.buffer.items[batch_end..], 276 ); 277 } 278 self.buffer.shrinkRetainingCapacity(self.buffer.items.len - batch_end); 279 } else { 280 log.err("ingest batch failed ({d} events, {d} pending), will retry in 5s", .{ 281 batch_end, self.buffer.items.len, 282 }); 283 any_failure = true; 284 } 285 } 286 287 if (self.delete_buffer.items.len > 0) { 288 const batch_end = @min(self.delete_buffer.items.len, MAX_BATCH); 289 const batch = self.delete_buffer.items[0..batch_end]; 290 const ok = deleteActors( 291 self.transport, 292 self.config, 293 batch, 294 ); 295 if (ok) { 296 self.total_deleted += batch_end; 297 log.info("-{d} moderated (total: {d})", .{ 298 batch_end, self.total_deleted, 299 }); 300 if (batch_end < self.delete_buffer.items.len) { 301 const remaining = self.delete_buffer.items.len - batch_end; 302 var i: usize = 0; 303 while (i < remaining) : (i += 1) { 304 self.delete_buffer.items[i] = self.delete_buffer.items[batch_end + i]; 305 } 306 self.delete_buffer.shrinkRetainingCapacity(remaining); 307 } else { 308 self.delete_buffer.clearRetainingCapacity(); 309 } 310 } else { 311 log.err("delete batch failed ({d} dids), will retry in 5s", .{batch_end}); 312 any_failure = true; 313 } 314 } 315 316 if (any_failure) { 317 self.retry_after = std.time.timestamp() + 5; 318 } else if (self.buffer.items.len == 0 and self.delete_buffer.items.len == 0) { 319 self.flushed_cursor = self.pending_cursor; 320 _ = self.arena.reset(.retain_capacity); 321 } 322 } 323}; 324 325// -- HTTP client functions (worker API) -- 326 327fn postBatch(transport: *HttpTransport, config: Config, events: []const ActorEvent, cursor: i64) bool { 328 var output: std.Io.Writer.Allocating = .init(transport.allocator); 329 defer output.deinit(); 330 331 var jw: json.Stringify = .{ 332 .writer = &output.writer, 333 .options = .{ .emit_null_optional_fields = false }, 334 }; 335 jw.write(.{ .events = events, .cursor = cursor }) catch return false; 336 337 var url_buf: [512]u8 = undefined; 338 const url = std.fmt.bufPrint(&url_buf, "{s}/admin/ingest", .{config.worker_url}) catch return false; 339 340 var auth_buf: [256]u8 = undefined; 341 const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return false; 342 343 const result = transport.fetch(.{ 344 .url = url, 345 .method = .POST, 346 .payload = output.written(), 347 .authorization = auth, 348 }) catch return false; 349 defer transport.allocator.free(result.body); 350 351 if (result.status != .ok) { 352 log.err("ingest HTTP {d}: {s}", .{ @intFromEnum(result.status), result.body }); 353 } 354 355 return result.status == .ok; 356} 357 358fn deleteActors(transport: *HttpTransport, config: Config, dids: []const []const u8) bool { 359 var output: std.Io.Writer.Allocating = .init(transport.allocator); 360 defer output.deinit(); 361 362 var jw: json.Stringify = .{ .writer = &output.writer }; 363 jw.write(.{ .dids = dids }) catch return false; 364 365 var url_buf: [512]u8 = undefined; 366 const url = std.fmt.bufPrint(&url_buf, "{s}/admin/delete", .{config.worker_url}) catch return false; 367 368 var auth_buf: [256]u8 = undefined; 369 const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return false; 370 371 const result = transport.fetch(.{ 372 .url = url, 373 .method = .POST, 374 .payload = output.written(), 375 .authorization = auth, 376 }) catch return false; 377 defer transport.allocator.free(result.body); 378 379 return result.status == .ok; 380} 381 382fn fetchCursorOnce(transport: *HttpTransport, config: Config) ?i64 { 383 var url_buf: [512]u8 = undefined; 384 const url = std.fmt.bufPrint(&url_buf, "{s}/admin/cursor", .{config.worker_url}) catch return null; 385 386 var auth_buf: [256]u8 = undefined; 387 const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return null; 388 389 const result = transport.fetch(.{ 390 .url = url, 391 .authorization = auth, 392 }) catch return null; 393 defer transport.allocator.free(result.body); 394 395 if (result.status != .ok) return null; 396 397 const parsed = json.parseFromSlice(json.Value, transport.allocator, result.body, .{}) catch return null; 398 defer parsed.deinit(); 399 400 const cursor_val = parsed.value.object.get("cursor") orelse return null; 401 return switch (cursor_val) { 402 .integer => |n| n, 403 .float => |f| @intFromFloat(f), 404 else => null, 405 }; 406} 407 408pub fn fetchCursor(transport: *HttpTransport, config: Config) ?i64 { 409 const backoff = [_]u64{ 1, 3, 10 }; 410 for (backoff, 0..) |delay, attempt| { 411 if (fetchCursorOnce(transport, config)) |cursor| return cursor; 412 log.warn("cursor fetch failed (attempt {d}/3), retrying in {d}s", .{ attempt + 1, delay }); 413 std.Thread.sleep(delay * std.time.ns_per_s); 414 } 415 return null; 416} 417 418// -- utilities -- 419 420pub fn getRssKB() u64 { 421 const f = std.fs.openFileAbsolute("/proc/self/statm", .{}) catch return 0; 422 defer f.close(); 423 var buf: [128]u8 = undefined; 424 const n = f.read(&buf) catch return 0; 425 var it = std.mem.splitScalar(u8, buf[0..n], ' '); 426 _ = it.next(); // size 427 const rss_pages = std.fmt.parseInt(u64, it.next() orelse return 0, 10) catch return 0; 428 return rss_pages * 4; 429}