Rust wrapper for the ATProto tap utility
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 281 lines 7.8 kB view raw view rendered
1# tapped 2 3[![Crates.io Version](https://img.shields.io/crates/v/tapped)](https://crates.io/crates/tapped) 4[![docs.rs](https://img.shields.io/docsrs/tapped)](https://docs.rs/tapped) 5 6A Rust wrapper library for the [`tap`](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) ATProto sync utility. 7 8`tapped` provides an idiomatic async Rust interface for spawning and communicating with a `tap` subprocess, making it easy to build applications that sync data from the ATProto network. 9 10## Features 11 12- Spawn and manage `tap` subprocesses with graceful shutdown 13- Strongly-typed configuration for all tap envvars 14- Strongly-typed async Rust functions covering all of tap's HTTP API endpoints 15- WebSocket-based event channel with automatic acknowledgment 16 17## Installation 18 19Add to your `Cargo.toml`: 20 21```toml 22[dependencies] 23tapped = "0.2" 24``` 25 26You'll also need the `tap` binary. Build it from the [indigo repository](https://github.com/bluesky-social/indigo): 27 28```bash 29cd cmd/tap && go build 30``` 31 32`tapped` has been most recently tested against: 33 34``` 35tap version v0.0.0-20260120225912-12d69fa4d209-rev-12d69fa 36``` 37 38## Quick Start 39 40```rust 41use tapped::{TapHandle, TapConfig, Event}; 42 43#[tokio::main] 44async fn main() -> tapped::Result<()> { 45 let config = TapConfig::builder() 46 .database_url("sqlite://tap.db") 47 .collection_filter("app.bsky.feed.post") 48 .build(); 49 50 // Spawn tap and connect 51 let handle = TapHandle::spawn_default(config).await?; 52 53 // Subscribe to events 54 let (mut receiver, mut ack_sender) = handle.channel().await?; 55 56 while let Ok((event, ack_id)) = receiver.recv().await { 57 match event { 58 Event::Record(record) => { 59 println!("[{:?}] {}/{}", 60 record.action, 61 record.collection, 62 record.rkey 63 ); 64 } 65 Event::Identity(identity) => { 66 println!("Identity: {} -> {}", identity.did, identity.handle); 67 } 68 } 69 // Manual acknowledgment required 70 ack_sender.ack(ack_id).await?; 71 } 72 73 Ok(()) 74} 75``` 76 77## Usage Patterns 78 79### Connect to Existing Instance 80 81If you have a tap instance already running: 82 83```rust 84use tapped::TapClient; 85 86let client = TapClient::new("http://localhost:2480")?; 87client.health().await?; 88``` 89 90### Spawn with Custom Binary Path 91 92```rust 93use tapped::{TapProcess, TapConfig}; 94 95let config = TapConfig::builder() 96 .database_url("sqlite://my-app.db") 97 .build(); 98 99let mut process = TapProcess::spawn("/path/to/tap", config).await?; 100let client = process.client()?; 101 102// Use the client... 103 104process.shutdown().await?; 105``` 106 107### Using TapHandle (Recommended) 108 109`TapHandle` combines process management and client access: 110 111```rust 112use tapped::{TapHandle, TapConfig}; 113 114let config = TapConfig::builder() 115 .database_url("sqlite://app.db") 116 .full_network(false) 117 .build(); 118 119let handle = TapHandle::spawn_default(config).await?; 120 121// TapHandle derefs to TapClient, so you can call client methods directly 122handle.health().await?; 123let count = handle.repo_count().await?; 124println!("Tracking {} repos", count); 125``` 126 127### Configuration Options 128 129```rust 130use tapped::{TapConfig, LogLevel}; 131use std::time::Duration; 132 133let config = TapConfig::builder() 134 // Database 135 .database_url("sqlite://tap.db") 136 .max_db_conns(10) 137 138 // Network 139 .bind("127.0.0.1:2480") 140 .relay_url("wss://bsky.network".parse().unwrap()) 141 .plc_url("https://plc.directory".parse().unwrap()) 142 143 // Filtering 144 .signal_collection("app.bsky.feed.post") 145 .collection_filter("app.bsky.feed.post") 146 .collection_filter("app.bsky.feed.like") 147 .full_network(false) 148 149 // Performance 150 .firehose_parallelism(10) 151 .resync_parallelism(5) 152 .outbox_parallelism(10) 153 .outbox_capacity(10000) 154 155 // Timeouts 156 .repo_fetch_timeout(Duration::from_secs(30)) 157 .startup_timeout(Duration::from_secs(60)) 158 .shutdown_timeout(Duration::from_secs(10)) 159 160 // Logging 161 .log_level(LogLevel::Info) 162 163 .build(); 164``` 165 166### Working with Events 167 168Events must be manually acknowledged: 169 170```rust 171use tapped::{Event, RecordAction}; 172 173let (mut receiver, mut ack_sender) = client.channel().await?; 174 175while let Ok((event, ack_id)) = receiver.recv().await { 176 match event { 177 Event::Record(record) => { 178 match record.action { 179 RecordAction::Create => { 180 // Access the raw JSON as a string 181 if let Some(json) = record.record_as_str() { 182 println!("Raw JSON: {}", json); 183 } 184 185 // Or deserialize to a specific type 186 // let post: MyPostType = record.deserialize_as()?; 187 } 188 RecordAction::Update => { /* ... */ } 189 RecordAction::Delete => { /* ... */ } 190 _ => {} 191 } 192 } 193 Event::Identity(identity) => { 194 println!("{} is now @{}", identity.did, identity.handle); 195 } 196 } 197 // Ack must be sent manually 198 ack_sender.ack(ack_id).await?; 199} 200``` 201 202### Managing Repositories 203 204```rust 205// Add repos to track 206client.add_repos(&["did:plc:abc123", "did:plc:def456"]).await?; 207 208// Remove repos 209client.remove_repos(&["did:plc:abc123"]).await?; 210 211// Get info about a specific repo 212let info = client.repo_info("did:plc:def456").await?; 213println!("State: {:?}, Records: {}", info.state, info.records); 214 215// Resolve a DID to its document 216let doc = client.resolve_did("did:plc:def456").await?; 217println!("Handles: {:?}", doc.also_known_as); 218``` 219 220### Checking Stats 221 222```rust 223let repos = client.repo_count().await?; 224let records = client.record_count().await?; 225let outbox = client.outbox_buffer().await?; 226let resync = client.resync_buffer().await?; 227let cursors = client.cursors().await?; 228 229println!("Tracking {} repos with {} records", repos, records); 230println!("Outbox buffer: {}, Resync buffer: {}", outbox, resync); 231println!("Firehose cursor: {:?}", cursors.firehose); 232``` 233 234## Example: Syncing standard.site Records with Schema Generation and Validation 235 236The repository includes a complete example demonstrating how to sync and validate ATProto records using `tapped` together with the [jacquard](https://crates.io/crates/jacquard) crates. 237 238The jacquard ecosystem provides runtime validation of records against their lexicon constraints, and the ability to generate Rust structs from lexicon JSON files. 239 240``` 241tapped/ 242├── tapped/ # The main tapped library 243├── lexicons-example/ # Generated types from lexicon schemas 244│ ├── lexicons/ # Source lexicon JSON files 245│ │ ├── site.standard.publication.json 246│ │ ├── site.standard.document.json 247│ │ └── ... 248│ └── src/ # Generated Rust code 249└── standard-site-sync/ # Example binary using both packages 250``` 251 252These files were generated like so: 253 254```bash 255# Install the code generator 256cargo install jacquard-lexgen 257 258jacquard-codegen -i lexicons-example/lexicons -o lexicons-example/src 259``` 260 261This produces strongly-typed structs with built-in validation. For example, the `site.standard.publication` lexicon becomes: 262 263```rust 264use lexicons_example::site_standard::publication::Publication; 265 266// Deserialize from JSON 267let publication: Publication = serde_json::from_str(json)?; 268 269// Validate against lexicon constraints (max length, grapheme limits, etc.) 270publication.validate()?; 271 272// Access typed fields 273println!("Name: {}", publication.name.as_str()); 274println!("URL: {}", publication.url.as_str()); 275``` 276 277For more detail see `process_record_event` [in `main.rs`](https://tangled.org/octet-stream.net/tapped/blob/main/standard-site-sync/src/main.rs#L140-201). 278 279## License 280 281MIT