search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: index site.standard.document with embedded leaflet content

This commit fixes indexing of documents published via site.standard.document
that contain embedded pub.leaflet.content. This was a multi-layered debugging
saga worth documenting.

## The Problem

Search for "set of shared standards" returned nothing, despite the document
existing at lab.leaflet.pub/3md4qsktbms24.

## Root Causes (in order of discovery)

1. **Fly secret overriding fly.toml** - TAP_SIGNAL_COLLECTION was set as a
Fly secret with wrong value ("pub.leaflet.document"), which overrode the
correct fly.toml value. Secrets take precedence over env vars in fly.toml.
Fix: `fly secrets unset TAP_SIGNAL_COLLECTION -a leaflet-search-tap`

2. **Extractor missing content.pages path** - The extractor only checked
`record.pages` (pub.leaflet.document) but not `record.content.pages`
(site.standard.document with embedded pub.leaflet.content). The content
structure differs between lexicons.

3. **Platform detection from collection only** - site.standard.document
gets platform="other" from collection detection, but the actual platform
(leaflet/pckt/offprint) should be inferred from the publication's basePath.

4. **Use-after-free in indexer** - base_path was read from query result,
but row.text() returns memory that's freed by result.deinit(). Fixed by
copying to stack buffer.

5. **Startup timeout** - HTTP server started after slow initialization,
causing Fly proxy timeouts. Reordered to start HTTP first, then init
services in background.

## The Saga of Failure Modes

- Repeatedly trying to curl from TAP container (which has no curl)
- Hallucinating TAP API endpoints that don't exist
- Falling back to backfill scripts instead of fixing root cause
- Not verifying Fly secrets vs fly.toml precedence
- Writing custom PDS client when zat already has everything needed

## How Content Extraction Now Works

1. TAP receives site.standard.document (signal collection)
2. Extractor tries: textContent → pages → content.pages
3. If content still empty AND content.$type == "pub.leaflet.content",
fetch pub.leaflet.document from PDS as fallback
4. Indexer infers platform from basePath (leaflet.pub → leaflet, etc.)

## Files Changed

- tap/fly.toml: signal collection → site.standard.document
- backend/src/extractor.zig: check content.pages, add test
- backend/src/tap.zig: PDS fetch fallback for empty content
- backend/src/indexer.zig: platform detection from basePath, fix UAF
- backend/src/main.zig: start HTTP first, init services in background
- backend/src/db/mod.zig: split init for staged startup
- scripts/backfill-pds: same content.pages fix
- docs/tap.md: updated documentation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

zzstoatzz 09e5ebc1 1e29cd8c

