search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: automatic embedding generation via background worker

- add embedder.zig: background worker that generates embeddings via Voyage API
- polls for documents missing embeddings, batches them, updates Turso
- content truncation to stay under Voyage token limits
- fix indexer to use ON CONFLICT instead of INSERT OR REPLACE
(INSERT OR REPLACE was nuking embeddings on document updates)

no more dependency on manual backfill scripts for new documents.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

zzstoatzz a4509a48 0469d2f4

+293 -1
+273
backend/src/embedder.zig
··· 1 + //! Background worker for generating document embeddings via Voyage AI. 2 + //! 3 + //! Periodically queries for documents missing embeddings, batches them, 4 + //! calls the Voyage API, and updates Turso with the results. 5 + 6 + const std = @import("std"); 7 + const http = std.http; 8 + const json = std.json; 9 + const mem = std.mem; 10 + const posix = std.posix; 11 + const Allocator = mem.Allocator; 12 + const db = @import("db/mod.zig"); 13 + 14 + // voyage-3-lite limits 15 + const MAX_BATCH_SIZE = 20; // conservative batch size for reliability 16 + const MAX_CONTENT_CHARS = 8000; // ~2000 tokens, well under 32K limit 17 + const EMBEDDING_DIM = 512; 18 + const POLL_INTERVAL_SECS: u64 = 60; // check for new docs every minute 19 + const ERROR_BACKOFF_SECS: u64 = 300; // 5 min backoff on errors 20 + 21 + /// Start the embedder background worker 22 + pub fn start(allocator: Allocator) void { 23 + const api_key = posix.getenv("VOYAGE_API_KEY") orelse { 24 + std.debug.print("embedder: VOYAGE_API_KEY not set, embeddings disabled\n", .{}); 25 + return; 26 + }; 27 + 28 + const thread = std.Thread.spawn(.{}, worker, .{ allocator, api_key }) catch |err| { 29 + std.debug.print("embedder: failed to start thread: {}\n", .{err}); 30 + return; 31 + }; 32 + thread.detach(); 33 + std.debug.print("embedder: background worker started\n", .{}); 34 + } 35 + 36 + fn worker(allocator: Allocator, api_key: []const u8) void { 37 + // wait for db to be ready 38 + std.Thread.sleep(5 * std.time.ns_per_s); 39 + 40 + var consecutive_errors: u32 = 0; 41 + 42 + while (true) { 43 + const processed = processNextBatch(allocator, api_key) catch |err| { 44 + consecutive_errors += 1; 45 + const backoff: u64 = @min(ERROR_BACKOFF_SECS * consecutive_errors, 3600); 46 + std.debug.print("embedder: error {}, backing off {}s\n", .{ err, backoff }); 47 + std.Thread.sleep(backoff * std.time.ns_per_s); 48 + continue; 49 + }; 50 + 51 + if (processed > 0) { 52 + consecutive_errors = 0; 53 + std.debug.print("embedder: processed {} documents\n", .{processed}); 54 + // immediately check for more 55 + continue; 56 + } 57 + 58 + // no work, sleep 59 + consecutive_errors = 0; 60 + std.Thread.sleep(POLL_INTERVAL_SECS * std.time.ns_per_s); 61 + } 62 + } 63 + 64 + const DocToEmbed = struct { 65 + uri: []const u8, 66 + text: []const u8, // title + " " + content (truncated) 67 + }; 68 + 69 + fn processNextBatch(allocator: Allocator, api_key: []const u8) !usize { 70 + const client = db.getClient() orelse return error.NoClient; 71 + 72 + // query for documents needing embeddings 73 + var result = try client.query( 74 + "SELECT uri, title, content FROM documents WHERE embedding IS NULL LIMIT ?", 75 + &.{std.fmt.comptimePrint("{}", .{MAX_BATCH_SIZE})}, 76 + ); 77 + defer result.deinit(); 78 + 79 + // collect documents 80 + var docs: std.ArrayList(DocToEmbed) = .empty; 81 + defer { 82 + for (docs.items) |doc| { 83 + allocator.free(doc.text); 84 + } 85 + docs.deinit(allocator); 86 + } 87 + 88 + for (result.rows) |row| { 89 + const uri = row.text(0); 90 + const title = row.text(1); 91 + const content = row.text(2); 92 + 93 + // build text for embedding: title + content, truncated 94 + const text = try buildEmbeddingText(allocator, title, content); 95 + try docs.append(allocator, .{ .uri = uri, .text = text }); 96 + } 97 + 98 + if (docs.items.len == 0) return 0; 99 + 100 + // call Voyage API 101 + const embeddings = try callVoyageApi(allocator, api_key, docs.items); 102 + defer { 103 + for (embeddings) |e| allocator.free(e); 104 + allocator.free(embeddings); 105 + } 106 + 107 + // update Turso with embeddings 108 + for (docs.items, embeddings) |doc, embedding| { 109 + updateDocumentEmbedding(client, doc.uri, embedding) catch |err| { 110 + std.debug.print("embedder: failed to update {s}: {}\n", .{ doc.uri, err }); 111 + }; 112 + } 113 + 114 + return docs.items.len; 115 + } 116 + 117 + fn buildEmbeddingText(allocator: Allocator, title: []const u8, content: []const u8) ![]u8 { 118 + // truncate content if needed 119 + const max_content = MAX_CONTENT_CHARS -| title.len -| 1; 120 + const truncated_content = if (content.len > max_content) content[0..max_content] else content; 121 + 122 + const text = try allocator.alloc(u8, title.len + 1 + truncated_content.len); 123 + @memcpy(text[0..title.len], title); 124 + text[title.len] = ' '; 125 + @memcpy(text[title.len + 1 ..], truncated_content); 126 + return text; 127 + } 128 + 129 + fn callVoyageApi(allocator: Allocator, api_key: []const u8, docs: []const DocToEmbed) ![][]f32 { 130 + var http_client: http.Client = .{ .allocator = allocator }; 131 + defer http_client.deinit(); 132 + 133 + // build request body 134 + const body = try buildVoyageRequest(allocator, docs); 135 + defer allocator.free(body); 136 + 137 + // prepare auth header 138 + var auth_buf: [256]u8 = undefined; 139 + const auth = std.fmt.bufPrint(&auth_buf, "Bearer {s}", .{api_key}) catch 140 + return error.AuthTooLong; 141 + 142 + // make request 143 + var response_body: std.Io.Writer.Allocating = .init(allocator); 144 + errdefer response_body.deinit(); 145 + 146 + const res = http_client.fetch(.{ 147 + .location = .{ .url = "https://api.voyageai.com/v1/embeddings" }, 148 + .method = .POST, 149 + .headers = .{ 150 + .content_type = .{ .override = "application/json" }, 151 + .authorization = .{ .override = auth }, 152 + }, 153 + .payload = body, 154 + .response_writer = &response_body.writer, 155 + }) catch |err| { 156 + std.debug.print("embedder: voyage request failed: {}\n", .{err}); 157 + return error.VoyageRequestFailed; 158 + }; 159 + 160 + if (res.status != .ok) { 161 + const resp_text = response_body.toOwnedSlice() catch ""; 162 + defer if (resp_text.len > 0) allocator.free(resp_text); 163 + std.debug.print("embedder: voyage error {}: {s}\n", .{ res.status, resp_text[0..@min(resp_text.len, 200)] }); 164 + return error.VoyageApiError; 165 + } 166 + 167 + const response_text = try response_body.toOwnedSlice(); 168 + defer allocator.free(response_text); 169 + 170 + return parseVoyageResponse(allocator, response_text, docs.len); 171 + } 172 + 173 + fn buildVoyageRequest(allocator: Allocator, docs: []const DocToEmbed) ![]const u8 { 174 + var body: std.Io.Writer.Allocating = .init(allocator); 175 + errdefer body.deinit(); 176 + var jw: json.Stringify = .{ .writer = &body.writer, .options = .{} }; 177 + 178 + try jw.beginObject(); 179 + 180 + try jw.objectField("model"); 181 + try jw.write("voyage-3-lite"); 182 + 183 + try jw.objectField("input_type"); 184 + try jw.write("document"); 185 + 186 + try jw.objectField("input"); 187 + try jw.beginArray(); 188 + for (docs) |doc| { 189 + try jw.write(doc.text); 190 + } 191 + try jw.endArray(); 192 + 193 + try jw.endObject(); 194 + 195 + return try body.toOwnedSlice(); 196 + } 197 + 198 + fn parseVoyageResponse(allocator: Allocator, response: []const u8, expected_count: usize) ![][]f32 { 199 + const parsed = json.parseFromSlice(json.Value, allocator, response, .{}) catch { 200 + std.debug.print("embedder: failed to parse voyage response\n", .{}); 201 + return error.ParseError; 202 + }; 203 + defer parsed.deinit(); 204 + 205 + const data = parsed.value.object.get("data") orelse return error.MissingData; 206 + if (data != .array) return error.InvalidData; 207 + 208 + if (data.array.items.len != expected_count) { 209 + std.debug.print("embedder: expected {} embeddings, got {}\n", .{ expected_count, data.array.items.len }); 210 + return error.CountMismatch; 211 + } 212 + 213 + const embeddings = try allocator.alloc([]f32, expected_count); 214 + errdefer { 215 + for (embeddings) |e| allocator.free(e); 216 + allocator.free(embeddings); 217 + } 218 + 219 + for (data.array.items, 0..) |item, i| { 220 + const embedding_val = item.object.get("embedding") orelse return error.MissingEmbedding; 221 + if (embedding_val != .array) return error.InvalidEmbedding; 222 + 223 + const embedding = try allocator.alloc(f32, EMBEDDING_DIM); 224 + errdefer allocator.free(embedding); 225 + 226 + if (embedding_val.array.items.len != EMBEDDING_DIM) { 227 + std.debug.print("embedder: expected {} dims, got {}\n", .{ EMBEDDING_DIM, embedding_val.array.items.len }); 228 + return error.DimensionMismatch; 229 + } 230 + 231 + for (embedding_val.array.items, 0..) |val, j| { 232 + embedding[j] = switch (val) { 233 + .float => @floatCast(val.float), 234 + .integer => @floatFromInt(val.integer), 235 + else => return error.InvalidValue, 236 + }; 237 + } 238 + embeddings[i] = embedding; 239 + } 240 + 241 + return embeddings; 242 + } 243 + 244 + fn updateDocumentEmbedding(client: *db.Client, uri: []const u8, embedding: []f32) !void { 245 + const allocator = client.allocator; 246 + 247 + // serialize embedding to JSON array string for vector32() 248 + var embedding_json: std.ArrayList(u8) = .empty; 249 + defer embedding_json.deinit(allocator); 250 + 251 + try embedding_json.append(allocator, '['); 252 + for (embedding, 0..) |val, i| { 253 + if (i > 0) try embedding_json.append(allocator, ','); 254 + var buf: [32]u8 = undefined; 255 + const str = std.fmt.bufPrint(&buf, "{d:.6}", .{val}) catch continue; 256 + try embedding_json.appendSlice(allocator, str); 257 + } 258 + try embedding_json.append(allocator, ']'); 259 + 260 + // use batch API to execute dynamic SQL 261 + const statements = [_]db.Client.Statement{ 262 + .{ 263 + .sql = "UPDATE documents SET embedding = vector32(?) WHERE uri = ?", 264 + .args = &.{ embedding_json.items, uri }, 265 + }, 266 + }; 267 + 268 + var result = client.queryBatch(&statements) catch |err| { 269 + std.debug.print("embedder: update failed for {s}: {}\n", .{ uri, err }); 270 + return err; 271 + }; 272 + defer result.deinit(); 273 + }
+16 -1
backend/src/indexer.zig
··· 111 111 } 112 112 } 113 113 114 + // use ON CONFLICT to preserve embedding column (INSERT OR REPLACE would nuke it) 114 115 try c.exec( 115 - "INSERT OR REPLACE INTO documents (uri, did, rkey, title, content, created_at, publication_uri, platform, source_collection, path, base_path, has_publication) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 116 + \\INSERT INTO documents (uri, did, rkey, title, content, created_at, publication_uri, platform, source_collection, path, base_path, has_publication) 117 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 118 + \\ON CONFLICT(uri) DO UPDATE SET 119 + \\ did = excluded.did, 120 + \\ rkey = excluded.rkey, 121 + \\ title = excluded.title, 122 + \\ content = excluded.content, 123 + \\ created_at = excluded.created_at, 124 + \\ publication_uri = excluded.publication_uri, 125 + \\ platform = excluded.platform, 126 + \\ source_collection = excluded.source_collection, 127 + \\ path = excluded.path, 128 + \\ base_path = excluded.base_path, 129 + \\ has_publication = excluded.has_publication 130 + , 116 131 &.{ uri, did, rkey, title, content, created_at orelse "", pub_uri, actual_platform, source_collection, path orelse "", base_path, has_pub }, 117 132 ); 118 133
+4
backend/src/main.zig
··· 6 6 const activity = @import("activity.zig"); 7 7 const server = @import("server.zig"); 8 8 const tap = @import("tap.zig"); 9 + const embedder = @import("embedder.zig"); 9 10 10 11 const MAX_HTTP_WORKERS = 16; 11 12 const SOCKET_TIMEOUT_SECS = 30; ··· 67 68 68 69 // start activity tracker 69 70 activity.init(); 71 + 72 + // start embedder (generates embeddings for new docs) 73 + embedder.start(allocator); 70 74 71 75 // start tap consumer 72 76 tap.consumer(allocator);