A better Rust ATProto crate
1//! Example: Subscribe to Jetstream firehose
2//!
3//! Jetstream is a JSON-based alternative to the standard DAG-CBOR firehose.
4//! It streams all public network updates in a simplified format.
5//!
6//! Usage:
7//! cargo run --example subscribe_jetstream
8//! cargo run --example subscribe_jetstream -- jetstream2.us-west.bsky.network
9
10use clap::Parser;
11use jacquard_common::deps::fluent_uri::{ParseError, Uri};
12use jacquard_common::jetstream::{CommitOperation, JetstreamMessage, JetstreamParams};
13use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient};
14use miette::IntoDiagnostic;
15use n0_future::StreamExt;
16
17#[derive(Parser, Debug)]
18#[command(author, version, about = "Subscribe to Jetstream firehose")]
19struct Args {
20 /// Jetstream URL (e.g., jetstream1.us-east.fire.hose.cam)
21 #[arg(default_value = "jetstream1.us-east.fire.hose.cam")]
22 jetstream_url: String,
23}
24
25fn normalize_uri(input: &str) -> Result<Uri<String>, ParseError> {
26 let without_scheme = input
27 .trim_start_matches("https://")
28 .trim_start_matches("http://")
29 .trim_start_matches("wss://")
30 .trim_start_matches("ws://");
31
32 let full = format!("wss://{}", without_scheme);
33 Uri::parse(full).map_err(|(e, _)| e)
34}
35
36fn print_message(msg: &JetstreamMessage) {
37 match msg {
38 JetstreamMessage::Commit {
39 did,
40 time_us,
41 commit,
42 } => {
43 let op = match commit.operation {
44 CommitOperation::Create => "create",
45 CommitOperation::Update => "update",
46 CommitOperation::Delete => "delete",
47 };
48 println!(
49 "Commit | did={} time_us={} op={} collection={} rkey={} cid={:?}",
50 did, time_us, op, commit.collection, commit.rkey, commit.cid
51 );
52 }
53 JetstreamMessage::Identity {
54 did,
55 time_us,
56 identity,
57 } => {
58 println!(
59 "Identity | did={} time_us={} handle={:?} seq={} time={}",
60 did, time_us, identity.handle, identity.seq, identity.time
61 );
62 }
63 JetstreamMessage::Account {
64 did,
65 time_us,
66 account,
67 } => {
68 println!(
69 "Account | did={} time_us={} active={} seq={} time={} status={:?}",
70 did, time_us, account.active, account.seq, account.time, account.status
71 );
72 }
73 }
74}
75
76#[tokio::main]
77async fn main() -> miette::Result<()> {
78 let args = Args::parse();
79
80 let base_url = normalize_uri(&args.jetstream_url).into_diagnostic()?;
81 println!("Connecting to {}", base_url);
82
83 // Create subscription client
84 let client = TungsteniteSubscriptionClient::from_base_uri(base_url);
85
86 // Subscribe with no filters (firehose mode)
87 // Enable compression if zstd feature is available
88 #[cfg(feature = "zstd")]
89 let params = { JetstreamParams::new().compress(true).build() };
90
91 #[cfg(not(feature = "zstd"))]
92 let params = { JetstreamParams::new().build() };
93
94 let stream = client.subscribe(¶ms).await.into_diagnostic()?;
95
96 println!("Connected! Streaming messages (Ctrl-C to stop)...\n");
97
98 // Set up Ctrl-C handler
99 let (tx, mut rx) = tokio::sync::oneshot::channel();
100 tokio::spawn(async move {
101 tokio::signal::ctrl_c().await.ok();
102 let _ = tx.send(());
103 });
104
105 // Convert to typed message stream
106 let (_sink, mut messages) = stream.into_stream();
107
108 let mut count = 0u64;
109
110 loop {
111 tokio::select! {
112 Some(result) = messages.next() => {
113 match result {
114 Ok(msg) => {
115 count += 1;
116 print_message(&msg);
117 }
118 Err(e) => eprintln!("Error: {}", e),
119 }
120 }
121 _ = &mut rx => {
122 println!("\nReceived {} messages", count);
123 println!("Shutting down...");
124 break;
125 }
126 }
127 }
128
129 Ok(())
130}