search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: exclude bridgy fed content from search results

Add is_bridgyfed column to documents table. Mark at ingest time via
HTTP URL heuristic (only bridgy fed puts HTTP URLs in the site field).
Exclude from all search paths: keyword (turso + local SQLite),
semantic (local DB check in filter loop), and author browse queries.

Includes scripts/mark-bridgyfed for backfilling existing rows via
PLC directory resolution.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

zzstoatzz 2b61f7a8 df72037c

+303 -11
+1
backend/src/db/LocalDb.zig
··· 244 244 c.exec("ALTER TABLE documents ADD COLUMN indexed_at TEXT", .{}) catch {}; 245 245 c.exec("ALTER TABLE documents ADD COLUMN embedded_at TEXT", .{}) catch {}; 246 246 c.exec("ALTER TABLE documents ADD COLUMN cover_image TEXT DEFAULT ''", .{}) catch {}; 247 + c.exec("ALTER TABLE documents ADD COLUMN is_bridgyfed INTEGER DEFAULT 0", .{}) catch {}; 247 248 } 248 249 249 250 /// Row adapter matching result.Row interface (column-indexed access)
+4
backend/src/db/schema.zig
··· 246 246 "UPDATE documents SET indexed_at = created_at WHERE indexed_at IS NULL", 247 247 &.{}, 248 248 ) catch {}; 249 + 250 + // is_bridgyfed: marks documents from bridgy fed (brid.gy PDS) 251 + // 0 = normal, 1 = bridgy fed (excluded from search by default) 252 + client.exec("ALTER TABLE documents ADD COLUMN is_bridgyfed INTEGER DEFAULT 0", &.{}) catch {}; 249 253 }
+5 -4
backend/src/db/sync.zig
··· 67 67 var result = turso.query( 68 68 \\SELECT uri, did, rkey, title, content, created_at, publication_uri, 69 69 \\ platform, source_collection, path, base_path, has_publication, indexed_at, embedded_at, 70 - \\ COALESCE(cover_image, '') as cover_image 70 + \\ COALESCE(cover_image, '') as cover_image, COALESCE(is_bridgyfed, 0) as is_bridgyfed 71 71 \\FROM documents 72 72 \\ORDER BY uri 73 73 \\LIMIT 500 OFFSET ? ··· 275 275 var result = turso.query( 276 276 \\SELECT uri, did, rkey, title, content, created_at, publication_uri, 277 277 \\ platform, source_collection, path, base_path, has_publication, indexed_at, embedded_at, 278 - \\ COALESCE(cover_image, '') as cover_image 278 + \\ COALESCE(cover_image, '') as cover_image, COALESCE(is_bridgyfed, 0) as is_bridgyfed 279 279 \\FROM documents 280 280 \\WHERE indexed_at >= ? 281 281 \\ORDER BY indexed_at ··· 351 351 conn.exec( 352 352 \\INSERT OR REPLACE INTO documents 353 353 \\(uri, did, rkey, title, content, created_at, publication_uri, 354 - \\ platform, source_collection, path, base_path, has_publication, indexed_at, embedded_at, cover_image) 355 - \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 354 + \\ platform, source_collection, path, base_path, has_publication, indexed_at, embedded_at, cover_image, is_bridgyfed) 355 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 356 356 , .{ 357 357 row.text(0), // uri 358 358 row.text(1), // did ··· 369 369 row.text(12), // indexed_at 370 370 row.text(13), // embedded_at 371 371 row.text(14), // cover_image 372 + row.int(15), // is_bridgyfed 372 373 }) catch |err| { 373 374 return err; 374 375 };
+12 -3
backend/src/ingest/indexer.zig
··· 173 173 } 174 174 } 175 175 176 + // bridgy fed detection: if platform is "other" and pub_uri is an HTTP(S) URL, 177 + // this is likely bridgy fed content (only bridgy fed puts HTTP URLs in the "site" field) 178 + const is_bridgyfed: []const u8 = if (std.mem.eql(u8, actual_platform, "other") and 179 + (std.mem.startsWith(u8, pub_uri, "http://") or std.mem.startsWith(u8, pub_uri, "https://"))) 180 + "1" 181 + else 182 + "0"; 183 + 176 184 // use ON CONFLICT to preserve embedded_at (INSERT OR REPLACE would nuke it) 177 185 // indexed_at uses strftime to record when this row was inserted/updated in Turso 178 186 // (created_at is the document's publication date, which can be old for resynced docs) 179 187 try c.exec( 180 - \\INSERT INTO documents (uri, did, rkey, title, content, created_at, publication_uri, platform, source_collection, path, base_path, has_publication, content_hash, cover_image, indexed_at) 181 - \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%S', 'now')) 188 + \\INSERT INTO documents (uri, did, rkey, title, content, created_at, publication_uri, platform, source_collection, path, base_path, has_publication, content_hash, cover_image, indexed_at, is_bridgyfed) 189 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%Y-%m-%dT%H:%M:%S', 'now'), ?) 182 190 \\ON CONFLICT(uri) DO UPDATE SET 183 191 \\ did = excluded.did, 184 192 \\ rkey = excluded.rkey, ··· 194 202 \\ content_hash = excluded.content_hash, 195 203 \\ cover_image = excluded.cover_image, 196 204 \\ indexed_at = strftime('%Y-%m-%dT%H:%M:%S', 'now'), 205 + \\ is_bridgyfed = excluded.is_bridgyfed, 197 206 \\ embedded_at = documents.embedded_at 198 207 , 199 - &.{ uri, did, rkey, title, content, created_at orelse "", pub_uri, actual_platform, source_collection, path orelse "", base_path, has_pub, &content_hash, cover_image orelse "" }, 208 + &.{ uri, did, rkey, title, content, created_at orelse "", pub_uri, actual_platform, source_collection, path orelse "", base_path, has_pub, &content_hash, cover_image orelse "", is_bridgyfed }, 200 209 ); 201 210 202 211 // update FTS index
+31 -4
backend/src/server/search.zig
··· 265 265 \\ COALESCE(p.name, '') as publication_name 266 266 \\FROM documents d 267 267 \\LEFT JOIN publications p ON d.publication_uri = p.uri 268 - \\WHERE d.did = :author 268 + \\WHERE d.did = :author AND (d.is_bridgyfed IS NULL OR d.is_bridgyfed = 0) 269 269 \\ORDER BY d.created_at DESC LIMIT 40 270 270 ); 271 271 ··· 277 277 \\ COALESCE(p.name, '') as publication_name 278 278 \\FROM documents d 279 279 \\LEFT JOIN publications p ON d.publication_uri = p.uri 280 - \\WHERE d.did = :author AND d.platform = :platform 280 + \\WHERE d.did = :author AND d.platform = :platform AND (d.is_bridgyfed IS NULL OR d.is_bridgyfed = 0) 281 281 \\ORDER BY d.created_at DESC LIMIT 40 282 282 ); 283 283 ··· 434 434 return .{ .sql = new_sql, .args = new_args }; 435 435 } 436 436 437 + /// Inject bridgy fed exclusion into a SQL query's WHERE clause before ORDER BY. 438 + /// Excludes documents where is_bridgyfed = 1 (bridgy fed content). 439 + fn addBridgyFedExclusion(alloc: Allocator, stmt: db.Client.Statement) !db.Client.Statement { 440 + const order_idx = std.mem.indexOf(u8, stmt.sql, "ORDER BY") orelse return stmt; 441 + const new_sql = try std.fmt.allocPrint(alloc, "{s}AND (d.is_bridgyfed IS NULL OR d.is_bridgyfed = 0) {s}", .{ stmt.sql[0..order_idx], stmt.sql[order_idx..] }); 442 + return .{ .sql = new_sql, .args = stmt.args }; 443 + } 444 + 445 + /// Check if a URI is from bridgy fed by looking up is_bridgyfed in local SQLite. 446 + fn isBridgyFed(uri: []const u8) bool { 447 + const local = db.getLocalDb() orelse return false; 448 + var rows = local.query( 449 + "SELECT is_bridgyfed FROM documents WHERE uri = ?", 450 + .{uri}, 451 + ) catch return false; 452 + defer rows.deinit(); 453 + if (rows.next()) |row| { 454 + return row.int(0) != 0; 455 + } 456 + return false; 457 + } 458 + 437 459 /// Keyword search: FTS5 via local SQLite or Turso fallback. 438 460 fn searchKeyword(alloc: Allocator, query: []const u8, tag_filter: ?[]const u8, platform_filter: ?[]const u8, since_filter: ?[]const u8, author_filter: ?[]const u8) ![]const u8 { 439 461 // try local SQLite first (faster for FTS queries) ··· 512 534 const doc_sql = getDocQuerySql(has_query, has_tag, has_platform, has_since); 513 535 const doc_args = try getDocQueryArgs(alloc, fts_query, tag_filter, platform_filter, since_filter, has_query, has_tag, has_platform, has_since); 514 536 if (doc_sql) |sql| { 515 - statements[stmt_count] = try addAuthorCondition(alloc, .{ .sql = sql, .args = doc_args }, "d.did", author_val); 537 + statements[stmt_count] = try addBridgyFedExclusion(alloc, try addAuthorCondition(alloc, .{ .sql = sql, .args = doc_args }, "d.did", author_val)); 516 538 stmt_count += 1; 517 539 } 518 540 ··· 529 551 } else { 530 552 base_stmt = .{ .sql = DocsByPubBasePath.positional, .args = &.{fts_query} }; 531 553 } 532 - statements[stmt_count] = try addAuthorCondition(alloc, base_stmt, "d.did", author_val); 554 + statements[stmt_count] = try addBridgyFedExclusion(alloc, try addAuthorCondition(alloc, base_stmt, "d.did", author_val)); 533 555 stmt_count += 1; 534 556 } 535 557 ··· 638 660 \\JOIN documents d ON f.uri = d.uri 639 661 \\LEFT JOIN publications p ON d.publication_uri = p.uri 640 662 \\WHERE documents_fts MATCH ? AND d.platform = ? AND (? = '' OR d.did = ?) 663 + \\AND (d.is_bridgyfed IS NULL OR d.is_bridgyfed = 0) 641 664 \\ORDER BY rank LIMIT 40 642 665 , .{ fts_query, platform, author_val, author_val }); 643 666 defer rows.deinit(); ··· 667 690 \\JOIN publications p ON d.publication_uri = p.uri 668 691 \\JOIN publications_fts pf ON p.uri = pf.uri 669 692 \\WHERE publications_fts MATCH ? AND d.platform = ? AND (? = '' OR d.did = ?) 693 + \\AND (d.is_bridgyfed IS NULL OR d.is_bridgyfed = 0) 670 694 \\ORDER BY rank LIMIT 40 671 695 , .{ fts_query, platform, author_val, author_val }); 672 696 defer bp_rows.deinit(); ··· 696 720 \\JOIN documents d ON f.uri = d.uri 697 721 \\LEFT JOIN publications p ON d.publication_uri = p.uri 698 722 \\WHERE documents_fts MATCH ? AND (? = '' OR d.did = ?) 723 + \\AND (d.is_bridgyfed IS NULL OR d.is_bridgyfed = 0) 699 724 \\ORDER BY rank LIMIT 40 700 725 , .{ fts_query, author_val, author_val }); 701 726 defer rows.deinit(); ··· 732 757 \\JOIN publications p ON d.publication_uri = p.uri 733 758 \\JOIN publications_fts pf ON p.uri = pf.uri 734 759 \\WHERE publications_fts MATCH ? AND (? = '' OR d.did = ?) 760 + \\AND (d.is_bridgyfed IS NULL OR d.is_bridgyfed = 0) 735 761 \\ORDER BY rank LIMIT 40 736 762 , .{ fts_query, author_val, author_val }); 737 763 defer bp_rows.deinit(); ··· 1229 1255 for (results, 0..) |r, idx| { 1230 1256 if (filtered_count >= 20) break; 1231 1257 if (r.title.len == 0) continue; 1258 + if (isBridgyFed(r.uri)) continue; 1232 1259 if (platform_filter) |pf| { 1233 1260 if (!std.mem.eql(u8, r.platform, pf)) continue; 1234 1261 }
+250
scripts/mark-bridgyfed
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["httpx", "pydantic-settings"] 5 + # /// 6 + """ 7 + Mark bridgy fed content in turso (set is_bridgyfed = 1). 8 + 9 + Finds all DIDs with platform='other', resolves their PDS via plc.directory, 10 + and UPDATE is_bridgyfed = 1 for any DID hosted on brid.gy. 11 + 12 + Usage: 13 + ./scripts/mark-bridgyfed # dry run (default) 14 + ./scripts/mark-bridgyfed --apply # actually update 15 + """ 16 + 17 + import argparse 18 + import asyncio 19 + import os 20 + import sys 21 + import time 22 + 23 + import httpx 24 + from pydantic_settings import BaseSettings, SettingsConfigDict 25 + 26 + 27 + def log(msg: str) -> None: 28 + print(msg, flush=True) 29 + 30 + 31 + class Settings(BaseSettings): 32 + model_config = SettingsConfigDict( 33 + env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore" 34 + ) 35 + 36 + turso_url: str 37 + turso_token: str 38 + 39 + @property 40 + def turso_host(self) -> str: 41 + url = self.turso_url 42 + if url.startswith("libsql://"): 43 + url = url[len("libsql://"):] 44 + return url 45 + 46 + 47 + def _make_stmt(sql: str, args: list | None = None) -> dict: 48 + stmt: dict = {"sql": sql} 49 + if args: 50 + stmt["args"] = [ 51 + {"type": "null"} if a is None else {"type": "text", "value": str(a)} 52 + for a in args 53 + ] 54 + return stmt 55 + 56 + 57 + async def _turso_request( 58 + client: httpx.AsyncClient, 59 + settings: Settings, 60 + stmt: dict, 61 + semaphore: asyncio.Semaphore, 62 + retries: int = 5, 63 + ) -> dict: 64 + """Send a pipeline request to Turso with retries on transient errors.""" 65 + async with semaphore: 66 + for attempt in range(retries): 67 + try: 68 + response = await client.post( 69 + f"https://{settings.turso_host}/v2/pipeline", 70 + headers={ 71 + "Authorization": f"Bearer {settings.turso_token}", 72 + "Content-Type": "application/json", 73 + }, 74 + json={ 75 + "requests": [ 76 + {"type": "execute", "stmt": stmt}, 77 + {"type": "close"}, 78 + ] 79 + }, 80 + timeout=120, 81 + ) 82 + if response.status_code in (400, 502, 503, 504) and attempt < retries - 1: 83 + wait = 2 ** (attempt + 1) 84 + log(f" turso {response.status_code}, retry in {wait}s...") 85 + await asyncio.sleep(wait) 86 + continue 87 + if response.status_code != 200: 88 + print(f"turso error: {response.text}", file=sys.stderr, flush=True) 89 + response.raise_for_status() 90 + return response.json() 91 + except httpx.TimeoutException: 92 + if attempt < retries - 1: 93 + wait = 2 ** (attempt + 1) 94 + log(f" turso timeout, retry in {wait}s...") 95 + await asyncio.sleep(wait) 96 + continue 97 + raise 98 + raise RuntimeError("unreachable") 99 + 100 + 101 + async def turso_query( 102 + client: httpx.AsyncClient, 103 + settings: Settings, 104 + sql: str, 105 + semaphore: asyncio.Semaphore, 106 + args: list | None = None, 107 + ) -> list[dict]: 108 + data = await _turso_request(client, settings, _make_stmt(sql, args), semaphore) 109 + result = data["results"][0]["response"]["result"] 110 + cols = [c["name"] for c in result["cols"]] 111 + return [{col: cell["value"] for col, cell in zip(cols, row)} for row in result["rows"]] 112 + 113 + 114 + async def turso_exec( 115 + client: httpx.AsyncClient, 116 + settings: Settings, 117 + sql: str, 118 + semaphore: asyncio.Semaphore, 119 + args: list | None = None, 120 + ) -> int: 121 + data = await _turso_request(client, settings, _make_stmt(sql, args), semaphore) 122 + return data["results"][0]["response"]["result"]["affected_row_count"] 123 + 124 + 125 + async def resolve_pds(client: httpx.AsyncClient, did: str, semaphore: asyncio.Semaphore) -> str | None: 126 + """Get PDS endpoint from PLC directory.""" 127 + async with semaphore: 128 + try: 129 + resp = await client.get(f"https://plc.directory/{did}", timeout=30) 130 + resp.raise_for_status() 131 + data = resp.json() 132 + for service in data.get("service", []): 133 + if service.get("type") == "AtprotoPersonalDataServer": 134 + return service["serviceEndpoint"] 135 + except Exception as e: 136 + print(f" plc lookup failed for {did}: {e}", file=sys.stderr, flush=True) 137 + return None 138 + 139 + 140 + async def mark_did( 141 + client: httpx.AsyncClient, 142 + settings: Settings, 143 + did: str, 144 + semaphore: asyncio.Semaphore, 145 + ) -> int: 146 + """Mark all documents for a DID as bridgy fed. Returns affected row count.""" 147 + return await turso_exec( 148 + client, settings, 149 + "UPDATE documents SET is_bridgyfed = 1 WHERE did = ? AND (is_bridgyfed IS NULL OR is_bridgyfed = 0)", 150 + semaphore, [did], 151 + ) 152 + 153 + 154 + async def main(): 155 + parser = argparse.ArgumentParser(description="Mark bridgy fed content in turso") 156 + parser.add_argument("--apply", action="store_true", help="Actually update (default is dry run)") 157 + args = parser.parse_args() 158 + 159 + try: 160 + settings = Settings() # type: ignore 161 + except Exception as e: 162 + print(f"error loading settings: {e}", file=sys.stderr) 163 + print("required env vars: TURSO_URL, TURSO_TOKEN", file=sys.stderr) 164 + sys.exit(1) 165 + 166 + if not args.apply: 167 + log("=== DRY RUN (pass --apply to update) ===\n") 168 + 169 + plc_sem = asyncio.Semaphore(20) # concurrent PLC lookups 170 + turso_sem = asyncio.Semaphore(10) # concurrent turso requests 171 + 172 + async with httpx.AsyncClient() as client: 173 + # step 1: get distinct DIDs with platform='other' 174 + log("querying distinct DIDs with platform='other'...") 175 + rows = await turso_query( 176 + client, settings, 177 + "SELECT DISTINCT did FROM documents WHERE platform = 'other'", 178 + turso_sem, 179 + ) 180 + dids = [r["did"] for r in rows] 181 + log(f" found {len(dids)} distinct DIDs\n") 182 + 183 + # step 2: resolve PDS concurrently 184 + log("resolving PDS endpoints (20 concurrent)...") 185 + t0 = time.monotonic() 186 + 187 + async def resolve_one(did: str) -> tuple[str, str | None]: 188 + pds = await resolve_pds(client, did, plc_sem) 189 + return did, pds 190 + 191 + results = await asyncio.gather(*[resolve_one(d) for d in dids]) 192 + elapsed = time.monotonic() - t0 193 + 194 + bridgy_dids = [] 195 + non_bridgy_dids = [] 196 + for did, pds in results: 197 + if pds and "brid.gy" in pds: 198 + bridgy_dids.append(did) 199 + else: 200 + non_bridgy_dids.append(did) 201 + 202 + log(f" resolved {len(dids)} DIDs in {elapsed:.1f}s") 203 + log(f" bridgy fed: {len(bridgy_dids)}") 204 + log(f" non-bridgy: {len(non_bridgy_dids)}\n") 205 + 206 + if not bridgy_dids: 207 + log("nothing to mark!") 208 + return 209 + 210 + if not args.apply: 211 + # show counts per DID 212 + log("querying document counts per bridgy fed DID...") 213 + total = 0 214 + for did in bridgy_dids[:10]: 215 + count_rows = await turso_query( 216 + client, settings, 217 + "SELECT count(*) as n FROM documents WHERE did = ?", 218 + turso_sem, [did], 219 + ) 220 + n = int(count_rows[0]["n"]) 221 + total += n 222 + log(f" {did}: {n} docs") 223 + if len(bridgy_dids) > 10: 224 + log(f" ... and {len(bridgy_dids) - 10} more DIDs") 225 + log(f"\npass --apply to mark these documents (estimated ~{total}+ rows)") 226 + return 227 + 228 + # step 3: mark concurrently (10 concurrent turso connections) 229 + log(f"marking {len(bridgy_dids)} bridgy fed DIDs (10 concurrent)...") 230 + t0 = time.monotonic() 231 + marked = 0 232 + done = 0 233 + 234 + async def mark_one(did: str) -> int: 235 + return await mark_did(client, settings, did, turso_sem) 236 + 237 + # process in chunks of 50 for progress reporting 238 + for chunk_start in range(0, len(bridgy_dids), 50): 239 + chunk = bridgy_dids[chunk_start : chunk_start + 50] 240 + counts = await asyncio.gather(*[mark_one(d) for d in chunk]) 241 + marked += sum(counts) 242 + done += len(chunk) 243 + log(f" [{done}/{len(bridgy_dids)}] marked {marked} docs so far") 244 + 245 + elapsed = time.monotonic() - t0 246 + log(f"\ndone! marked {marked} documents from {len(bridgy_dids)} DIDs in {elapsed:.1f}s") 247 + 248 + 249 + if __name__ == "__main__": 250 + asyncio.run(main())