search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: deduplicate semantic search results + clean up rebuild script

- add URI dedup in searchSemantic() (same doc appeared twice from tpuf)
- rewrite scripts/rebuild-vector-index for tpuf namespace reset workflow

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 1467e4b9 efcbca63

+166 -1
+18 -1
backend/src/search.zig
··· 880 880 alloc.free(results); 881 881 } 882 882 883 - // serialize results, filtering by distance + platform, capped at 20 883 + // serialize results, filtering by distance + platform + dedup, capped at 20 884 884 var output: std.Io.Writer.Allocating = .init(alloc); 885 885 errdefer output.deinit(); 886 + 887 + // track seen URIs to deduplicate 888 + var seen: [20][]const u8 = undefined; 889 + var seen_count: usize = 0; 886 890 887 891 var jw: json.Stringify = .{ .writer = &output.writer }; 888 892 try jw.beginArray(); ··· 895 899 if (r.title.len == 0) continue; 896 900 if (platform_filter) |pf| { 897 901 if (!std.mem.eql(u8, r.platform, pf)) continue; 902 + } 903 + // deduplicate by URI 904 + var is_dup = false; 905 + for (seen[0..seen_count]) |s| { 906 + if (std.mem.eql(u8, s, r.uri)) { 907 + is_dup = true; 908 + break; 909 + } 910 + } 911 + if (is_dup) continue; 912 + if (seen_count < 20) { 913 + seen[seen_count] = r.uri; 914 + seen_count += 1; 898 915 } 899 916 count += 1; 900 917 try jw.write(SearchResultJson{
+148
scripts/rebuild-vector-index
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["httpx", "pydantic-settings"] 5 + # /// 6 + """ 7 + Reset the turbopuffer vector index and trigger a full re-embedding. 8 + 9 + This script: 10 + 1. Deletes the turbopuffer namespace (old vectors with wrong dimensions) 11 + 2. Clears embedded_at in turso so the backend embedder re-processes all docs 12 + 13 + The backend embedder will automatically pick up unembedded docs and 14 + re-embed them with the current model on its next poll cycle (~60s). 15 + 16 + Usage: 17 + ./scripts/rebuild-vector-index # delete namespace + clear embedded_at 18 + ./scripts/rebuild-vector-index --check # show current state without changing anything 19 + """ 20 + 21 + import argparse 22 + import os 23 + import subprocess 24 + import sys 25 + 26 + import httpx 27 + from pydantic_settings import BaseSettings, SettingsConfigDict 28 + 29 + 30 + TPUF_NAMESPACE = "leaflet-search" 31 + FLY_APP = "leaflet-search-backend" 32 + 33 + 34 + class Settings(BaseSettings): 35 + model_config = SettingsConfigDict( 36 + env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore" 37 + ) 38 + turso_url: str 39 + turso_token: str 40 + 41 + @property 42 + def turso_host(self) -> str: 43 + url = self.turso_url 44 + if url.startswith("libsql://"): 45 + url = url[len("libsql://"):] 46 + return url 47 + 48 + 49 + def pipeline(settings: Settings, statements: list[str]) -> list[dict]: 50 + requests = [{"type": "execute", "stmt": {"sql": sql}} for sql in statements] 51 + requests.append({"type": "close"}) 52 + 53 + resp = httpx.post( 54 + f"https://{settings.turso_host}/v2/pipeline", 55 + headers={ 56 + "Authorization": f"Bearer {settings.turso_token}", 57 + "Content-Type": "application/json", 58 + }, 59 + json={"requests": requests}, 60 + timeout=60, 61 + ) 62 + resp.raise_for_status() 63 + data = resp.json() 64 + 65 + results = [] 66 + for i, result in enumerate(data["results"][:-1]): 67 + if result["type"] == "error": 68 + raise Exception(f"statement {i} failed: {result['error']}") 69 + results.append(result["response"]["result"]) 70 + return results 71 + 72 + 73 + def scalar(settings: Settings, sql: str) -> int: 74 + results = pipeline(settings, [sql]) 75 + cell = results[0]["rows"][0][0] 76 + return int(cell["value"] if isinstance(cell, dict) else cell) 77 + 78 + 79 + def get_tpuf_key() -> str: 80 + result = subprocess.run( 81 + ["fly", "-a", FLY_APP, "ssh", "console", "-C", "printenv TURBOPUFFER_API_KEY"], 82 + capture_output=True, text=True, 83 + ) 84 + if result.returncode != 0: 85 + raise Exception(f"fly ssh failed: {result.stderr.strip()}") 86 + key = result.stdout.strip().splitlines()[-1].strip() 87 + if not key.startswith("tpuf_"): 88 + raise Exception(f"unexpected key format: {key[:10]}...") 89 + return key 90 + 91 + 92 + def delete_tpuf_namespace(api_key: str) -> str: 93 + resp = httpx.delete( 94 + f"https://api.turbopuffer.com/v2/namespaces/{TPUF_NAMESPACE}", 95 + headers={ 96 + "Authorization": f"Bearer {api_key}", 97 + "Content-Type": "application/json", 98 + }, 99 + timeout=30, 100 + ) 101 + if resp.status_code == 200: 102 + return "deleted" 103 + data = resp.json() 104 + if "not found" in data.get("error", ""): 105 + return "already gone" 106 + raise Exception(f"tpuf delete failed: {data}") 107 + 108 + 109 + def main(): 110 + parser = argparse.ArgumentParser() 111 + parser.add_argument("--check", action="store_true", help="show state without changing anything") 112 + args = parser.parse_args() 113 + 114 + try: 115 + settings = Settings() # type: ignore 116 + except Exception as e: 117 + print(f"error: {e}", file=sys.stderr) 118 + print("required: TURSO_URL, TURSO_TOKEN (or .env file)", file=sys.stderr) 119 + sys.exit(1) 120 + 121 + total = scalar(settings, "SELECT COUNT(*) FROM documents") 122 + embedded = scalar(settings, "SELECT COUNT(*) FROM documents WHERE embedded_at IS NOT NULL") 123 + print(f"documents: {total}, embedded: {embedded}") 124 + 125 + if args.check: 126 + return 127 + 128 + # step 1: get tpuf key 129 + print("getting tpuf key from fly...", end="", flush=True) 130 + tpuf_key = get_tpuf_key() 131 + print(f" ok ({tpuf_key[:10]}...)") 132 + 133 + # step 2: delete namespace 134 + print(f"deleting tpuf namespace '{TPUF_NAMESPACE}'...", end="", flush=True) 135 + status = delete_tpuf_namespace(tpuf_key) 136 + print(f" ok ({status})") 137 + 138 + # step 3: clear embedded_at 139 + print(f"clearing embedded_at...", end="", flush=True) 140 + pipeline(settings, ["UPDATE documents SET embedded_at = NULL"]) 141 + remaining = scalar(settings, "SELECT COUNT(*) FROM documents WHERE embedded_at IS NOT NULL") 142 + print(f" ok ({remaining} remaining)") 143 + 144 + print(f"\ndone. embedder will re-embed {total} docs on next poll (~60s).") 145 + 146 + 147 + if __name__ == "__main__": 148 + main()