GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix cursor resume and remove slingshot from ingest hot path

the ingester now fetches the last cursor from the worker on startup
and passes it to jetstream, so restarts resume where they left off
instead of starting from live.

removed synchronous slingshot call from profile commit handling —
identity events already carry handles, and the backfill/request-indexing
paths cover any gaps. this eliminates an external dependency from the
hot path and unblocks processing during slingshot downtime.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+47 -35
+47 -35
ingester/src/main.zig
··· 8 8 const log = std.log.scoped(.ingester); 9 9 10 10 const MAX_BATCH: usize = 500; 11 - const SLINGSHOT_URL = "https://slingshot.microcosm.blue/xrpc/blue.microcosm.identity.resolveMiniDoc?identifier="; 12 11 13 12 const Config = struct { 14 13 worker_url: []const u8, ··· 97 96 if (c.operation != .create and c.operation != .update) return; 98 97 99 98 const record = c.record orelse return; 100 - const aa = self.arena.allocator(); 101 99 102 100 const did = self.dupe(c.did) orelse return; 103 101 var event = ActorEvent{ .did = did }; ··· 112 110 } 113 111 } 114 112 115 - // resolve handle via slingshot 116 - event.handle = resolveHandle(aa, did); 117 - 118 - if (event.handle != null or event.display_name != null or event.avatar_cid != null) { 113 + if (event.display_name != null or event.avatar_cid != null) { 119 114 self.buffer.append(self.allocator, event) catch return; 120 115 } 121 116 } ··· 179 174 } 180 175 }; 181 176 182 - /// resolve DID → handle via slingshot (microcosm community service) 183 - fn resolveHandle(allocator: Allocator, did: []const u8) ?[]const u8 { 184 - var client = http.Client{ .allocator = allocator }; 185 - defer client.deinit(); 186 - 187 - var url_buf: [256]u8 = undefined; 188 - const url = std.fmt.bufPrint(&url_buf, SLINGSHOT_URL ++ "{s}", .{did}) catch return null; 189 - 190 - var aw: std.Io.Writer.Allocating = .init(allocator); 191 - defer aw.deinit(); 192 - 193 - const result = client.fetch(.{ 194 - .location = .{ .url = url }, 195 - .method = .GET, 196 - .response_writer = &aw.writer, 197 - }) catch return null; 198 - 199 - if (result.status != .ok) return null; 200 - 201 - var resp = aw.toArrayList(); 202 - defer resp.deinit(allocator); 203 - 204 - const parsed = json.parseFromSlice(json.Value, allocator, resp.items, .{}) catch return null; 205 - defer parsed.deinit(); 206 - 207 - const handle = zat.json.getString(parsed.value, "handle") orelse return null; 208 - return allocator.dupe(u8, handle) catch null; 209 - } 210 - 211 177 fn writeJsonEscaped(w: anytype, s: []const u8) !void { 212 178 for (s) |c| { 213 179 switch (c) { ··· 326 292 return result.status == .ok; 327 293 } 328 294 295 + fn fetchCursor(allocator: Allocator, config: Config) ?i64 { 296 + var client = http.Client{ .allocator = allocator }; 297 + defer client.deinit(); 298 + 299 + var url_buf: [512]u8 = undefined; 300 + const url = std.fmt.bufPrint(&url_buf, "{s}/admin/cursor", .{config.worker_url}) catch return null; 301 + 302 + var auth_buf: [256]u8 = undefined; 303 + const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{config.secret}) catch return null; 304 + 305 + var aw: std.Io.Writer.Allocating = .init(allocator); 306 + defer aw.deinit(); 307 + 308 + const result = client.fetch(.{ 309 + .location = .{ .url = url }, 310 + .method = .GET, 311 + .headers = .{ 312 + .authorization = .{ .override = auth }, 313 + }, 314 + .response_writer = &aw.writer, 315 + }) catch return null; 316 + 317 + if (result.status != .ok) return null; 318 + 319 + var resp = aw.toArrayList(); 320 + defer resp.deinit(allocator); 321 + 322 + const parsed = json.parseFromSlice(json.Value, allocator, resp.items, .{}) catch return null; 323 + defer parsed.deinit(); 324 + 325 + const cursor_val = parsed.value.object.get("cursor") orelse return null; 326 + return switch (cursor_val) { 327 + .integer => |n| n, 328 + .float => |f| @intFromFloat(f), 329 + else => null, 330 + }; 331 + } 332 + 329 333 pub fn main() !void { 330 334 var gpa = std.heap.GeneralPurposeAllocator(.{}){}; 331 335 defer _ = gpa.deinit(); ··· 334 338 const config = getConfig(); 335 339 log.info("typeahead ingester → {s}", .{config.worker_url}); 336 340 341 + const cursor = fetchCursor(allocator, config); 342 + if (cursor) |c| { 343 + log.info("resuming from cursor {d}", .{c}); 344 + } else { 345 + log.info("no cursor found, starting from live", .{}); 346 + } 347 + 337 348 var handler = IngestHandler.init(allocator, config); 338 349 defer handler.deinit(); 339 350 340 351 var client = zat.JetstreamClient.init(allocator, .{ 341 352 .wanted_collections = &.{"app.bsky.actor.profile"}, 353 + .cursor = cursor, 342 354 }); 343 355 defer client.deinit(); 344 356