GET /xrpc/app.bsky.actor.searchActorsTypeahead
typeahead.waow.tech
1//! Jetstream event handler: buffers actor events, flushes batches to worker
2//! Dual-writes profile/identity events to local SQLite for immediate search
3
4const std = @import("std");
5const mem = std.mem;
6const json = std.json;
7const Allocator = mem.Allocator;
8const zat = @import("zat");
9const HttpTransport = zat.HttpTransport;
10const LocalDb = @import("db/LocalDb.zig");
11const BloomFilter = @import("bloom.zig").BloomFilter;
12
13const log = std.log.scoped(.ingester);
14
15pub const MAX_BATCH: usize = 100;
16const BLOOM_BITS: usize = 10_000_000; // ~1.2MB fixed
17const BLOOM_HASHES: usize = 7;
18
19pub const Config = struct {
20 worker_url: []const u8,
21 secret: []const u8,
22};
23
24pub fn getConfig() Config {
25 return .{
26 .worker_url = std.posix.getenv("TYPEAHEAD_URL") orelse
27 @panic("TYPEAHEAD_URL not set"),
28 .secret = std.posix.getenv("TYPEAHEAD_SECRET") orelse
29 @panic("TYPEAHEAD_SECRET not set"),
30 };
31}
32
33/// an actor event to POST to the worker
34pub const ActorEvent = struct {
35 did: []const u8,
36 handle: ?[]const u8 = null,
37 display_name: ?[]const u8 = null,
38 avatar_cid: ?[]const u8 = null,
39};
40
41pub const IngestHandler = struct {
42 allocator: Allocator,
43 config: Config,
44 transport: *HttpTransport,
45 local_db: ?*LocalDb,
46 buffer: std.ArrayList(ActorEvent),
47 delete_buffer: std.ArrayList([]const u8),
48 /// arena owns all string data in buffer/delete_buffer
49 arena: std.heap.ArenaAllocator,
50 /// bloom filter: fixed-size dedup for non-profile DIDs (~1.2MB)
51 bloom: BloomFilter,
52 pending_cursor: i64 = 0,
53 flushed_cursor: i64 = 0,
54 total_ingested: u64 = 0,
55 total_deleted: u64 = 0,
56 last_flush: i64 = 0,
57 retry_after: i64 = 0, // timestamp before which we skip flush attempts
58
59 pub fn init(allocator: Allocator, config: Config, transport: *HttpTransport, local_db: ?*LocalDb) !IngestHandler {
60 return .{
61 .allocator = allocator,
62 .config = config,
63 .transport = transport,
64 .local_db = local_db,
65 .buffer = .{},
66 .delete_buffer = .{},
67 .arena = std.heap.ArenaAllocator.init(allocator),
68 .bloom = try BloomFilter.init(allocator, BLOOM_BITS, BLOOM_HASHES),
69 .last_flush = std.time.timestamp(),
70 };
71 }
72
73 pub fn deinit(self: *IngestHandler) void {
74 self.bloom.deinit(self.allocator);
75 self.arena.deinit();
76 self.buffer.deinit(self.allocator);
77 self.delete_buffer.deinit(self.allocator);
78 }
79
80 fn dupe(self: *IngestHandler, s: []const u8) ?[]const u8 {
81 return self.arena.allocator().dupe(u8, s) catch null;
82 }
83
84 fn resetBloom(self: *IngestHandler) void {
85 log.info("resetting bloom filter ({d} insertions)", .{self.bloom.count});
86 self.bloom.reset();
87 }
88
89 pub fn onEvent(self: *IngestHandler, event: zat.JetstreamEvent) void {
90 switch (event) {
91 .commit => |c| self.handleCommit(c),
92 .identity => |id| self.handleIdentity(id),
93 .account => |acct| self.handleAccount(acct),
94 }
95
96 self.pending_cursor = event.timeUs();
97
98 const now = std.time.timestamp();
99 // drop incoming events if buffers are backed up (persistent failure)
100 const MAX_BACKLOG = MAX_BATCH * 10;
101 if (self.buffer.items.len >= MAX_BACKLOG or self.delete_buffer.items.len >= MAX_BACKLOG) {
102 log.err("backlog overflow ({d} ingest, {d} delete), dropping oldest events", .{
103 self.buffer.items.len, self.delete_buffer.items.len,
104 });
105 self.buffer.clearRetainingCapacity();
106 self.delete_buffer.clearRetainingCapacity();
107 _ = self.arena.reset(.retain_capacity);
108 self.resetBloom();
109 self.flushed_cursor = self.pending_cursor;
110 self.retry_after = 0;
111 }
112
113 // respect backoff cooldown after failures
114 if (now < self.retry_after) return;
115
116 const should_flush = self.buffer.items.len >= MAX_BATCH or
117 self.delete_buffer.items.len >= MAX_BATCH or
118 (now - self.last_flush >= 5 and (self.buffer.items.len > 0 or self.delete_buffer.items.len > 0));
119
120 if (should_flush) {
121 self.flush();
122 }
123 }
124
125 pub fn onError(_: *IngestHandler, err: anyerror) void {
126 log.err("jetstream: {s}", .{@errorName(err)});
127 }
128
129 pub fn onConnect(_: *IngestHandler, host: []const u8) void {
130 log.info("connected to {s}", .{host});
131 }
132
133 fn handleCommit(self: *IngestHandler, c: zat.jetstream.CommitEvent) void {
134 if (mem.eql(u8, c.collection, "app.bsky.actor.profile")) {
135 // rich extraction from profile records
136 if (c.operation != .create and c.operation != .update) return;
137
138 const record = c.record orelse return;
139
140 const did = self.dupe(c.did) orelse return;
141 var event = ActorEvent{ .did = did };
142
143 if (zat.json.getString(record, "displayName")) |name| {
144 event.display_name = self.dupe(name);
145 }
146
147 if (zat.json.getPath(record, "avatar")) |avatar| {
148 if (zat.json.getString(avatar, "ref.$link")) |cid| {
149 event.avatar_cid = self.dupe(cid);
150 }
151 }
152
153 // dual-write to local SQLite (profile events have searchable data)
154 self.writeToLocal(event);
155
156 // ingester only sees self-labels from profile records — it can never
157 // correctly determine hidden. let refreshModeration cron handle it.
158
159 self.buffer.append(self.allocator, event) catch return;
160 } else {
161 // non-profile collections: just discover the DID
162 if (c.operation == .delete) return;
163
164 // dedup: skip if bloom filter says we've seen this DID recently
165 if (self.bloom.contains(c.did)) return;
166 self.bloom.insert(c.did);
167
168 const did = self.dupe(c.did) orelse return;
169 self.buffer.append(self.allocator, .{ .did = did }) catch return;
170 }
171 }
172
173 fn handleIdentity(self: *IngestHandler, id: zat.jetstream.IdentityEvent) void {
174 const handle = id.handle orelse return;
175 const event = ActorEvent{
176 .did = self.dupe(id.did) orelse return,
177 .handle = self.dupe(handle),
178 };
179
180 // dual-write: identity events have handles (searchable)
181 self.writeToLocal(event);
182
183 self.buffer.append(self.allocator, event) catch return;
184 }
185
186 fn handleAccount(self: *IngestHandler, acct: zat.jetstream.AccountEvent) void {
187 if (!acct.active) {
188 const did = self.dupe(acct.did) orelse return;
189
190 // dual-write: delete from local
191 if (self.local_db) |db| {
192 db.exec("DELETE FROM actors WHERE did = ?", .{did}) catch {};
193 db.exec("DELETE FROM actors_fts WHERE did = ?", .{did}) catch {};
194 }
195
196 self.delete_buffer.append(self.allocator, did) catch return;
197 }
198 }
199
200 /// Dual-write to local SQLite for immediate search availability
201 fn writeToLocal(self: *IngestHandler, event: ActorEvent) void {
202 const db = self.local_db orelse return;
203 if (!db.isReady()) return;
204
205 const conn = db.getConn() orelse return;
206
207 db.lock();
208 defer db.unlock();
209
210 // only write if we have searchable data (handle or display_name)
211 const has_handle = event.handle != null and event.handle.?.len > 0;
212 const has_name = event.display_name != null and event.display_name.?.len > 0;
213 if (!has_handle and !has_name) return;
214
215 // upsert actor
216 conn.exec(
217 \\INSERT INTO actors (did, handle, display_name, avatar_url)
218 \\VALUES (?, ?, ?, ?)
219 \\ON CONFLICT(did) DO UPDATE SET
220 \\ handle = COALESCE(NULLIF(excluded.handle, ''), actors.handle),
221 \\ display_name = COALESCE(NULLIF(excluded.display_name, ''), actors.display_name),
222 \\ avatar_url = COALESCE(NULLIF(excluded.avatar_url, ''), actors.avatar_url)
223 , .{
224 event.did,
225 event.handle orelse "",
226 event.display_name orelse "",
227 event.avatar_cid orelse "",
228 }) catch return;
229
230 // update FTS
231 conn.exec("DELETE FROM actors_fts WHERE did = ?", .{event.did}) catch {};
232
233 // read back current handle + display_name for FTS
234 const row = conn.row(
235 "SELECT handle, display_name FROM actors WHERE did = ?",
236 .{event.did},
237 ) catch return;
238 if (row) |r| {
239 defer r.deinit();
240 const h = r.text(0);
241 const dn = r.text(1);
242 if (h.len > 0 or dn.len > 0) {
243 conn.exec(
244 "INSERT INTO actors_fts (did, handle, display_name) VALUES (?, ?, ?)",
245 .{ event.did, h, dn },
246 ) catch {};
247 }
248 }
249 }
250
251 fn flush(self: *IngestHandler) void {
252 self.last_flush = std.time.timestamp();
253 var any_failure = false;
254
255 if (self.buffer.items.len > 0) {
256 const batch_end = @min(self.buffer.items.len, MAX_BATCH);
257 const batch = self.buffer.items[0..batch_end];
258 const ok = postBatch(
259 self.transport,
260 self.config,
261 batch,
262 self.flushed_cursor,
263 );
264 if (ok) {
265 self.total_ingested += batch_end;
266 const rss = getRssKB();
267 log.info("+{d} actors (total: {d}, pending: {d}) cursor={d} rss={d}KB", .{
268 batch_end, self.total_ingested,
269 self.buffer.items.len - batch_end, self.pending_cursor, rss,
270 });
271 if (batch_end < self.buffer.items.len) {
272 std.mem.copyForwards(
273 ActorEvent,
274 self.buffer.items[0 .. self.buffer.items.len - batch_end],
275 self.buffer.items[batch_end..],
276 );
277 }
278 self.buffer.shrinkRetainingCapacity(self.buffer.items.len - batch_end);
279 } else {
280 log.err("ingest batch failed ({d} events, {d} pending), will retry in 5s", .{
281 batch_end, self.buffer.items.len,
282 });
283 any_failure = true;
284 }
285 }
286
287 if (self.delete_buffer.items.len > 0) {
288 const batch_end = @min(self.delete_buffer.items.len, MAX_BATCH);
289 const batch = self.delete_buffer.items[0..batch_end];
290 const ok = deleteActors(
291 self.transport,
292 self.config,
293 batch,
294 );
295 if (ok) {
296 self.total_deleted += batch_end;
297 log.info("-{d} moderated (total: {d})", .{
298 batch_end, self.total_deleted,
299 });
300 if (batch_end < self.delete_buffer.items.len) {
301 const remaining = self.delete_buffer.items.len - batch_end;
302 var i: usize = 0;
303 while (i < remaining) : (i += 1) {
304 self.delete_buffer.items[i] = self.delete_buffer.items[batch_end + i];
305 }
306 self.delete_buffer.shrinkRetainingCapacity(remaining);
307 } else {
308 self.delete_buffer.clearRetainingCapacity();
309 }
310 } else {
311 log.err("delete batch failed ({d} dids), will retry in 5s", .{batch_end});
312 any_failure = true;
313 }
314 }
315
316 if (any_failure) {
317 self.retry_after = std.time.timestamp() + 5;
318 } else if (self.buffer.items.len == 0 and self.delete_buffer.items.len == 0) {
319 self.flushed_cursor = self.pending_cursor;
320 _ = self.arena.reset(.retain_capacity);
321 }
322 }
323};
324
325// -- HTTP client functions (worker API) --
326
327fn postBatch(transport: *HttpTransport, config: Config, events: []const ActorEvent, cursor: i64) bool {
328 var output: std.Io.Writer.Allocating = .init(transport.allocator);
329 defer output.deinit();
330
331 var jw: json.Stringify = .{
332 .writer = &output.writer,
333 .options = .{ .emit_null_optional_fields = false },
334 };
335 jw.write(.{ .events = events, .cursor = cursor }) catch return false;
336
337 var url_buf: [512]u8 = undefined;
338 const url = std.fmt.bufPrint(&url_buf, "{s}/admin/ingest", .{config.worker_url}) catch return false;
339
340 var auth_buf: [256]u8 = undefined;
341 const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return false;
342
343 const result = transport.fetch(.{
344 .url = url,
345 .method = .POST,
346 .payload = output.written(),
347 .authorization = auth,
348 }) catch return false;
349 defer transport.allocator.free(result.body);
350
351 if (result.status != .ok) {
352 log.err("ingest HTTP {d}: {s}", .{ @intFromEnum(result.status), result.body });
353 }
354
355 return result.status == .ok;
356}
357
358fn deleteActors(transport: *HttpTransport, config: Config, dids: []const []const u8) bool {
359 var output: std.Io.Writer.Allocating = .init(transport.allocator);
360 defer output.deinit();
361
362 var jw: json.Stringify = .{ .writer = &output.writer };
363 jw.write(.{ .dids = dids }) catch return false;
364
365 var url_buf: [512]u8 = undefined;
366 const url = std.fmt.bufPrint(&url_buf, "{s}/admin/delete", .{config.worker_url}) catch return false;
367
368 var auth_buf: [256]u8 = undefined;
369 const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return false;
370
371 const result = transport.fetch(.{
372 .url = url,
373 .method = .POST,
374 .payload = output.written(),
375 .authorization = auth,
376 }) catch return false;
377 defer transport.allocator.free(result.body);
378
379 return result.status == .ok;
380}
381
382fn fetchCursorOnce(transport: *HttpTransport, config: Config) ?i64 {
383 var url_buf: [512]u8 = undefined;
384 const url = std.fmt.bufPrint(&url_buf, "{s}/admin/cursor", .{config.worker_url}) catch return null;
385
386 var auth_buf: [256]u8 = undefined;
387 const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return null;
388
389 const result = transport.fetch(.{
390 .url = url,
391 .authorization = auth,
392 }) catch return null;
393 defer transport.allocator.free(result.body);
394
395 if (result.status != .ok) return null;
396
397 const parsed = json.parseFromSlice(json.Value, transport.allocator, result.body, .{}) catch return null;
398 defer parsed.deinit();
399
400 const cursor_val = parsed.value.object.get("cursor") orelse return null;
401 return switch (cursor_val) {
402 .integer => |n| n,
403 .float => |f| @intFromFloat(f),
404 else => null,
405 };
406}
407
408pub fn fetchCursor(transport: *HttpTransport, config: Config) ?i64 {
409 const backoff = [_]u64{ 1, 3, 10 };
410 for (backoff, 0..) |delay, attempt| {
411 if (fetchCursorOnce(transport, config)) |cursor| return cursor;
412 log.warn("cursor fetch failed (attempt {d}/3), retrying in {d}s", .{ attempt + 1, delay });
413 std.Thread.sleep(delay * std.time.ns_per_s);
414 }
415 return null;
416}
417
418// -- utilities --
419
420pub fn getRssKB() u64 {
421 const f = std.fs.openFileAbsolute("/proc/self/statm", .{}) catch return 0;
422 defer f.close();
423 var buf: [128]u8 = undefined;
424 const n = f.read(&buf) catch return 0;
425 var it = std.mem.splitScalar(u8, buf[0..n], ' ');
426 _ = it.next(); // size
427 const rss_pages = std.fmt.parseInt(u64, it.next() orelse return 0, 10) catch return 0;
428 return rss_pages * 4;
429}