A better Rust ATProto crate
1//! Example: Subscribe to a PDS's subscribeRepos endpoint
2//!
3//! This demonstrates consuming the repo event stream directly from a PDS,
4//! which is what a Relay does to ingest updates from PDSes.
5//!
6//! Usage:
7//! cargo run --example subscribe_repos -- atproto.systems
8
9use clap::Parser;
10use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage};
11use jacquard_common::deps::fluent_uri::{ParseError, Uri};
12use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient};
13use miette::IntoDiagnostic;
14use n0_future::StreamExt;
15use smol_str::ToSmolStr;
16
17#[derive(Parser, Debug)]
18#[command(
19 author,
20 version,
21 about = "Subscribe to a PDS's subscribeRepos endpoint"
22)]
23struct Args {
24 /// PDS URL (e.g., atproto.systems or https://atproto.systems)
25 pds_url: String,
26
27 /// Starting cursor position
28 #[arg(short, long)]
29 cursor: Option<i64>,
30}
31
32fn normalize_uri(input: &str) -> Result<Uri<String>, ParseError> {
33 let without_scheme = input
34 .trim_start_matches("https://")
35 .trim_start_matches("http://")
36 .trim_start_matches("wss://")
37 .trim_start_matches("ws://");
38
39 let full = format!("wss://{}", without_scheme);
40 Uri::parse(full).map_err(|(e, _)| e)
41}
42
43fn print_message(msg: &SubscribeReposMessage) {
44 match msg {
45 SubscribeReposMessage::Commit(commit) => {
46 println!(
47 "Commit | repo={} seq={} time={} rev={} commit={} prev={}\n | ops={:?} ",
48 commit.repo,
49 commit.seq,
50 commit.time,
51 commit.rev,
52 commit.commit,
53 commit
54 .since
55 .as_ref()
56 .map(|ts| ts.to_smolstr())
57 .unwrap_or_default(),
58 commit.ops,
59 );
60 }
61 SubscribeReposMessage::Identity(identity) => {
62 println!(
63 "Identity | did={} seq={} time={} handle={:?}",
64 identity.did, identity.seq, identity.time, identity.handle
65 );
66 }
67 SubscribeReposMessage::Account(account) => {
68 println!(
69 "Account | did={} seq={} time={} active={} status={:?}",
70 account.did, account.seq, account.time, account.active, account.status
71 );
72 }
73 SubscribeReposMessage::Sync(sync) => {
74 println!(
75 "Sync | did={} seq={} time={} rev={} blocks={}b",
76 sync.did,
77 sync.seq,
78 sync.time,
79 sync.rev,
80 sync.blocks.len()
81 );
82 }
83 SubscribeReposMessage::Info(info) => {
84 println!("Info | name={} message={:?}", info.name, info.message);
85 }
86 SubscribeReposMessage::Unknown(data) => {
87 println!("Unknown message: {:?}", data);
88 }
89 }
90}
91
92#[tokio::main]
93async fn main() -> miette::Result<()> {
94 let args = Args::parse();
95
96 let base_url = normalize_uri(&args.pds_url).into_diagnostic()?;
97 println!("Connecting to {}", base_url);
98
99 // Create subscription client
100 let client = TungsteniteSubscriptionClient::from_base_uri(base_url);
101
102 // Subscribe with optional cursor
103 let params = if let Some(cursor) = args.cursor {
104 SubscribeRepos::new().cursor(cursor).build()
105 } else {
106 SubscribeRepos::new().build()
107 };
108 let stream = client.subscribe(¶ms).await.into_diagnostic()?;
109
110 println!("Connected! Streaming messages (Ctrl-C to stop)...\n");
111
112 // Set up Ctrl-C handler
113 let (tx, mut rx) = tokio::sync::oneshot::channel();
114 tokio::spawn(async move {
115 tokio::signal::ctrl_c().await.ok();
116 let _ = tx.send(());
117 });
118
119 // Convert to typed message stream
120 let (_sink, mut messages) = stream.into_stream();
121
122 loop {
123 tokio::select! {
124 Some(result) = messages.next() => {
125 match result {
126 Ok(msg) => print_message(&msg),
127 Err(e) => eprintln!("--- ERROR: {} ---", e),
128 }
129 }
130 _ = &mut rx => {
131 println!("\nShutting down...");
132 break;
133 }
134 }
135 }
136
137 Ok(())
138}