search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

perf: non-blocking stats writes and local-first similarity cache

- add stats_buffer.zig: atomic counters + background sync to Turso (5s interval)
- stats.zig: recordSearch/recordCacheHit/etc now instant (~1us) via buffer
- LocalDb.zig: add similarity_cache table for local caching
- search.zig: local-first cache lookups + cached doc count (5min refresh)
- sync.zig: sync similarity_cache from Turso on startup

before: search latency 100-7600ms (blocked on Turso stats writes)
after: search latency <10ms (stats buffered, synced in background)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

zzstoatzz c141096d 901ef622

+297 -30
+10
backend/src/db/LocalDb.zig
··· 220 220 \\ count INTEGER DEFAULT 1 221 221 \\) 222 222 , .{}) catch {}; 223 + 224 + // similarity cache (local copy for fast lookups) 225 + c.exec( 226 + \\CREATE TABLE IF NOT EXISTS similarity_cache ( 227 + \\ source_uri TEXT PRIMARY KEY, 228 + \\ results TEXT NOT NULL, 229 + \\ doc_count INTEGER NOT NULL, 230 + \\ computed_at INTEGER NOT NULL 231 + \\) 232 + , .{}) catch {}; 223 233 } 224 234 225 235 /// Row adapter matching result.Row interface (column-indexed access)
+26 -1
backend/src/db/sync.zig
··· 138 138 } 139 139 } 140 140 141 + // sync similarity cache 142 + var cache_count: usize = 0; 143 + { 144 + conn.exec("DELETE FROM similarity_cache", .{}) catch {}; 145 + 146 + if (turso.query( 147 + "SELECT source_uri, results, doc_count, computed_at FROM similarity_cache", 148 + &.{}, 149 + )) |res_val| { 150 + var res = res_val; 151 + defer res.deinit(); 152 + 153 + for (res.rows) |row| { 154 + conn.exec( 155 + "INSERT OR REPLACE INTO similarity_cache (source_uri, results, doc_count, computed_at) VALUES (?, ?, ?, ?)", 156 + .{ row.text(0), row.text(1), row.text(2), row.text(3) }, 157 + ) catch {}; 158 + cache_count += 1; 159 + } 160 + } else |err| { 161 + std.debug.print("sync: turso similarity_cache query failed: {}\n", .{err}); 162 + // continue anyway - cache isn't critical 163 + } 164 + } 165 + 141 166 // record sync time 142 167 var ts_buf: [20]u8 = undefined; 143 168 const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; ··· 157 182 }; 158 183 159 184 local.setReady(true); 160 - std.debug.print("sync: full sync complete - {d} docs, {d} pubs, {d} tags, {d} popular\n", .{ doc_count, pub_count, tag_count, popular_count }); 185 + std.debug.print("sync: full sync complete - {d} docs, {d} pubs, {d} tags, {d} popular, {d} cached\n", .{ doc_count, pub_count, tag_count, popular_count, cache_count }); 161 186 } 162 187 163 188 /// Incremental sync: fetch documents created since last sync
+4
backend/src/main.zig
··· 5 5 const logfire = @import("logfire"); 6 6 const db = @import("db/mod.zig"); 7 7 const activity = @import("activity.zig"); 8 + const stats_buffer = @import("stats_buffer.zig"); 8 9 const server = @import("server.zig"); 9 10 const tap = @import("tap.zig"); 10 11 const embedder = @import("embedder.zig"); ··· 78 79 79 80 // start activity tracker 80 81 activity.init(); 82 + 83 + // start stats buffer (background sync to Turso) 84 + stats_buffer.init(); 81 85 82 86 // start embedder (generates embeddings for new docs) 83 87 embedder.start(allocator);
+69 -4
backend/src/search.zig
··· 5 5 const db = @import("db/mod.zig"); 6 6 const stats = @import("stats.zig"); 7 7 8 + // cached embedded doc count (refresh every 5 minutes) 9 + var cached_doc_count: std.atomic.Value(i64) = std.atomic.Value(i64).init(0); 10 + var doc_count_updated_at: std.atomic.Value(i64) = std.atomic.Value(i64).init(0); 11 + const DOC_COUNT_CACHE_SECS = 300; // 5 minutes 12 + 8 13 // JSON output type for search results 9 14 const SearchResultJson = struct { 10 15 type: []const u8, ··· 565 570 pub fn findSimilar(alloc: Allocator, uri: []const u8, limit: usize) ![]const u8 { 566 571 const c = db.getClient() orelse return error.NotInitialized; 567 572 568 - // get current doc count (for cache invalidation) 569 - const doc_count = getEmbeddedDocCount(c) orelse return error.QueryFailed; 573 + // get cached doc count (rarely hits Turso - refreshes every 5 min) 574 + const doc_count = getEmbeddedDocCountCached(c) orelse return error.QueryFailed; 575 + 576 + // check LOCAL cache first (instant) 577 + if (db.getLocalDb()) |local| { 578 + if (getCachedSimilarLocal(alloc, local, uri, doc_count)) |cached| { 579 + stats.recordCacheHit(); 580 + return cached; 581 + } 582 + } 570 583 571 - // check cache 584 + // check Turso cache (slower, but needed if local empty) 572 585 if (getCachedSimilar(alloc, c, uri, doc_count)) |cached| { 573 586 stats.recordCacheHit(); 587 + // also write to local cache for next time 588 + if (db.getLocalDb()) |local| { 589 + cacheSimilarResultsLocal(local, uri, cached, doc_count); 590 + } 574 591 return cached; 575 592 } 576 593 stats.recordCacheMiss(); ··· 607 624 608 625 const results = try output.toOwnedSlice(); 609 626 610 - // cache the results (fire and forget) 627 + // cache to LOCAL db (instant) 628 + if (db.getLocalDb()) |local| { 629 + cacheSimilarResultsLocal(local, uri, results, doc_count); 630 + } 631 + 632 + // cache to Turso (fire and forget - still useful for durability) 611 633 cacheSimilarResults(c, uri, results, doc_count); 612 634 613 635 return results; ··· 620 642 return res.rows[0].int(0); 621 643 } 622 644 645 + fn getEmbeddedDocCountCached(c: *db.Client) ?i64 { 646 + const now = std.time.timestamp(); 647 + const last_update = doc_count_updated_at.load(.acquire); 648 + 649 + // use cached value if fresh enough 650 + if (now - last_update < DOC_COUNT_CACHE_SECS) { 651 + const cached = cached_doc_count.load(.acquire); 652 + if (cached > 0) return cached; 653 + } 654 + 655 + // refresh from Turso 656 + const count = getEmbeddedDocCount(c) orelse return null; 657 + cached_doc_count.store(count, .release); 658 + doc_count_updated_at.store(now, .release); 659 + return count; 660 + } 661 + 623 662 fn getCachedSimilar(alloc: Allocator, c: *db.Client, uri: []const u8, current_doc_count: i64) ?[]const u8 { 624 663 var count_buf: [20]u8 = undefined; 625 664 const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{current_doc_count}) catch return null; ··· 644 683 c.exec( 645 684 "INSERT OR REPLACE INTO similarity_cache (source_uri, results, doc_count, computed_at) VALUES (?, ?, ?, ?)", 646 685 &.{ uri, results, count_str, ts_str }, 686 + ) catch {}; 687 + } 688 + 689 + fn getCachedSimilarLocal(alloc: Allocator, local: *db.LocalDb, uri: []const u8, current_doc_count: i64) ?[]const u8 { 690 + var rows = local.query( 691 + "SELECT results, doc_count FROM similarity_cache WHERE source_uri = ?", 692 + .{uri}, 693 + ) catch return null; 694 + defer rows.deinit(); 695 + 696 + const row = rows.next() orelse return null; 697 + // check doc_count matches for cache validity 698 + if (row.int(1) != current_doc_count) return null; 699 + return alloc.dupe(u8, row.text(0)) catch null; 700 + } 701 + 702 + fn cacheSimilarResultsLocal(local: *db.LocalDb, uri: []const u8, results: []const u8, doc_count: i64) void { 703 + var count_buf: [20]u8 = undefined; 704 + const count_str = std.fmt.bufPrint(&count_buf, "{d}", .{doc_count}) catch return; 705 + 706 + var ts_buf: [20]u8 = undefined; 707 + const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch return; 708 + 709 + local.exec( 710 + "INSERT OR REPLACE INTO similarity_cache (source_uri, results, doc_count, computed_at) VALUES (?, ?, ?, ?)", 711 + .{ uri, results, count_str, ts_str }, 647 712 ) catch {}; 648 713 } 649 714
+16 -25
backend/src/stats.zig
··· 4 4 const zql = @import("zql"); 5 5 const db = @import("db/mod.zig"); 6 6 const activity = @import("activity.zig"); 7 + const stats_buffer = @import("stats_buffer.zig"); 7 8 8 9 const TagJson = struct { tag: []const u8, count: i64 }; 9 10 const PopularJson = struct { query: []const u8, count: i64 }; ··· 103 104 defer res.deinit(); 104 105 105 106 const row = res.first() orelse return default_stats; 107 + // include pending deltas from buffer 106 108 return .{ 107 109 .documents = row.int(0), 108 110 .publications = row.int(1), 109 111 .embeddings = row.int(2), 110 - .searches = row.int(3), 111 - .errors = row.int(4), 112 + .searches = stats_buffer.getSearchCount(row.int(3)), 113 + .errors = stats_buffer.getErrorCount(row.int(4)), 112 114 .started_at = row.int(5), 113 - .cache_hits = row.int(6), 114 - .cache_misses = row.int(7), 115 + .cache_hits = stats_buffer.getCacheHitCount(row.int(6)), 116 + .cache_misses = stats_buffer.getCacheMissCount(row.int(7)), 115 117 }; 116 118 } 117 119 ··· 136 138 defer rows.deinit(); 137 139 const row = rows.next() orelse return error.NoRows; 138 140 141 + // include pending deltas from buffer 139 142 return .{ 140 143 .documents = row.int(0), 141 144 .publications = row.int(1), 142 145 .embeddings = row.int(2), 143 - .searches = stats_row.int(0), 144 - .errors = stats_row.int(1), 146 + .searches = stats_buffer.getSearchCount(stats_row.int(0)), 147 + .errors = stats_buffer.getErrorCount(stats_row.int(1)), 145 148 .started_at = stats_row.int(2), 146 - .cache_hits = stats_row.int(3), 147 - .cache_misses = stats_row.int(4), 149 + .cache_hits = stats_buffer.getCacheHitCount(stats_row.int(3)), 150 + .cache_misses = stats_buffer.getCacheMissCount(stats_row.int(4)), 148 151 }; 149 152 } 150 153 151 154 pub fn recordSearch(query: []const u8) void { 152 - const c = db.getClient() orelse return; 153 - 154 155 activity.record(); 155 - c.exec("UPDATE stats SET total_searches = total_searches + 1 WHERE id = 1", &.{}) catch {}; 156 - 157 - // track popular searches (skip empty/very short queries) 158 - if (query.len >= 2) { 159 - c.exec( 160 - "INSERT INTO popular_searches (query, count) VALUES (?, 1) ON CONFLICT(query) DO UPDATE SET count = count + 1", 161 - &.{query}, 162 - ) catch {}; 163 - } 156 + stats_buffer.recordSearch(); 157 + stats_buffer.queuePopularSearch(query); 164 158 } 165 159 166 160 pub fn recordError() void { 167 - const c = db.getClient() orelse return; 168 - c.exec("UPDATE stats SET total_errors = total_errors + 1 WHERE id = 1", &.{}) catch {}; 161 + stats_buffer.recordError(); 169 162 } 170 163 171 164 pub fn recordCacheHit() void { 172 - const c = db.getClient() orelse return; 173 - c.exec("UPDATE stats SET cache_hits = COALESCE(cache_hits, 0) + 1 WHERE id = 1", &.{}) catch {}; 165 + stats_buffer.recordCacheHit(); 174 166 } 175 167 176 168 pub fn recordCacheMiss() void { 177 - const c = db.getClient() orelse return; 178 - c.exec("UPDATE stats SET cache_misses = COALESCE(cache_misses, 0) + 1 WHERE id = 1", &.{}) catch {}; 169 + stats_buffer.recordCacheMiss(); 179 170 } 180 171 181 172 const PlatformCount = struct { platform: []const u8, count: i64 };
+172
backend/src/stats_buffer.zig
··· 1 + //! Buffered stats with background sync to Turso 2 + //! Follows activity.zig pattern: instant local writes, periodic remote sync 3 + 4 + const std = @import("std"); 5 + const db = @import("db/mod.zig"); 6 + const logfire = @import("logfire"); 7 + 8 + const SYNC_INTERVAL_MS = 5000; // 5 seconds 9 + const MAX_PENDING_SEARCHES = 256; 10 + 11 + // atomic deltas (since last sync) 12 + var delta_searches: std.atomic.Value(i64) = std.atomic.Value(i64).init(0); 13 + var delta_errors: std.atomic.Value(i64) = std.atomic.Value(i64).init(0); 14 + var delta_cache_hits: std.atomic.Value(i64) = std.atomic.Value(i64).init(0); 15 + var delta_cache_misses: std.atomic.Value(i64) = std.atomic.Value(i64).init(0); 16 + 17 + // popular searches ring buffer 18 + var pending_searches: [MAX_PENDING_SEARCHES]?[]const u8 = .{null} ** MAX_PENDING_SEARCHES; 19 + var search_write_idx: usize = 0; 20 + var search_read_idx: usize = 0; 21 + var search_mutex: std.Thread.Mutex = .{}; 22 + 23 + // allocator for search string copies 24 + var gpa: std.heap.GeneralPurposeAllocator(.{}) = .{}; 25 + 26 + var sync_thread: ?std.Thread = null; 27 + 28 + pub fn init() void { 29 + sync_thread = std.Thread.spawn(.{}, syncLoop, .{}) catch |err| { 30 + logfire.warn("stats_buffer: failed to start sync thread: {}", .{err}); 31 + return; 32 + }; 33 + if (sync_thread) |t| t.detach(); 34 + logfire.info("stats_buffer: initialized with {d}ms sync interval", .{SYNC_INTERVAL_MS}); 35 + } 36 + 37 + // instant, non-blocking increments 38 + pub fn recordSearch() void { 39 + _ = delta_searches.fetchAdd(1, .monotonic); 40 + } 41 + 42 + pub fn recordError() void { 43 + _ = delta_errors.fetchAdd(1, .monotonic); 44 + } 45 + 46 + pub fn recordCacheHit() void { 47 + _ = delta_cache_hits.fetchAdd(1, .monotonic); 48 + } 49 + 50 + pub fn recordCacheMiss() void { 51 + _ = delta_cache_misses.fetchAdd(1, .monotonic); 52 + } 53 + 54 + // queue popular search (best effort, drops if full) 55 + pub fn queuePopularSearch(query: []const u8) void { 56 + if (query.len < 2) return; 57 + 58 + search_mutex.lock(); 59 + defer search_mutex.unlock(); 60 + 61 + // check if buffer is full 62 + const next_write = (search_write_idx + 1) % MAX_PENDING_SEARCHES; 63 + if (next_write == search_read_idx) { 64 + // buffer full, drop oldest 65 + if (pending_searches[search_read_idx]) |old| { 66 + gpa.allocator().free(old); 67 + pending_searches[search_read_idx] = null; 68 + } 69 + search_read_idx = (search_read_idx + 1) % MAX_PENDING_SEARCHES; 70 + } 71 + 72 + // allocate copy 73 + const copy = gpa.allocator().dupe(u8, query) catch return; 74 + pending_searches[search_write_idx] = copy; 75 + search_write_idx = next_write; 76 + } 77 + 78 + // get current totals (base from db + pending deltas) 79 + pub fn getSearchCount(base: i64) i64 { 80 + return base + delta_searches.load(.acquire); 81 + } 82 + 83 + pub fn getErrorCount(base: i64) i64 { 84 + return base + delta_errors.load(.acquire); 85 + } 86 + 87 + pub fn getCacheHitCount(base: i64) i64 { 88 + return base + delta_cache_hits.load(.acquire); 89 + } 90 + 91 + pub fn getCacheMissCount(base: i64) i64 { 92 + return base + delta_cache_misses.load(.acquire); 93 + } 94 + 95 + fn syncLoop() void { 96 + while (true) { 97 + std.Thread.sleep(SYNC_INTERVAL_MS * std.time.ns_per_ms); 98 + syncToTurso(); 99 + } 100 + } 101 + 102 + fn syncToTurso() void { 103 + const c = db.getClient() orelse return; 104 + 105 + // swap deltas to zero and get values 106 + const searches = delta_searches.swap(0, .acq_rel); 107 + const errors = delta_errors.swap(0, .acq_rel); 108 + const cache_hits = delta_cache_hits.swap(0, .acq_rel); 109 + const cache_misses = delta_cache_misses.swap(0, .acq_rel); 110 + 111 + // sync stats if any changed 112 + if (searches != 0 or errors != 0 or cache_hits != 0 or cache_misses != 0) { 113 + syncStatsDelta(c, searches, errors, cache_hits, cache_misses); 114 + } 115 + 116 + // sync popular searches 117 + syncPopularSearches(c); 118 + } 119 + 120 + fn syncStatsDelta(c: *db.Client, searches: i64, errors: i64, cache_hits: i64, cache_misses: i64) void { 121 + // build SQL with values embedded (safe - these are i64, not user input) 122 + var sql_buf: [512]u8 = undefined; 123 + const sql = std.fmt.bufPrint(&sql_buf, 124 + \\UPDATE stats SET 125 + \\ total_searches = total_searches + {d}, 126 + \\ total_errors = total_errors + {d}, 127 + \\ cache_hits = COALESCE(cache_hits, 0) + {d}, 128 + \\ cache_misses = COALESCE(cache_misses, 0) + {d} 129 + \\WHERE id = 1 130 + , .{ searches, errors, cache_hits, cache_misses }) catch return; 131 + 132 + // use queryBatch which accepts runtime SQL 133 + var statements = [_]db.Client.Statement{.{ .sql = sql, .args = &.{} }}; 134 + var batch = c.queryBatch(&statements) catch |err| { 135 + logfire.warn("stats_buffer: sync failed: {}, restoring deltas", .{err}); 136 + // restore deltas on failure 137 + _ = delta_searches.fetchAdd(searches, .monotonic); 138 + _ = delta_errors.fetchAdd(errors, .monotonic); 139 + _ = delta_cache_hits.fetchAdd(cache_hits, .monotonic); 140 + _ = delta_cache_misses.fetchAdd(cache_misses, .monotonic); 141 + return; 142 + }; 143 + batch.deinit(); 144 + 145 + logfire.debug("stats_buffer: synced deltas (searches={d}, errors={d}, hits={d}, misses={d})", .{ searches, errors, cache_hits, cache_misses }); 146 + } 147 + 148 + fn syncPopularSearches(c: *db.Client) void { 149 + search_mutex.lock(); 150 + defer search_mutex.unlock(); 151 + 152 + var synced: usize = 0; 153 + while (search_read_idx != search_write_idx) { 154 + if (pending_searches[search_read_idx]) |query| { 155 + // sync to Turso 156 + c.exec( 157 + "INSERT INTO popular_searches (query, count) VALUES (?, 1) ON CONFLICT(query) DO UPDATE SET count = count + 1", 158 + &.{query}, 159 + ) catch {}; 160 + 161 + // free and clear 162 + gpa.allocator().free(query); 163 + pending_searches[search_read_idx] = null; 164 + synced += 1; 165 + } 166 + search_read_idx = (search_read_idx + 1) % MAX_PENDING_SEARCHES; 167 + } 168 + 169 + if (synced > 0) { 170 + logfire.debug("stats_buffer: synced {d} popular searches", .{synced}); 171 + } 172 + }