very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[ingest] support multiple relays

dawn 85528411 2b3fcc56

+186 -121
+1 -1
Cargo.toml
··· 52 52 tempfile = "3.26.0" 53 53 54 54 [profile.dev] 55 - opt-level = 2 55 + opt-level = 2
+10 -9
README.md
··· 20 20 | backfill | backfill events are mixed into the live queue and prioritized (per-repo, acting as synchronization barrier) by the server. | backfill simply inserts historical events (`live: false`) into the global event log. streaming is just reading this log sequentially. synchronization is the same as tap, `live: true` vs `live: false`. | 21 21 | event types | `record`, `identity` (includes status) | `record`, `identity` (handle), `account` (status) | 22 22 23 + ### multiple relay support 24 + 25 + `hydrant` supports connecting to multiple relays simultaneously for both firehose ingestion and crawling. when `RELAY_HOSTS` is configured with multiple URLs: 26 + 27 + - one independent firehose stream loop is spawned per relay 28 + - one independent crawling loop is spawned per relay 29 + - each relay maintains its own firehose / crawler cursor state 30 + - all ingestion loops and crawlers share the same worker pool and database 31 + - all crawlers share the same pending queue for backfill 32 + 23 33 ## configuration 24 34 25 35 `hydrant` is configured via environment variables. all variables are prefixed with `HYDRANT_` (except `RUST_LOG`). ··· 52 62 | `ENABLE_CRAWLER` | `false` (if Filter), `true` (if Full) | whether to actively query the network for unknown repositories. | 53 63 | `CRAWLER_MAX_PENDING_REPOS` | `2000` | max pending repos for crawler. | 54 64 | `CRAWLER_RESUME_PENDING_REPOS` | `1000` | resume threshold for crawler pending repos. | 55 - 56 - ### multi-relay crawling 57 - 58 - the crawler supports querying multiple relays simultaneously. when `RELAY_HOSTS` is configured with multiple URLs: 59 - 60 - - one independent crawling loop is spawned per relay 61 - - each relay maintains its own cursor state 62 - - all crawlers share the same pending queue for backfill 63 - - firehose connection uses the first relay in the list 64 65 65 66 ## api 66 67
+1 -1
src/config.rs
··· 92 92 93 93 let relay_host: Url = cfg!( 94 94 "RELAY_HOST", 95 - Url::parse("wss://relay.fire.hose.cam").unwrap() 95 + Url::parse("wss://relay.fire.hose.cam/").unwrap() 96 96 ); 97 97 let relay_hosts = std::env::var("HYDRANT_RELAY_HOSTS") 98 98 .ok()
+16 -11
src/crawler/mod.rs
··· 3 3 use crate::db::{Db, keys, ser_repo_state}; 4 4 use crate::state::AppState; 5 5 use crate::types::RepoState; 6 - use crate::util::{ErrorForStatus, RetryOutcome, RetryWithBackoff, parse_retry_after}; 6 + use crate::util::{ErrorForStatus, RetryOutcome, RetryWithBackoff, parse_retry_after, relay_id}; 7 7 use chrono::{DateTime, TimeDelta, Utc}; 8 8 use futures::FutureExt; 9 9 use jacquard_api::com_atproto::repo::describe_repo::DescribeRepoOutput; ··· 214 214 } 215 215 216 216 async fn get_cursor(&self, relay_host: &Url) -> Result<Cursor> { 217 - let key = crawler_cursor_key(relay_host); 217 + let key = crawler_cursor_key(&relay_id(relay_host)); 218 218 let cursor_bytes = Db::get(self.state.db.cursors.clone(), &key).await?; 219 219 let cursor: Cursor = cursor_bytes 220 220 .as_deref() ··· 358 358 Ok(()) 359 359 } 360 360 361 - async fn crawl(crawler: Arc<Self>, relay_host: &Url) -> Result<()> { 362 - let mut relay_url = relay_host.clone(); 363 - match relay_url.scheme() { 364 - "wss" => relay_url 361 + fn base_url(url: &Url) -> Result<Url> { 362 + let mut url = url.clone(); 363 + match url.scheme() { 364 + "wss" => url 365 365 .set_scheme("https") 366 - .map_err(|_| miette::miette!("invalid url: {relay_url}"))?, 367 - "ws" => relay_url 366 + .map_err(|_| miette::miette!("invalid url: {url}"))?, 367 + "ws" => url 368 368 .set_scheme("http") 369 - .map_err(|_| miette::miette!("invalid url: {relay_url}"))?, 369 + .map_err(|_| miette::miette!("invalid url: {url}"))?, 370 370 _ => {} 371 371 } 372 + Ok(url) 373 + } 374 + 375 + async fn crawl(crawler: Arc<Self>, relay_host: &Url) -> Result<()> { 376 + let base_url = Self::base_url(relay_host)?; 372 377 373 378 let mut rng: SmallRng = rand::make_rng(); 374 379 let db = &crawler.state.db; ··· 431 436 } 432 437 } 433 438 434 - let mut list_repos_url = relay_url 439 + let mut list_repos_url = base_url 435 440 .join("/xrpc/com.atproto.sync.listRepos") 436 441 .into_diagnostic()?; 437 442 list_repos_url ··· 598 603 } 599 604 batch.insert( 600 605 &db.cursors, 601 - crawler_cursor_key(relay_host), 606 + crawler_cursor_key(&relay_id(relay_host)), 602 607 rmp_serde::to_vec(&cursor) 603 608 .into_diagnostic() 604 609 .wrap_err("cant serialize cursor")?,
+10 -5
src/db/keys.rs
··· 1 1 use jacquard_common::types::string::Did; 2 2 use smol_str::SmolStr; 3 - use url::Url; 4 3 5 4 use crate::db::types::{DbRkey, DbTid, TrimmedDid}; 5 + use crate::util::RelayId; 6 6 7 7 /// separator used for composite keys 8 8 pub const SEP: u8 = b'|'; ··· 163 163 TrimmedDid::try_from(&key[CRAWLER_RETRY_PREFIX.len()..]) 164 164 } 165 165 166 - pub fn crawler_cursor_key(relay_host: &Url) -> Vec<u8> { 167 - let mut key = b"crawler_cursor".to_vec(); 168 - key.push(SEP); 169 - key.extend_from_slice(relay_host.as_str().as_bytes()); 166 + pub fn crawler_cursor_key(relay_id: &RelayId) -> Vec<u8> { 167 + let mut key = b"crawler_cursor|".to_vec(); 168 + key.extend_from_slice(relay_id); 169 + key 170 + } 171 + 172 + pub fn firehose_cursor_key(relay_id: &RelayId) -> Vec<u8> { 173 + let mut key = b"firehose_cursor|".to_vec(); 174 + key.extend_from_slice(relay_id); 170 175 key 171 176 }
+15 -3
src/db/mod.rs
··· 13 13 use std::sync::Arc; 14 14 use std::sync::atomic::{AtomicBool, AtomicU64}; 15 15 16 + use crate::util::RelayId; 17 + 16 18 pub mod compaction; 17 19 pub mod filter; 18 20 pub mod gc; ··· 515 517 } 516 518 } 517 519 518 - pub fn set_firehose_cursor(db: &Db, cursor: i64) -> Result<()> { 520 + pub fn set_firehose_cursor(db: &Db, relay_id: &RelayId, cursor: i64) -> Result<()> { 519 521 db.cursors 520 - .insert(keys::CURSOR_KEY, cursor.to_be_bytes()) 522 + .insert(keys::firehose_cursor_key(relay_id), cursor.to_be_bytes()) 521 523 .into_diagnostic() 522 524 } 523 525 524 - pub async fn get_firehose_cursor(db: &Db) -> Result<Option<i64>> { 526 + pub async fn get_firehose_cursor(db: &Db, relay_id: &RelayId) -> Result<Option<i64>> { 527 + let per_relay_key = keys::firehose_cursor_key(relay_id); 528 + if let Some(v) = Db::get(db.cursors.clone(), per_relay_key).await? { 529 + return Ok(Some(i64::from_be_bytes( 530 + v.as_ref() 531 + .try_into() 532 + .into_diagnostic() 533 + .wrap_err("cursor is not 8 bytes")?, 534 + ))); 535 + } 536 + 525 537 Db::get(db.cursors.clone(), keys::CURSOR_KEY) 526 538 .await? 527 539 .map(|v| {
+28 -32
src/ingest/firehose.rs
··· 3 3 use crate::ingest::stream::{FirehoseStream, SubscribeReposMessage, decode_frame}; 4 4 use crate::ingest::{BufferTx, IngestMessage}; 5 5 use crate::state::AppState; 6 + use crate::util::RelayId; 6 7 use jacquard_common::IntoStatic; 7 8 use jacquard_common::types::did::Did; 8 9 use miette::{IntoDiagnostic, Result}; 9 10 use std::sync::Arc; 10 - use std::sync::atomic::Ordering; 11 11 use std::time::Duration; 12 12 use tracing::{debug, error, info, trace}; 13 13 use url::Url; ··· 16 16 state: Arc<AppState>, 17 17 buffer_tx: BufferTx, 18 18 relay_host: Url, 19 + relay_id: RelayId, 19 20 filter: FilterHandle, 20 21 _verify_signatures: bool, 21 22 } ··· 28 29 filter: FilterHandle, 29 30 verify_signatures: bool, 30 31 ) -> Self { 32 + let relay_id = crate::util::relay_id(&relay_host); 31 33 Self { 32 34 state, 33 35 buffer_tx, 34 36 relay_host, 37 + relay_id, 35 38 filter, 36 39 _verify_signatures: verify_signatures, 37 40 } 38 41 } 39 42 40 - pub async fn run(mut self) -> Result<()> { 43 + pub async fn run(self) -> Result<()> { 41 44 loop { 42 - let current_cursor = self.state.cur_firehose.load(Ordering::SeqCst); 43 - let start_cursor = if current_cursor > 0 { 44 - Some(current_cursor) 45 - } else { 46 - db::get_firehose_cursor(&self.state.db).await? 47 - }; 48 - match start_cursor { 49 - Some(c) => info!(cursor = %c, "resuming from cursor"), 50 - None => info!("no cursor found, live tailing"), 51 - } 45 + let start_cursor = db::get_firehose_cursor(&self.state.db, &self.relay_id).await?; 52 46 53 - if let Some(c) = start_cursor { 54 - self.state.cur_firehose.store(c, Ordering::SeqCst); 47 + match start_cursor { 48 + Some(c) => info!(relay = %self.relay_host, cursor = %c, "resuming from cursor"), 49 + None => info!(relay = %self.relay_host, "no cursor found, live tailing"), 55 50 } 56 51 57 - let mut stream = 58 - match FirehoseStream::connect(self.relay_host.clone(), start_cursor).await { 59 - Ok(s) => s, 60 - Err(e) => { 61 - error!(err = %e, "failed to connect to firehose, retrying in 5s"); 62 - tokio::time::sleep(Duration::from_secs(5)).await; 63 - continue; 64 - } 65 - }; 52 + let mut stream = match FirehoseStream::connect(self.relay_host.clone(), start_cursor) 53 + .await 54 + { 55 + Ok(s) => s, 56 + Err(e) => { 57 + error!(relay = %self.relay_host, err = %e, "failed to connect to firehose, retrying in 5s"); 58 + tokio::time::sleep(Duration::from_secs(5)).await; 59 + continue; 60 + } 61 + }; 66 62 67 - info!("firehose connected"); 63 + info!(relay = %self.relay_host, "firehose connected"); 68 64 69 65 while let Some(bytes_res) = stream.next().await { 70 66 let bytes = match bytes_res { 71 67 Ok(b) => b, 72 68 Err(e) => { 73 - error!(err = %e, "firehose stream error"); 69 + error!(relay = %self.relay_host, err = %e, "firehose stream error"); 74 70 break; 75 71 } 76 72 }; 77 73 match decode_frame(&bytes) { 78 74 Ok(msg) => self.handle_message(msg).await, 79 75 Err(e) => { 80 - error!(err = %e, "firehose stream error"); 76 + error!(relay = %self.relay_host, err = %e, "firehose stream error"); 81 77 break; 82 78 } 83 79 } 84 80 } 85 81 86 - error!("firehose disconnected, reconnecting in 5s..."); 82 + error!(relay = %self.relay_host, "firehose disconnected, reconnecting in 5s..."); 87 83 tokio::time::sleep(Duration::from_secs(5)).await; 88 84 } 89 85 } 90 86 91 - async fn handle_message(&mut self, msg: SubscribeReposMessage<'_>) { 87 + async fn handle_message(&self, msg: SubscribeReposMessage<'_>) { 92 88 let did = match &msg { 93 89 SubscribeReposMessage::Commit(commit) => &commit.repo, 94 90 SubscribeReposMessage::Identity(identity) => &identity.did, ··· 108 104 } 109 105 trace!(did = %did, "forwarding message to ingest buffer"); 110 106 111 - if let Err(e) = self 112 - .buffer_tx 113 - .send(IngestMessage::Firehose(msg.into_static())) 114 - { 107 + if let Err(e) = self.buffer_tx.send(IngestMessage::Firehose { 108 + relay_id: self.relay_id.clone(), 109 + msg: msg.into_static(), 110 + }) { 115 111 error!(err = %e, "failed to send message to buffer processor"); 116 112 } 117 113 }
+5 -1
src/ingest/mod.rs
··· 7 7 use jacquard_common::types::did::Did; 8 8 9 9 use crate::ingest::stream::SubscribeReposMessage; 10 + use crate::util::RelayId; 10 11 11 12 #[derive(Debug)] 12 13 pub enum IngestMessage { 13 - Firehose(SubscribeReposMessage<'static>), 14 + Firehose { 15 + relay_id: RelayId, 16 + msg: SubscribeReposMessage<'static>, 17 + }, 14 18 BackfillFinished(Did<'static>), 15 19 } 16 20
+5 -5
src/ingest/worker.rs
··· 121 121 // dispatch loop 122 122 while let Some(msg) = self.rx.blocking_recv() { 123 123 let did = match &msg { 124 - IngestMessage::Firehose(m) => match m { 124 + IngestMessage::Firehose { msg: m, .. } => match m { 125 125 SubscribeReposMessage::Commit(c) => &c.repo, 126 126 SubscribeReposMessage::Identity(i) => &i.did, 127 127 SubscribeReposMessage::Account(a) => &a.did, ··· 223 223 } 224 224 } 225 225 } 226 - IngestMessage::Firehose(msg) => { 226 + IngestMessage::Firehose { relay_id, msg } => { 227 227 let (did, seq) = match &msg { 228 228 SubscribeReposMessage::Commit(c) => (&c.repo, c.seq), 229 229 SubscribeReposMessage::Identity(i) => (&i.did, i.seq), ··· 261 261 } 262 262 } 263 263 264 - state 265 - .cur_firehose 266 - .store(seq, std::sync::atomic::Ordering::SeqCst); 264 + if let Some((_, cursor)) = state.relay_cursors.get(&relay_id) { 265 + cursor.store(seq, std::sync::atomic::Ordering::SeqCst); 266 + } 267 267 } 268 268 } 269 269
+30 -27
src/main.rs
··· 1 1 use futures::{FutureExt, future::BoxFuture}; 2 2 use hydrant::config::{Config, SignatureVerification}; 3 - use hydrant::db::{self, set_firehose_cursor}; 3 + use hydrant::db; 4 4 use hydrant::ingest::firehose::FirehoseIngestor; 5 5 use hydrant::state::AppState; 6 6 use hydrant::{api, backfill::BackfillWorker, ingest::worker::FirehoseWorker}; ··· 176 176 loop { 177 177 std::thread::sleep(persist_interval); 178 178 179 - // persist firehose cursor 180 - let seq = state.cur_firehose.load(Ordering::SeqCst); 181 - if let Err(e) = set_firehose_cursor(&state.db, seq) { 182 - error!(err = %e, "failed to save cursor"); 183 - db::check_poisoned_report(&e); 179 + // persist firehose cursors 180 + for (relay_id, (relay, cursor)) in &state.relay_cursors { 181 + let seq = cursor.load(Ordering::SeqCst); 182 + if seq > 0 { 183 + if let Err(e) = db::set_firehose_cursor(&state.db, relay_id, seq) { 184 + error!(relay = %relay, err = %e, "failed to save cursor"); 185 + db::check_poisoned_report(&e); 186 + } 187 + } 184 188 } 185 189 186 190 // persist counts ··· 249 253 } 250 254 }); 251 255 252 - let ingestor = FirehoseIngestor::new( 253 - state.clone(), 254 - buffer_tx, 255 - cfg.relays 256 - .first() 257 - .cloned() 258 - .expect("at least one relay host must be configured"), 259 - state.filter.clone(), 260 - matches!(cfg.verify_signatures, SignatureVerification::Full), 261 - ); 256 + let mut t: Vec<BoxFuture<miette::Result<()>>> = vec![Box::pin( 257 + tokio::task::spawn_blocking(move || { 258 + firehose_worker 259 + .join() 260 + .map_err(|e| miette::miette!("buffer processor died: {e:?}")) 261 + }) 262 + .map(|r| r.into_diagnostic().flatten().flatten()), 263 + )]; 264 + 265 + for relay_url in &cfg.relays { 266 + let ingestor = FirehoseIngestor::new( 267 + state.clone(), 268 + buffer_tx.clone(), 269 + relay_url.clone(), 270 + state.filter.clone(), 271 + matches!(cfg.verify_signatures, SignatureVerification::Full), 272 + ); 273 + t.push(Box::pin(ingestor.run())); 274 + } 262 275 263 - vec![ 264 - Box::pin( 265 - tokio::task::spawn_blocking(move || { 266 - firehose_worker 267 - .join() 268 - .map_err(|e| miette::miette!("buffer processor died: {e:?}")) 269 - }) 270 - .map(|r| r.into_diagnostic().flatten().flatten()), 271 - ) as BoxFuture<_>, 272 - Box::pin(ingestor.run()), 273 - ] 276 + t 274 277 } else { 275 278 info!("firehose ingestion disabled by config"); 276 279 // if firehose is disabled, we just wait indefinitely (or until signal)
+11 -2
src/state.rs
··· 1 + use std::collections::HashMap; 1 2 use std::sync::atomic::AtomicI64; 2 3 3 4 use miette::Result; 4 5 use tokio::sync::Notify; 6 + use url::Url; 5 7 6 8 use crate::{ 7 9 config::Config, 8 10 db::Db, 9 11 filter::{FilterHandle, new_handle}, 10 12 resolver::Resolver, 13 + util::{RelayId, relay_id}, 11 14 }; 12 15 13 16 pub struct AppState { 14 17 pub db: Db, 15 18 pub resolver: Resolver, 16 19 pub filter: FilterHandle, 17 - pub cur_firehose: AtomicI64, 20 + pub relay_cursors: HashMap<RelayId, (Url, AtomicI64)>, 18 21 pub backfill_notify: Notify, 19 22 } 20 23 ··· 25 28 let filter_config = crate::db::filter::load(&db.filter)?; 26 29 let filter = new_handle(filter_config); 27 30 31 + let relay_cursors = config 32 + .relays 33 + .iter() 34 + .map(|url| (relay_id(url), (url.clone(), AtomicI64::new(0)))) 35 + .collect(); 36 + 28 37 Ok(Self { 29 38 db, 30 39 resolver, 31 40 filter, 32 - cur_firehose: AtomicI64::new(0), 41 + relay_cursors, 33 42 backfill_notify: Notify::new(), 34 43 }) 35 44 }
+7
src/util.rs
··· 3 3 use rand::RngExt; 4 4 use reqwest::StatusCode; 5 5 use serde::{Deserialize, Deserializer, Serializer}; 6 + use url::Url; 7 + 8 + pub type RelayId = Vec<u8>; 9 + 10 + pub fn relay_id(url: &Url) -> RelayId { 11 + url.as_str().as_bytes().to_vec() 12 + } 6 13 7 14 /// outcome of [`RetryWithBackoff::retry`] when the operation does not succeed. 8 15 pub enum RetryOutcome<E> {
+38 -17
tests/authenticated_stream_test.nu
··· 1 1 #!/usr/bin/env nu 2 2 use common.nu * 3 3 4 - def main [] { 5 - let env_vars = load-env-file 6 - let did = ($env_vars | get --optional TEST_REPO) 7 - let password = ($env_vars | get --optional TEST_PASSWORD) 8 - 9 - if ($did | is-empty) or ($password | is-empty) { 10 - print "error: TEST_REPO and TEST_PASSWORD must be set in .env" 11 - exit 1 12 - } 13 - 14 - let pds_url = resolve-pds $did 15 - 16 - let port = 3005 4 + def run-auth-test [did: string, password: string, pds_url: string, relays: string, port: int] { 17 5 let url = $"http://localhost:($port)" 18 6 let ws_url = $"ws://localhost:($port)/stream" 19 7 let db_path = (mktemp -d -t hydrant_auth_test.XXXXXX) ··· 25 13 print "authentication successful" 26 14 27 15 # 2. start hydrant 28 - print $"starting hydrant on port ($port)..." 29 - let binary = build-hydrant 30 - let instance = start-hydrant $binary $db_path $port 16 + print $"starting hydrant on port ($port) with relays: ($relays)..." 17 + let binary = "target/debug/hydrant" # already built in main 18 + let instance = (with-env { HYDRANT_RELAY_HOSTS: $relays } { 19 + start-hydrant $binary $db_path $port 20 + }) 31 21 32 22 mut test_passed = false 33 23 ··· 175 165 print "cleaning up..." 176 166 try { kill -9 $instance.pid } 177 167 178 - if $test_passed { 168 + $test_passed 169 + } 170 + 171 + def main [] { 172 + let env_vars = load-env-file 173 + let did = ($env_vars | get --optional TEST_REPO) 174 + let password = ($env_vars | get --optional TEST_PASSWORD) 175 + 176 + if ($did | is-empty) or ($password | is-empty) { 177 + print "error: TEST_REPO and TEST_PASSWORD must be set in .env" 178 + exit 1 179 + } 180 + 181 + let pds_url = resolve-pds $did 182 + 183 + # ensure build 184 + build-hydrant | ignore 185 + 186 + print "=== running single-relay test ===" 187 + let relay1 = "wss://relay.fire.hose.cam" 188 + let success1 = run-auth-test $did $password $pds_url $relay1 3005 189 + 190 + print "" 191 + print "=== running multi-relay test ===" 192 + let relay_multi = "wss://relay.fire.hose.cam,wss://relay3.fr.hose.cam,wss://relay1.us-west.bsky.network,wss://relay1.us-east.bsky.network" 193 + let success2 = run-auth-test $did $password $pds_url $relay_multi 3015 194 + 195 + if $success1 and $success2 { 196 + print "" 197 + print "ALL AUTHENTICATED STREAM TESTS PASSED" 179 198 exit 0 180 199 } else { 200 + print "" 201 + print $"TESTS FAILED: single=($success1), multi=($success2)" 181 202 exit 1 182 203 } 183 204 }
+3 -2
tests/throttling_test.nu
··· 45 45 HYDRANT_DISABLE_BACKFILL: "true", # disable backfill so pending count stays up 46 46 HYDRANT_API_PORT: ($port | into string), 47 47 HYDRANT_LOG_LEVEL: "debug", 48 + RUST_LOG: "debug", 48 49 HYDRANT_CRAWLER_MAX_PENDING_REPOS: "2", 49 50 HYDRANT_CRAWLER_RESUME_PENDING_REPOS: "1" 50 51 } { ··· 88 89 sleep 2sec # give logging a moment 89 90 90 91 let logs = (open $log_file | str replace --all "\n" " ") 91 - if ($logs | str contains "crawler throttling: pending repos") { 92 + if ($logs | str contains "throttling: above max pending") { 92 93 print "CONFIRMED: crawler is throttling!" 93 94 94 95 # now testing resumption ··· 108 109 109 110 # check logs for resumption message 110 111 let logs_after = (open $log_file | str replace --all "\n" " ") 111 - if ($logs_after | str contains "crawler resuming") { 112 + if ($logs_after | str contains "throttling released") { 112 113 print "CONFIRMED: crawler resumed!" 113 114 $success = true 114 115 } else {
+6 -5
tests/verify_crawler.nu
··· 89 89 # check cursor persistence 90 90 print "verifying crawler cursor persistence..." 91 91 let cursor_check = try { 92 - # cursor key format: crawler_cursor|{scheme}://{host}:{port} 93 - let cursor_res = (http get $"($debug_url)/debug/get?partition=cursors&key=crawler_cursor|http://localhost:3008").value 94 - print $"cursor value from debug: ($cursor_res)" 92 + # cursor key format is now: crawler_cursor|{relay_id_hash} 93 + let cursor_iter = (http get $"($debug_url)/debug/iter?partition=cursors") 94 + print $"cursors in db: ($cursor_iter | to json)" 95 95 96 - # cursor should be non-empty if crawler successfully fetched repos 97 - if ($cursor_res | is-not-empty) { 96 + let has_cursor = ($cursor_iter.items | any { |it| ($it.0 | str starts-with "crawler_cursor") }) 97 + 98 + if $has_cursor { 98 99 print "cursor verified." 99 100 true 100 101 } else {