GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

widen firehose to posts, likes, follows for mass actor discovery

previously only subscribed to app.bsky.actor.profile — now also
ingests app.bsky.feed.post, app.bsky.feed.like, app.bsky.graph.follow.
non-profile commits extract just the DID (bare upsert), with a bounded
dedup set (500k cap) to avoid redundant writes. also bumps hourly
handle resolution from 1000→5000 with 10-concurrent parallel calls.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+85 -39
+65 -23
ingester/src/main.zig
··· 8 8 const log = std.log.scoped(.ingester); 9 9 10 10 const MAX_BATCH: usize = 100; 11 + const SEEN_CAP: usize = 500_000; 11 12 12 13 const Config = struct { 13 14 worker_url: []const u8, ··· 39 40 delete_buffer: std.ArrayList([]const u8), 40 41 /// arena owns all string data in buffer/delete_buffer 41 42 arena: std.heap.ArenaAllocator, 43 + /// dedup set: tracks DIDs we've already enqueued from non-profile commits 44 + seen: std.StringHashMapUnmanaged(void) = .{}, 45 + seen_arena: std.heap.ArenaAllocator, 46 + seen_count: usize = 0, 42 47 pending_cursor: i64 = 0, 43 48 flushed_cursor: i64 = 0, 44 49 total_ingested: u64 = 0, ··· 53 58 .buffer = .{}, 54 59 .delete_buffer = .{}, 55 60 .arena = std.heap.ArenaAllocator.init(allocator), 61 + .seen_arena = std.heap.ArenaAllocator.init(allocator), 56 62 .last_flush = std.time.timestamp(), 57 63 }; 58 64 } 59 65 60 66 fn deinit(self: *IngestHandler) void { 67 + self.seen.deinit(self.allocator); 68 + self.seen_arena.deinit(); 61 69 self.arena.deinit(); 62 70 self.buffer.deinit(self.allocator); 63 71 self.delete_buffer.deinit(self.allocator); ··· 67 75 return self.arena.allocator().dupe(u8, s) catch null; 68 76 } 69 77 78 + fn dupeForSeen(self: *IngestHandler, s: []const u8) ?[]const u8 { 79 + return self.seen_arena.allocator().dupe(u8, s) catch null; 80 + } 81 + 82 + fn pruneSeen(self: *IngestHandler) void { 83 + log.info("pruning seen set ({d} entries)", .{self.seen_count}); 84 + self.seen.clearRetainingCapacity(); 85 + _ = self.seen_arena.reset(.retain_capacity); 86 + self.seen_count = 0; 87 + } 88 + 70 89 pub fn onEvent(self: *IngestHandler, event: zat.JetstreamEvent) void { 71 90 switch (event) { 72 91 .commit => |c| self.handleCommit(c), ··· 86 105 self.buffer.clearRetainingCapacity(); 87 106 self.delete_buffer.clearRetainingCapacity(); 88 107 _ = self.arena.reset(.retain_capacity); 108 + self.pruneSeen(); 89 109 self.flushed_cursor = self.pending_cursor; 90 110 self.retry_after = 0; 91 111 } ··· 111 131 } 112 132 113 133 fn handleCommit(self: *IngestHandler, c: zat.jetstream.CommitEvent) void { 114 - if (!mem.eql(u8, c.collection, "app.bsky.actor.profile")) return; 115 - if (c.operation != .create and c.operation != .update) return; 134 + if (mem.eql(u8, c.collection, "app.bsky.actor.profile")) { 135 + // rich extraction from profile records 136 + if (c.operation != .create and c.operation != .update) return; 116 137 117 - const record = c.record orelse return; 138 + const record = c.record orelse return; 118 139 119 - const did = self.dupe(c.did) orelse return; 120 - var event = ActorEvent{ .did = did }; 140 + const did = self.dupe(c.did) orelse return; 141 + var event = ActorEvent{ .did = did }; 121 142 122 - if (zat.json.getString(record, "displayName")) |name| { 123 - event.display_name = self.dupe(name); 124 - } 143 + if (zat.json.getString(record, "displayName")) |name| { 144 + event.display_name = self.dupe(name); 145 + } 125 146 126 - if (zat.json.getPath(record, "avatar")) |avatar| { 127 - if (zat.json.getString(avatar, "ref.$link")) |cid| { 128 - event.avatar_cid = self.dupe(cid); 147 + if (zat.json.getPath(record, "avatar")) |avatar| { 148 + if (zat.json.getString(avatar, "ref.$link")) |cid| { 149 + event.avatar_cid = self.dupe(cid); 150 + } 129 151 } 130 - } 131 152 132 - // check self-labels for !no-unauthenticated (user opts out of unauthenticated visibility) 133 - event.hidden = blk: { 134 - const values = zat.json.getArray(record, "labels.values") orelse break :blk false; 135 - for (values) |item| { 136 - const val = zat.json.getString(item, "val") orelse continue; 137 - if (mem.eql(u8, val, "!no-unauthenticated")) break :blk true; 138 - } 139 - break :blk false; 140 - }; 153 + // check self-labels for !no-unauthenticated 154 + event.hidden = blk: { 155 + const values = zat.json.getArray(record, "labels.values") orelse break :blk false; 156 + for (values) |item| { 157 + const val = zat.json.getString(item, "val") orelse continue; 158 + if (mem.eql(u8, val, "!no-unauthenticated")) break :blk true; 159 + } 160 + break :blk false; 161 + }; 141 162 142 - self.buffer.append(self.allocator, event) catch return; 163 + self.buffer.append(self.allocator, event) catch return; 164 + } else { 165 + // non-profile collections: just discover the DID 166 + if (c.operation == .delete) return; 167 + 168 + // dedup: skip if we've seen this DID recently 169 + if (self.seen.contains(c.did)) return; 170 + 171 + const seen_key = self.dupeForSeen(c.did) orelse return; 172 + self.seen.put(self.allocator, seen_key, {}) catch return; 173 + self.seen_count += 1; 174 + 175 + const did = self.dupe(c.did) orelse return; 176 + self.buffer.append(self.allocator, .{ .did = did }) catch return; 177 + 178 + if (self.seen_count >= SEEN_CAP) self.pruneSeen(); 179 + } 143 180 } 144 181 145 182 fn handleIdentity(self: *IngestHandler, id: zat.jetstream.IdentityEvent) void { ··· 374 411 defer handler.deinit(); 375 412 376 413 var client = zat.JetstreamClient.init(allocator, .{ 377 - .wanted_collections = &.{"app.bsky.actor.profile"}, 414 + .wanted_collections = &.{ 415 + "app.bsky.actor.profile", 416 + "app.bsky.feed.post", 417 + "app.bsky.feed.like", 418 + "app.bsky.graph.follow", 419 + }, 378 420 .cursor = cursor, 379 421 }); 380 422 defer client.deinit();
+20 -16
src/index.ts
··· 224 224 /** resolve handles for actors missing them via slingshot */ 225 225 async function resolveHandles(db: TursoDB): Promise<void> { 226 226 const { results } = await db.prepare( 227 - "SELECT did FROM actors WHERE handle = '' ORDER BY updated_at DESC LIMIT 1000" 227 + "SELECT did FROM actors WHERE handle = '' ORDER BY updated_at DESC LIMIT 5000" 228 228 ).all<{ did: string }>(); 229 229 if (!results || results.length === 0) return; 230 230 231 231 let resolved = 0; 232 - for (const { did } of results) { 233 - try { 234 - const res = await fetch( 235 - `${SLINGSHOT_URL}?identifier=${encodeURIComponent(did)}` 236 - ); 237 - if (!res.ok) continue; 238 - const identity: SlingshotResponse = await res.json(); 239 - if (identity.handle) { 240 - await db.prepare( 241 - "UPDATE actors SET handle = ?1 WHERE did = ?2 AND handle = ''" 242 - ).bind(identity.handle, did).run(); 243 - resolved++; 232 + const BATCH = 10; 233 + for (let i = 0; i < results.length; i += BATCH) { 234 + const batch = results.slice(i, i + BATCH); 235 + await Promise.all(batch.map(async ({ did }) => { 236 + try { 237 + const res = await fetch( 238 + `${SLINGSHOT_URL}?identifier=${encodeURIComponent(did)}` 239 + ); 240 + if (!res.ok) return; 241 + const identity: SlingshotResponse = await res.json(); 242 + if (identity.handle) { 243 + await db.prepare( 244 + "UPDATE actors SET handle = ?1 WHERE did = ?2 AND handle = ''" 245 + ).bind(identity.handle, did).run(); 246 + resolved++; 247 + } 248 + } catch { 249 + // best-effort — skip failures 244 250 } 245 - } catch { 246 - // best-effort — skip failures 247 - } 251 + })); 248 252 } 249 253 if (resolved > 0) { 250 254 console.log(JSON.stringify({ event: "handle_resolve", resolved, checked: results.length }));