search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: integrate logfire-zig for observability

- add logfire-zig dependency from tangled.sh
- configure logfire in main.zig (reads LOGFIRE_WRITE_TOKEN from env)
- add spans to HTTP handlers (search, tags, popular, similar)
- add spans to embedder batch processing and voyage API calls
- add span to TAP record indexing
- replace std.debug.print with logfire.info/warn/err throughout
- add counters for search requests, documents indexed, publications indexed

when LOGFIRE_WRITE_TOKEN is not set, falls back to console output

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

zzstoatzz f6e1ad58 365fff8f

+90 -38
+6
backend/build.zig
··· 25 25 .sqlite3 = &[_][]const u8{ "-std=c99", "-DSQLITE_ENABLE_FTS5" }, 26 26 }); 27 27 28 + const logfire = b.dependency("logfire", .{ 29 + .target = target, 30 + .optimize = optimize, 31 + }); 32 + 28 33 const exe = b.addExecutable(.{ 29 34 .name = "leaflet-search", 30 35 .root_module = b.createModule(.{ ··· 36 41 .{ .name = "zql", .module = zql.module("zql") }, 37 42 .{ .name = "zat", .module = zat.module("zat") }, 38 43 .{ .name = "zqlite", .module = zqlite.module("zqlite") }, 44 + .{ .name = "logfire", .module = logfire.module("logfire") }, 39 45 }, 40 46 }), 41 47 });
+4
backend/build.zig.zon
··· 20 20 .url = "https://github.com/karlseguin/zqlite.zig/archive/refs/heads/master.tar.gz", 21 21 .hash = "zqlite-0.0.0-RWLaY_y_mADh2LdbDrG_2HT2dBAcsAR8Jig_7-dOJd0B", 22 22 }, 23 + .logfire = .{ 24 + .url = "https://tangled.sh/zzstoatzz.io/logfire-zig/archive/main", 25 + .hash = "logfire_zig-0.1.0-x2yDLpwEAQC7sbmc8_4g5nhfzb75kzVZ1r3jTM_AgO5l", 26 + }, 23 27 }, 24 28 .paths = .{ 25 29 "build.zig",
+20 -10
backend/src/embedder.zig
··· 9 9 const mem = std.mem; 10 10 const posix = std.posix; 11 11 const Allocator = mem.Allocator; 12 + const logfire = @import("logfire"); 12 13 const db = @import("db/mod.zig"); 13 14 14 15 // voyage-3-lite limits ··· 21 22 /// Start the embedder background worker 22 23 pub fn start(allocator: Allocator) void { 23 24 const api_key = posix.getenv("VOYAGE_API_KEY") orelse { 24 - std.debug.print("embedder: VOYAGE_API_KEY not set, embeddings disabled\n", .{}); 25 + logfire.info("embedder: VOYAGE_API_KEY not set, embeddings disabled", .{}); 25 26 return; 26 27 }; 27 28 28 29 const thread = std.Thread.spawn(.{}, worker, .{ allocator, api_key }) catch |err| { 29 - std.debug.print("embedder: failed to start thread: {}\n", .{err}); 30 + logfire.err("embedder: failed to start thread: {}", .{err}); 30 31 return; 31 32 }; 32 33 thread.detach(); 33 - std.debug.print("embedder: background worker started\n", .{}); 34 + logfire.info("embedder: background worker started", .{}); 34 35 } 35 36 36 37 fn worker(allocator: Allocator, api_key: []const u8) void { ··· 43 44 const processed = processNextBatch(allocator, api_key) catch |err| { 44 45 consecutive_errors += 1; 45 46 const backoff: u64 = @min(ERROR_BACKOFF_SECS * consecutive_errors, 3600); 46 - std.debug.print("embedder: error {}, backing off {}s\n", .{ err, backoff }); 47 + logfire.warn("embedder: error {}, backing off {d}s", .{ err, backoff }); 47 48 std.Thread.sleep(backoff * std.time.ns_per_s); 48 49 continue; 49 50 }; 50 51 51 52 if (processed > 0) { 52 53 consecutive_errors = 0; 53 - std.debug.print("embedder: processed {} documents\n", .{processed}); 54 + logfire.debug("embedder: processed {d} documents", .{processed}); 55 + logfire.counter("embedder.documents_processed", @intCast(processed)); 54 56 // immediately check for more 55 57 continue; 56 58 } ··· 67 69 }; 68 70 69 71 fn processNextBatch(allocator: Allocator, api_key: []const u8) !usize { 72 + const span = logfire.span("embedder.process_batch", .{}); 73 + defer span.end(); 74 + 70 75 const client = db.getClient() orelse return error.NoClient; 71 76 72 77 // query for documents needing embeddings ··· 107 112 // update Turso with embeddings 108 113 for (docs.items, embeddings) |doc, embedding| { 109 114 updateDocumentEmbedding(client, doc.uri, embedding) catch |err| { 110 - std.debug.print("embedder: failed to update {s}: {}\n", .{ doc.uri, err }); 115 + logfire.err("embedder: failed to update {s}: {}", .{ doc.uri, err }); 111 116 }; 112 117 } 113 118 ··· 127 132 } 128 133 129 134 fn callVoyageApi(allocator: Allocator, api_key: []const u8, docs: []const DocToEmbed) ![][]f32 { 135 + const span = logfire.span("embedder.voyage_api", .{ 136 + .batch_size = @as(i64, @intCast(docs.len)), 137 + }); 138 + defer span.end(); 139 + 130 140 var http_client: http.Client = .{ .allocator = allocator }; 131 141 defer http_client.deinit(); 132 142 ··· 153 163 .payload = body, 154 164 .response_writer = &response_body.writer, 155 165 }) catch |err| { 156 - std.debug.print("embedder: voyage request failed: {}\n", .{err}); 166 + logfire.err("embedder: voyage request failed: {}", .{err}); 157 167 return error.VoyageRequestFailed; 158 168 }; 159 169 160 170 if (res.status != .ok) { 161 171 const resp_text = response_body.toOwnedSlice() catch ""; 162 172 defer if (resp_text.len > 0) allocator.free(resp_text); 163 - std.debug.print("embedder: voyage error {}: {s}\n", .{ res.status, resp_text[0..@min(resp_text.len, 200)] }); 173 + logfire.err("embedder: voyage error {}: {s}", .{ res.status, resp_text[0..@min(resp_text.len, 200)] }); 164 174 return error.VoyageApiError; 165 175 } 166 176 ··· 197 207 198 208 fn parseVoyageResponse(allocator: Allocator, response: []const u8, expected_count: usize) ![][]f32 { 199 209 const parsed = json.parseFromSlice(json.Value, allocator, response, .{}) catch { 200 - std.debug.print("embedder: failed to parse voyage response\n", .{}); 210 + logfire.err("embedder: failed to parse voyage response", .{}); 201 211 return error.ParseError; 202 212 }; 203 213 defer parsed.deinit(); ··· 206 216 if (data != .array) return error.InvalidData; 207 217 208 218 if (data.array.items.len != expected_count) { 209 - std.debug.print("embedder: expected {} embeddings, got {}\n", .{ expected_count, data.array.items.len }); 219 + logfire.err("embedder: expected {d} embeddings, got {d}", .{ expected_count, data.array.items.len }); 210 220 return error.CountMismatch; 211 221 } 212 222
+14 -4
backend/src/main.zig
··· 2 2 const net = std.net; 3 3 const posix = std.posix; 4 4 const Thread = std.Thread; 5 + const logfire = @import("logfire"); 5 6 const db = @import("db/mod.zig"); 6 7 const activity = @import("activity.zig"); 7 8 const server = @import("server.zig"); ··· 16 17 defer _ = gpa.deinit(); 17 18 const allocator = gpa.allocator(); 18 19 20 + // configure logfire (reads LOGFIRE_WRITE_TOKEN from env) 21 + _ = logfire.configure(.{ 22 + .service_name = "leaflet-search", 23 + .service_version = "0.1.0", 24 + .environment = posix.getenv("FLY_APP_NAME") orelse "development", 25 + }) catch |err| { 26 + std.debug.print("logfire init failed: {}, continuing without observability\n", .{err}); 27 + }; 28 + 19 29 // start http server FIRST so Fly proxy doesn't timeout 20 30 const port: u16 = blk: { 21 31 const port_str = posix.getenv("PORT") orelse "3000"; ··· 27 37 defer listener.deinit(); 28 38 29 39 const app_name = posix.getenv("APP_NAME") orelse "leaflet-search"; 30 - std.debug.print("{s} listening on http://0.0.0.0:{d} (max {} workers)\n", .{ app_name, port, MAX_HTTP_WORKERS }); 40 + logfire.info("{s} listening on port {d} (max {d} workers)", .{ app_name, port, MAX_HTTP_WORKERS }); 31 41 32 42 // init turso client synchronously (fast, needed for search fallback) 33 43 try db.initTurso(); ··· 46 56 47 57 while (true) { 48 58 const conn = listener.accept() catch |err| { 49 - std.debug.print("accept error: {}\n", .{err}); 59 + logfire.err("accept error: {}", .{err}); 50 60 continue; 51 61 }; 52 62 53 63 setSocketTimeout(conn.stream.handle, SOCKET_TIMEOUT_SECS) catch |err| { 54 - std.debug.print("failed to set socket timeout: {}\n", .{err}); 64 + logfire.warn("failed to set socket timeout: {}", .{err}); 55 65 }; 56 66 57 67 pool.spawn(server.handleConnection, .{conn}) catch |err| { 58 - std.debug.print("pool spawn error: {}\n", .{err}); 68 + logfire.err("pool spawn error: {}", .{err}); 59 69 conn.stream.close(); 60 70 }; 61 71 }
+17 -2
backend/src/server.zig
··· 3 3 const http = std.http; 4 4 const mem = std.mem; 5 5 const json = std.json; 6 + const logfire = @import("logfire"); 6 7 const activity = @import("activity.zig"); 7 8 const search = @import("search.zig"); 8 9 const stats = @import("stats.zig"); ··· 26 27 while (true) { 27 28 var request = server.receiveHead() catch |err| { 28 29 if (err != error.HttpConnectionClosing and err != error.EndOfStream) { 29 - std.debug.print("http receive error: {}\n", .{err}); 30 + logfire.debug("http receive error: {}", .{err}); 30 31 } 31 32 return; 32 33 }; 33 34 handleRequest(&server, &request) catch |err| { 34 - std.debug.print("request error: {}\n", .{err}); 35 + logfire.err("request error: {}", .{err}); 35 36 return; 36 37 }; 37 38 if (!request.head.keep_alive) return; ··· 76 77 fn handleSearch(request: *http.Server.Request, target: []const u8) !void { 77 78 const start_time = std.time.microTimestamp(); 78 79 defer timing.record(.search, start_time); 80 + 81 + const span = logfire.span("http.search", .{}); 82 + defer span.end(); 79 83 80 84 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 81 85 defer arena.deinit(); ··· 94 98 95 99 // perform FTS search - arena handles cleanup 96 100 const results = search.search(alloc, query, tag_filter, platform_filter, since_filter) catch |err| { 101 + logfire.err("search failed: {}", .{err}); 97 102 stats.recordError(); 98 103 return err; 99 104 }; 100 105 stats.recordSearch(query); 106 + logfire.counter("search.requests", 1); 101 107 try sendJson(request, results); 102 108 } 103 109 ··· 105 111 const start_time = std.time.microTimestamp(); 106 112 defer timing.record(.tags, start_time); 107 113 114 + const span = logfire.span("http.tags", .{}); 115 + defer span.end(); 116 + 108 117 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 109 118 defer arena.deinit(); 110 119 const alloc = arena.allocator(); ··· 116 125 fn handlePopular(request: *http.Server.Request) !void { 117 126 const start_time = std.time.microTimestamp(); 118 127 defer timing.record(.popular, start_time); 128 + 129 + const span = logfire.span("http.popular", .{}); 130 + defer span.end(); 119 131 120 132 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 121 133 defer arena.deinit(); ··· 281 293 fn handleSimilar(request: *http.Server.Request, target: []const u8) !void { 282 294 const start_time = std.time.microTimestamp(); 283 295 defer timing.record(.similar, start_time); 296 + 297 + const span = logfire.span("http.similar", .{}); 298 + defer span.end(); 284 299 285 300 var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 286 301 defer arena.deinit();
+29 -22
backend/src/tap.zig
··· 5 5 const Allocator = mem.Allocator; 6 6 const websocket = @import("websocket"); 7 7 const zat = @import("zat"); 8 + const logfire = @import("logfire"); 8 9 const indexer = @import("indexer.zig"); 9 10 const extractor = @import("extractor.zig"); 10 11 ··· 48 49 if (connected) |_| { 49 50 // connection succeeded then closed - reset backoff 50 51 backoff = 1; 51 - std.debug.print("tap connection closed, reconnecting immediately...\n", .{}); 52 + logfire.info("tap connection closed, reconnecting immediately", .{}); 52 53 } else |err| { 53 54 // connection failed - backoff 54 - std.debug.print("tap error: {}, reconnecting in {}s...\n", .{ err, backoff }); 55 + logfire.warn("tap error: {}, reconnecting in {d}s", .{ err, backoff }); 55 56 posix.nanosleep(backoff, 0); 56 57 backoff = @min(backoff * 2, max_backoff); 57 58 } ··· 67 68 pub fn serverMessage(self: *Handler, data: []const u8) !void { 68 69 self.msg_count += 1; 69 70 if (self.msg_count % 100 == 1) { 70 - std.debug.print("tap: received {} messages\n", .{self.msg_count}); 71 + logfire.debug("tap: received {d} messages", .{self.msg_count}); 71 72 } 72 73 73 74 // extract message ID for ACK ··· 75 76 76 77 // process the message 77 78 processMessage(self.allocator, data) catch |err| { 78 - std.debug.print("message processing error: {}\n", .{err}); 79 + logfire.err("message processing error: {}", .{err}); 79 80 // still ACK even on error to avoid infinite retries 80 81 }; 81 82 ··· 87 88 88 89 fn sendAck(self: *Handler, msg_id: i64) void { 89 90 const ack_json = std.fmt.bufPrint(&self.ack_buf, "{{\"type\":\"ack\",\"id\":{d}}}", .{msg_id}) catch |err| { 90 - std.debug.print("tap: ACK format error: {}\n", .{err}); 91 + logfire.err("tap: ACK format error: {}", .{err}); 91 92 return; 92 93 }; 93 94 self.client.write(@constCast(ack_json)) catch |err| { 94 - std.debug.print("tap: failed to send ACK: {}\n", .{err}); 95 + logfire.err("tap: failed to send ACK: {}", .{err}); 95 96 }; 96 97 } 97 98 98 99 pub fn close(_: *Handler) void { 99 - std.debug.print("tap connection closed\n", .{}); 100 + logfire.debug("tap connection closed", .{}); 100 101 } 101 102 }; 102 103 ··· 112 113 const tls = useTls(); 113 114 const path = "/channel"; 114 115 115 - std.debug.print("connecting to {s}://{s}:{d}{s}\n", .{ if (tls) "wss" else "ws", host, port, path }); 116 + logfire.info("connecting to {s}://{s}:{d}{s}", .{ if (tls) "wss" else "ws", host, port, path }); 116 117 117 118 var client = websocket.Client.init(allocator, .{ 118 119 .host = host, ··· 120 121 .tls = tls, 121 122 .max_size = 1024 * 1024, // 1MB 122 123 }) catch |err| { 123 - std.debug.print("websocket client init failed: {}\n", .{err}); 124 + logfire.err("websocket client init failed: {}", .{err}); 124 125 return err; 125 126 }; 126 127 defer client.deinit(); ··· 129 130 const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{host}) catch host; 130 131 131 132 client.handshake(path, .{ .headers = host_header }) catch |err| { 132 - std.debug.print("websocket handshake failed: {}\n", .{err}); 133 + logfire.err("websocket handshake failed: {}", .{err}); 133 134 return err; 134 135 }; 135 136 136 - std.debug.print("tap connected!\n", .{}); 137 + logfire.info("tap connected", .{}); 137 138 138 139 var handler = Handler{ .allocator = allocator, .client = &client }; 139 140 client.readLoop(&handler) catch |err| { 140 - std.debug.print("websocket read loop error: {}\n", .{err}); 141 + logfire.err("websocket read loop error: {}", .{err}); 141 142 return err; 142 143 }; 143 144 } ··· 169 170 170 171 fn processMessage(allocator: Allocator, payload: []const u8) !void { 171 172 const parsed = json.parseFromSlice(json.Value, allocator, payload, .{}) catch { 172 - std.debug.print("tap: JSON parse failed, first 100 bytes: {s}\n", .{payload[0..@min(payload.len, 100)]}); 173 + logfire.err("tap: JSON parse failed, first 100 bytes: {s}", .{payload[0..@min(payload.len, 100)]}); 173 174 return; 174 175 }; 175 176 defer parsed.deinit(); 176 177 177 178 // check message type 178 179 const msg_type = zat.json.getString(parsed.value, "type") orelse { 179 - std.debug.print("tap: no type field in message\n", .{}); 180 + logfire.warn("tap: no type field in message", .{}); 180 181 return; 181 182 }; 182 183 ··· 184 185 185 186 // extract record envelope (extractAt ignores extra fields like live, rev, cid) 186 187 const rec = zat.json.extractAt(TapRecord, allocator, parsed.value, .{"record"}) catch |err| { 187 - std.debug.print("tap: failed to extract record: {}\n", .{err}); 188 + logfire.warn("tap: failed to extract record: {}", .{err}); 188 189 return; 189 190 }; 190 191 ··· 195 196 var uri_buf: [256]u8 = undefined; 196 197 const uri = zat.AtUri.format(&uri_buf, did.raw, rec.collection, rec.rkey) orelse return; 197 198 199 + // span for the actual indexing work 200 + const span = logfire.span("tap.index_record", .{}); 201 + defer span.end(); 202 + 198 203 if (rec.isCreate() or rec.isUpdate()) { 199 204 const inner_record = zat.json.getObject(parsed.value, "record.record") orelse return; 200 205 201 206 if (isDocumentCollection(rec.collection)) { 202 207 processDocument(allocator, uri, did.raw, rec.rkey, inner_record, rec.collection) catch |err| { 203 - std.debug.print("document processing error: {}\n", .{err}); 208 + logfire.err("document processing error: {}", .{err}); 204 209 }; 205 210 } else if (isPublicationCollection(rec.collection)) { 206 211 processPublication(allocator, uri, did.raw, rec.rkey, inner_record) catch |err| { 207 - std.debug.print("publication processing error: {}\n", .{err}); 212 + logfire.err("publication processing error: {}", .{err}); 208 213 }; 209 214 } 210 215 } else if (rec.isDelete()) { 211 216 if (isDocumentCollection(rec.collection)) { 212 217 indexer.deleteDocument(uri); 213 - std.debug.print("deleted document: {s}\n", .{uri}); 218 + logfire.debug("deleted document: {s}", .{uri}); 214 219 } else if (isPublicationCollection(rec.collection)) { 215 220 indexer.deletePublication(uri); 216 - std.debug.print("deleted publication: {s}\n", .{uri}); 221 + logfire.debug("deleted publication: {s}", .{uri}); 217 222 } 218 223 } 219 224 } ··· 221 226 fn processDocument(allocator: Allocator, uri: []const u8, did: []const u8, rkey: []const u8, record: json.ObjectMap, collection: []const u8) !void { 222 227 var doc = extractor.extractDocument(allocator, record, collection) catch |err| { 223 228 if (err != error.NoContent and err != error.MissingTitle) { 224 - std.debug.print("extraction error for {s}: {}\n", .{ uri, err }); 229 + logfire.warn("extraction error for {s}: {}", .{ uri, err }); 225 230 } 226 231 return; 227 232 }; ··· 240 245 doc.source_collection, 241 246 doc.path, 242 247 ); 243 - std.debug.print("indexed document: {s} [{s}] ({} chars, {} tags)\n", .{ uri, doc.platformName(), doc.content.len, doc.tags.len }); 248 + logfire.debug("indexed document: {s} [{s}] ({d} chars, {d} tags)", .{ uri, doc.platformName(), doc.content.len, doc.tags.len }); 249 + logfire.counter("tap.documents_indexed", 1); 244 250 } 245 251 246 252 fn processPublication(_: Allocator, uri: []const u8, did: []const u8, rkey: []const u8, record: json.ObjectMap) !void { ··· 256 262 stripUrlScheme(zat.json.getString(record_val, "url")); 257 263 258 264 try indexer.insertPublication(uri, did, rkey, name, description, base_path); 259 - std.debug.print("indexed publication: {s} (base_path: {s})\n", .{ uri, base_path orelse "none" }); 265 + logfire.debug("indexed publication: {s} (base_path: {s})", .{ uri, base_path orelse "none" }); 266 + logfire.counter("tap.publications_indexed", 1); 260 267 } 261 268 262 269 fn stripUrlScheme(url: ?[]const u8) ?[]const u8 {