forked from
octet-stream.net/tapped
Rust wrapper for the ATProto tap utility
1# tapped
2
3[](https://crates.io/crates/tapped)
4[](https://docs.rs/tapped)
5
6A Rust wrapper library for the [`tap`](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) ATProto sync utility.
7
8`tapped` provides an idiomatic async Rust interface for spawning and communicating with a `tap` subprocess, making it easy to build applications that sync data from the ATProto network.
9
10## Features
11
12- Spawn and manage `tap` subprocesses with graceful shutdown
13- Strongly-typed configuration for all tap envvars
14- Strongly-typed async Rust functions covering all of tap's HTTP API endpoints
15- WebSocket-based event channel with automatic acknowledgment
16
17## Installation
18
19Add to your `Cargo.toml`:
20
21```toml
22[dependencies]
23tapped = "0.2"
24```
25
26You'll also need the `tap` binary. Build it from the [indigo repository](https://github.com/bluesky-social/indigo):
27
28```bash
29cd cmd/tap && go build
30```
31
32`tapped` has been most recently tested against:
33
34```
35tap version v0.0.0-20260120225912-12d69fa4d209-rev-12d69fa
36```
37
38## Quick Start
39
40```rust
41use tapped::{TapHandle, TapConfig, Event};
42
43#[tokio::main]
44async fn main() -> tapped::Result<()> {
45 let config = TapConfig::builder()
46 .database_url("sqlite://tap.db")
47 .collection_filter("app.bsky.feed.post")
48 .build();
49
50 // Spawn tap and connect
51 let handle = TapHandle::spawn_default(config).await?;
52
53 // Subscribe to events
54 let (mut receiver, mut ack_sender) = handle.channel().await?;
55
56 while let Ok((event, ack_id)) = receiver.recv().await {
57 match event {
58 Event::Record(record) => {
59 println!("[{:?}] {}/{}",
60 record.action,
61 record.collection,
62 record.rkey
63 );
64 }
65 Event::Identity(identity) => {
66 println!("Identity: {} -> {}", identity.did, identity.handle);
67 }
68 }
69 // Manual acknowledgment required
70 ack_sender.ack(ack_id).await?;
71 }
72
73 Ok(())
74}
75```
76
77## Usage Patterns
78
79### Connect to Existing Instance
80
81If you have a tap instance already running:
82
83```rust
84use tapped::TapClient;
85
86let client = TapClient::new("http://localhost:2480")?;
87client.health().await?;
88```
89
90### Spawn with Custom Binary Path
91
92```rust
93use tapped::{TapProcess, TapConfig};
94
95let config = TapConfig::builder()
96 .database_url("sqlite://my-app.db")
97 .build();
98
99let mut process = TapProcess::spawn("/path/to/tap", config).await?;
100let client = process.client()?;
101
102// Use the client...
103
104process.shutdown().await?;
105```
106
107### Using TapHandle (Recommended)
108
109`TapHandle` combines process management and client access:
110
111```rust
112use tapped::{TapHandle, TapConfig};
113
114let config = TapConfig::builder()
115 .database_url("sqlite://app.db")
116 .full_network(false)
117 .build();
118
119let handle = TapHandle::spawn_default(config).await?;
120
121// TapHandle derefs to TapClient, so you can call client methods directly
122handle.health().await?;
123let count = handle.repo_count().await?;
124println!("Tracking {} repos", count);
125```
126
127### Configuration Options
128
129```rust
130use tapped::{TapConfig, LogLevel};
131use std::time::Duration;
132
133let config = TapConfig::builder()
134 // Database
135 .database_url("sqlite://tap.db")
136 .max_db_conns(10)
137
138 // Network
139 .bind("127.0.0.1:2480")
140 .relay_url("wss://bsky.network".parse().unwrap())
141 .plc_url("https://plc.directory".parse().unwrap())
142
143 // Filtering
144 .signal_collection("app.bsky.feed.post")
145 .collection_filter("app.bsky.feed.post")
146 .collection_filter("app.bsky.feed.like")
147 .full_network(false)
148
149 // Performance
150 .firehose_parallelism(10)
151 .resync_parallelism(5)
152 .outbox_parallelism(10)
153 .outbox_capacity(10000)
154
155 // Timeouts
156 .repo_fetch_timeout(Duration::from_secs(30))
157 .startup_timeout(Duration::from_secs(60))
158 .shutdown_timeout(Duration::from_secs(10))
159
160 // Logging
161 .log_level(LogLevel::Info)
162
163 .build();
164```
165
166### Working with Events
167
168Events must be manually acknowledged:
169
170```rust
171use tapped::{Event, RecordAction};
172
173let (mut receiver, mut ack_sender) = client.channel().await?;
174
175while let Ok((event, ack_id)) = receiver.recv().await {
176 match event {
177 Event::Record(record) => {
178 match record.action {
179 RecordAction::Create => {
180 // Access the raw JSON as a string
181 if let Some(json) = record.record_as_str() {
182 println!("Raw JSON: {}", json);
183 }
184
185 // Or deserialize to a specific type
186 // let post: MyPostType = record.deserialize_as()?;
187 }
188 RecordAction::Update => { /* ... */ }
189 RecordAction::Delete => { /* ... */ }
190 _ => {}
191 }
192 }
193 Event::Identity(identity) => {
194 println!("{} is now @{}", identity.did, identity.handle);
195 }
196 }
197 // Ack must be sent manually
198 ack_sender.ack(ack_id).await?;
199}
200```
201
202### Managing Repositories
203
204```rust
205// Add repos to track
206client.add_repos(&["did:plc:abc123", "did:plc:def456"]).await?;
207
208// Remove repos
209client.remove_repos(&["did:plc:abc123"]).await?;
210
211// Get info about a specific repo
212let info = client.repo_info("did:plc:def456").await?;
213println!("State: {:?}, Records: {}", info.state, info.records);
214
215// Resolve a DID to its document
216let doc = client.resolve_did("did:plc:def456").await?;
217println!("Handles: {:?}", doc.also_known_as);
218```
219
220### Checking Stats
221
222```rust
223let repos = client.repo_count().await?;
224let records = client.record_count().await?;
225let outbox = client.outbox_buffer().await?;
226let resync = client.resync_buffer().await?;
227let cursors = client.cursors().await?;
228
229println!("Tracking {} repos with {} records", repos, records);
230println!("Outbox buffer: {}, Resync buffer: {}", outbox, resync);
231println!("Firehose cursor: {:?}", cursors.firehose);
232```
233
234## Example: Syncing standard.site Records with Schema Generation and Validation
235
236The repository includes a complete example demonstrating how to sync and validate ATProto records using `tapped` together with the [jacquard](https://crates.io/crates/jacquard) crates.
237
238The jacquard ecosystem provides runtime validation of records against their lexicon constraints, and the ability to generate Rust structs from lexicon JSON files.
239
240```
241tapped/
242├── tapped/ # The main tapped library
243├── lexicons-example/ # Generated types from lexicon schemas
244│ ├── lexicons/ # Source lexicon JSON files
245│ │ ├── site.standard.publication.json
246│ │ ├── site.standard.document.json
247│ │ └── ...
248│ └── src/ # Generated Rust code
249└── standard-site-sync/ # Example binary using both packages
250```
251
252These files were generated like so:
253
254```bash
255# Install the code generator
256cargo install jacquard-lexgen
257
258jacquard-codegen -i lexicons-example/lexicons -o lexicons-example/src
259```
260
261This produces strongly-typed structs with built-in validation. For example, the `site.standard.publication` lexicon becomes:
262
263```rust
264use lexicons_example::site_standard::publication::Publication;
265
266// Deserialize from JSON
267let publication: Publication = serde_json::from_str(json)?;
268
269// Validate against lexicon constraints (max length, grapheme limits, etc.)
270publication.validate()?;
271
272// Access typed fields
273println!("Name: {}", publication.name.as_str());
274println!("URL: {}", publication.url.as_str());
275```
276
277For more detail see `process_record_event` [in `main.rs`](https://tangled.org/octet-stream.net/tapped/blob/main/standard-site-sync/src/main.rs#L140-201).
278
279## License
280
281MIT