A better Rust ATProto crate
102
fork

Configure Feed

Select the types of activity you want to include in your feed.

crate docs, changelog updates, release info in readme

Orual 6f4a1089 1639c6bd

+62 -109
+4 -12
CHANGELOG.md
··· 57 57 - `AtUri::from_parts()` constructor for building URIs from components 58 58 - Proper Display and FromStr implementations 59 59 60 - **Memory-based credential session** (`jacquard`) 61 - - `MemoryCredentialSession` for in-memory session storage 62 - - Useful for short-lived applications or testing 63 - - No file I/O required 64 - 65 - **Collection record fetching improvements** (`jacquard-api`, `jacquard-lexicon`) 66 - - Generated `fetch_record()` convenience method on all record types 67 - - Fetches owned record without turbofish syntax: `Post::fetch_record(agent, uri).await` 68 - - Simplifies common pattern of fetching + converting to owned 60 + **Memory-based credential session helpers** (`jacquard`) (ty [@vielle.dev](https://tangled.org/@vielle.dev)) 69 61 70 62 **Axum improvements** (`jacquard-axum`) 71 63 - `XrpcError` now implements `IntoResponse` for better error handling ··· 76 68 - `subscribe_repos.rs`: Subscribe to PDS firehose with typed DAG-CBOR messages 77 69 - `subscribe_jetstream.rs`: Subscribe to Jetstream with typed JSON messages and optional compression 78 70 - `stream_get_blob.rs`: Download blobs using HTTP streaming 79 - - `app_password_example.rs`: App password authentication example 71 + - `app_password_example.rs`: App password authentication example (ty [@vielle.dev](https://tangled.org/@vielle.dev)) 80 72 81 73 **CID deserialization improvements** (`jacquard-common`) 82 74 - Fixed `Cid` type to properly deserialize CBOR tag 42 via `IpldCid::deserialize` ··· 95 87 - Override `decode_message()` in trait impls to use framed decoding 96 88 - All record types now have `fetch_uri()` and `fetch_record()` methods generated 97 89 98 - **Dependencies** (`jacquard-axum`) 90 + **Dependencies** (`jacquard-axum`) (ty [@thoth.ptnote.dev](https://tangled.org/@thoth.ptnote.dev)) 99 91 - Disabled default features for `jacquard` dependency to reduce bloat 100 92 101 93 ### Fixed 102 94 103 - **Blob upload** (`jacquard`) 95 + **Blob upload** (`jacquard`) (ty [@vielle.dev](https://tangled.org/@vielle.dev) for reporting this one) 104 96 - Fixed `upload_blob()` authentication issues 105 97 - Properly authenticates while allowing custom Content-Type headers 106 98
+19 -39
README.md
··· 8 8 9 9 It is also designed around zero-copy/borrowed deserialization: types like [`Post<'_>`](https://tangled.org/@nonbinary.computer/jacquard/blob/main/crates/jacquard-api/src/app_bsky/feed/post.rs) can borrow data (via the [`CowStr<'_>`](https://docs.rs/jacquard/latest/jacquard/cowstr/enum.CowStr.html) type and a host of other types built on top of it) directly from the response buffer instead of allocating owned copies. Owned versions are themselves mostly inlined or reference-counted pointers and are therefore still quite efficient. The `IntoStatic` trait (which is derivable) makes it easy to get an owned version and avoid worrying about lifetimes. 10 10 11 + ## 0.6.0 Release Highlights: 12 + 13 + - **WebSocket streaming** (gated behind feature: "streaming" in `jacquard` and "websocket" in `jacquard-common`) 14 + - Base level HTTP streamed responses and (on non-wasm platforms) request support (gated behind feature: "streaming" in `jacquard-common`) 15 + - **Support for atproto event stream endpoints** (e.g. subscribeRepos, subscribeLabels, firehose) 16 + - **Jetstream subscriber support and implementation** 17 + - **zstd compression support** for JSON websocket endpoints 18 + - **XRPC streaming procedure traits** for endpoints with large payloads, experimental manual implementations in `jacquard` 19 + - Fixed blob upload and download bugs, CID link deserialization issues. 20 + 21 + ### WARNING 22 + 23 + A lot of the streaming code is still pretty experimental. The examples work, though.\ 24 + The modules are also less well-documented, and don't have code examples. There are also a lot of utility functions for conveniently working with the streams and transforming them which are lacking. Use [`n0-future`](https://docs.rs/n0-future/latest/n0_future/index.html) to work with them, that is what Jacquard uses internally as much as possible. 25 + 26 + ### Changelog 27 + 28 + [./CHANGELOG.md] 29 + 11 30 ## Goals and Features 12 31 13 32 - Validated, spec-compliant, easy to work with, and performant baseline types ··· 20 39 - Lexicon Data value type for working with unknown atproto data (dag-cbor or json) 21 40 - An order of magnitude less boilerplate than some existing crates 22 41 - Use as much or as little from the crates as you need 23 - 24 42 25 43 ## Example 26 44 ··· 97 115 | `jacquard-identity` | Identity resolution | [![Crates.io](https://img.shields.io/crates/v/jacquard-identity.svg)](https://crates.io/crates/jacquard-identity) [![Documentation](https://docs.rs/jacquard-identity/badge.svg)](https://docs.rs/jacquard-identity) | 98 116 | `jacquard-lexicon` | Lexicon parsing and code generation | [![Crates.io](https://img.shields.io/crates/v/jacquard-lexicon.svg)](https://crates.io/crates/jacquard-lexicon) [![Documentation](https://docs.rs/jacquard-lexicon/badge.svg)](https://docs.rs/jacquard-lexicon) | 99 117 | `jacquard-derive` | Macros for lexicon types | [![Crates.io](https://img.shields.io/crates/v/jacquard-derive.svg)](https://crates.io/crates/jacquard-derive) [![Documentation](https://docs.rs/jacquard-derive/badge.svg)](https://docs.rs/jacquard-derive) | 100 - 101 - ## Changelog 102 - 103 - [CHANGELOG.md](./CHANGELOG.md) 104 - 105 - Highlights: 106 - 107 - - initial streaming support 108 - - experimental WASM support 109 - - better value type deserialization helpers 110 - - service auth implementation 111 - - XrpcRequest derive Macros 112 - - more builders in generated api to make constructing things easier (lmk if compile time is awful) 113 - - `AgentSessionExt` trait with a host of convenience methods for working with records and preferences 114 - - Improvements to the `Collection` trait, code generation, and addition of the `VecUpdate` trait to enable that 115 - 116 - 117 - ## Experimental WASM Support 118 - 119 - Core crates (`jacquard-common`, `jacquard-api`, `jacquard-identity`, `jacquard-oauth`) compile for `wasm32-unknown-unknown`. Traits use [`trait-variant`](https://docs.rs/trait-variant) to conditionally exclude `Send` bounds on WASM targets. DNS-based handle resolution is gated behind the `dns` feature and unavailable on WASM (HTTPS well-known and PDS resolution still work). 120 - 121 - Test WASM compilation: 122 - ```bash 123 - just check-wasm 124 - # or: cargo build --target wasm32-unknown-unknown -p jacquard-common --no-default-features 125 - ``` 126 - 127 - 128 - ### Initial Streaming Support 129 - 130 - Jacquard is building out support for efficient streaming for large payloads: 131 - 132 - - **Blob uploads/downloads**: Stream media without loading into memory 133 - - **CAR file streaming**: Efficient repo sync operations 134 - - **Thin forwarding**: Pipe data between endpoints 135 - - **WebSocket support**: Bidirectional streaming connections 136 - 137 - Enable with the `streaming` feature flag. See `jacquard-common` documentation for details. 138 118 139 119 ## Development 140 120
+1
crates/jacquard-common/src/error.rs
··· 98 98 #[source] 99 99 serde_ipld_dagcbor::DecodeError<std::convert::Infallible>, 100 100 ), 101 + /// CBOR header deserialization failed (framed WebSocket messages) 101 102 #[cfg(feature = "websocket")] 102 103 #[error("Failed to deserialize cbor header: {0}")] 103 104 CborHeader(
+27
crates/jacquard-common/src/xrpc/streaming.rs
··· 9 9 use std::path::Path; 10 10 use std::{marker::PhantomData, pin::Pin}; 11 11 12 + /// Trait for streaming XRPC procedures (bidirectional streaming). 13 + /// 14 + /// Defines frame encoding/decoding for procedures that send/receive streams of data. 12 15 pub trait XrpcProcedureStream { 13 16 /// The NSID for this XRPC method 14 17 const NSID: &'static str; 15 18 /// The upload encoding 16 19 const ENCODING: &'static str; 17 20 21 + /// Frame type for this streaming procedure 18 22 type Frame<'de>; 19 23 24 + /// Associated request type 20 25 type Request: XrpcRequest; 21 26 22 27 /// Response type returned from the XRPC call (marker struct) 23 28 type Response: XrpcStreamResp; 24 29 30 + /// Encode a frame into bytes for transmission. 31 + /// 32 + /// Default implementation uses DAG-CBOR encoding. 25 33 fn encode_frame<'de>(data: Self::Frame<'de>) -> Result<Bytes, StreamError> 26 34 where 27 35 Self::Frame<'de>: Serialize, ··· 55 63 /// Response output type 56 64 type Frame<'de>: IntoStatic; 57 65 66 + /// Encode a frame into bytes for transmission. 67 + /// 68 + /// Default implementation uses DAG-CBOR encoding. 58 69 fn encode_frame<'de>(data: Self::Frame<'de>) -> Result<Bytes, StreamError> 59 70 where 60 71 Self::Frame<'de>: Serialize, ··· 77 88 } 78 89 } 79 90 91 + /// A single frame in a streaming XRPC request or response. 92 + /// 93 + /// Wraps a buffer of bytes with optional type tagging via the phantom parameter. 80 94 #[repr(transparent)] 81 95 pub struct XrpcStreamFrame<F = ()> { 96 + /// The frame data 82 97 pub buffer: Bytes, 83 98 _marker: PhantomData<F>, 84 99 } 85 100 86 101 impl XrpcStreamFrame { 102 + /// Create a new untyped stream frame 87 103 pub fn new(buffer: Bytes) -> Self { 88 104 Self { 89 105 buffer, ··· 93 109 } 94 110 95 111 impl<F> XrpcStreamFrame<F> { 112 + /// Create a new typed stream frame 96 113 pub fn new_typed<G>(buffer: Bytes) -> Self { 97 114 Self { 98 115 buffer, ··· 142 159 pub Pin<Box<dyn n0_future::Sink<XrpcStreamFrame<F>, Error = StreamError> + Send>>, 143 160 ); 144 161 162 + /// Typed streaming XRPC response. 163 + /// 164 + /// Similar to `StreamingResponse` but with optional type-level frame tagging. 145 165 pub struct XrpcResponseStream<F = ()> { 146 166 parts: http::response::Parts, 147 167 body: Boxed<Result<XrpcStreamFrame<F>, StreamError>>, 148 168 } 149 169 150 170 impl XrpcResponseStream { 171 + /// Create from a `StreamingResponse` 151 172 pub fn from_bytestream(StreamingResponse { parts, body }: StreamingResponse) -> Self { 152 173 Self { 153 174 parts, ··· 158 179 } 159 180 } 160 181 182 + /// Create from response parts and a byte stream 161 183 pub fn from_parts(parts: http::response::Parts, body: ByteStream) -> Self { 162 184 Self { 163 185 parts, ··· 168 190 } 169 191 } 170 192 193 + /// Consume and return parts and body separately 171 194 pub fn into_parts(self) -> (http::response::Parts, ByteStream) { 172 195 ( 173 196 self.parts, ··· 175 198 ) 176 199 } 177 200 201 + /// Consume and return just the body stream 178 202 pub fn into_bytestream(self) -> ByteStream { 179 203 ByteStream::new(self.body.map_ok(|f| f.buffer).boxed()) 180 204 } 181 205 } 182 206 183 207 impl<F: XrpcStreamResp> XrpcResponseStream<F> { 208 + /// Create a typed response stream from a `StreamingResponse` 184 209 pub fn from_stream(StreamingResponse { parts, body }: StreamingResponse) -> Self { 185 210 Self { 186 211 parts, ··· 191 216 } 192 217 } 193 218 219 + /// Create a typed response stream from parts and body 194 220 pub fn from_typed_parts(parts: http::response::Parts, body: ByteStream) -> Self { 195 221 Self { 196 222 parts, ··· 203 229 } 204 230 205 231 impl<F: XrpcStreamResp + 'static> XrpcResponseStream<F> { 232 + /// Consume the typed stream and return just the raw byte stream 206 233 pub fn into_bytestream(self) -> ByteStream { 207 234 ByteStream::new(self.body.map_ok(|f| f.buffer).boxed()) 208 235 }
+11 -1
crates/jacquard-common/src/xrpc/subscription.rs
··· 98 98 } 99 99 } 100 100 101 + /// Header for framed DAG-CBOR subscription messages. 102 + /// 103 + /// Used in ATProto subscription streams where each message has a CBOR-encoded header 104 + /// followed by the message body. 101 105 #[derive(Debug, serde::Deserialize)] 102 106 pub struct EventHeader { 107 + /// Operation code 103 108 pub op: i64, 104 - pub t: smol_str::SmolStr, // type discriminator like "#commit" 109 + /// Event type discriminator (e.g., "#commit", "#identity") 110 + pub t: smol_str::SmolStr, 105 111 } 106 112 113 + /// Parse a framed DAG-CBOR message header and return the header plus remaining body bytes. 114 + /// 115 + /// Used for two-stage deserialization of subscription messages in formats like 116 + /// `com.atproto.sync.subscribeRepos`. 107 117 pub fn parse_event_header<'a>(bytes: &'a [u8]) -> Result<(EventHeader, &'a [u8]), DecodeError> { 108 118 let mut cursor = std::io::Cursor::new(bytes); 109 119 let header: EventHeader = ciborium::de::from_reader(&mut cursor)?;
-25
examples/streaming_download.rs
··· 1 - //! Example: Download large file using streaming 2 - use jacquard_common::http_client::HttpClientExt; 3 - 4 - #[tokio::main] 5 - async fn main() -> Result<(), Box<dyn std::error::Error>> { 6 - let client = reqwest::Client::new(); 7 - 8 - let request = http::Request::builder() 9 - .uri("https://httpbin.org/bytes/1024") 10 - .body(vec![]) 11 - .unwrap(); 12 - 13 - let response = client.send_http_streaming(request).await?; 14 - println!("Status: {}", response.status()); 15 - println!("Headers: {:?}", response.headers()); 16 - 17 - let (_parts, _body) = response.into_parts(); 18 - println!("Received streaming response body (ByteStream)"); 19 - 20 - // Note: To iterate over chunks, use futures_lite::StreamExt on the pinned inner stream: 21 - // let mut stream = Box::pin(body.into_inner()); 22 - // while let Some(chunk) = stream.as_mut().try_next().await? { ... } 23 - 24 - Ok(()) 25 - }
-32
examples/streaming_upload.rs
··· 1 - //! Example: Upload data using streaming request body 2 - 3 - use bytes::Bytes; 4 - use futures::stream; 5 - use jacquard_common::http_client::HttpClientExt; 6 - 7 - #[tokio::main] 8 - async fn main() -> Result<(), Box<dyn std::error::Error>> { 9 - let client = reqwest::Client::new(); 10 - 11 - // Create a stream of data chunks 12 - let chunks = vec![ 13 - Bytes::from("Hello, "), 14 - Bytes::from("streaming "), 15 - Bytes::from("world!"), 16 - ]; 17 - let body_stream = stream::iter(chunks); 18 - 19 - // Build request and split into parts 20 - let request = http::Request::builder() 21 - .method(http::Method::POST) 22 - .uri("https://httpbin.org/post") 23 - .body(()) 24 - .unwrap(); 25 - 26 - let (parts, _) = request.into_parts(); 27 - 28 - let response = client.send_http_bidirectional(parts, body_stream).await?; 29 - println!("Status: {}", response.status()); 30 - 31 - Ok(()) 32 - }