search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

add turbopuffer vector store client, wire into embedder

tpuf.zig: standalone client for turbopuffer REST API (upsert, query, delete).
stores full document metadata alongside vectors so query results can be
returned directly without a DB roundtrip.

embedder: after writing embeddings to turso, also upserts to turbopuffer
when TURBOPUFFER_API_KEY is set. uses zql.Query + fromRow pattern for
the expanded SELECT (metadata needed for tpuf sync).

to backfill: clear turso embeddings, let the embedder re-process everything.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz be9734be e57e4805

+441 -13
+69 -13
backend/src/ingest/embedder.zig
··· 10 10 const posix = std.posix; 11 11 const Allocator = mem.Allocator; 12 12 const logfire = @import("logfire"); 13 + const zql = @import("zql"); 13 14 const db = @import("../db/mod.zig"); 15 + const tpuf = @import("../tpuf.zig"); 14 16 15 17 // voyage-3-lite limits 16 18 const MAX_BATCH_SIZE = 20; // conservative batch size for reliability ··· 18 20 const EMBEDDING_DIM = 512; 19 21 const POLL_INTERVAL_SECS: u64 = 60; // check for new docs every minute 20 22 const ERROR_BACKOFF_SECS: u64 = 300; // 5 min backoff on errors 23 + 24 + // columns: uri(0) title(1) content(2) did(3) created_at(4) rkey(5) 25 + // base_path(6) has_publication(7) platform(8) path(9) 26 + const DocsNeedingEmbeddings = zql.Query( 27 + \\SELECT uri, title, content, did, created_at, rkey, 28 + \\ base_path, has_publication, platform, COALESCE(path, '') as path 29 + \\FROM documents WHERE embedding IS NULL LIMIT :limit 30 + ); 21 31 22 32 /// Start the embedder background worker 23 33 pub fn start(allocator: Allocator) void { ··· 64 74 65 75 const DocToEmbed = struct { 66 76 uri: []const u8, 67 - text: []const u8, // title + " " + content (truncated) 77 + text: []const u8, // title + " " + content (truncated, owned by caller) 78 + // metadata for tpuf — valid while DocsNeedingEmbeddings result rows are alive 79 + title: []const u8, 80 + did: []const u8, 81 + created_at: []const u8, 82 + rkey: []const u8, 83 + base_path: []const u8, 84 + platform: []const u8, 85 + path: []const u8, 86 + has_publication: bool, 87 + 88 + /// Map from DocsNeedingEmbeddings row. Caller must separately build `text`. 89 + fn fromRow(row: db.Row) DocToEmbed { 90 + return .{ 91 + .uri = row.text(0), 92 + .text = "", // set by caller after buildEmbeddingText 93 + .title = row.text(1), 94 + .did = row.text(3), 95 + .created_at = row.text(4), 96 + .rkey = row.text(5), 97 + .base_path = row.text(6), 98 + .has_publication = row.int(7) != 0, 99 + .platform = row.text(8), 100 + .path = row.text(9), 101 + }; 102 + } 68 103 }; 69 104 70 105 fn processNextBatch(allocator: Allocator, api_key: []const u8) !usize { ··· 73 108 74 109 const client = db.getClient() orelse return error.NoClient; 75 110 76 - // query for documents needing embeddings 77 111 var result = try client.query( 78 - "SELECT uri, title, content FROM documents WHERE embedding IS NULL LIMIT ?", 112 + DocsNeedingEmbeddings.positional, 79 113 &.{std.fmt.comptimePrint("{}", .{MAX_BATCH_SIZE})}, 80 114 ); 81 115 defer result.deinit(); ··· 83 117 // collect documents 84 118 var docs: std.ArrayList(DocToEmbed) = .empty; 85 119 defer { 86 - for (docs.items) |doc| { 87 - allocator.free(doc.text); 88 - } 120 + for (docs.items) |doc| allocator.free(doc.text); 89 121 docs.deinit(allocator); 90 122 } 91 123 92 124 for (result.rows) |row| { 93 - const uri = row.text(0); 94 - const title = row.text(1); 95 - const content = row.text(2); 96 - 97 - // build text for embedding: title + content, truncated 98 - const text = try buildEmbeddingText(allocator, title, content); 99 - try docs.append(allocator, .{ .uri = uri, .text = text }); 125 + var doc = DocToEmbed.fromRow(row); 126 + doc.text = try buildEmbeddingText(allocator, row.text(1), row.text(2)); 127 + try docs.append(allocator, doc); 100 128 } 101 129 102 130 if (docs.items.len == 0) return 0; ··· 112 140 for (docs.items, embeddings) |doc, embedding| { 113 141 updateDocumentEmbedding(client, doc.uri, embedding) catch |err| { 114 142 logfire.err("embedder: failed to update {s}: {}", .{ doc.uri, err }); 143 + }; 144 + } 145 + 146 + // sync to turbopuffer (fire-and-forget — search works without it) 147 + if (tpuf.isEnabled()) { 148 + var tpuf_docs = allocator.alloc(tpuf.VectorDoc, docs.items.len) catch { 149 + logfire.warn("embedder: failed to alloc tpuf batch", .{}); 150 + return docs.items.len; 151 + }; 152 + defer allocator.free(tpuf_docs); 153 + 154 + for (docs.items, embeddings, 0..) |doc, embedding, i| { 155 + tpuf_docs[i] = .{ 156 + .id = doc.uri, 157 + .vector = embedding, 158 + .title = doc.title, 159 + .did = doc.did, 160 + .created_at = doc.created_at, 161 + .rkey = doc.rkey, 162 + .base_path = doc.base_path, 163 + .platform = doc.platform, 164 + .path = doc.path, 165 + .has_publication = doc.has_publication, 166 + }; 167 + } 168 + 169 + tpuf.upsert(allocator, tpuf_docs) catch |err| { 170 + logfire.warn("embedder: tpuf upsert failed: {}, continuing", .{err}); 115 171 }; 116 172 } 117 173
+4
backend/src/main.zig
··· 4 4 const Thread = std.Thread; 5 5 const logfire = @import("logfire"); 6 6 const db = @import("db/mod.zig"); 7 + const tpuf = @import("tpuf.zig"); 7 8 const metrics = @import("metrics.zig"); 8 9 const server = @import("server.zig"); 9 10 const ingest = @import("ingest.zig"); ··· 81 82 82 83 // start stats buffer (background sync to Turso) 83 84 metrics.buffer.init(); 85 + 86 + // init vector store (reads TURBOPUFFER_API_KEY from env) 87 + tpuf.init(); 84 88 85 89 // start embedder (voyage-3-lite, 512 dims, 1 worker) 86 90 ingest.embedder.start(allocator);
+368
backend/src/tpuf.zig
··· 1 + //! Turbopuffer vector store client. 2 + //! 3 + //! Manages vector upserts and ANN queries against a turbopuffer namespace. 4 + //! Used by: 5 + //! - ingest/embedder.zig: upsert document vectors after embedding 6 + //! - search.zig: ANN query for semantic search + /similar replacement 7 + //! 8 + //! All operations are fire-and-forget safe — callers should handle errors 9 + //! gracefully since the system works without vector search. 10 + 11 + const std = @import("std"); 12 + const json = std.json; 13 + const http = std.http; 14 + const mem = std.mem; 15 + const posix = std.posix; 16 + const Allocator = mem.Allocator; 17 + const logfire = @import("logfire"); 18 + 19 + const API_BASE = "https://api.turbopuffer.com/v2/namespaces/"; 20 + 21 + var api_key: ?[]const u8 = null; 22 + var namespace: []const u8 = "leaflet-search"; 23 + 24 + // pre-formatted URL paths (built at init) 25 + var upsert_url_buf: [256]u8 = undefined; 26 + var upsert_url: []const u8 = ""; 27 + var query_url_buf: [256]u8 = undefined; 28 + var query_url: []const u8 = ""; 29 + 30 + /// Document metadata stored alongside vectors in turbopuffer. 31 + /// Fields mirror the SearchResultJson output so query results 32 + /// can be returned directly without a DB roundtrip. 33 + pub const VectorDoc = struct { 34 + id: []const u8, // AT-URI (used as turbopuffer document ID) 35 + vector: []const f32, // embedding (voyage-3-lite, 512 dims) 36 + title: []const u8, 37 + did: []const u8, 38 + created_at: []const u8, 39 + rkey: []const u8, 40 + base_path: []const u8, 41 + platform: []const u8, 42 + path: []const u8, 43 + has_publication: bool, 44 + }; 45 + 46 + /// Result from an ANN query. 47 + pub const QueryResult = struct { 48 + id: []const u8, 49 + dist: f64, 50 + title: []const u8, 51 + did: []const u8, 52 + created_at: []const u8, 53 + rkey: []const u8, 54 + base_path: []const u8, 55 + platform: []const u8, 56 + path: []const u8, 57 + has_publication: bool, 58 + }; 59 + 60 + /// Read config from environment. Call once at startup. 61 + pub fn init() void { 62 + api_key = posix.getenv("TURBOPUFFER_API_KEY"); 63 + if (posix.getenv("TURBOPUFFER_NAMESPACE")) |ns| { 64 + namespace = ns; 65 + } 66 + 67 + if (api_key != null) { 68 + // pre-format URL paths 69 + upsert_url = std.fmt.bufPrint(&upsert_url_buf, "{s}{s}", .{ API_BASE, namespace }) catch { 70 + logfire.err("tpuf: namespace too long", .{}); 71 + api_key = null; 72 + return; 73 + }; 74 + query_url = std.fmt.bufPrint(&query_url_buf, "{s}{s}/query", .{ API_BASE, namespace }) catch { 75 + logfire.err("tpuf: namespace too long", .{}); 76 + api_key = null; 77 + return; 78 + }; 79 + logfire.info("tpuf: initialized (namespace={s})", .{namespace}); 80 + } else { 81 + logfire.info("tpuf: TURBOPUFFER_API_KEY not set, vector store disabled", .{}); 82 + } 83 + } 84 + 85 + pub fn isEnabled() bool { 86 + return api_key != null; 87 + } 88 + 89 + /// Upsert document vectors with metadata. Creates the namespace on first write. 90 + /// Errors are logged but should not be fatal — the system works without vector search. 91 + pub fn upsert(allocator: Allocator, docs: []const VectorDoc) !void { 92 + const key = api_key orelse return error.NotConfigured; 93 + if (docs.len == 0) return; 94 + 95 + const span = logfire.span("tpuf.upsert", .{ 96 + .count = @as(i64, @intCast(docs.len)), 97 + }); 98 + defer span.end(); 99 + 100 + const body = try buildUpsertBody(allocator, docs); 101 + defer allocator.free(body); 102 + 103 + const response = try doRequest(allocator, key, upsert_url, body); 104 + defer allocator.free(response); 105 + } 106 + 107 + /// ANN query: find the top_k most similar vectors to the given query vector. 108 + /// Returns results with full document metadata for direct use in search responses. 109 + pub fn query(allocator: Allocator, vector: []const f32, top_k: usize) ![]QueryResult { 110 + const key = api_key orelse return error.NotConfigured; 111 + 112 + const span = logfire.span("tpuf.query", .{ 113 + .top_k = @as(i64, @intCast(top_k)), 114 + }); 115 + defer span.end(); 116 + 117 + const body = try buildQueryBody(allocator, vector, top_k); 118 + defer allocator.free(body); 119 + 120 + const response = try doRequest(allocator, key, query_url, body); 121 + defer allocator.free(response); 122 + 123 + return parseQueryResponse(allocator, response); 124 + } 125 + 126 + /// Delete vectors by ID. Can be batched into a single write call. 127 + pub fn delete(allocator: Allocator, ids: []const []const u8) !void { 128 + const key = api_key orelse return error.NotConfigured; 129 + if (ids.len == 0) return; 130 + 131 + const span = logfire.span("tpuf.delete", .{ 132 + .count = @as(i64, @intCast(ids.len)), 133 + }); 134 + defer span.end(); 135 + 136 + const body = try buildDeleteBody(allocator, ids); 137 + defer allocator.free(body); 138 + 139 + const response = try doRequest(allocator, key, upsert_url, body); 140 + defer allocator.free(response); 141 + } 142 + 143 + // --- request builders --- 144 + 145 + fn buildUpsertBody(allocator: Allocator, docs: []const VectorDoc) ![]const u8 { 146 + var output: std.Io.Writer.Allocating = .init(allocator); 147 + errdefer output.deinit(); 148 + 149 + var jw: json.Stringify = .{ .writer = &output.writer }; 150 + try jw.beginObject(); 151 + 152 + try jw.objectField("distance_metric"); 153 + try jw.write("cosine_distance"); 154 + 155 + try jw.objectField("upsert_rows"); 156 + try jw.beginArray(); 157 + 158 + for (docs) |doc| { 159 + try jw.beginObject(); 160 + 161 + try jw.objectField("id"); 162 + try jw.write(doc.id); 163 + 164 + try jw.objectField("vector"); 165 + try jw.beginArray(); 166 + for (doc.vector) |v| try jw.write(v); 167 + try jw.endArray(); 168 + 169 + try jw.objectField("title"); 170 + try jw.write(doc.title); 171 + try jw.objectField("did"); 172 + try jw.write(doc.did); 173 + try jw.objectField("created_at"); 174 + try jw.write(doc.created_at); 175 + try jw.objectField("rkey"); 176 + try jw.write(doc.rkey); 177 + try jw.objectField("base_path"); 178 + try jw.write(doc.base_path); 179 + try jw.objectField("platform"); 180 + try jw.write(doc.platform); 181 + try jw.objectField("path"); 182 + try jw.write(doc.path); 183 + try jw.objectField("has_publication"); 184 + try jw.write(@as(u64, if (doc.has_publication) 1 else 0)); 185 + 186 + try jw.endObject(); 187 + } 188 + 189 + try jw.endArray(); 190 + try jw.endObject(); 191 + 192 + return try output.toOwnedSlice(); 193 + } 194 + 195 + fn buildQueryBody(allocator: Allocator, vector: []const f32, top_k: usize) ![]const u8 { 196 + var output: std.Io.Writer.Allocating = .init(allocator); 197 + errdefer output.deinit(); 198 + 199 + var jw: json.Stringify = .{ .writer = &output.writer }; 200 + try jw.beginObject(); 201 + 202 + // rank_by: ["vector", "ANN", [0.1, 0.2, ...]] 203 + try jw.objectField("rank_by"); 204 + try jw.beginArray(); 205 + try jw.write("vector"); 206 + try jw.write("ANN"); 207 + try jw.beginArray(); 208 + for (vector) |v| try jw.write(v); 209 + try jw.endArray(); 210 + try jw.endArray(); 211 + 212 + try jw.objectField("top_k"); 213 + try jw.write(top_k); 214 + 215 + try jw.objectField("include_attributes"); 216 + try jw.beginArray(); 217 + for ([_][]const u8{ 218 + "title", 219 + "did", 220 + "created_at", 221 + "rkey", 222 + "base_path", 223 + "platform", 224 + "path", 225 + "has_publication", 226 + }) |attr| { 227 + try jw.write(attr); 228 + } 229 + try jw.endArray(); 230 + 231 + try jw.endObject(); 232 + 233 + return try output.toOwnedSlice(); 234 + } 235 + 236 + fn buildDeleteBody(allocator: Allocator, ids: []const []const u8) ![]const u8 { 237 + var output: std.Io.Writer.Allocating = .init(allocator); 238 + errdefer output.deinit(); 239 + 240 + var jw: json.Stringify = .{ .writer = &output.writer }; 241 + try jw.beginObject(); 242 + 243 + try jw.objectField("deletes"); 244 + try jw.beginArray(); 245 + for (ids) |id| try jw.write(id); 246 + try jw.endArray(); 247 + 248 + try jw.endObject(); 249 + 250 + return try output.toOwnedSlice(); 251 + } 252 + 253 + // --- response parsing --- 254 + 255 + fn parseQueryResponse(allocator: Allocator, response: []const u8) ![]QueryResult { 256 + const parsed = json.parseFromSlice(json.Value, allocator, response, .{}) catch { 257 + logfire.err("tpuf: failed to parse query response", .{}); 258 + return error.ParseError; 259 + }; 260 + defer parsed.deinit(); 261 + 262 + const rows = switch (parsed.value) { 263 + .object => |obj| obj.get("rows") orelse return &.{}, 264 + else => return &.{}, 265 + }; 266 + 267 + const items = switch (rows) { 268 + .array => |arr| arr.items, 269 + else => return &.{}, 270 + }; 271 + 272 + if (items.len == 0) return &.{}; 273 + 274 + var results = try allocator.alloc(QueryResult, items.len); 275 + var count: usize = 0; 276 + 277 + for (items) |item| { 278 + const obj = switch (item) { 279 + .object => |o| o, 280 + else => continue, 281 + }; 282 + 283 + results[count] = .{ 284 + .id = try allocator.dupe(u8, jsonStr(obj, "id")), 285 + .dist = jsonFloat(obj, "$dist"), 286 + .title = try allocator.dupe(u8, jsonStr(obj, "title")), 287 + .did = try allocator.dupe(u8, jsonStr(obj, "did")), 288 + .created_at = try allocator.dupe(u8, jsonStr(obj, "created_at")), 289 + .rkey = try allocator.dupe(u8, jsonStr(obj, "rkey")), 290 + .base_path = try allocator.dupe(u8, jsonStr(obj, "base_path")), 291 + .platform = try allocator.dupe(u8, jsonStr(obj, "platform")), 292 + .path = try allocator.dupe(u8, jsonStr(obj, "path")), 293 + .has_publication = jsonUint(obj, "has_publication") != 0, 294 + }; 295 + count += 1; 296 + } 297 + 298 + // shrink to actual count if any rows were skipped 299 + if (count < items.len) { 300 + return allocator.realloc(results, count) catch results[0..count]; 301 + } 302 + return results; 303 + } 304 + 305 + // --- JSON helpers --- 306 + 307 + fn jsonStr(obj: json.ObjectMap, key: []const u8) []const u8 { 308 + const val = obj.get(key) orelse return ""; 309 + return switch (val) { 310 + .string => |s| s, 311 + else => "", 312 + }; 313 + } 314 + 315 + fn jsonFloat(obj: json.ObjectMap, key: []const u8) f64 { 316 + const val = obj.get(key) orelse return 0; 317 + return switch (val) { 318 + .float => |f| f, 319 + .integer => |i| @floatFromInt(i), 320 + else => 0, 321 + }; 322 + } 323 + 324 + fn jsonUint(obj: json.ObjectMap, key: []const u8) u64 { 325 + const val = obj.get(key) orelse return 0; 326 + return switch (val) { 327 + .integer => |i| @intCast(i), 328 + .float => |f| @intFromFloat(f), 329 + else => 0, 330 + }; 331 + } 332 + 333 + // --- HTTP --- 334 + 335 + fn doRequest(allocator: Allocator, key: []const u8, url: []const u8, body: []const u8) ![]const u8 { 336 + var client: http.Client = .{ .allocator = allocator }; 337 + defer client.deinit(); 338 + 339 + var auth_buf: [256]u8 = undefined; 340 + const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{key}) catch 341 + return error.AuthTooLong; 342 + 343 + var response_body: std.Io.Writer.Allocating = .init(allocator); 344 + errdefer response_body.deinit(); 345 + 346 + const res = client.fetch(.{ 347 + .location = .{ .url = url }, 348 + .method = .POST, 349 + .headers = .{ 350 + .content_type = .{ .override = "application/json" }, 351 + .authorization = .{ .override = auth }, 352 + }, 353 + .payload = body, 354 + .response_writer = &response_body.writer, 355 + }) catch |err| { 356 + logfire.err("tpuf: request failed: {}", .{err}); 357 + return error.RequestFailed; 358 + }; 359 + 360 + if (res.status != .ok) { 361 + const resp_text = response_body.toOwnedSlice() catch ""; 362 + defer if (resp_text.len > 0) allocator.free(resp_text); 363 + logfire.err("tpuf: API error {}: {s}", .{ res.status, resp_text[0..@min(resp_text.len, 200)] }); 364 + return error.ApiError; 365 + } 366 + 367 + return try response_body.toOwnedSlice(); 368 + }