very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
60
fork

Configure Feed

Select the types of activity you want to include in your feed.

[tests] implement full network crawler verification

- add 'tests/verify_crawler.nu' integration test with backfill disabled
- add 'tests/mock_relay.nu' to mock listRepos endpoint with did:web support
- add 'disable_backfill' and 'disable_firehose' config options
- update 'src/api/debug.rs' to deserialize cursors as strings

dawn b5249e77 2d23aa5b

+261 -46
+1
.gitignore
··· 3 3 .envrc 4 4 result 5 5 .env 6 + mock_debug.log
+1
flake.nix
··· 27 27 go 28 28 cmake 29 29 websocat 30 + http-nu 30 31 ]; 31 32 }; 32 33 };
+3
src/api/debug.rs
··· 106 106 if let Ok(arr) = value.try_into() { 107 107 return Value::Number(u64::from_be_bytes(arr).into()); 108 108 } 109 + if let Ok(s) = String::from_utf8(value.to_vec()) { 110 + return Value::String(s); 111 + } 109 112 } 110 113 "blocks" => { 111 114 if let Ok(val) = serde_ipld_dagcbor::from_slice::<Value>(value) {
+6
src/config.rs
··· 52 52 pub enable_debug: bool, 53 53 pub verify_signatures: SignatureVerification, 54 54 pub identity_cache_size: u64, 55 + pub disable_firehose: bool, 56 + pub disable_backfill: bool, 55 57 } 56 58 57 59 impl Config { ··· 97 99 let debug_port = cfg!("DEBUG_PORT", 3001u16); 98 100 let verify_signatures = cfg!("VERIFY_SIGNATURES", SignatureVerification::Full); 99 101 let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 100_000u64); 102 + let disable_firehose = cfg!("DISABLE_FIREHOSE", false); 103 + let disable_backfill = cfg!("DISABLE_BACKFILL", false); 100 104 101 105 Ok(Self { 102 106 database_path, ··· 114 118 enable_debug, 115 119 verify_signatures, 116 120 identity_cache_size, 121 + disable_firehose, 122 + disable_backfill, 117 123 }) 118 124 } 119 125 }
+57 -46
src/main.rs
··· 39 39 ); 40 40 } 41 41 42 - tokio::spawn({ 43 - let state = state.clone(); 44 - let timeout = cfg.repo_fetch_timeout; 45 - BackfillWorker::new( 46 - state, 47 - backfill_rx, 48 - timeout, 49 - cfg.backfill_concurrency_limit, 50 - matches!( 51 - cfg.verify_signatures, 52 - SignatureVerification::Full | SignatureVerification::BackfillOnly 53 - ), 54 - ) 55 - .run() 56 - }); 57 - 58 - let firehose_worker = std::thread::spawn({ 59 - let state = state.clone(); 60 - let handle = tokio::runtime::Handle::current(); 61 - move || { 62 - FirehoseWorker::new( 42 + if !cfg.disable_backfill { 43 + tokio::spawn({ 44 + let state = state.clone(); 45 + let timeout = cfg.repo_fetch_timeout; 46 + BackfillWorker::new( 63 47 state, 64 - buffer_rx, 65 - matches!(cfg.verify_signatures, SignatureVerification::Full), 48 + backfill_rx, 49 + timeout, 50 + cfg.backfill_concurrency_limit, 51 + matches!( 52 + cfg.verify_signatures, 53 + SignatureVerification::Full | SignatureVerification::BackfillOnly 54 + ), 66 55 ) 67 - .run(handle) 68 - } 69 - }); 56 + .run() 57 + }); 58 + } 70 59 71 60 if let Err(e) = spawn_blocking({ 72 61 let state = state.clone(); ··· 167 156 ); 168 157 } 169 158 170 - let ingestor = FirehoseIngestor::new( 171 - state.clone(), 172 - buffer_tx, 173 - cfg.relay_host, 174 - cfg.full_network, 175 - matches!(cfg.verify_signatures, SignatureVerification::Full), 176 - ); 159 + let tasks = if !cfg.disable_firehose { 160 + let firehose_worker = std::thread::spawn({ 161 + let state = state.clone(); 162 + let handle = tokio::runtime::Handle::current(); 163 + move || { 164 + FirehoseWorker::new( 165 + state, 166 + buffer_rx, 167 + matches!(cfg.verify_signatures, SignatureVerification::Full), 168 + ) 169 + .run(handle) 170 + } 171 + }); 177 172 178 - let res = futures::future::try_join_all::<[BoxFuture<_>; _]>([ 179 - Box::pin( 180 - tokio::task::spawn_blocking(move || { 181 - firehose_worker 182 - .join() 183 - .map_err(|e| miette::miette!("buffer processor thread died: {e:?}")) 184 - }) 185 - .map(|r| r.into_diagnostic().flatten().flatten()), 186 - ), 187 - Box::pin(ingestor.run()), 188 - ]); 189 - if let Err(e) = res.await { 190 - error!("ingestor or buffer processor died: {e}"); 173 + let ingestor = FirehoseIngestor::new( 174 + state.clone(), 175 + buffer_tx, 176 + cfg.relay_host, 177 + cfg.full_network, 178 + matches!(cfg.verify_signatures, SignatureVerification::Full), 179 + ); 180 + 181 + vec![ 182 + Box::pin( 183 + tokio::task::spawn_blocking(move || { 184 + firehose_worker 185 + .join() 186 + .map_err(|e| miette::miette!("buffer processor thread died: {e:?}")) 187 + }) 188 + .map(|r| r.into_diagnostic().flatten().flatten()), 189 + ) as BoxFuture<_>, 190 + Box::pin(ingestor.run()), 191 + ] 192 + } else { 193 + info!("firehose ingestion disabled by config"); 194 + // if firehose is disabled, we just wait indefinitely (or until signal) 195 + // essentially we just want to keep the main thread alive for the other components 196 + vec![Box::pin(futures::future::pending::<miette::Result<()>>()) as BoxFuture<_>] 197 + }; 198 + 199 + let res = futures::future::select_all(tasks); 200 + if let (Err(e), _, _) = res.await { 201 + error!("critical worker died: {e}"); 191 202 db::check_poisoned_report(&e); 192 203 } 193 204
+62
tests/mock_relay.nu
··· 1 + # mock_relay.nu 2 + 3 + # A closure that handles HTTP requests 4 + {|req| 5 + 6 + 7 + # check path 8 + if ($req.path | str starts-with "/xrpc/com.atproto.sync.listRepos") { 9 + 10 + # parse query params if any 11 + let query_string = ($req.path | split row "?" | get 1? | default "") 12 + let params = if ($query_string | is-empty) { 13 + [] 14 + } else { 15 + ($query_string | split row "&" | each { |it| $it | split row "=" }) 16 + } 17 + let cursor = ($params | where { |x| $x.0 == "cursor" } | get 0?.1?) 18 + 19 + # define some mock repos 20 + let all_repos = [ 21 + { did: "did:web:mock1.com", head: "bafyreidf747c4x3lps3k4n357l3a3r57k3k465743k573k465743k5", rev: "3j6s746574657" }, 22 + { did: "did:web:mock2.com", head: "bafyreidf747c4x3lps3k4n357l3a3r57k3k465743k573k465743k5", rev: "3j6s746574657" }, 23 + { did: "did:web:mock3.com", head: "bafyreidf747c4x3lps3k4n357l3a3r57k3k465743k573k465743k5", rev: "3j6s746574657" }, 24 + { did: "did:web:mock4.com", head: "bafyreidf747c4x3lps3k4n357l3a3r57k3k465743k573k465743k5", rev: "3j6s746574657" }, 25 + { did: "did:web:mock5.com", head: "bafyreidf747c4x3lps3k4n357l3a3r57k3k465743k573k465743k5", rev: "3j6s746574657" } 26 + ] 27 + 28 + let repos = if ($cursor == "50") { 29 + [] 30 + } else { 31 + $all_repos 32 + } 33 + 34 + let next_cursor = if ($cursor == "50") { 35 + null 36 + } else { 37 + "50" 38 + } 39 + 40 + { 41 + cursor: $next_cursor, 42 + repos: $repos 43 + } 44 + | to json 45 + | metadata set --merge { 46 + http.response: { 47 + headers: { 48 + "Content-Type": "application/json" 49 + } 50 + } 51 + } 52 + 53 + } else { 54 + # 404 55 + "not found" 56 + | metadata set --merge { 57 + http.response: { 58 + status: 404 59 + } 60 + } 61 + } 62 + }
+131
tests/verify_crawler.nu
··· 1 + #!/usr/bin/env nu 2 + use common.nu * 3 + 4 + def main [] { 5 + # 1. ensure http-nu is installed 6 + if (which http-nu | is-empty) { 7 + print "http-nu not found, installing..." 8 + cargo install http-nu 9 + } 10 + 11 + # 2. setup ports and paths 12 + let port = 3006 13 + let mock_port = 3008 14 + let url = $"http://localhost:($port)" 15 + let debug_url = $"http://localhost:($port + 1)" 16 + let mock_url = $"http://localhost:($mock_port)" 17 + let db_path = (mktemp -d -t hydrant_full_net.XXXXXX) 18 + 19 + print $"testing full network crawler..." 20 + print $"database path: ($db_path)" 21 + 22 + # 3. start mock relay 23 + print $"starting mock relay on ($mock_port)..." 24 + let mock_pid = ( 25 + bash -c $"http-nu :($mock_port) tests/mock_relay.nu > ($db_path)/mock.log 2>&1 & echo $!" 26 + | str trim 27 + | into int 28 + ) 29 + print $"mock relay pid: ($mock_pid)" 30 + 31 + # give mock relay a moment 32 + sleep 1sec 33 + 34 + # 4. start hydrant in full network mode, firehose disabled 35 + let binary = build-hydrant 36 + 37 + let log_file = $"($db_path)/hydrant.log" 38 + print $"starting hydrant - logs at ($log_file)..." 39 + 40 + let hydrant_pid = ( 41 + with-env { 42 + HYDRANT_DATABASE_PATH: ($db_path), 43 + HYDRANT_FULL_NETWORK: "true", 44 + HYDRANT_RELAY_HOST: ($mock_url), 45 + HYDRANT_DISABLE_FIREHOSE: "true", 46 + HYDRANT_DISABLE_BACKFILL: "true", 47 + HYDRANT_API_PORT: ($port | into string), 48 + HYDRANT_ENABLE_DEBUG: "true", # for stats checking 49 + HYDRANT_DEBUG_PORT: ($port + 1 | into string), 50 + HYDRANT_LOG_LEVEL: "debug", 51 + HYDRANT_CURSOR_SAVE_INTERVAL: "1" # faster save 52 + } { 53 + sh -c $"($binary) >($log_file) 2>&1 & echo $!" | str trim | into int 54 + } 55 + ) 56 + print $"hydrant started with pid: ($hydrant_pid)" 57 + 58 + mut success = false 59 + 60 + try { 61 + if (wait-for-api $url) { 62 + print "hydrant api is up." 63 + 64 + # wait for crawler to run (it runs on startup) 65 + print "waiting for crawler to fetch repos..." 66 + 67 + # retry check for 30s 68 + for i in 1..30 { 69 + let stats = (http get $"($url)/stats?accurate=true").counts 70 + let pending = ($stats.pending | into int) 71 + let repos = ($stats.repos | default 0 | into int) 72 + 73 + # we expect 5 repos from the mock 74 + print $"[($i)/30] pending: ($pending), known_repos: ($repos)" 75 + 76 + if $repos >= 5 { 77 + print "crawler successfully discovered repos!" 78 + $success = true 79 + break 80 + } 81 + 82 + sleep 1sec 83 + } 84 + 85 + if not $success { 86 + print "timeout waiting for crawler." 87 + } 88 + 89 + # check cursor persistence 90 + print "verifying crawler cursor persistence..." 91 + let cursor_check = try { 92 + let cursor_res = (http get $"($debug_url)/debug/get?partition=cursors&key=crawler_cursor").value 93 + print $"cursor value from debug: ($cursor_res)" 94 + 95 + if $cursor_res == "50" { 96 + print "cursor verified." 97 + true 98 + } else { 99 + print "cursor mismatch or missing." 100 + false 101 + } 102 + } catch { 103 + print "failed to get cursor from debug endpoint" 104 + false 105 + } 106 + if not $cursor_check { $success = false } 107 + 108 + } else { 109 + print "hydrant failed to start." 110 + } 111 + } catch { |e| 112 + print $"test failed with error: ($e)" 113 + } 114 + 115 + # cleanup 116 + print "stopping processes..." 117 + try { kill $hydrant_pid } 118 + try { kill $mock_pid } 119 + 120 + if $success { 121 + print "test passed!" 122 + exit 0 123 + } else { 124 + print "test failed!" 125 + print "hydrant logs:" 126 + open $log_file | tail -n 20 127 + print "mock logs:" 128 + open $"($db_path)/mock.log" 129 + exit 1 130 + } 131 + }