search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

similarity search: switch to brute-force, improve backfill

- use vector_distance_cos() instead of vector_top_k() with DiskANN index
(Turso's index has ~60s write latency per row, brute-force is ~0.15s)
- add retry logic and batched writes to backfill script
- increase concurrency to 8 workers for faster embedding backfill
- document embeddings workflow in README

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

zzstoatzz 009ecffc 1394d094

+109 -45
+31 -2
README.md
··· 25 25 ## api 26 26 27 27 ``` 28 - GET /search?q=<query>&tag=<tag> # search with query, tag, or both 28 + GET /search?q=<query>&tag=<tag> # full-text search with query, tag, or both 29 + GET /similar?uri=<at-uri> # find similar documents via vector embeddings 29 30 GET /tags # list all tags with counts 31 + GET /popular # popular search queries 30 32 GET /stats # document/publication counts 31 33 GET /health # health check 32 34 ``` 33 35 34 36 search returns three entity types: `article` (document in a publication), `looseleaf` (standalone document), `publication` (newsletter itself). tag filtering applies to documents only. 35 37 38 + `/similar` uses [Voyage AI](https://voyageai.com) embeddings with brute-force cosine similarity (~0.15s for 3500 docs). 39 + 36 40 ## [stack](https://bsky.app/profile/zzstoatzz.io/post/3mbij5ip4ws2a) 37 41 38 42 - [Fly.io](https://fly.io) hosts backend + tap 39 - - [Turso](https://turso.tech) cloud SQLite 43 + - [Turso](https://turso.tech) cloud SQLite with vector support 44 + - [Voyage AI](https://voyageai.com) embeddings (voyage-3-lite) 40 45 - [Tap](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) syncs leaflet content from ATProto firehose 41 46 - [Zig](https://ziglang.org) HTTP server, search API, content indexing 42 47 - [Cloudflare Pages](https://pages.cloudflare.com) static frontend 48 + 49 + ## embeddings 50 + 51 + documents are embedded using Voyage AI's `voyage-3-lite` model (512 dimensions). new documents from the firehose don't automatically get embeddings - they need to be backfilled periodically. 52 + 53 + ### backfill embeddings 54 + 55 + requires `TURSO_URL`, `TURSO_TOKEN`, and `VOYAGE_API_KEY` in `.env`: 56 + 57 + ```bash 58 + # check how many docs need embeddings 59 + ./scripts/backfill-embeddings --dry-run 60 + 61 + # run the backfill (uses batching + concurrency) 62 + ./scripts/backfill-embeddings --batch-size 50 63 + ``` 64 + 65 + the script: 66 + - fetches docs where `embedding IS NULL` 67 + - batches them to Voyage API (50 docs/batch default) 68 + - writes embeddings to Turso in batched transactions 69 + - runs 8 concurrent workers 70 + 71 + **note:** we use brute-force cosine similarity instead of a vector index. Turso's DiskANN index has ~60s write latency per row, making it impractical for incremental updates. brute-force on 3500 vectors runs in ~0.15s which is fine for this scale.
+15 -12
backend/src/search.zig
··· 175 175 } 176 176 177 177 /// Find documents similar to a given document using vector similarity 178 + /// Uses brute-force cosine distance (no index required, ~7s for 3500 docs) 178 179 pub fn findSimilar(alloc: Allocator, uri: []const u8, limit: usize) ![]const u8 { 179 180 const c = db.getClient() orelse return error.NotInitialized; 180 181 ··· 182 183 errdefer output.deinit(); 183 184 184 185 var limit_buf: [8]u8 = undefined; 185 - const limit_str = std.fmt.bufPrint(&limit_buf, "{d}", .{limit + 1}) catch "6"; // +1 to exclude self 186 + const limit_str = std.fmt.bufPrint(&limit_buf, "{d}", .{limit}) catch "5"; 186 187 187 - // vector similarity search using the document's embedding 188 - // note: CAST required because Hrana sends all values as text 188 + // brute-force cosine similarity search (no vector index needed) 189 189 var res = c.query( 190 - \\SELECT d.uri, d.did, d.title, '' as snippet, 191 - \\ d.created_at, d.rkey, COALESCE(p.base_path, '') as base_path, 192 - \\ CASE WHEN d.publication_uri != '' THEN 1 ELSE 0 END as has_publication 193 - \\FROM vector_top_k('documents_embedding_idx', 194 - \\ (SELECT embedding FROM documents WHERE uri = ?), CAST(? AS INTEGER)) AS v 195 - \\JOIN documents d ON d.rowid = v.id 196 - \\LEFT JOIN publications p ON d.publication_uri = p.uri 197 - \\WHERE d.uri != ? 198 - , &.{ uri, limit_str, uri }) catch { 190 + \\SELECT d2.uri, d2.did, d2.title, '' as snippet, 191 + \\ d2.created_at, d2.rkey, COALESCE(p.base_path, '') as base_path, 192 + \\ CASE WHEN d2.publication_uri != '' THEN 1 ELSE 0 END as has_publication 193 + \\FROM documents d1, documents d2 194 + \\LEFT JOIN publications p ON d2.publication_uri = p.uri 195 + \\WHERE d1.uri = ? 196 + \\ AND d2.uri != d1.uri 197 + \\ AND d1.embedding IS NOT NULL 198 + \\ AND d2.embedding IS NOT NULL 199 + \\ORDER BY vector_distance_cos(d1.embedding, d2.embedding) 200 + \\LIMIT ? 201 + , &.{ uri, limit_str }) catch { 199 202 try output.writer.writeAll("[]"); 200 203 return try output.toOwnedSlice(); 201 204 };
+63 -31
scripts/backfill-embeddings
··· 75 75 return [dict(zip(cols, [extract_value(cell) for cell in row])) for row in rows] 76 76 77 77 78 - def turso_exec(settings: Settings, sql: str, args: list | None = None) -> None: 79 - """Execute a statement against Turso.""" 80 - stmt = {"sql": sql} 81 - if args: 82 - stmt["args"] = [{"type": "text", "value": str(a)} for a in args] 78 + def turso_exec(settings: Settings, sql: str, args: list | None = None, retries: int = 3) -> None: 79 + """Execute a statement against Turso with retry logic.""" 80 + turso_batch_exec(settings, [(sql, args)], retries) 83 81 84 - response = httpx.post( 85 - f"https://{settings.turso_host}/v2/pipeline", 86 - headers={ 87 - "Authorization": f"Bearer {settings.turso_token}", 88 - "Content-Type": "application/json", 89 - }, 90 - json={"requests": [{"type": "execute", "stmt": stmt}, {"type": "close"}]}, 91 - timeout=30, 92 - ) 93 - response.raise_for_status() 94 - data = response.json() 95 - result = data["results"][0] 96 - if result["type"] == "error": 97 - raise Exception(f"Turso error: {result['error']}") 82 + 83 + def turso_batch_exec(settings: Settings, statements: list[tuple[str, list | None]], retries: int = 3) -> None: 84 + """Execute multiple statements in a single pipeline request.""" 85 + import time 86 + 87 + requests = [] 88 + for sql, args in statements: 89 + stmt = {"sql": sql} 90 + if args: 91 + stmt["args"] = [{"type": "text", "value": str(a)} for a in args] 92 + requests.append({"type": "execute", "stmt": stmt}) 93 + requests.append({"type": "close"}) 94 + 95 + for attempt in range(retries): 96 + try: 97 + response = httpx.post( 98 + f"https://{settings.turso_host}/v2/pipeline", 99 + headers={ 100 + "Authorization": f"Bearer {settings.turso_token}", 101 + "Content-Type": "application/json", 102 + }, 103 + json={"requests": requests}, 104 + timeout=120, 105 + ) 106 + response.raise_for_status() 107 + data = response.json() 108 + for i, result in enumerate(data["results"][:-1]): # skip the close result 109 + if result["type"] == "error": 110 + raise Exception(f"Turso error on statement {i}: {result['error']}") 111 + return 112 + except (httpx.ReadTimeout, httpx.ConnectTimeout, httpx.ConnectError) as e: 113 + if attempt < retries - 1: 114 + wait = 2 ** (attempt + 1) 115 + print(f" {type(e).__name__}, retrying in {wait}s...") 116 + time.sleep(wait) 117 + else: 118 + raise 98 119 99 120 100 121 def voyage_embed(settings: Settings, texts: list[str]) -> list[list[float]]: ··· 162 183 print(f" ... and {len(docs) - 10} more") 163 184 return 164 185 165 - # process in batches 166 - processed = 0 167 - for i in range(0, len(docs), args.batch_size): 168 - batch = docs[i : i + args.batch_size] 169 - texts = [f"{doc['title']} {doc['content']}" for doc in batch] 186 + # process in batches with concurrency 187 + from concurrent.futures import ThreadPoolExecutor, as_completed 170 188 171 - print(f"embedding batch {i // args.batch_size + 1} ({len(batch)} docs)...") 189 + def process_batch(batch_info): 190 + batch_num, batch = batch_info 191 + texts = [f"{doc['title']} {doc['content']}" for doc in batch] 172 192 embeddings = voyage_embed(settings, texts) 173 - 193 + statements = [] 174 194 for doc, embedding in zip(batch, embeddings): 175 195 embedding_json = json.dumps(embedding) 176 - turso_exec( 177 - settings, 196 + statements.append(( 178 197 "UPDATE documents SET embedding = vector32(?) WHERE uri = ?", 179 198 [embedding_json, doc["uri"]], 180 - ) 181 - processed += 1 199 + )) 200 + turso_batch_exec(settings, statements) 201 + return batch_num, len(batch) 202 + 203 + batches = [(i // args.batch_size + 1, docs[i : i + args.batch_size]) 204 + for i in range(0, len(docs), args.batch_size)] 205 + 206 + processed = 0 207 + workers = min(8, len(batches)) # more workers now that index is dropped 208 + print(f"processing {len(batches)} batches with {workers} workers...") 182 209 183 - print(f" updated {processed}/{len(docs)}") 210 + with ThreadPoolExecutor(max_workers=workers) as executor: 211 + futures = {executor.submit(process_batch, b): b[0] for b in batches} 212 + for future in as_completed(futures): 213 + batch_num, count = future.result() 214 + processed += count 215 + print(f"batch {batch_num} done ({processed}/{len(docs)})", flush=True) 184 216 185 217 print(f"done! processed {processed} documents") 186 218