GET /xrpc/app.bsky.actor.searchActorsTypeahead typeahead.waow.tech
16
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 5080912ca9c3d7d4301e4dce83a5b804de56d00a 155 lines 4.9 kB view raw
1const std = @import("std"); 2const mem = std.mem; 3const net = std.net; 4const posix = std.posix; 5const Allocator = mem.Allocator; 6const Thread = std.Thread; 7const zat = @import("zat"); 8const HttpTransport = zat.HttpTransport; 9const LocalDb = @import("db/LocalDb.zig"); 10const sync = @import("db/sync.zig"); 11const server = @import("server.zig"); 12const ingest = @import("ingest.zig"); 13 14const log = std.log.scoped(.ingester); 15 16const MAX_HTTP_WORKERS = 8; 17const SOCKET_TIMEOUT_SECS = 5; 18 19const JetstreamArgs = struct { 20 allocator: Allocator, 21 config: ingest.Config, 22 transport: *HttpTransport, 23 local_db: ?*LocalDb, 24}; 25 26fn jetstreamThread(args: JetstreamArgs) void { 27 var handler = ingest.IngestHandler.init( 28 args.allocator, 29 args.config, 30 args.transport, 31 args.local_db, 32 ) catch |err| { 33 log.err("handler init failed: {}", .{err}); 34 return; 35 }; 36 defer handler.deinit(); 37 38 var client = zat.JetstreamClient.init(args.allocator, .{ 39 .wanted_collections = &.{ 40 "app.bsky.actor.profile", 41 "app.bsky.feed.post", 42 "app.bsky.feed.like", 43 "app.bsky.graph.follow", 44 }, 45 .cursor = ingest.fetchCursor(args.transport, args.config), 46 }); 47 defer client.deinit(); 48 49 client.subscribe(&handler); 50} 51 52fn setSocketTimeout(fd: posix.fd_t, secs: u32) !void { 53 const timeout = mem.toBytes(posix.timeval{ 54 .sec = @intCast(secs), 55 .usec = 0, 56 }); 57 try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.RCVTIMEO, &timeout); 58 try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.SNDTIMEO, &timeout); 59} 60 61pub fn main() !void { 62 var gpa = std.heap.GeneralPurposeAllocator(.{}){}; 63 defer _ = gpa.deinit(); 64 const allocator = gpa.allocator(); 65 66 const config = ingest.getConfig(); 67 log.info("typeahead ingester → {s}", .{config.worker_url}); 68 69 // open local SQLite (optional — graceful degradation if no volume) 70 var local_db = LocalDb.init(allocator); 71 var local_db_ptr: ?*LocalDb = null; 72 local_db.open() catch |err| { 73 log.warn("local db failed to open: {}, search will be unavailable", .{err}); 74 }; 75 if (local_db.conn != null) { 76 local_db_ptr = &local_db; 77 } 78 defer local_db.deinit(); 79 80 // start HTTP server FIRST so Fly proxy doesn't timeout 81 const port: u16 = blk: { 82 const port_str = posix.getenv("PORT") orelse "8080"; 83 break :blk std.fmt.parseInt(u16, port_str, 10) catch 8080; 84 }; 85 86 const address = try net.Address.parseIp("0.0.0.0", port); 87 var listener = try address.listen(.{ .reuse_address = true }); 88 defer listener.deinit(); 89 90 log.info("listening on port {d}", .{port}); 91 92 // init thread pool for HTTP connections 93 var pool: Thread.Pool = undefined; 94 try pool.init(.{ 95 .allocator = allocator, 96 .n_jobs = MAX_HTTP_WORKERS, 97 }); 98 defer pool.deinit(); 99 100 // start sync thread (background — turso → local SQLite) 101 if (local_db_ptr) |db| { 102 const sync_thread = try Thread.spawn(.{}, sync.syncLoop, .{ allocator, db }); 103 sync_thread.detach(); 104 } 105 106 // start jetstream thread (background — ingestion) 107 var transport = HttpTransport.init(allocator); 108 transport.keep_alive = false; 109 defer transport.deinit(); 110 111 const js_thread = try Thread.spawn(.{}, jetstreamThread, .{JetstreamArgs{ 112 .allocator = allocator, 113 .config = config, 114 .transport = &transport, 115 .local_db = local_db_ptr, 116 }}); 117 js_thread.detach(); 118 119 // main thread: HTTP accept loop 120 while (true) { 121 const conn = listener.accept() catch |err| { 122 log.err("accept error: {}", .{err}); 123 continue; 124 }; 125 126 setSocketTimeout(conn.stream.handle, SOCKET_TIMEOUT_SECS) catch |err| { 127 log.warn("failed to set socket timeout: {}", .{err}); 128 }; 129 130 if (local_db_ptr) |db| { 131 pool.spawn(server.handleConnection, .{ conn, db }) catch |err| { 132 log.err("pool spawn error: {}", .{err}); 133 conn.stream.close(); 134 }; 135 } else { 136 // no local DB — just serve health 137 var read_buffer: [1024]u8 = undefined; 138 var write_buffer: [1024]u8 = undefined; 139 var reader = conn.stream.reader(&read_buffer); 140 var writer = conn.stream.writer(&write_buffer); 141 var srv = std.http.Server.init(reader.interface(), &writer.interface); 142 var request = srv.receiveHead() catch { 143 conn.stream.close(); 144 continue; 145 }; 146 request.respond("{\"error\":\"search unavailable\"}", .{ 147 .status = .service_unavailable, 148 .extra_headers = &.{ 149 .{ .name = "content-type", .value = "application/json" }, 150 }, 151 }) catch {}; 152 conn.stream.close(); 153 } 154 } 155}