search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add standardsite support and improve TAP debugging

- add /platforms endpoint to show document counts by platform
- add standardsite to frontend PLATFORM_CONFIG with badges
- fix TAP docs: clarify backfill behavior (TAP does send historical data)
- add TAP API endpoints documentation
- increase TAP memory to 4GB and parallelism to 5 workers
- add backfill-pds script for direct PDS fetching

馃 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

zzstoatzz af4a0dde 1176b0e3

+406 -13
+11
backend/src/server.zig
··· 56 56 try sendJson(request, "{\"status\":\"ok\"}"); 57 57 } else if (mem.eql(u8, target, "/popular")) { 58 58 try handlePopular(request); 59 + } else if (mem.eql(u8, target, "/platforms")) { 60 + try handlePlatforms(request); 59 61 } else if (mem.eql(u8, target, "/dashboard")) { 60 62 try handleDashboard(request); 61 63 } else if (mem.eql(u8, target, "/api/dashboard")) { ··· 109 111 110 112 const popular = try stats.getPopular(alloc, 5); 111 113 try sendJson(request, popular); 114 + } 115 + 116 + fn handlePlatforms(request: *http.Server.Request) !void { 117 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 118 + defer arena.deinit(); 119 + const alloc = arena.allocator(); 120 + 121 + const data = try stats.getPlatformCounts(alloc); 122 + try sendJson(request, data); 112 123 } 113 124 114 125 fn parseQueryParam(alloc: std.mem.Allocator, target: []const u8, param: []const u8) ![]const u8 {
+51
backend/src/stats.zig
··· 102 102 c.exec("UPDATE stats SET cache_misses = COALESCE(cache_misses, 0) + 1 WHERE id = 1", &.{}) catch {}; 103 103 } 104 104 105 + const PlatformCount = struct { platform: []const u8, count: i64 }; 106 + 107 + pub fn getPlatformCounts(alloc: Allocator) ![]const u8 { 108 + const c = db.getClient() orelse return error.NotInitialized; 109 + 110 + var output: std.Io.Writer.Allocating = .init(alloc); 111 + errdefer output.deinit(); 112 + 113 + var jw: json.Stringify = .{ .writer = &output.writer }; 114 + try jw.beginObject(); 115 + 116 + // documents by platform 117 + try jw.objectField("documents"); 118 + if (c.query("SELECT platform, COUNT(*) as count FROM documents GROUP BY platform ORDER BY count DESC", &.{})) |res_val| { 119 + var res = res_val; 120 + defer res.deinit(); 121 + try jw.beginArray(); 122 + for (res.rows) |row| try jw.write(PlatformCount{ .platform = row.text(0), .count = row.int(1) }); 123 + try jw.endArray(); 124 + } else |_| { 125 + try jw.beginArray(); 126 + try jw.endArray(); 127 + } 128 + 129 + // FTS document count 130 + try jw.objectField("fts_count"); 131 + if (c.query("SELECT COUNT(*) FROM documents_fts", &.{})) |res_val| { 132 + var res = res_val; 133 + defer res.deinit(); 134 + if (res.first()) |row| { 135 + try jw.write(row.int(0)); 136 + } else try jw.write(0); 137 + } else |_| try jw.write(0); 138 + 139 + // sample URIs from each platform (for debugging) 140 + try jw.objectField("sample_standardsite"); 141 + if (c.query("SELECT uri FROM documents WHERE platform = 'standardsite' LIMIT 3", &.{})) |res_val| { 142 + var res = res_val; 143 + defer res.deinit(); 144 + try jw.beginArray(); 145 + for (res.rows) |row| try jw.write(row.text(0)); 146 + try jw.endArray(); 147 + } else |_| { 148 + try jw.beginArray(); 149 + try jw.endArray(); 150 + } 151 + 152 + try jw.endObject(); 153 + return try output.toOwnedSlice(); 154 + } 155 + 105 156 pub fn getPopular(alloc: Allocator, limit: usize) ![]const u8 { 106 157 const c = db.getClient() orelse return error.NotInitialized; 107 158
+40 -4
docs/tap.md
··· 6 6 7 7 tap subscribes to the ATProto firehose, filters for specific collections (e.g., `pub.leaflet.document`), and broadcasts matching events to websocket clients. it also does initial crawling/backfilling of existing records. 8 8 9 - key behavior: **TAP only broadcasts live firehose events, not resynced historical data**. the resyncer marks records as `Live: false` and does not pass them to the websocket outbox. this means the backend only receives new/updated records after TAP starts - historical data must be fetched separately. 9 + key behavior: **TAP backfills historical data when repos are added**. when a repo is added to tracking: 10 + 1. TAP fetches the full repo from the account's PDS using `com.atproto.sync.getRepo` 11 + 2. live firehose events during backfill are buffered in memory 12 + 3. historical events (marked `live: false`) are delivered first 13 + 4. after historical events complete, buffered live events are released 14 + 5. subsequent firehose events arrive immediately marked as `live: true` 15 + 16 + TAP enforces strict per-repo ordering - live events are synchronization barriers that require all prior events to complete first. 10 17 11 18 ## message format 12 19 ··· 41 48 42 49 1. **action is a string, not an enum** - TAP sends `"action": "create"` as a JSON string. if your parser expects an enum type, extraction will silently fail. use string comparison. 43 50 44 - 2. **resynced records don't broadcast** - TAP's resyncer fetches historical data but marks it `Live: false` and doesn't send it to websocket clients. only live firehose events are broadcast. 51 + 2. **collection filters apply to output** - `TAP_COLLECTION_FILTERS` controls which records TAP sends to clients. records from other collections are fetched but not forwarded. 45 52 46 - 3. **silent extraction failures** - if using zat's `extractAt`, enable debug logging to see why parsing fails: 53 + 3. **signal collection vs collection filters** - `TAP_SIGNAL_COLLECTION` controls auto-discovery of repos (which repos to track), while `TAP_COLLECTION_FILTERS` controls which records from those repos to output. a repo must either be auto-discovered via signal collection OR manually added via `/repos/add`. 54 + 55 + 4. **silent extraction failures** - if using zat's `extractAt`, enable debug logging to see why parsing fails: 47 56 ```zig 48 57 pub const std_options = .{ 49 58 .log_scope_levels = &.{.{ .scope = .zat, .level = .debug }}, ··· 83 92 | `websocket handshake failed: error.Timeout` | TAP not running or network issue | restart TAP, check regions match | 84 93 | `dialing failed: lookup ... i/o timeout` | DNS issues reaching bsky relay | restart TAP, transient network issue | 85 94 | messages received but not indexed | extraction failing (type mismatch) | enable zat debug logging, check field types | 86 - | document count not increasing | TAP only sends live events, not historical | this is expected behavior | 95 + | repo shows `records: 0` after adding | resync failed or collection not in filters | check TAP logs for resync errors, verify `TAP_COLLECTION_FILTERS` | 96 + | new platform records not appearing | platform's collection not in `TAP_COLLECTION_FILTERS` | add collection to filters, restart TAP | 97 + 98 + ## TAP API endpoints 99 + 100 + TAP exposes HTTP endpoints for monitoring and control: 101 + 102 + | endpoint | description | 103 + |----------|-------------| 104 + | `/health` | health check | 105 + | `/stats/repo-count` | number of tracked repos | 106 + | `/stats/record-count` | total records processed | 107 + | `/stats/outbox-buffer` | events waiting to be sent | 108 + | `/stats/resync-buffer` | DIDs waiting to be resynced | 109 + | `/stats/cursors` | firehose cursor position | 110 + | `/info/:did` | repo status: `{"did":"...","state":"active","records":N}` | 111 + | `/repos/add` | POST with `{"dids":["did:plc:..."]}` to add repos | 112 + | `/repos/remove` | POST with `{"dids":["did:plc:..."]}` to remove repos | 113 + 114 + example: check repo status 115 + ```bash 116 + fly ssh console -a leaflet-search-tap -C "curl -s localhost:2480/info/did:plc:abc123" 117 + ``` 118 + 119 + example: manually add a repo for backfill 120 + ```bash 121 + fly ssh console -a leaflet-search-tap -C 'curl -X POST -H "Content-Type: application/json" -d "{\"dids\":[\"did:plc:abc123\"]}" localhost:2480/repos/add' 122 + ``` 87 123 88 124 ## fly.io deployment 89 125
+286
scripts/backfill-pds
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["httpx", "pydantic-settings"] 5 + # /// 6 + """ 7 + Backfill records directly from a PDS. 8 + 9 + Usage: 10 + ./scripts/backfill-pds did:plc:mkqt76xvfgxuemlwlx6ruc3w 11 + ./scripts/backfill-pds zat.dev 12 + """ 13 + 14 + import argparse 15 + import json 16 + import os 17 + import sys 18 + 19 + import httpx 20 + from pydantic_settings import BaseSettings, SettingsConfigDict 21 + 22 + 23 + class Settings(BaseSettings): 24 + model_config = SettingsConfigDict( 25 + env_file=os.environ.get("ENV_FILE", ".env"), extra="ignore" 26 + ) 27 + 28 + turso_url: str 29 + turso_token: str 30 + 31 + @property 32 + def turso_host(self) -> str: 33 + url = self.turso_url 34 + if url.startswith("libsql://"): 35 + url = url[len("libsql://") :] 36 + return url 37 + 38 + 39 + def resolve_handle(handle: str) -> str: 40 + """Resolve a handle to a DID.""" 41 + resp = httpx.get( 42 + f"https://bsky.social/xrpc/com.atproto.identity.resolveHandle", 43 + params={"handle": handle}, 44 + timeout=30, 45 + ) 46 + resp.raise_for_status() 47 + return resp.json()["did"] 48 + 49 + 50 + def get_pds_endpoint(did: str) -> str: 51 + """Get PDS endpoint from PLC directory.""" 52 + resp = httpx.get(f"https://plc.directory/{did}", timeout=30) 53 + resp.raise_for_status() 54 + data = resp.json() 55 + for service in data.get("service", []): 56 + if service.get("type") == "AtprotoPersonalDataServer": 57 + return service["serviceEndpoint"] 58 + raise ValueError(f"No PDS endpoint found for {did}") 59 + 60 + 61 + def list_records(pds: str, did: str, collection: str) -> list[dict]: 62 + """List all records from a collection.""" 63 + records = [] 64 + cursor = None 65 + while True: 66 + params = {"repo": did, "collection": collection, "limit": 100} 67 + if cursor: 68 + params["cursor"] = cursor 69 + resp = httpx.get( 70 + f"{pds}/xrpc/com.atproto.repo.listRecords", params=params, timeout=30 71 + ) 72 + resp.raise_for_status() 73 + data = resp.json() 74 + records.extend(data.get("records", [])) 75 + cursor = data.get("cursor") 76 + if not cursor: 77 + break 78 + return records 79 + 80 + 81 + def turso_exec(settings: Settings, sql: str, args: list | None = None) -> None: 82 + """Execute a statement against Turso.""" 83 + stmt = {"sql": sql} 84 + if args: 85 + # Handle None values properly - use null type 86 + stmt["args"] = [] 87 + for a in args: 88 + if a is None: 89 + stmt["args"].append({"type": "null"}) 90 + else: 91 + stmt["args"].append({"type": "text", "value": str(a)}) 92 + 93 + response = httpx.post( 94 + f"https://{settings.turso_host}/v2/pipeline", 95 + headers={ 96 + "Authorization": f"Bearer {settings.turso_token}", 97 + "Content-Type": "application/json", 98 + }, 99 + json={"requests": [{"type": "execute", "stmt": stmt}, {"type": "close"}]}, 100 + timeout=30, 101 + ) 102 + if response.status_code != 200: 103 + print(f"Turso error: {response.text}", file=sys.stderr) 104 + response.raise_for_status() 105 + 106 + 107 + def extract_document(record: dict, collection: str) -> dict | None: 108 + """Extract document fields from a record.""" 109 + value = record.get("value", {}) 110 + 111 + # Get title 112 + title = value.get("title") 113 + if not title: 114 + return None 115 + 116 + # Get content - try multiple field names 117 + content = value.get("content") or value.get("text") or "" 118 + if isinstance(content, dict): 119 + # Handle richtext format 120 + content = content.get("text", "") 121 + 122 + # Get created_at 123 + created_at = value.get("createdAt", "") 124 + 125 + # Get publication reference 126 + publication = value.get("publication") 127 + publication_uri = None 128 + if publication and isinstance(publication, dict): 129 + publication_uri = publication.get("uri") 130 + 131 + # Get tags 132 + tags = value.get("tags", []) 133 + if not isinstance(tags, list): 134 + tags = [] 135 + 136 + # Determine platform from collection 137 + if collection.startswith("pub.leaflet"): 138 + platform = "leaflet" 139 + elif collection.startswith("site.standard"): 140 + platform = "standardsite" 141 + else: 142 + platform = "unknown" 143 + 144 + return { 145 + "title": title, 146 + "content": content, 147 + "created_at": created_at, 148 + "publication_uri": publication_uri, 149 + "tags": tags, 150 + "platform": platform, 151 + "collection": collection, 152 + } 153 + 154 + 155 + def main(): 156 + parser = argparse.ArgumentParser(description="Backfill records from a PDS") 157 + parser.add_argument("identifier", help="DID or handle to backfill") 158 + parser.add_argument("--dry-run", action="store_true", help="Show what would be done") 159 + args = parser.parse_args() 160 + 161 + try: 162 + settings = Settings() # type: ignore 163 + except Exception as e: 164 + print(f"error loading settings: {e}", file=sys.stderr) 165 + print("required env vars: TURSO_URL, TURSO_TOKEN", file=sys.stderr) 166 + sys.exit(1) 167 + 168 + # Resolve identifier to DID 169 + identifier = args.identifier 170 + if identifier.startswith("did:"): 171 + did = identifier 172 + else: 173 + print(f"resolving handle {identifier}...") 174 + did = resolve_handle(identifier) 175 + print(f" -> {did}") 176 + 177 + # Get PDS endpoint 178 + print(f"looking up PDS for {did}...") 179 + pds = get_pds_endpoint(did) 180 + print(f" -> {pds}") 181 + 182 + # Collections to fetch 183 + collections = [ 184 + "pub.leaflet.document", 185 + "pub.leaflet.publication", 186 + "site.standard.document", 187 + "site.standard.publication", 188 + ] 189 + 190 + total_docs = 0 191 + total_pubs = 0 192 + 193 + for collection in collections: 194 + print(f"fetching {collection}...") 195 + try: 196 + records = list_records(pds, did, collection) 197 + except httpx.HTTPStatusError as e: 198 + if e.response.status_code == 400: 199 + print(f" (no records)") 200 + continue 201 + raise 202 + 203 + if not records: 204 + print(f" (no records)") 205 + continue 206 + 207 + print(f" found {len(records)} records") 208 + 209 + for record in records: 210 + uri = record["uri"] 211 + # Parse rkey from URI: at://did/collection/rkey 212 + parts = uri.split("/") 213 + rkey = parts[-1] 214 + 215 + if collection.endswith(".document"): 216 + doc = extract_document(record, collection) 217 + if not doc: 218 + print(f" skip {uri} (no title)") 219 + continue 220 + 221 + if args.dry_run: 222 + print(f" would insert: {doc['title'][:50]}...") 223 + else: 224 + # Insert document 225 + turso_exec( 226 + settings, 227 + """ 228 + INSERT INTO documents (uri, did, rkey, title, content, created_at, publication_uri, platform, source_collection) 229 + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 230 + ON CONFLICT(did, rkey) DO UPDATE SET 231 + uri = excluded.uri, 232 + title = excluded.title, 233 + content = excluded.content, 234 + created_at = excluded.created_at, 235 + publication_uri = excluded.publication_uri, 236 + platform = excluded.platform, 237 + source_collection = excluded.source_collection 238 + """, 239 + [uri, did, rkey, doc["title"], doc["content"], doc["created_at"], doc["publication_uri"], doc["platform"], doc["collection"]], 240 + ) 241 + # Insert tags 242 + for tag in doc["tags"]: 243 + turso_exec( 244 + settings, 245 + "INSERT OR IGNORE INTO document_tags (document_uri, tag) VALUES (?, ?)", 246 + [uri, tag], 247 + ) 248 + # Update FTS index (delete then insert, FTS5 doesn't support ON CONFLICT) 249 + turso_exec(settings, "DELETE FROM documents_fts WHERE uri = ?", [uri]) 250 + turso_exec( 251 + settings, 252 + "INSERT INTO documents_fts (uri, title, content) VALUES (?, ?, ?)", 253 + [uri, doc["title"], doc["content"]], 254 + ) 255 + print(f" indexed: {doc['title'][:50]}...") 256 + total_docs += 1 257 + 258 + elif collection.endswith(".publication"): 259 + value = record["value"] 260 + name = value.get("name", "") 261 + description = value.get("description") 262 + base_path = value.get("base_path") 263 + 264 + if args.dry_run: 265 + print(f" would insert pub: {name}") 266 + else: 267 + turso_exec( 268 + settings, 269 + """ 270 + INSERT INTO publications (uri, did, rkey, name, description, base_path) 271 + VALUES (?, ?, ?, ?, ?, ?) 272 + ON CONFLICT(uri) DO UPDATE SET 273 + name = excluded.name, 274 + description = excluded.description, 275 + base_path = excluded.base_path 276 + """, 277 + [uri, did, rkey, name, description, base_path], 278 + ) 279 + print(f" indexed pub: {name}") 280 + total_pubs += 1 281 + 282 + print(f"\ndone! {total_docs} documents, {total_pubs} publications") 283 + 284 + 285 + if __name__ == "__main__": 286 + main()
+15 -6
site/index.html
··· 5 5 <meta name="viewport" content="width=device-width, initial-scale=1.0"> 6 6 <link rel="icon" type="image/svg+xml" href="/favicon.svg"> 7 7 <title>leaflet search</title> 8 - <meta name="description" content="search for leaflet"> 8 + <meta name="description" content="search atproto publishing platforms"> 9 9 <meta property="og:title" content="leaflet search"> 10 - <meta property="og:description" content="search for leaflet"> 10 + <meta property="og:description" content="search atproto publishing platforms"> 11 11 <meta property="og:type" content="website"> 12 12 <meta name="twitter:card" content="summary"> 13 13 <meta name="twitter:title" content="leaflet search"> 14 - <meta name="twitter:description" content="search for leaflet"> 14 + <meta name="twitter:description" content="search atproto publishing platforms"> 15 15 <style> 16 16 * { box-sizing: border-box; margin: 0; padding: 0; } 17 17 ··· 365 365 366 366 <div id="results" class="results"> 367 367 <div class="empty-state"> 368 - <p>search for <a href="https://leaflet.pub" target="_blank">leaflet.pub</a></p> 368 + <p>search atproto publishing platforms</p> 369 + <p style="font-size:11px;margin-top:0.5rem"><a href="https://leaflet.pub" target="_blank">leaflet</a> 路 <a href="https://pckt.blog" target="_blank">pckt</a> 路 <a href="https://standard.site" target="_blank">standard.site</a></p> 369 370 </div> 370 371 </div> 371 372 ··· 433 434 434 435 // build URL based on entity type and platform 435 436 const docUrl = buildDocUrl(doc, entityType, platform); 436 - const platformBadge = platform !== 'leaflet' ? `<span class="platform-badge">${escapeHtml(platform)}</span>` : ''; 437 + const platformLabel = PLATFORM_CONFIG[platform]?.label || platform; 438 + const platformBadge = `<span class="platform-badge">${escapeHtml(platformLabel)}</span>`; 437 439 const date = doc.createdAt ? new Date(doc.createdAt).toLocaleDateString() : ''; 438 440 439 441 // platform home URL for meta link ··· 499 501 home: 'https://offprint.app', 500 502 label: 'offprint.app', 501 503 // offprint is in early beta, URL pattern unknown 504 + docUrl: null 505 + }, 506 + standardsite: { 507 + home: 'https://standard.site', 508 + label: 'standard.site', 509 + // standard.site uses basePath from publication 502 510 docUrl: null 503 511 }, 504 512 }; ··· 645 653 function renderEmptyState() { 646 654 resultsDiv.innerHTML = ` 647 655 <div class="empty-state"> 648 - <p>search for <a href="https://leaflet.pub" target="_blank">leaflet.pub</a></p> 656 + <p>search atproto publishing platforms</p> 657 + <p style="font-size:11px;margin-top:0.5rem"><a href="https://leaflet.pub" target="_blank">leaflet</a> 路 <a href="https://pckt.blog" target="_blank">pckt</a> 路 <a href="https://standard.site" target="_blank">standard.site</a></p> 649 658 </div> 650 659 `; 651 660 }
+3 -3
tap/fly.toml
··· 11 11 TAP_SIGNAL_COLLECTION = 'pub.leaflet.document' 12 12 TAP_COLLECTION_FILTERS = 'pub.leaflet.document,pub.leaflet.publication,site.standard.document,site.standard.publication' 13 13 TAP_LOG_LEVEL = 'info' 14 - TAP_RESYNC_PARALLELISM = '2' 14 + TAP_RESYNC_PARALLELISM = '5' 15 15 TAP_IDENT_CACHE_SIZE = '10000' 16 16 TAP_CURSOR_SAVE_INTERVAL = '5s' 17 17 TAP_REPO_FETCH_TIMEOUT = '600s' ··· 24 24 min_machines_running = 1 25 25 26 26 [[vm]] 27 - memory = '2gb' 27 + memory = '4gb' 28 28 cpu_kind = 'shared' 29 - cpus = 1 29 + cpus = 2 30 30 31 31 [mounts] 32 32 source = 'leaflet_tap_data'