GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 428 lines 16 kB view raw
1//! Jetstream event handler: buffers actor events, flushes batches to worker 2//! Dual-writes profile/identity events to local SQLite for immediate search 3 4const std = @import("std"); 5const io = std.Options.debug_io; 6const mem = std.mem; 7const json = std.json; 8const Allocator = mem.Allocator; 9const zat = @import("zat"); 10const HttpTransport = zat.HttpTransport; 11const LocalDb = @import("db/LocalDb.zig"); 12const BloomFilter = @import("bloom.zig").BloomFilter; 13 14const log = std.log.scoped(.ingester); 15 16fn cGetenv(name: [*:0]const u8) ?[]const u8 { 17 if (std.c.getenv(name)) |p| return mem.span(p); 18 return null; 19} 20 21fn timestamp() i64 { 22 return @intCast(@divFloor(std.Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s)); 23} 24 25pub const MAX_BATCH: usize = 100; 26const BLOOM_BITS: usize = 10_000_000; // ~1.2MB fixed 27const BLOOM_HASHES: usize = 7; 28 29pub const Config = struct { 30 worker_url: []const u8, 31 secret: []const u8, 32}; 33 34pub fn getConfig() Config { 35 return .{ 36 .worker_url = cGetenv("TYPEAHEAD_URL") orelse 37 @panic("TYPEAHEAD_URL not set"), 38 .secret = cGetenv("TYPEAHEAD_SECRET") orelse 39 @panic("TYPEAHEAD_SECRET not set"), 40 }; 41} 42 43/// an actor event to POST to the worker 44pub const ActorEvent = struct { 45 did: []const u8, 46 handle: ?[]const u8 = null, 47 display_name: ?[]const u8 = null, 48 avatar_cid: ?[]const u8 = null, 49}; 50 51pub const IngestHandler = struct { 52 allocator: Allocator, 53 config: Config, 54 transport: *HttpTransport, 55 local_db: ?*LocalDb, 56 buffer: std.ArrayList(ActorEvent), 57 delete_buffer: std.ArrayList([]const u8), 58 /// arena owns all string data in buffer/delete_buffer 59 arena: std.heap.ArenaAllocator, 60 /// bloom filter: fixed-size dedup for non-profile DIDs (~1.2MB) 61 bloom: BloomFilter, 62 pending_cursor: i64 = 0, 63 flushed_cursor: i64 = 0, 64 total_ingested: u64 = 0, 65 total_deleted: u64 = 0, 66 last_flush: i64 = 0, 67 retry_after: i64 = 0, // timestamp before which we skip flush attempts 68 69 pub fn init(allocator: Allocator, config: Config, transport: *HttpTransport, local_db: ?*LocalDb) !IngestHandler { 70 return .{ 71 .allocator = allocator, 72 .config = config, 73 .transport = transport, 74 .local_db = local_db, 75 .buffer = .empty, 76 .delete_buffer = .empty, 77 .arena = std.heap.ArenaAllocator.init(allocator), 78 .bloom = try BloomFilter.init(allocator, BLOOM_BITS, BLOOM_HASHES), 79 .last_flush = timestamp(), 80 }; 81 } 82 83 pub fn deinit(self: *IngestHandler) void { 84 self.bloom.deinit(self.allocator); 85 self.arena.deinit(); 86 self.buffer.deinit(self.allocator); 87 self.delete_buffer.deinit(self.allocator); 88 } 89 90 fn dupe(self: *IngestHandler, s: []const u8) ?[]const u8 { 91 return self.arena.allocator().dupe(u8, s) catch null; 92 } 93 94 fn resetBloom(self: *IngestHandler) void { 95 log.info("resetting bloom filter ({d} insertions)", .{self.bloom.count}); 96 self.bloom.reset(); 97 } 98 99 pub fn onEvent(self: *IngestHandler, event: zat.JetstreamEvent) void { 100 switch (event) { 101 .commit => |c| self.handleCommit(c), 102 .identity => |id| self.handleIdentity(id), 103 .account => |acct| self.handleAccount(acct), 104 } 105 106 self.pending_cursor = event.timeUs(); 107 108 const now = timestamp(); 109 // drop incoming events if buffers are backed up (persistent failure) 110 const MAX_BACKLOG = MAX_BATCH * 10; 111 if (self.buffer.items.len >= MAX_BACKLOG or self.delete_buffer.items.len >= MAX_BACKLOG) { 112 log.err("backlog overflow ({d} ingest, {d} delete), dropping oldest events", .{ 113 self.buffer.items.len, self.delete_buffer.items.len, 114 }); 115 self.buffer.clearRetainingCapacity(); 116 self.delete_buffer.clearRetainingCapacity(); 117 _ = self.arena.reset(.retain_capacity); 118 self.resetBloom(); 119 self.flushed_cursor = self.pending_cursor; 120 self.retry_after = 0; 121 } 122 123 // respect backoff cooldown after failures 124 if (now < self.retry_after) return; 125 126 const should_flush = self.buffer.items.len >= MAX_BATCH or 127 self.delete_buffer.items.len >= MAX_BATCH or 128 (now - self.last_flush >= 5 and (self.buffer.items.len > 0 or self.delete_buffer.items.len > 0)); 129 130 if (should_flush) { 131 self.flush(); 132 } 133 } 134 135 pub fn onError(_: *IngestHandler, err: anyerror) void { 136 log.err("jetstream: {s}", .{@errorName(err)}); 137 } 138 139 pub fn onConnect(_: *IngestHandler, host: []const u8) void { 140 log.info("connected to {s}", .{host}); 141 } 142 143 fn handleCommit(self: *IngestHandler, c: zat.jetstream.CommitEvent) void { 144 if (mem.eql(u8, c.collection, "app.bsky.actor.profile") or 145 mem.eql(u8, c.collection, "sh.tangled.actor.profile")) 146 { 147 // rich extraction from profile records (bsky and tangled) 148 if (c.operation != .create and c.operation != .update) return; 149 150 const record = c.record orelse return; 151 152 const did = self.dupe(c.did) orelse return; 153 var event = ActorEvent{ .did = did }; 154 155 // bsky profiles have displayName; tangled profiles don't 156 if (zat.json.getString(record, "displayName")) |name| { 157 event.display_name = self.dupe(name); 158 } 159 160 if (zat.json.getPath(record, "avatar")) |avatar| { 161 if (zat.json.getString(avatar, "ref.$link")) |cid| { 162 event.avatar_cid = self.dupe(cid); 163 } 164 } 165 166 // dual-write to local SQLite (profile events have searchable data) 167 self.writeToLocal(event); 168 169 // ingester only sees self-labels from profile records — it can never 170 // correctly determine hidden. let refreshModeration cron handle it. 171 172 self.buffer.append(self.allocator, event) catch return; 173 } else { 174 // non-profile collections: just discover the DID 175 if (c.operation == .delete) return; 176 177 // dedup: skip if bloom filter says we've seen this DID recently 178 if (self.bloom.contains(c.did)) return; 179 self.bloom.insert(c.did); 180 181 const did = self.dupe(c.did) orelse return; 182 self.buffer.append(self.allocator, .{ .did = did }) catch return; 183 } 184 } 185 186 fn handleIdentity(self: *IngestHandler, id: zat.jetstream.IdentityEvent) void { 187 const handle = id.handle orelse return; 188 const event = ActorEvent{ 189 .did = self.dupe(id.did) orelse return, 190 .handle = self.dupe(handle), 191 }; 192 193 // dual-write: identity events have handles (searchable) 194 self.writeToLocal(event); 195 196 self.buffer.append(self.allocator, event) catch return; 197 } 198 199 fn handleAccount(self: *IngestHandler, acct: zat.jetstream.AccountEvent) void { 200 if (!acct.active) { 201 const did = self.dupe(acct.did) orelse return; 202 203 // dual-write: delete from local (skip if sync holds the lock) 204 if (self.local_db) |db| { 205 if (db.mutex.tryLock()) { 206 defer db.mutex.unlock(io); 207 if (db.getConn()) |c| { 208 c.exec("DELETE FROM actors WHERE did = ?", .{did}) catch {}; 209 } 210 } 211 } 212 213 self.delete_buffer.append(self.allocator, did) catch return; 214 } 215 } 216 217 /// Dual-write to local SQLite for immediate search availability. 218 /// Uses tryLock to avoid blocking ingester when sync holds the write lock. 219 /// FTS is rebuilt by sync at cycle boundaries — no per-row FTS maintenance here. 220 fn writeToLocal(self: *IngestHandler, event: ActorEvent) void { 221 const db = self.local_db orelse return; 222 if (!db.isReady()) return; 223 224 const conn = db.getConn() orelse return; 225 226 if (!db.mutex.tryLock()) return; // sync thread has the lock, skip 227 defer db.mutex.unlock(io); 228 229 // only write if we have searchable data (handle or display_name) 230 const has_handle = event.handle != null and event.handle.?.len > 0; 231 const has_name = event.display_name != null and event.display_name.?.len > 0; 232 if (!has_handle and !has_name) return; 233 234 // upsert actor (FTS will be rebuilt by next sync cycle) 235 conn.exec( 236 \\INSERT INTO actors (did, handle, display_name, avatar_url) 237 \\VALUES (?, ?, ?, ?) 238 \\ON CONFLICT(did) DO UPDATE SET 239 \\ handle = COALESCE(NULLIF(excluded.handle, ''), actors.handle), 240 \\ display_name = COALESCE(NULLIF(excluded.display_name, ''), actors.display_name), 241 \\ avatar_url = COALESCE(NULLIF(excluded.avatar_url, ''), actors.avatar_url) 242 , .{ 243 event.did, 244 event.handle orelse "", 245 event.display_name orelse "", 246 event.avatar_cid orelse "", 247 }) catch return; 248 } 249 250 fn flush(self: *IngestHandler) void { 251 self.last_flush = timestamp(); 252 var any_failure = false; 253 254 if (self.buffer.items.len > 0) { 255 const batch_end = @min(self.buffer.items.len, MAX_BATCH); 256 const batch = self.buffer.items[0..batch_end]; 257 const ok = postBatch( 258 self.transport, 259 self.config, 260 batch, 261 self.flushed_cursor, 262 ); 263 if (ok) { 264 self.total_ingested += batch_end; 265 const rss = getRssKB(); 266 log.info("+{d} actors (total: {d}, pending: {d}) cursor={d} rss={d}KB", .{ 267 batch_end, self.total_ingested, 268 self.buffer.items.len - batch_end, self.pending_cursor, rss, 269 }); 270 if (batch_end < self.buffer.items.len) { 271 std.mem.copyForwards( 272 ActorEvent, 273 self.buffer.items[0 .. self.buffer.items.len - batch_end], 274 self.buffer.items[batch_end..], 275 ); 276 } 277 self.buffer.shrinkRetainingCapacity(self.buffer.items.len - batch_end); 278 } else { 279 log.err("ingest batch failed ({d} events, {d} pending), will retry in 5s", .{ 280 batch_end, self.buffer.items.len, 281 }); 282 any_failure = true; 283 } 284 } 285 286 if (self.delete_buffer.items.len > 0) { 287 const batch_end = @min(self.delete_buffer.items.len, MAX_BATCH); 288 const batch = self.delete_buffer.items[0..batch_end]; 289 const ok = deleteActors( 290 self.transport, 291 self.config, 292 batch, 293 ); 294 if (ok) { 295 self.total_deleted += batch_end; 296 log.info("-{d} moderated (total: {d})", .{ 297 batch_end, self.total_deleted, 298 }); 299 if (batch_end < self.delete_buffer.items.len) { 300 const remaining = self.delete_buffer.items.len - batch_end; 301 var i: usize = 0; 302 while (i < remaining) : (i += 1) { 303 self.delete_buffer.items[i] = self.delete_buffer.items[batch_end + i]; 304 } 305 self.delete_buffer.shrinkRetainingCapacity(remaining); 306 } else { 307 self.delete_buffer.clearRetainingCapacity(); 308 } 309 } else { 310 log.err("delete batch failed ({d} dids), will retry in 5s", .{batch_end}); 311 any_failure = true; 312 } 313 } 314 315 if (any_failure) { 316 self.retry_after = timestamp() + 5; 317 } else if (self.buffer.items.len == 0 and self.delete_buffer.items.len == 0) { 318 self.flushed_cursor = self.pending_cursor; 319 _ = self.arena.reset(.retain_capacity); 320 } 321 } 322}; 323 324// -- HTTP client functions (worker API) -- 325 326fn postBatch(transport: *HttpTransport, config: Config, events: []const ActorEvent, cursor: i64) bool { 327 var output: std.Io.Writer.Allocating = .init(transport.allocator); 328 defer output.deinit(); 329 330 var jw: json.Stringify = .{ 331 .writer = &output.writer, 332 .options = .{ .emit_null_optional_fields = false }, 333 }; 334 jw.write(.{ .events = events, .cursor = cursor }) catch return false; 335 336 var url_buf: [512]u8 = undefined; 337 const url = std.fmt.bufPrint(&url_buf, "{s}/admin/ingest", .{config.worker_url}) catch return false; 338 339 var auth_buf: [256]u8 = undefined; 340 const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return false; 341 342 const result = transport.fetch(.{ 343 .url = url, 344 .method = .POST, 345 .payload = output.written(), 346 .authorization = auth, 347 }) catch return false; 348 defer transport.allocator.free(result.body); 349 350 if (result.status != .ok) { 351 log.err("ingest HTTP {d}: {s}", .{ @intFromEnum(result.status), result.body }); 352 } 353 354 return result.status == .ok; 355} 356 357fn deleteActors(transport: *HttpTransport, config: Config, dids: []const []const u8) bool { 358 var output: std.Io.Writer.Allocating = .init(transport.allocator); 359 defer output.deinit(); 360 361 var jw: json.Stringify = .{ .writer = &output.writer }; 362 jw.write(.{ .dids = dids }) catch return false; 363 364 var url_buf: [512]u8 = undefined; 365 const url = std.fmt.bufPrint(&url_buf, "{s}/admin/delete", .{config.worker_url}) catch return false; 366 367 var auth_buf: [256]u8 = undefined; 368 const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return false; 369 370 const result = transport.fetch(.{ 371 .url = url, 372 .method = .POST, 373 .payload = output.written(), 374 .authorization = auth, 375 }) catch return false; 376 defer transport.allocator.free(result.body); 377 378 return result.status == .ok; 379} 380 381fn fetchCursorOnce(transport: *HttpTransport, config: Config) ?i64 { 382 var url_buf: [512]u8 = undefined; 383 const url = std.fmt.bufPrint(&url_buf, "{s}/admin/cursor", .{config.worker_url}) catch return null; 384 385 var auth_buf: [256]u8 = undefined; 386 const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return null; 387 388 const result = transport.fetch(.{ 389 .url = url, 390 .authorization = auth, 391 }) catch return null; 392 defer transport.allocator.free(result.body); 393 394 if (result.status != .ok) return null; 395 396 const parsed = json.parseFromSlice(json.Value, transport.allocator, result.body, .{}) catch return null; 397 defer parsed.deinit(); 398 399 const cursor_val = parsed.value.object.get("cursor") orelse return null; 400 return switch (cursor_val) { 401 .integer => |n| n, 402 .float => |f| @intFromFloat(f), 403 else => null, 404 }; 405} 406 407pub fn fetchCursor(transport: *HttpTransport, config: Config) ?i64 { 408 const backoff = [_]u64{ 1, 3, 10 }; 409 for (backoff, 0..) |delay, attempt| { 410 if (fetchCursorOnce(transport, config)) |cursor| return cursor; 411 log.warn("cursor fetch failed (attempt {d}/3), retrying in {d}s", .{ attempt + 1, delay }); 412 io.sleep(.{ .nanoseconds = delay * std.time.ns_per_s }, .real) catch {}; 413 } 414 return null; 415} 416 417// -- utilities -- 418 419pub fn getRssKB() u64 { 420 const f = std.Io.Dir.openFileAbsolute(io, "/proc/self/statm", .{}) catch return 0; 421 defer f.close(io); 422 var buf: [128]u8 = undefined; 423 const n = f.readStreaming(io, &.{&buf}) catch return 0; 424 var it = std.mem.splitScalar(u8, buf[0..n], ' '); 425 _ = it.next(); // size 426 const rss_pages = std.fmt.parseInt(u64, it.next() orelse return 0, 10) catch return 0; 427 return rss_pages * 4; 428}