Tap drinker
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: make crawling DIDs optional

Signed-off-by: tjh <x@tjh.dev>

tjh 09150613 ae586b81

+30 -13
+2 -2
README.md
··· 4 4 5 5 ## Example Usage 6 6 7 - In this example we'll *tap into* everything in the "sh.tangled.*" NSID starting from the @tangled.org AT repo. 7 + In this example we'll crawl all DIDs from the "sh.tangled.*" NSID, starting from the @tangled.org AT repo. 8 8 9 9 ### 1. Setup a PostgreSQL cluster and create a database 10 10 ··· 27 27 Run `trap`, seeding from the DID of @tangled.org: 28 28 29 29 ```bash 30 - RUST_LOG=debug,sqlx=warn TRAP_DATABASE_URL=postgresql:///trap_tangled trap --seed did:plc:wshs7t2adsemcrrd4snkeqli 30 + RUST_LOG=debug,sqlx=warn TRAP_DATABASE_URL=postgresql:///trap_tangled trap --crawl --seed did:plc:wshs7t2adsemcrrd4snkeqli 31 31 ``` 32 32 33 33 Trap will submit the seed DID to the Tap service. Each record returned by Tap is scanned, and any DIDs found will also be added to the Tap service.
+4
src/cli.rs
··· 24 24 /// DIDs to seed from. 25 25 #[arg(long, value_delimiter = ',', env = "TAP_DUMP_SEED")] 26 26 pub seed: Vec<String>, 27 + 28 + /// Crawl records for DIDs and add to the Tap service. 29 + #[arg(long)] 30 + pub crawl: bool, 27 31 } 28 32 29 33 pub fn parse() -> Arguments {
+24 -11
src/main.rs
··· 55 55 pool.clone(), 56 56 did_tx.clone(), 57 57 shutdown.child_token(), 58 + arguments.crawl, 58 59 )); 59 60 60 - tasks.spawn(did_task(tap, pool, did_rx, shutdown.child_token())); 61 + tasks.spawn(did_task( 62 + tap, 63 + pool, 64 + did_rx, 65 + shutdown.child_token(), 66 + arguments.crawl, 67 + )); 61 68 tasks.spawn(shutdown_task(shutdown.clone())); 62 69 63 70 // Submit seed DIDs to the Tap service. ··· 99 106 pool: PgPool, 100 107 tx: mpsc::UnboundedSender<(String, DidSource)>, 101 108 shutdown: CancellationToken, 109 + crawl: bool, 102 110 ) -> anyhow::Result<()> { 103 111 while let Some(Some((span, event, ack))) = shutdown.run_until_cancelled(channel.recv()).await { 104 112 async { ··· 107 115 TapEvent::Record(record) => { 108 116 let (record, parsed_record) = handle_record(record, &mut transaction).await?; 109 117 110 - // Expand the network of tracked DIDs. 111 - let nsid = record.collection.into_boxed_str(); 112 - for did in extract_dids(&parsed_record) { 113 - tx.send((did, DidSource::Record(nsid.clone())))?; 118 + if crawl { 119 + // Expand the network of tracked DIDs. 120 + let nsid = record.collection.into_boxed_str(); 121 + for did in extract_dids(&parsed_record) { 122 + tx.send((did, DidSource::Record(nsid.clone())))?; 123 + } 114 124 } 115 125 } 116 126 TapEvent::Identity(identity) => { ··· 202 212 pool: PgPool, 203 213 mut did_rx: mpsc::UnboundedReceiver<(String, DidSource)>, 204 214 shutdown: CancellationToken, 215 + crawl: bool, 205 216 ) -> anyhow::Result<()> { 206 217 const BATCH: usize = 64; 207 218 208 219 let mut seen: HashSet<String> = HashSet::with_capacity(10_000); 209 220 let mut dids = Vec::new(); 210 221 211 - // Query known DIDs from the database. 212 - let mut query = sqlx::query!("SELECT did FROM identity").fetch(&pool); 213 - while let Some(Ok(row)) = query.next().await { 214 - seen.insert(row.did); 222 + if crawl { 223 + // Query known DIDs from the database. 224 + let mut query = sqlx::query!("SELECT did FROM identity").fetch(&pool); 225 + while let Some(Ok(row)) = query.next().await { 226 + seen.insert(row.did); 227 + } 228 + 229 + tracing::debug!(count = seen.len(), "loaded tracked DIDs from database"); 215 230 } 216 - 217 - tracing::debug!(count = seen.len(), "loaded tracked DIDs from database"); 218 231 219 232 loop { 220 233 tokio::time::sleep(Duration::from_millis(200)).await;