+173 -41
+10 -2
backend/src/db/mod.zig
··· 17 17 var client: ?Client = null; 18 18 var local_db: ?LocalDb = null; 19 19 20 - pub fn init() !void { 20 + /// Initialize Turso client only (fast, call synchronously at startup) 21 + pub fn initTurso() !void { 21 22 client = try Client.init(gpa.allocator()); 22 23 try schema.init(&client.?); 24 + } 23 25 24 - // initialize local replica (non-fatal if it fails) 26 + /// Initialize local SQLite replica (slow, call in background thread) 27 + pub fn initLocalDb() void { 25 28 initLocal() catch |err| { 26 29 std.debug.print("local db init failed (will use turso only): {}\n", .{err}); 27 30 }; 31 + } 32 + 33 + pub fn init() !void { 34 + try initTurso(); 35 + initLocalDb(); 28 36 } 29 37 30 38 fn initLocal() !void {
+25 -2
backend/src/extractor.zig
··· 150 150 try buf.appendSlice(allocator, desc); 151 151 } 152 152 153 - if (zat.json.getArray(record, "pages")) |pages| { 154 - for (pages) |page| { 153 + // check for pages at top level (pub.leaflet.document) 154 + // or nested in content object (site.standard.document with pub.leaflet.content) 155 + const pages = zat.json.getArray(record, "pages") orelse 156 + zat.json.getArray(record, "content.pages"); 157 + 158 + if (pages) |p| { 159 + for (p) |page| { 155 160 if (page == .object) { 156 161 try extractPageContent(allocator, &buf, page.object); 157 162 } ··· 261 266 try std.testing.expectEqualStrings("leaflet", Platform.leaflet.displayName()); 262 267 try std.testing.expectEqualStrings("other", Platform.other.displayName()); 263 268 } 269 + 270 + test "extractDocument: site.standard.document with pub.leaflet.content" { 271 + const allocator = std.testing.allocator; 272 + 273 + // minimal site.standard.document with embedded pub.leaflet.content 274 + const test_json = 275 + \\{"title":"Test Post","content":{"$type":"pub.leaflet.content","pages":[{"id":"page1","$type":"pub.leaflet.pages.linearDocument","blocks":[{"$type":"pub.leaflet.pages.linearDocument#block","block":{"$type":"pub.leaflet.blocks.text","plaintext":"Hello world"}}]}]}} 276 + ; 277 + 278 + const parsed = try json.parseFromSlice(json.Value, allocator, test_json, .{}); 279 + defer parsed.deinit(); 280 + 281 + var doc = try extractDocument(allocator, parsed.value.object, "site.standard.document"); 282 + defer doc.deinit(); 283 + 284 + try std.testing.expectEqualStrings("Test Post", doc.title); 285 + try std.testing.expectEqualStrings("Hello world", doc.content); 286 + }
+35 -4
backend/src/indexer.zig
··· 36 36 const has_pub: []const u8 = if (pub_uri.len > 0) "1" else "0"; 37 37 38 38 // look up base_path from publication (or fallback to DID lookup) 39 + // use a stack buffer because row.text() returns a slice into result memory 40 + // which gets freed by result.deinit() 41 + var base_path_buf: [256]u8 = undefined; 39 42 var base_path: []const u8 = ""; 43 + 40 44 if (pub_uri.len > 0) { 41 45 if (c.query("SELECT base_path FROM publications WHERE uri = ?", &.{pub_uri})) |res| { 42 46 var result = res; 43 47 defer result.deinit(); 44 48 if (result.first()) |row| { 45 - base_path = row.text(0); 49 + const val = row.text(0); 50 + if (val.len > 0 and val.len <= base_path_buf.len) { 51 + @memcpy(base_path_buf[0..val.len], val); 52 + base_path = base_path_buf[0..val.len]; 53 + } 46 54 } 47 55 } else |_| {} 48 56 } ··· 64 72 var result = res; 65 73 defer result.deinit(); 66 74 if (result.first()) |row| { 67 - base_path = row.text(0); 75 + const val = row.text(0); 76 + if (val.len > 0 and val.len <= base_path_buf.len) { 77 + @memcpy(base_path_buf[0..val.len], val); 78 + base_path = base_path_buf[0..val.len]; 79 + } 68 80 } 69 81 } else |_| {} 70 82 ··· 74 86 var result = res; 75 87 defer result.deinit(); 76 88 if (result.first()) |row| { 77 - base_path = row.text(0); 89 + const val = row.text(0); 90 + if (val.len > 0 and val.len <= base_path_buf.len) { 91 + @memcpy(base_path_buf[0..val.len], val); 92 + base_path = base_path_buf[0..val.len]; 93 + } 78 94 } 79 95 } else |_| {} 80 96 } 81 97 } 82 98 99 + // detect platform from basePath if platform is unknown/other 100 + // this handles site.standard.* documents where collection doesn't indicate platform 101 + var actual_platform = platform; 102 + if (std.mem.eql(u8, platform, "unknown") or std.mem.eql(u8, platform, "other")) { 103 + if (std.mem.indexOf(u8, base_path, "leaflet.pub") != null) { 104 + actual_platform = "leaflet"; 105 + } else if (std.mem.indexOf(u8, base_path, "pckt.blog") != null) { 106 + actual_platform = "pckt"; 107 + } else if (std.mem.indexOf(u8, base_path, "offprint.app") != null) { 108 + actual_platform = "offprint"; 109 + } else if (std.mem.indexOf(u8, base_path, "greengale.app") != null) { 110 + actual_platform = "greengale"; 111 + } 112 + } 113 + 83 114 try c.exec( 84 115 "INSERT OR REPLACE INTO documents (uri, did, rkey, title, content, created_at, publication_uri, platform, source_collection, path, base_path, has_publication) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 85 - &.{ uri, did, rkey, title, content, created_at orelse "", pub_uri, platform, source_collection, path orelse "", base_path, has_pub }, 116 + &.{ uri, did, rkey, title, content, created_at orelse "", pub_uri, actual_platform, source_collection, path orelse "", base_path, has_pub }, 86 117 ); 87 118 88 119 // update FTS index
+28 -20
backend/src/main.zig
··· 15 15 defer _ = gpa.deinit(); 16 16 const allocator = gpa.allocator(); 17 17 18 - // init db (turso + local replica) 19 - try db.init(); 20 - db.startSync(); 21 - 22 - // start activity tracker 23 - activity.init(); 24 - 25 - // start tap consumer in background 26 - const tap_thread = try Thread.spawn(.{}, tap.consumer, .{allocator}); 27 - defer tap_thread.join(); 28 - 29 - // init thread pool for http connections 30 - var pool: Thread.Pool = undefined; 31 - try pool.init(.{ 32 - .allocator = allocator, 33 - .n_jobs = MAX_HTTP_WORKERS, 34 - }); 35 - defer pool.deinit(); 36 - 37 - // start http server 18 + // start http server FIRST so Fly proxy doesn't timeout 38 19 const port: u16 = blk: { 39 20 const port_str = posix.getenv("PORT") orelse "3000"; 40 21 break :blk std.fmt.parseInt(u16, port_str, 10) catch 3000; ··· 47 28 const app_name = posix.getenv("APP_NAME") orelse "leaflet-search"; 48 29 std.debug.print("{s} listening on http://0.0.0.0:{d} (max {} workers)\n", .{ app_name, port, MAX_HTTP_WORKERS }); 49 30 31 + // init turso client synchronously (fast, needed for search fallback) 32 + try db.initTurso(); 33 + 34 + // init thread pool for http connections 35 + var pool: Thread.Pool = undefined; 36 + try pool.init(.{ 37 + .allocator = allocator, 38 + .n_jobs = MAX_HTTP_WORKERS, 39 + }); 40 + defer pool.deinit(); 41 + 42 + // init local db and other services in background (slow) 43 + const init_thread = try Thread.spawn(.{}, initServices, .{allocator}); 44 + init_thread.detach(); 45 + 50 46 while (true) { 51 47 const conn = listener.accept() catch |err| { 52 48 std.debug.print("accept error: {}\n", .{err}); ··· 62 58 conn.stream.close(); 63 59 }; 64 60 } 61 + } 62 + 63 + fn initServices(allocator: std.mem.Allocator) void { 64 + // init local db (slow - turso already initialized) 65 + db.initLocalDb(); 66 + db.startSync(); 67 + 68 + // start activity tracker 69 + activity.init(); 70 + 71 + // start tap consumer 72 + tap.consumer(allocator); 65 73 } 66 74 67 75 fn setSocketTimeout(fd: posix.fd_t, secs: u32) !void {
+61
backend/src/tap.zig
··· 228 228 }; 229 229 defer doc.deinit(); 230 230 231 + // if content is empty and this is a site.standard.document with pub.leaflet.content, 232 + // fetch the pub.leaflet.document from PDS to get actual content 233 + if (doc.content.len == 0 and mem.eql(u8, collection, STANDARD_DOCUMENT)) { 234 + const record_val: json.Value = .{ .object = record }; 235 + const content_type = zat.json.getString(record_val, "content.$type"); 236 + if (content_type != null and mem.eql(u8, content_type.?, "pub.leaflet.content")) { 237 + if (fetchLeafletContent(allocator, did, rkey)) |leaflet_content| { 238 + allocator.free(doc.content); 239 + doc.content = leaflet_content; 240 + std.debug.print("fetched leaflet content for {s}: {} chars\n", .{ uri, leaflet_content.len }); 241 + } else |err| { 242 + std.debug.print("failed to fetch leaflet content for {s}: {}\n", .{ uri, err }); 243 + } 244 + } 245 + } 246 + 231 247 try indexer.insertDocument( 232 248 uri, 233 249 did, ··· 242 258 doc.path, 243 259 ); 244 260 std.debug.print("indexed document: {s} [{s}] ({} chars, {} tags)\n", .{ uri, doc.platformName(), doc.content.len, doc.tags.len }); 261 + } 262 + 263 + /// fetch content from pub.leaflet.document for a given DID/rkey 264 + fn fetchLeafletContent(allocator: Allocator, did: []const u8, rkey: []const u8) ![]u8 { 265 + // resolve DID to get PDS endpoint 266 + const did_parsed = zat.Did.parse(did) orelse return error.InvalidDid; 267 + var resolver = zat.DidResolver.init(allocator); 268 + defer resolver.deinit(); 269 + 270 + var did_doc = try resolver.resolve(did_parsed); 271 + defer did_doc.deinit(); 272 + 273 + const pds = did_doc.pdsEndpoint() orelse return error.NoPdsEndpoint; 274 + 275 + // fetch pub.leaflet.document from PDS 276 + var client = zat.XrpcClient.init(allocator, pds); 277 + defer client.deinit(); 278 + 279 + const nsid = zat.Nsid.parse("com.atproto.repo.getRecord") orelse return error.InvalidNsid; 280 + 281 + var params = std.StringHashMap([]const u8).init(allocator); 282 + defer params.deinit(); 283 + try params.put("repo", did); 284 + try params.put("collection", LEAFLET_DOCUMENT); 285 + try params.put("rkey", rkey); 286 + 287 + var response = try client.query(nsid, params); 288 + defer response.deinit(); 289 + 290 + if (!response.ok()) return error.PdsRequestFailed; 291 + 292 + // parse response and extract content 293 + var parsed = try response.json(); 294 + defer parsed.deinit(); 295 + 296 + const record_obj = zat.json.getObject(parsed.value, "value") orelse return error.NoValueInResponse; 297 + 298 + // use extractor to get content from pub.leaflet.document 299 + var leaflet_doc = try extractor.extractDocument(allocator, record_obj, LEAFLET_DOCUMENT); 300 + defer { 301 + // don't free content - we're returning it 302 + leaflet_doc.allocator.free(leaflet_doc.tags); 303 + } 304 + 305 + return leaflet_doc.content; 245 306 } 246 307 247 308 fn processPublication(_: Allocator, uri: []const u8, did: []const u8, rkey: []const u8, record: json.ObjectMap) !void {
+5 -5
docs/tap.md
··· 4 4 5 5 ## what is tap? 6 6 7 - tap subscribes to the ATProto firehose, filters for specific collections (e.g., `pub.leaflet.document`), and broadcasts matching events to websocket clients. it also does initial crawling/backfilling of existing records. 7 + tap subscribes to the ATProto firehose, filters for specific collections (e.g., `site.standard.document`), and broadcasts matching events to websocket clients. it also does initial crawling/backfilling of existing records. 8 8 9 9 key behavior: **tap backfills historical data when repos are added**. when a repo is added to tracking: 10 10 1. tap fetches the full repo from the account's PDS using `com.atproto.sync.getRepo` ··· 26 26 "live": true, 27 27 "did": "did:plc:abc123...", 28 28 "rev": "3mbspmpaidl2a", 29 - "collection": "pub.leaflet.document", 29 + "collection": "site.standard.document", 30 30 "rkey": "3lzyrj6q6gs27", 31 31 "action": "create", 32 32 "record": { ... }, ··· 42 42 | type | string | "record", "identity", "account" | message type | 43 43 | action | **string** | "create", "update", "delete" | NOT an enum! | 44 44 | live | bool | true/false | true = firehose, false = resync | 45 - | collection | string | e.g., "pub.leaflet.document" | lexicon collection | 45 + | collection | string | e.g., "site.standard.document" | lexicon collection | 46 46 47 47 ## gotchas 48 48 49 49 1. **action is a string, not an enum** - tap sends `"action": "create"` as a JSON string. if your parser expects an enum type, extraction will silently fail. use string comparison. 50 50 51 - 2. **collection filters apply to output** - `TAP_COLLECTION_FILTERS` controls which records tap sends to clients. records from other collections are fetched but not forwarded. 51 + 2. **collection filters apply during processing** - `TAP_COLLECTION_FILTERS` controls which records tap processes and sends to clients, both during live commits and resync CAR walks. records from other collections are skipped entirely. 52 52 53 53 3. **signal collection vs collection filters** - `TAP_SIGNAL_COLLECTION` controls auto-discovery of repos (which repos to track), while `TAP_COLLECTION_FILTERS` controls which records from those repos to output. a repo must either be auto-discovered via signal collection OR manually added via `/repos/add`. 54 54 ··· 178 178 | `/stats/repo-count` | number of tracked repos | 179 179 | `/stats/record-count` | total records processed | 180 180 | `/stats/outbox-buffer` | events waiting to be sent | 181 - | `/stats/resync-buffer` | DIDs waiting to be resynced | 181 + | `/stats/resync-buffer` | buffered commits for repos currently resyncing (NOT the resync queue) | 182 182 | `/stats/cursors` | firehose cursor position | 183 183 | `/info/:did` | repo status: `{"did":"...","state":"active","records":N}` | 184 184 | `/repos/add` | POST with `{"dids":["did:plc:..."]}` to add repos |
+8 -7
scripts/backfill-pds
··· 161 161 if not title: 162 162 return None 163 163 164 - # Get content - try textContent (site.standard), then leaflet blocks, then content/text 164 + # Get content - try textContent (site.standard), then leaflet blocks 165 165 content = value.get("textContent") or "" 166 166 if not content: 167 - # Try leaflet-style pages/blocks 167 + # Try leaflet-style pages/blocks at top level (pub.leaflet.document) 168 168 pages = value.get("pages", []) 169 169 if pages: 170 170 content = extract_leaflet_blocks(pages) 171 171 if not content: 172 - # Fall back to simple content/text fields 173 - content = value.get("content") or value.get("text") or "" 174 - if isinstance(content, dict): 175 - # Handle richtext format 176 - content = content.get("text", "") 172 + # Try content.pages (site.standard.document with pub.leaflet.content) 173 + content_obj = value.get("content") 174 + if isinstance(content_obj, dict): 175 + pages = content_obj.get("pages", []) 176 + if pages: 177 + content = extract_leaflet_blocks(pages) 177 178 178 179 # Get created_at 179 180 created_at = value.get("createdAt", "")
+1 -1
tap/fly.toml
··· 8 8 TAP_DATABASE_URL = 'sqlite:///data/tap.db' 9 9 TAP_BIND = ':2480' 10 10 TAP_RELAY_URL = 'https://relay1.us-east.bsky.network' 11 - TAP_SIGNAL_COLLECTION = 'pub.leaflet.document' 11 + TAP_SIGNAL_COLLECTION = 'site.standard.document' 12 12 TAP_COLLECTION_FILTERS = 'pub.leaflet.document,pub.leaflet.publication,site.standard.document,site.standard.publication' 13 13 TAP_LOG_LEVEL = 'info' 14 14 TAP_RESYNC_PARALLELISM = '1'