search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

make turbopuffer sole vector store, remove turso embeddings

- findSimilar now uses tpuf.getVectorById + tpuf.query (ANN, ~100ms vs 2-3s brute-force cosine)
- embedder writes only to turbopuffer, marks docs with embedded_at timestamp
- remove similarity_cache table, cosine queries, and all cache machinery
- add embedded_at column migration (replaces turso embedding column)
- preserve embedded_at in indexer ON CONFLICT clause

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz f01cc250 be9734be

+182 -223
+1 -11
backend/src/db/LocalDb.zig
··· 125 125 fn createSchema(self: *LocalDb) !void { 126 126 const c = self.conn orelse return error.NotOpen; 127 127 128 - // documents table (no embedding column - vectors stay on Turso) 128 + // documents table (no embedding/embedded_at — vectors are in turbopuffer) 129 129 c.exec( 130 130 \\CREATE TABLE IF NOT EXISTS documents ( 131 131 \\ uri TEXT PRIMARY KEY, ··· 236 236 \\CREATE TABLE IF NOT EXISTS popular_searches ( 237 237 \\ query TEXT PRIMARY KEY, 238 238 \\ count INTEGER DEFAULT 1 239 - \\) 240 - , .{}) catch {}; 241 - 242 - // similarity cache (local copy for fast lookups) 243 - c.exec( 244 - \\CREATE TABLE IF NOT EXISTS similarity_cache ( 245 - \\ source_uri TEXT PRIMARY KEY, 246 - \\ results TEXT NOT NULL, 247 - \\ doc_count INTEGER NOT NULL, 248 - \\ computed_at INTEGER NOT NULL 249 239 \\) 250 240 , .{}) catch {}; 251 241
+3 -11
backend/src/db/schema.zig
··· 100 100 \\) 101 101 , &.{}); 102 102 103 - // similarity cache: stores precomputed similar documents 104 - // invalidated when doc_count changes (new docs added/removed) 105 - try client.exec( 106 - \\CREATE TABLE IF NOT EXISTS similarity_cache ( 107 - \\ source_uri TEXT PRIMARY KEY, 108 - \\ results TEXT NOT NULL, 109 - \\ doc_count INTEGER NOT NULL, 110 - \\ computed_at INTEGER NOT NULL 111 - \\) 112 - , &.{}); 113 103 } 114 104 115 105 fn runMigrations(client: *Client) !void { ··· 134 124 client.exec("UPDATE publications SET platform = 'leaflet' WHERE platform IS NULL", &.{}) catch {}; 135 125 client.exec("UPDATE publications SET source_collection = 'pub.leaflet.publication' WHERE source_collection IS NULL", &.{}) catch {}; 136 126 137 - // vector embeddings column already added by backfill script 127 + // embedded_at: tracks when a document was embedded into turbopuffer 128 + // (replaces the old turso embedding column — can't DROP COLUMN in SQLite, but NULL = no space) 129 + client.exec("ALTER TABLE documents ADD COLUMN embedded_at TEXT", &.{}) catch {}; 138 130 139 131 // dedupe index: same (did, rkey) across collections = same document 140 132 // e.g., pub.leaflet.document/abc and site.standard.document/abc are the same content
+32 -58
backend/src/ingest/embedder.zig
··· 26 26 const DocsNeedingEmbeddings = zql.Query( 27 27 \\SELECT uri, title, content, did, created_at, rkey, 28 28 \\ base_path, has_publication, platform, COALESCE(path, '') as path 29 - \\FROM documents WHERE embedding IS NULL LIMIT :limit 29 + \\FROM documents WHERE embedded_at IS NULL LIMIT :limit 30 30 ); 31 31 32 32 /// Start the embedder background worker ··· 136 136 allocator.free(embeddings); 137 137 } 138 138 139 - // update Turso with embeddings 140 - for (docs.items, embeddings) |doc, embedding| { 141 - updateDocumentEmbedding(client, doc.uri, embedding) catch |err| { 142 - logfire.err("embedder: failed to update {s}: {}", .{ doc.uri, err }); 143 - }; 139 + // upsert to turbopuffer (sole vector store) 140 + if (!tpuf.isEnabled()) { 141 + logfire.warn("embedder: tpuf not configured, skipping batch", .{}); 142 + return 0; 144 143 } 145 144 146 - // sync to turbopuffer (fire-and-forget — search works without it) 147 - if (tpuf.isEnabled()) { 148 - var tpuf_docs = allocator.alloc(tpuf.VectorDoc, docs.items.len) catch { 149 - logfire.warn("embedder: failed to alloc tpuf batch", .{}); 150 - return docs.items.len; 145 + var tpuf_docs = try allocator.alloc(tpuf.VectorDoc, docs.items.len); 146 + defer allocator.free(tpuf_docs); 147 + 148 + for (docs.items, embeddings, 0..) |doc, embedding, i| { 149 + tpuf_docs[i] = .{ 150 + .id = doc.uri, 151 + .vector = embedding, 152 + .title = doc.title, 153 + .did = doc.did, 154 + .created_at = doc.created_at, 155 + .rkey = doc.rkey, 156 + .base_path = doc.base_path, 157 + .platform = doc.platform, 158 + .path = doc.path, 159 + .has_publication = doc.has_publication, 151 160 }; 152 - defer allocator.free(tpuf_docs); 161 + } 153 162 154 - for (docs.items, embeddings, 0..) |doc, embedding, i| { 155 - tpuf_docs[i] = .{ 156 - .id = doc.uri, 157 - .vector = embedding, 158 - .title = doc.title, 159 - .did = doc.did, 160 - .created_at = doc.created_at, 161 - .rkey = doc.rkey, 162 - .base_path = doc.base_path, 163 - .platform = doc.platform, 164 - .path = doc.path, 165 - .has_publication = doc.has_publication, 166 - }; 167 - } 163 + tpuf.upsert(allocator, tpuf_docs) catch |err| { 164 + logfire.warn("embedder: tpuf upsert failed: {}, will retry", .{err}); 165 + return error.TpufUpsertFailed; 166 + }; 168 167 169 - tpuf.upsert(allocator, tpuf_docs) catch |err| { 170 - logfire.warn("embedder: tpuf upsert failed: {}, continuing", .{err}); 168 + // mark docs as embedded in turso (so they won't be re-processed) 169 + for (docs.items) |doc| { 170 + client.exec( 171 + "UPDATE documents SET embedded_at = strftime('%Y-%m-%dT%H:%M:%S', 'now') WHERE uri = ?", 172 + &.{doc.uri}, 173 + ) catch |err| { 174 + logfire.err("embedder: failed to mark {s} as embedded: {}", .{ doc.uri, err }); 171 175 }; 172 176 } 173 177 ··· 335 339 return embeddings; 336 340 } 337 341 338 - fn updateDocumentEmbedding(client: *db.Client, uri: []const u8, embedding: []f32) !void { 339 - const allocator = client.allocator; 340 - 341 - // serialize embedding to JSON array string for vector32() 342 - var embedding_json: std.ArrayList(u8) = .empty; 343 - defer embedding_json.deinit(allocator); 344 - 345 - try embedding_json.append(allocator, '['); 346 - for (embedding, 0..) |val, i| { 347 - if (i > 0) try embedding_json.append(allocator, ','); 348 - var buf: [32]u8 = undefined; 349 - const str = std.fmt.bufPrint(&buf, "{d:.6}", .{val}) catch continue; 350 - try embedding_json.appendSlice(allocator, str); 351 - } 352 - try embedding_json.append(allocator, ']'); 353 - 354 - // use batch API to execute dynamic SQL 355 - const statements = [_]db.Client.Statement{ 356 - .{ 357 - .sql = "UPDATE documents SET embedding = vector32(?) WHERE uri = ?", 358 - .args = &.{ embedding_json.items, uri }, 359 - }, 360 - }; 361 - 362 - var result = client.queryBatch(&statements) catch |err| { 363 - std.debug.print("embedder: update failed for {s}: {}\n", .{ uri, err }); 364 - return err; 365 - }; 366 - defer result.deinit(); 367 - }
+3 -2
backend/src/ingest/indexer.zig
··· 118 118 } 119 119 } 120 120 121 - // use ON CONFLICT to preserve embedding column (INSERT OR REPLACE would nuke it) 121 + // use ON CONFLICT to preserve embedded_at (INSERT OR REPLACE would nuke it) 122 122 // indexed_at uses strftime to record when this row was inserted/updated in Turso 123 123 // (created_at is the document's publication date, which can be old for resynced docs) 124 124 try c.exec( ··· 136 136 \\ path = excluded.path, 137 137 \\ base_path = excluded.base_path, 138 138 \\ has_publication = excluded.has_publication, 139 - \\ indexed_at = strftime('%Y-%m-%dT%H:%M:%S', 'now') 139 + \\ indexed_at = strftime('%Y-%m-%dT%H:%M:%S', 'now'), 140 + \\ embedded_at = documents.embedded_at 140 141 , 141 142 &.{ uri, did, rkey, title, content, created_at orelse "", pub_uri, actual_platform, source_collection, path orelse "", base_path, has_pub }, 142 143 );
+2 -2
backend/src/metrics/stats.zig
··· 31 31 \\SELECT 32 32 \\ (SELECT COUNT(*) FROM documents) as docs, 33 33 \\ (SELECT COUNT(*) FROM publications) as pubs, 34 - \\ (SELECT COUNT(*) FROM documents WHERE embedding IS NOT NULL) as embeddings, 34 + \\ (SELECT COUNT(*) FROM documents WHERE embedded_at IS NOT NULL) as embeddings, 35 35 \\ (SELECT total_searches FROM stats WHERE id = 1) as searches, 36 36 \\ (SELECT total_errors FROM stats WHERE id = 1) as errors, 37 37 \\ (SELECT service_started_at FROM stats WHERE id = 1) as started_at, ··· 63 63 \\SELECT 64 64 \\ (SELECT COUNT(*) FROM documents) as docs, 65 65 \\ (SELECT COUNT(*) FROM publications) as pubs, 66 - \\ (SELECT COUNT(*) FROM documents WHERE embedding IS NOT NULL) as embeddings 66 + \\ (SELECT COUNT(*) FROM documents WHERE embedded_at IS NOT NULL) as embeddings 67 67 , .{}); 68 68 defer rows.deinit(); 69 69 const row = rows.next() orelse return error.NoRows;
+47 -139
backend/src/search.zig
··· 4 4 const zql = @import("zql"); 5 5 const logfire = @import("logfire"); 6 6 const db = @import("db/mod.zig"); 7 - const metrics = @import("metrics.zig"); 8 - 9 - // cached embedded doc count (refresh every 5 minutes) 10 - var cached_doc_count: std.atomic.Value(i64) = std.atomic.Value(i64).init(0); 11 - var doc_count_updated_at: std.atomic.Value(i64) = std.atomic.Value(i64).init(0); 12 - const DOC_COUNT_CACHE_SECS = 300; // 5 minutes 7 + const tpuf = @import("tpuf.zig"); 13 8 14 9 // JSON output type for search results 15 10 const SearchResultJson = struct { ··· 591 586 return &.{}; 592 587 } 593 588 594 - /// Find documents similar to a given document using vector similarity 595 - /// Uses brute-force cosine distance with caching (cache invalidated when doc count changes) 589 + /// Find documents similar to a given document via turbopuffer ANN search. 590 + /// 1. Fetch source doc's vector from tpuf (~50ms) 591 + /// 2. ANN nearest-neighbor query (~50ms) 592 + /// 3. Filter out source URI, serialize results 596 593 pub fn findSimilar(alloc: Allocator, uri: []const u8, limit: usize) ![]const u8 { 597 - const c = db.getClient() orelse return error.NotInitialized; 594 + // get source document's vector 595 + const vector = tpuf.getVectorById(alloc, uri) catch |err| { 596 + logfire.warn("similar: getVectorById failed for {s}: {}", .{ uri, err }); 597 + return error.VectorNotFound; 598 + }; 599 + defer alloc.free(vector); 598 600 599 - // get cached doc count (rarely hits Turso - refreshes every 5 min) 600 - const doc_count = getEmbeddedDocCountCached(c) orelse return error.QueryFailed; 601 - 602 - // check LOCAL cache first (instant) 603 - if (db.getLocalDb()) |local| { 604 - if (getCachedSimilarLocal(alloc, local, uri, doc_count)) |cached| { 605 - metrics.stats.recordCacheHit(); 606 - return cached; 607 - } 608 - } 609 - 610 - // check Turso cache (slower, but needed if local empty) 611 - if (getCachedSimilar(alloc, c, uri, doc_count)) |cached| { 612 - metrics.stats.recordCacheHit(); 613 - // also write to local cache for next time 614 - if (db.getLocalDb()) |local| { 615 - cacheSimilarResultsLocal(local, uri, cached, doc_count); 601 + // ANN query (request limit+1 so we can filter out the source doc) 602 + const results = tpuf.query(alloc, vector, limit + 1) catch |err| { 603 + logfire.warn("similar: tpuf query failed: {}", .{err}); 604 + return error.QueryFailed; 605 + }; 606 + defer { 607 + for (results) |r| { 608 + alloc.free(r.id); 609 + alloc.free(r.title); 610 + alloc.free(r.did); 611 + alloc.free(r.created_at); 612 + alloc.free(r.rkey); 613 + alloc.free(r.base_path); 614 + alloc.free(r.platform); 615 + alloc.free(r.path); 616 616 } 617 - return cached; 617 + alloc.free(results); 618 618 } 619 - metrics.stats.recordCacheMiss(); 620 619 621 - // cache miss - compute similarity 620 + // serialize, filtering out the source URI 622 621 var output: std.Io.Writer.Allocating = .init(alloc); 623 622 errdefer output.deinit(); 624 623 625 - var limit_buf: [8]u8 = undefined; 626 - const limit_str = std.fmt.bufPrint(&limit_buf, "{d}", .{limit}) catch "5"; 627 - 628 - // brute-force cosine similarity search (no vector index needed) 629 - var res = c.query( 630 - \\SELECT d2.uri, d2.did, d2.title, '' as snippet, 631 - \\ d2.created_at, d2.rkey, d2.base_path, d2.has_publication, 632 - \\ d2.platform, COALESCE(d2.path, '') as path 633 - \\FROM documents d1, documents d2 634 - \\WHERE d1.uri = ? 635 - \\ AND d2.uri != d1.uri 636 - \\ AND d1.embedding IS NOT NULL 637 - \\ AND d2.embedding IS NOT NULL 638 - \\ORDER BY vector_distance_cos(d1.embedding, d2.embedding) 639 - \\LIMIT ? 640 - , &.{ uri, limit_str }) catch { 641 - try output.writer.writeAll("[]"); 642 - return try output.toOwnedSlice(); 643 - }; 644 - defer res.deinit(); 645 - 646 624 var jw: json.Stringify = .{ .writer = &output.writer }; 647 625 try jw.beginArray(); 648 - for (res.rows) |row| try jw.write(Doc.fromRow(row).toJson()); 626 + var count: usize = 0; 627 + for (results) |r| { 628 + if (std.mem.eql(u8, r.id, uri)) continue; 629 + if (count >= limit) break; 630 + try jw.write(SearchResultJson{ 631 + .type = if (r.has_publication) "article" else "looseleaf", 632 + .uri = r.id, 633 + .did = r.did, 634 + .title = r.title, 635 + .snippet = "", 636 + .createdAt = r.created_at, 637 + .rkey = r.rkey, 638 + .basePath = r.base_path, 639 + .platform = r.platform, 640 + .path = r.path, 641 + }); 642 + count += 1; 643 + } 649 644 try jw.endArray(); 650 645 651 - const results = try output.toOwnedSlice(); 652 - 653 - // cache to LOCAL db (instant) 654 - if (db.getLocalDb()) |local| { 655 - cacheSimilarResultsLocal(local, uri, results, doc_count); 656 - } 657 - 658 - // cache to Turso (fire and forget - still useful for durability) 659 - cacheSimilarResults(c, uri, results, doc_count); 660 - 661 - return results; 662 - } 663 - 664 - fn getEmbeddedDocCount(c: *db.Client) ?i64 { 665 - var res = c.query("SELECT COUNT(*) FROM documents WHERE embedding IS NOT NULL", &.{}) catch return null; 666 - defer res.deinit(); 667 - if (res.rows.len == 0) return null; 668 - return res.rows[0].int(0); 669 - } 670 - 671 - fn getEmbeddedDocCountCached(c: *db.Client) ?i64 { 672 - const now = std.time.timestamp(); 673 - const last_update = doc_count_updated_at.load(.acquire); 674 - 675 - // use cached value if fresh enough 676 - if (now - last_update < DOC_COUNT_CACHE_SECS) { 677 - const cached = cached_doc_count.load(.acquire); 678 - if (cached > 0) return cached; 679 - } 680 - 681 - // refresh from Turso 682 - const count = getEmbeddedDocCount(c) orelse return null; 683 - cached_doc_count.store(count, .release); 684 - doc_count_updated_at.store(now, .release); 685 - return count; 686 - } 687 - 688 - fn getCachedSimilar(alloc: Allocator, c: *db.Client, uri: []const u8, current_doc_count: i64) ?[]const u8 { 689 - var count_buf: [20]u8 = undefined; 690 - const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{current_doc_count}) catch return null; 691 - 692 - var res = c.query( 693 - "SELECT results FROM similarity_cache WHERE source_uri = ? AND doc_count = ?", 694 - &.{ uri, count_str }, 695 - ) catch return null; 696 - defer res.deinit(); 697 - 698 - if (res.rows.len == 0) return null; 699 - return alloc.dupe(u8, res.rows[0].text(0)) catch null; 700 - } 701 - 702 - fn cacheSimilarResults(c: *db.Client, uri: []const u8, results: []const u8, doc_count: i64) void { 703 - var count_buf: [20]u8 = undefined; 704 - const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{doc_count}) catch return; 705 - 706 - var ts_buf: [20]u8 = undefined; 707 - const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch return; 708 - 709 - c.exec( 710 - "INSERT OR REPLACE INTO similarity_cache (source_uri, results, doc_count, computed_at) VALUES (?, ?, ?, ?)", 711 - &.{ uri, results, count_str, ts_str }, 712 - ) catch {}; 713 - } 714 - 715 - fn getCachedSimilarLocal(alloc: Allocator, local: *db.LocalDb, uri: []const u8, current_doc_count: i64) ?[]const u8 { 716 - var rows = local.query( 717 - "SELECT results, doc_count FROM similarity_cache WHERE source_uri = ?", 718 - .{uri}, 719 - ) catch return null; 720 - defer rows.deinit(); 721 - 722 - const row = rows.next() orelse return null; 723 - // check doc_count matches for cache validity 724 - if (row.int(1) != current_doc_count) return null; 725 - return alloc.dupe(u8, row.text(0)) catch null; 726 - } 727 - 728 - fn cacheSimilarResultsLocal(local: *db.LocalDb, uri: []const u8, results: []const u8, doc_count: i64) void { 729 - var count_buf: [20]u8 = undefined; 730 - const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{doc_count}) catch return; 731 - 732 - var ts_buf: [20]u8 = undefined; 733 - const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch return; 734 - 735 - local.exec( 736 - "INSERT OR REPLACE INTO similarity_cache (source_uri, results, doc_count, computed_at) VALUES (?, ?, ?, ?)", 737 - .{ uri, results, count_str, ts_str }, 738 - ) catch {}; 646 + return try output.toOwnedSlice(); 739 647 } 740 648 741 649 /// Build FTS5 query with OR between terms: "cat dog" -> "cat OR dog*"
+94
backend/src/tpuf.zig
··· 123 123 return parseQueryResponse(allocator, response); 124 124 } 125 125 126 + /// Retrieve a document's vector by its ID (AT-URI). 127 + /// Used to get the source vector for ANN similarity queries. 128 + pub fn getVectorById(allocator: Allocator, id: []const u8) ![]f32 { 129 + const key = api_key orelse return error.NotConfigured; 130 + 131 + const span = logfire.span("tpuf.get_vector", .{}); 132 + defer span.end(); 133 + 134 + const body = try buildGetVectorBody(allocator, id); 135 + defer allocator.free(body); 136 + 137 + const response = try doRequest(allocator, key, query_url, body); 138 + defer allocator.free(response); 139 + 140 + return parseVectorResponse(allocator, response); 141 + } 142 + 126 143 /// Delete vectors by ID. Can be batched into a single write call. 127 144 pub fn delete(allocator: Allocator, ids: []const []const u8) !void { 128 145 const key = api_key orelse return error.NotConfigured; ··· 233 250 return try output.toOwnedSlice(); 234 251 } 235 252 253 + fn buildGetVectorBody(allocator: Allocator, id: []const u8) ![]const u8 { 254 + var output: std.Io.Writer.Allocating = .init(allocator); 255 + errdefer output.deinit(); 256 + 257 + var jw: json.Stringify = .{ .writer = &output.writer }; 258 + try jw.beginObject(); 259 + 260 + // rank_by: ["id", "asc"] — return by ID order (we only want 1) 261 + try jw.objectField("rank_by"); 262 + try jw.beginArray(); 263 + try jw.write("id"); 264 + try jw.write("asc"); 265 + try jw.endArray(); 266 + 267 + // filters: ["id", "Eq", "<id>"] 268 + try jw.objectField("filters"); 269 + try jw.beginArray(); 270 + try jw.write("id"); 271 + try jw.write("Eq"); 272 + try jw.write(id); 273 + try jw.endArray(); 274 + 275 + try jw.objectField("top_k"); 276 + try jw.write(1); 277 + 278 + try jw.objectField("include_vectors"); 279 + try jw.write(true); 280 + 281 + try jw.endObject(); 282 + 283 + return try output.toOwnedSlice(); 284 + } 285 + 236 286 fn buildDeleteBody(allocator: Allocator, ids: []const []const u8) ![]const u8 { 237 287 var output: std.Io.Writer.Allocating = .init(allocator); 238 288 errdefer output.deinit(); ··· 300 350 return allocator.realloc(results, count) catch results[0..count]; 301 351 } 302 352 return results; 353 + } 354 + 355 + fn parseVectorResponse(allocator: Allocator, response: []const u8) ![]f32 { 356 + const parsed = json.parseFromSlice(json.Value, allocator, response, .{}) catch { 357 + logfire.err("tpuf: failed to parse vector response", .{}); 358 + return error.ParseError; 359 + }; 360 + defer parsed.deinit(); 361 + 362 + const rows = switch (parsed.value) { 363 + .object => |obj| obj.get("rows") orelse return error.NoRows, 364 + else => return error.InvalidResponse, 365 + }; 366 + 367 + const items = switch (rows) { 368 + .array => |arr| arr.items, 369 + else => return error.InvalidResponse, 370 + }; 371 + 372 + if (items.len == 0) return error.NotFound; 373 + 374 + const row = switch (items[0]) { 375 + .object => |o| o, 376 + else => return error.InvalidResponse, 377 + }; 378 + 379 + const vec_val = row.get("vector") orelse return error.NoVector; 380 + const vec_items = switch (vec_val) { 381 + .array => |arr| arr.items, 382 + else => return error.InvalidResponse, 383 + }; 384 + 385 + const vector = try allocator.alloc(f32, vec_items.len); 386 + errdefer allocator.free(vector); 387 + 388 + for (vec_items, 0..) |val, i| { 389 + vector[i] = switch (val) { 390 + .float => @floatCast(val.float), 391 + .integer => @floatFromInt(val.integer), 392 + else => return error.InvalidValue, 393 + }; 394 + } 395 + 396 + return vector; 303 397 } 304 398 305 399 // --- JSON helpers ---