GET /xrpc/app.bsky.actor.searchActorsTypeahead
typeahead.waow.tech
1//! Jetstream event handler: buffers actor events, flushes batches to worker
2//! Dual-writes profile/identity events to local SQLite for immediate search
3
4const std = @import("std");
5const io = std.Options.debug_io;
6const mem = std.mem;
7const json = std.json;
8const Allocator = mem.Allocator;
9const zat = @import("zat");
10const HttpTransport = zat.HttpTransport;
11const LocalDb = @import("db/LocalDb.zig");
12const BloomFilter = @import("bloom.zig").BloomFilter;
13
14const log = std.log.scoped(.ingester);
15
16fn cGetenv(name: [*:0]const u8) ?[]const u8 {
17 if (std.c.getenv(name)) |p| return mem.span(p);
18 return null;
19}
20
21fn timestamp() i64 {
22 return @intCast(@divFloor(std.Io.Timestamp.now(io, .real).nanoseconds, std.time.ns_per_s));
23}
24
25pub const MAX_BATCH: usize = 100;
26const BLOOM_BITS: usize = 10_000_000; // ~1.2MB fixed
27const BLOOM_HASHES: usize = 7;
28
29pub const Config = struct {
30 worker_url: []const u8,
31 secret: []const u8,
32};
33
34pub fn getConfig() Config {
35 return .{
36 .worker_url = cGetenv("TYPEAHEAD_URL") orelse
37 @panic("TYPEAHEAD_URL not set"),
38 .secret = cGetenv("TYPEAHEAD_SECRET") orelse
39 @panic("TYPEAHEAD_SECRET not set"),
40 };
41}
42
43/// an actor event to POST to the worker
44pub const ActorEvent = struct {
45 did: []const u8,
46 handle: ?[]const u8 = null,
47 display_name: ?[]const u8 = null,
48 avatar_cid: ?[]const u8 = null,
49};
50
51pub const IngestHandler = struct {
52 allocator: Allocator,
53 config: Config,
54 transport: *HttpTransport,
55 local_db: ?*LocalDb,
56 buffer: std.ArrayList(ActorEvent),
57 delete_buffer: std.ArrayList([]const u8),
58 /// arena owns all string data in buffer/delete_buffer
59 arena: std.heap.ArenaAllocator,
60 /// bloom filter: fixed-size dedup for non-profile DIDs (~1.2MB)
61 bloom: BloomFilter,
62 pending_cursor: i64 = 0,
63 flushed_cursor: i64 = 0,
64 total_ingested: u64 = 0,
65 total_deleted: u64 = 0,
66 last_flush: i64 = 0,
67 retry_after: i64 = 0, // timestamp before which we skip flush attempts
68
69 pub fn init(allocator: Allocator, config: Config, transport: *HttpTransport, local_db: ?*LocalDb) !IngestHandler {
70 return .{
71 .allocator = allocator,
72 .config = config,
73 .transport = transport,
74 .local_db = local_db,
75 .buffer = .empty,
76 .delete_buffer = .empty,
77 .arena = std.heap.ArenaAllocator.init(allocator),
78 .bloom = try BloomFilter.init(allocator, BLOOM_BITS, BLOOM_HASHES),
79 .last_flush = timestamp(),
80 };
81 }
82
83 pub fn deinit(self: *IngestHandler) void {
84 self.bloom.deinit(self.allocator);
85 self.arena.deinit();
86 self.buffer.deinit(self.allocator);
87 self.delete_buffer.deinit(self.allocator);
88 }
89
90 fn dupe(self: *IngestHandler, s: []const u8) ?[]const u8 {
91 return self.arena.allocator().dupe(u8, s) catch null;
92 }
93
94 fn resetBloom(self: *IngestHandler) void {
95 log.info("resetting bloom filter ({d} insertions)", .{self.bloom.count});
96 self.bloom.reset();
97 }
98
99 pub fn onEvent(self: *IngestHandler, event: zat.JetstreamEvent) void {
100 switch (event) {
101 .commit => |c| self.handleCommit(c),
102 .identity => |id| self.handleIdentity(id),
103 .account => |acct| self.handleAccount(acct),
104 }
105
106 self.pending_cursor = event.timeUs();
107
108 const now = timestamp();
109 // drop incoming events if buffers are backed up (persistent failure)
110 const MAX_BACKLOG = MAX_BATCH * 10;
111 if (self.buffer.items.len >= MAX_BACKLOG or self.delete_buffer.items.len >= MAX_BACKLOG) {
112 log.err("backlog overflow ({d} ingest, {d} delete), dropping oldest events", .{
113 self.buffer.items.len, self.delete_buffer.items.len,
114 });
115 self.buffer.clearRetainingCapacity();
116 self.delete_buffer.clearRetainingCapacity();
117 _ = self.arena.reset(.retain_capacity);
118 self.resetBloom();
119 self.flushed_cursor = self.pending_cursor;
120 self.retry_after = 0;
121 }
122
123 // respect backoff cooldown after failures
124 if (now < self.retry_after) return;
125
126 const should_flush = self.buffer.items.len >= MAX_BATCH or
127 self.delete_buffer.items.len >= MAX_BATCH or
128 (now - self.last_flush >= 5 and (self.buffer.items.len > 0 or self.delete_buffer.items.len > 0));
129
130 if (should_flush) {
131 self.flush();
132 }
133 }
134
135 pub fn onError(_: *IngestHandler, err: anyerror) void {
136 log.err("jetstream: {s}", .{@errorName(err)});
137 }
138
139 pub fn onConnect(_: *IngestHandler, host: []const u8) void {
140 log.info("connected to {s}", .{host});
141 }
142
143 fn handleCommit(self: *IngestHandler, c: zat.jetstream.CommitEvent) void {
144 if (mem.eql(u8, c.collection, "app.bsky.actor.profile") or
145 mem.eql(u8, c.collection, "sh.tangled.actor.profile"))
146 {
147 // rich extraction from profile records (bsky and tangled)
148 if (c.operation != .create and c.operation != .update) return;
149
150 const record = c.record orelse return;
151
152 const did = self.dupe(c.did) orelse return;
153 var event = ActorEvent{ .did = did };
154
155 // bsky profiles have displayName; tangled profiles don't
156 if (zat.json.getString(record, "displayName")) |name| {
157 event.display_name = self.dupe(name);
158 }
159
160 if (zat.json.getPath(record, "avatar")) |avatar| {
161 if (zat.json.getString(avatar, "ref.$link")) |cid| {
162 event.avatar_cid = self.dupe(cid);
163 }
164 }
165
166 // dual-write to local SQLite (profile events have searchable data)
167 self.writeToLocal(event);
168
169 // ingester only sees self-labels from profile records — it can never
170 // correctly determine hidden. let refreshModeration cron handle it.
171
172 self.buffer.append(self.allocator, event) catch return;
173 } else {
174 // non-profile collections: just discover the DID
175 if (c.operation == .delete) return;
176
177 // dedup: skip if bloom filter says we've seen this DID recently
178 if (self.bloom.contains(c.did)) return;
179 self.bloom.insert(c.did);
180
181 const did = self.dupe(c.did) orelse return;
182 self.buffer.append(self.allocator, .{ .did = did }) catch return;
183 }
184 }
185
186 fn handleIdentity(self: *IngestHandler, id: zat.jetstream.IdentityEvent) void {
187 const handle = id.handle orelse return;
188 const event = ActorEvent{
189 .did = self.dupe(id.did) orelse return,
190 .handle = self.dupe(handle),
191 };
192
193 // dual-write: identity events have handles (searchable)
194 self.writeToLocal(event);
195
196 self.buffer.append(self.allocator, event) catch return;
197 }
198
199 fn handleAccount(self: *IngestHandler, acct: zat.jetstream.AccountEvent) void {
200 if (!acct.active) {
201 const did = self.dupe(acct.did) orelse return;
202
203 // dual-write: delete from local (skip if sync holds the lock)
204 if (self.local_db) |db| {
205 if (db.mutex.tryLock()) {
206 defer db.mutex.unlock(io);
207 if (db.getConn()) |c| {
208 c.exec("DELETE FROM actors WHERE did = ?", .{did}) catch {};
209 }
210 }
211 }
212
213 self.delete_buffer.append(self.allocator, did) catch return;
214 }
215 }
216
217 /// Dual-write to local SQLite for immediate search availability.
218 /// Uses tryLock to avoid blocking ingester when sync holds the write lock.
219 /// FTS is rebuilt by sync at cycle boundaries — no per-row FTS maintenance here.
220 fn writeToLocal(self: *IngestHandler, event: ActorEvent) void {
221 const db = self.local_db orelse return;
222 if (!db.isReady()) return;
223
224 const conn = db.getConn() orelse return;
225
226 if (!db.mutex.tryLock()) return; // sync thread has the lock, skip
227 defer db.mutex.unlock(io);
228
229 // only write if we have searchable data (handle or display_name)
230 const has_handle = event.handle != null and event.handle.?.len > 0;
231 const has_name = event.display_name != null and event.display_name.?.len > 0;
232 if (!has_handle and !has_name) return;
233
234 // upsert actor (FTS will be rebuilt by next sync cycle)
235 conn.exec(
236 \\INSERT INTO actors (did, handle, display_name, avatar_url)
237 \\VALUES (?, ?, ?, ?)
238 \\ON CONFLICT(did) DO UPDATE SET
239 \\ handle = COALESCE(NULLIF(excluded.handle, ''), actors.handle),
240 \\ display_name = COALESCE(NULLIF(excluded.display_name, ''), actors.display_name),
241 \\ avatar_url = COALESCE(NULLIF(excluded.avatar_url, ''), actors.avatar_url)
242 , .{
243 event.did,
244 event.handle orelse "",
245 event.display_name orelse "",
246 event.avatar_cid orelse "",
247 }) catch return;
248 }
249
250 fn flush(self: *IngestHandler) void {
251 self.last_flush = timestamp();
252 var any_failure = false;
253
254 if (self.buffer.items.len > 0) {
255 const batch_end = @min(self.buffer.items.len, MAX_BATCH);
256 const batch = self.buffer.items[0..batch_end];
257 const ok = postBatch(
258 self.transport,
259 self.config,
260 batch,
261 self.flushed_cursor,
262 );
263 if (ok) {
264 self.total_ingested += batch_end;
265 const rss = getRssKB();
266 log.info("+{d} actors (total: {d}, pending: {d}) cursor={d} rss={d}KB", .{
267 batch_end, self.total_ingested,
268 self.buffer.items.len - batch_end, self.pending_cursor, rss,
269 });
270 if (batch_end < self.buffer.items.len) {
271 std.mem.copyForwards(
272 ActorEvent,
273 self.buffer.items[0 .. self.buffer.items.len - batch_end],
274 self.buffer.items[batch_end..],
275 );
276 }
277 self.buffer.shrinkRetainingCapacity(self.buffer.items.len - batch_end);
278 } else {
279 log.err("ingest batch failed ({d} events, {d} pending), will retry in 5s", .{
280 batch_end, self.buffer.items.len,
281 });
282 any_failure = true;
283 }
284 }
285
286 if (self.delete_buffer.items.len > 0) {
287 const batch_end = @min(self.delete_buffer.items.len, MAX_BATCH);
288 const batch = self.delete_buffer.items[0..batch_end];
289 const ok = deleteActors(
290 self.transport,
291 self.config,
292 batch,
293 );
294 if (ok) {
295 self.total_deleted += batch_end;
296 log.info("-{d} moderated (total: {d})", .{
297 batch_end, self.total_deleted,
298 });
299 if (batch_end < self.delete_buffer.items.len) {
300 const remaining = self.delete_buffer.items.len - batch_end;
301 var i: usize = 0;
302 while (i < remaining) : (i += 1) {
303 self.delete_buffer.items[i] = self.delete_buffer.items[batch_end + i];
304 }
305 self.delete_buffer.shrinkRetainingCapacity(remaining);
306 } else {
307 self.delete_buffer.clearRetainingCapacity();
308 }
309 } else {
310 log.err("delete batch failed ({d} dids), will retry in 5s", .{batch_end});
311 any_failure = true;
312 }
313 }
314
315 if (any_failure) {
316 self.retry_after = timestamp() + 5;
317 } else if (self.buffer.items.len == 0 and self.delete_buffer.items.len == 0) {
318 self.flushed_cursor = self.pending_cursor;
319 _ = self.arena.reset(.retain_capacity);
320 }
321 }
322};
323
324// -- HTTP client functions (worker API) --
325
326fn postBatch(transport: *HttpTransport, config: Config, events: []const ActorEvent, cursor: i64) bool {
327 var output: std.Io.Writer.Allocating = .init(transport.allocator);
328 defer output.deinit();
329
330 var jw: json.Stringify = .{
331 .writer = &output.writer,
332 .options = .{ .emit_null_optional_fields = false },
333 };
334 jw.write(.{ .events = events, .cursor = cursor }) catch return false;
335
336 var url_buf: [512]u8 = undefined;
337 const url = std.fmt.bufPrint(&url_buf, "{s}/admin/ingest", .{config.worker_url}) catch return false;
338
339 var auth_buf: [256]u8 = undefined;
340 const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return false;
341
342 const result = transport.fetch(.{
343 .url = url,
344 .method = .POST,
345 .payload = output.written(),
346 .authorization = auth,
347 }) catch return false;
348 defer transport.allocator.free(result.body);
349
350 if (result.status != .ok) {
351 log.err("ingest HTTP {d}: {s}", .{ @intFromEnum(result.status), result.body });
352 }
353
354 return result.status == .ok;
355}
356
357fn deleteActors(transport: *HttpTransport, config: Config, dids: []const []const u8) bool {
358 var output: std.Io.Writer.Allocating = .init(transport.allocator);
359 defer output.deinit();
360
361 var jw: json.Stringify = .{ .writer = &output.writer };
362 jw.write(.{ .dids = dids }) catch return false;
363
364 var url_buf: [512]u8 = undefined;
365 const url = std.fmt.bufPrint(&url_buf, "{s}/admin/delete", .{config.worker_url}) catch return false;
366
367 var auth_buf: [256]u8 = undefined;
368 const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return false;
369
370 const result = transport.fetch(.{
371 .url = url,
372 .method = .POST,
373 .payload = output.written(),
374 .authorization = auth,
375 }) catch return false;
376 defer transport.allocator.free(result.body);
377
378 return result.status == .ok;
379}
380
381fn fetchCursorOnce(transport: *HttpTransport, config: Config) ?i64 {
382 var url_buf: [512]u8 = undefined;
383 const url = std.fmt.bufPrint(&url_buf, "{s}/admin/cursor", .{config.worker_url}) catch return null;
384
385 var auth_buf: [256]u8 = undefined;
386 const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return null;
387
388 const result = transport.fetch(.{
389 .url = url,
390 .authorization = auth,
391 }) catch return null;
392 defer transport.allocator.free(result.body);
393
394 if (result.status != .ok) return null;
395
396 const parsed = json.parseFromSlice(json.Value, transport.allocator, result.body, .{}) catch return null;
397 defer parsed.deinit();
398
399 const cursor_val = parsed.value.object.get("cursor") orelse return null;
400 return switch (cursor_val) {
401 .integer => |n| n,
402 .float => |f| @intFromFloat(f),
403 else => null,
404 };
405}
406
407pub fn fetchCursor(transport: *HttpTransport, config: Config) ?i64 {
408 const backoff = [_]u64{ 1, 3, 10 };
409 for (backoff, 0..) |delay, attempt| {
410 if (fetchCursorOnce(transport, config)) |cursor| return cursor;
411 log.warn("cursor fetch failed (attempt {d}/3), retrying in {d}s", .{ attempt + 1, delay });
412 io.sleep(.{ .nanoseconds = delay * std.time.ns_per_s }, .real) catch {};
413 }
414 return null;
415}
416
417// -- utilities --
418
419pub fn getRssKB() u64 {
420 const f = std.Io.Dir.openFileAbsolute(io, "/proc/self/statm", .{}) catch return 0;
421 defer f.close(io);
422 var buf: [128]u8 = undefined;
423 const n = f.readStreaming(io, &.{&buf}) catch return 0;
424 var it = std.mem.splitScalar(u8, buf[0..n], ' ');
425 _ = it.next(); // size
426 const rss_pages = std.fmt.parseInt(u64, it.next() orelse return 0, 10) catch return 0;
427 return rss_pages * 4;
428}