A better Rust ATProto crate
103
fork

Configure Feed

Select the types of activity you want to include in your feed.

at pretty-codegen 138 lines 4.3 kB view raw
1//! Example: Subscribe to a PDS's subscribeRepos endpoint 2//! 3//! This demonstrates consuming the repo event stream directly from a PDS, 4//! which is what a Relay does to ingest updates from PDSes. 5//! 6//! Usage: 7//! cargo run --example subscribe_repos -- atproto.systems 8 9use clap::Parser; 10use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; 11use jacquard_common::deps::fluent_uri::{ParseError, Uri}; 12use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 13use miette::IntoDiagnostic; 14use n0_future::StreamExt; 15use smol_str::ToSmolStr; 16 17#[derive(Parser, Debug)] 18#[command( 19 author, 20 version, 21 about = "Subscribe to a PDS's subscribeRepos endpoint" 22)] 23struct Args { 24 /// PDS URL (e.g., atproto.systems or https://atproto.systems) 25 pds_url: String, 26 27 /// Starting cursor position 28 #[arg(short, long)] 29 cursor: Option<i64>, 30} 31 32fn normalize_uri(input: &str) -> Result<Uri<String>, ParseError> { 33 let without_scheme = input 34 .trim_start_matches("https://") 35 .trim_start_matches("http://") 36 .trim_start_matches("wss://") 37 .trim_start_matches("ws://"); 38 39 let full = format!("wss://{}", without_scheme); 40 Uri::parse(full).map_err(|(e, _)| e) 41} 42 43fn print_message(msg: &SubscribeReposMessage) { 44 match msg { 45 SubscribeReposMessage::Commit(commit) => { 46 println!( 47 "Commit | repo={} seq={} time={} rev={} commit={} prev={}\n | ops={:?} ", 48 commit.repo, 49 commit.seq, 50 commit.time, 51 commit.rev, 52 commit.commit, 53 commit 54 .since 55 .as_ref() 56 .map(|ts| ts.to_smolstr()) 57 .unwrap_or_default(), 58 commit.ops, 59 ); 60 } 61 SubscribeReposMessage::Identity(identity) => { 62 println!( 63 "Identity | did={} seq={} time={} handle={:?}", 64 identity.did, identity.seq, identity.time, identity.handle 65 ); 66 } 67 SubscribeReposMessage::Account(account) => { 68 println!( 69 "Account | did={} seq={} time={} active={} status={:?}", 70 account.did, account.seq, account.time, account.active, account.status 71 ); 72 } 73 SubscribeReposMessage::Sync(sync) => { 74 println!( 75 "Sync | did={} seq={} time={} rev={} blocks={}b", 76 sync.did, 77 sync.seq, 78 sync.time, 79 sync.rev, 80 sync.blocks.len() 81 ); 82 } 83 SubscribeReposMessage::Info(info) => { 84 println!("Info | name={} message={:?}", info.name, info.message); 85 } 86 SubscribeReposMessage::Unknown(data) => { 87 println!("Unknown message: {:?}", data); 88 } 89 } 90} 91 92#[tokio::main] 93async fn main() -> miette::Result<()> { 94 let args = Args::parse(); 95 96 let base_url = normalize_uri(&args.pds_url).into_diagnostic()?; 97 println!("Connecting to {}", base_url); 98 99 // Create subscription client 100 let client = TungsteniteSubscriptionClient::from_base_uri(base_url); 101 102 // Subscribe with optional cursor 103 let params = if let Some(cursor) = args.cursor { 104 SubscribeRepos::new().cursor(cursor).build() 105 } else { 106 SubscribeRepos::new().build() 107 }; 108 let stream = client.subscribe(&params).await.into_diagnostic()?; 109 110 println!("Connected! Streaming messages (Ctrl-C to stop)...\n"); 111 112 // Set up Ctrl-C handler 113 let (tx, mut rx) = tokio::sync::oneshot::channel(); 114 tokio::spawn(async move { 115 tokio::signal::ctrl_c().await.ok(); 116 let _ = tx.send(()); 117 }); 118 119 // Convert to typed message stream 120 let (_sink, mut messages) = stream.into_stream(); 121 122 loop { 123 tokio::select! { 124 Some(result) = messages.next() => { 125 match result { 126 Ok(msg) => print_message(&msg), 127 Err(e) => eprintln!("--- ERROR: {} ---", e), 128 } 129 } 130 _ = &mut rx => { 131 println!("\nShutting down..."); 132 break; 133 } 134 } 135 } 136 137 Ok(()) 138}