A better Rust ATProto crate
103
fork

Configure Feed

Select the types of activity you want to include in your feed.

at pretty-codegen 130 lines 4.1 kB view raw
1//! Example: Subscribe to Jetstream firehose 2//! 3//! Jetstream is a JSON-based alternative to the standard DAG-CBOR firehose. 4//! It streams all public network updates in a simplified format. 5//! 6//! Usage: 7//! cargo run --example subscribe_jetstream 8//! cargo run --example subscribe_jetstream -- jetstream2.us-west.bsky.network 9 10use clap::Parser; 11use jacquard_common::deps::fluent_uri::{ParseError, Uri}; 12use jacquard_common::jetstream::{CommitOperation, JetstreamMessage, JetstreamParams}; 13use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 14use miette::IntoDiagnostic; 15use n0_future::StreamExt; 16 17#[derive(Parser, Debug)] 18#[command(author, version, about = "Subscribe to Jetstream firehose")] 19struct Args { 20 /// Jetstream URL (e.g., jetstream1.us-east.fire.hose.cam) 21 #[arg(default_value = "jetstream1.us-east.fire.hose.cam")] 22 jetstream_url: String, 23} 24 25fn normalize_uri(input: &str) -> Result<Uri<String>, ParseError> { 26 let without_scheme = input 27 .trim_start_matches("https://") 28 .trim_start_matches("http://") 29 .trim_start_matches("wss://") 30 .trim_start_matches("ws://"); 31 32 let full = format!("wss://{}", without_scheme); 33 Uri::parse(full).map_err(|(e, _)| e) 34} 35 36fn print_message(msg: &JetstreamMessage) { 37 match msg { 38 JetstreamMessage::Commit { 39 did, 40 time_us, 41 commit, 42 } => { 43 let op = match commit.operation { 44 CommitOperation::Create => "create", 45 CommitOperation::Update => "update", 46 CommitOperation::Delete => "delete", 47 }; 48 println!( 49 "Commit | did={} time_us={} op={} collection={} rkey={} cid={:?}", 50 did, time_us, op, commit.collection, commit.rkey, commit.cid 51 ); 52 } 53 JetstreamMessage::Identity { 54 did, 55 time_us, 56 identity, 57 } => { 58 println!( 59 "Identity | did={} time_us={} handle={:?} seq={} time={}", 60 did, time_us, identity.handle, identity.seq, identity.time 61 ); 62 } 63 JetstreamMessage::Account { 64 did, 65 time_us, 66 account, 67 } => { 68 println!( 69 "Account | did={} time_us={} active={} seq={} time={} status={:?}", 70 did, time_us, account.active, account.seq, account.time, account.status 71 ); 72 } 73 } 74} 75 76#[tokio::main] 77async fn main() -> miette::Result<()> { 78 let args = Args::parse(); 79 80 let base_url = normalize_uri(&args.jetstream_url).into_diagnostic()?; 81 println!("Connecting to {}", base_url); 82 83 // Create subscription client 84 let client = TungsteniteSubscriptionClient::from_base_uri(base_url); 85 86 // Subscribe with no filters (firehose mode) 87 // Enable compression if zstd feature is available 88 #[cfg(feature = "zstd")] 89 let params = { JetstreamParams::new().compress(true).build() }; 90 91 #[cfg(not(feature = "zstd"))] 92 let params = { JetstreamParams::new().build() }; 93 94 let stream = client.subscribe(&params).await.into_diagnostic()?; 95 96 println!("Connected! Streaming messages (Ctrl-C to stop)...\n"); 97 98 // Set up Ctrl-C handler 99 let (tx, mut rx) = tokio::sync::oneshot::channel(); 100 tokio::spawn(async move { 101 tokio::signal::ctrl_c().await.ok(); 102 let _ = tx.send(()); 103 }); 104 105 // Convert to typed message stream 106 let (_sink, mut messages) = stream.into_stream(); 107 108 let mut count = 0u64; 109 110 loop { 111 tokio::select! { 112 Some(result) = messages.next() => { 113 match result { 114 Ok(msg) => { 115 count += 1; 116 print_message(&msg); 117 } 118 Err(e) => eprintln!("Error: {}", e), 119 } 120 } 121 _ = &mut rx => { 122 println!("\nReceived {} messages", count); 123 println!("Shutting down..."); 124 break; 125 } 126 } 127 } 128 129 Ok(()) 130}