A better Rust ATProto crate
103
fork

Configure Feed

Select the types of activity you want to include in your feed.

add ByteStream and ByteSink abstractions

Orual cce91535 5bf84829

+111
+29
Cargo.lock
··· 1146 1146 ] 1147 1147 1148 1148 [[package]] 1149 + name = "futures" 1150 + version = "0.3.31" 1151 + source = "registry+https://github.com/rust-lang/crates.io-index" 1152 + checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" 1153 + dependencies = [ 1154 + "futures-channel", 1155 + "futures-core", 1156 + "futures-executor", 1157 + "futures-io", 1158 + "futures-sink", 1159 + "futures-task", 1160 + "futures-util", 1161 + ] 1162 + 1163 + [[package]] 1149 1164 name = "futures-buffered" 1150 1165 version = "0.2.12" 1151 1166 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1165 1180 checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" 1166 1181 dependencies = [ 1167 1182 "futures-core", 1183 + "futures-sink", 1168 1184 ] 1169 1185 1170 1186 [[package]] ··· 1172 1188 version = "0.3.31" 1173 1189 source = "registry+https://github.com/rust-lang/crates.io-index" 1174 1190 checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" 1191 + 1192 + [[package]] 1193 + name = "futures-executor" 1194 + version = "0.3.31" 1195 + source = "registry+https://github.com/rust-lang/crates.io-index" 1196 + checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" 1197 + dependencies = [ 1198 + "futures-core", 1199 + "futures-task", 1200 + "futures-util", 1201 + ] 1175 1202 1176 1203 [[package]] 1177 1204 name = "futures-io" ··· 1221 1248 source = "registry+https://github.com/rust-lang/crates.io-index" 1222 1249 checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" 1223 1250 dependencies = [ 1251 + "futures-channel", 1224 1252 "futures-core", 1225 1253 "futures-io", 1226 1254 "futures-macro", ··· 1953 1981 "chrono", 1954 1982 "cid", 1955 1983 "ed25519-dalek", 1984 + "futures", 1956 1985 "getrandom 0.3.4", 1957 1986 "http", 1958 1987 "ipld-core",
+4
crates/jacquard-common/Cargo.toml
··· 78 78 optional = true 79 79 features = ["arithmetic"] 80 80 81 + [dev-dependencies] 82 + tokio = { version = "1", features = ["macros", "rt"] } 83 + futures = "0.3" 84 + 81 85 [package.metadata.docs.rs] 82 86 features = [ "crypto-k256", "crypto-k256", "crypto-p256"]
+78
crates/jacquard-common/src/stream.rs
··· 87 87 } 88 88 } 89 89 90 + use bytes::Bytes; 91 + 92 + /// Platform-agnostic byte stream abstraction 93 + pub struct ByteStream { 94 + inner: Box<dyn n0_future::Stream<Item = Result<Bytes, StreamError>>>, 95 + } 96 + 97 + impl ByteStream { 98 + /// Create a new byte stream from any compatible stream 99 + pub fn new<S>(stream: S) -> Self 100 + where 101 + S: n0_future::Stream<Item = Result<Bytes, StreamError>> + 'static, 102 + { 103 + Self { 104 + inner: Box::new(stream), 105 + } 106 + } 107 + 108 + /// Check if stream is known to be empty (always false for dynamic streams) 109 + pub fn is_empty(&self) -> bool { 110 + false 111 + } 112 + 113 + /// Convert into the inner boxed stream 114 + pub fn into_inner(self) -> Box<dyn n0_future::Stream<Item = Result<Bytes, StreamError>>> { 115 + self.inner 116 + } 117 + } 118 + 119 + impl fmt::Debug for ByteStream { 120 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 121 + f.debug_struct("ByteStream").finish_non_exhaustive() 122 + } 123 + } 124 + 125 + /// Platform-agnostic byte sink abstraction 126 + pub struct ByteSink { 127 + inner: Box<dyn n0_future::Sink<Bytes, Error = StreamError>>, 128 + } 129 + 130 + impl ByteSink { 131 + /// Create a new byte sink from any compatible sink 132 + pub fn new<S>(sink: S) -> Self 133 + where 134 + S: n0_future::Sink<Bytes, Error = StreamError> + 'static, 135 + { 136 + Self { 137 + inner: Box::new(sink), 138 + } 139 + } 140 + 141 + /// Convert into the inner boxed sink 142 + pub fn into_inner(self) -> Box<dyn n0_future::Sink<Bytes, Error = StreamError>> { 143 + self.inner 144 + } 145 + } 146 + 147 + impl fmt::Debug for ByteSink { 148 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 149 + f.debug_struct("ByteSink").finish_non_exhaustive() 150 + } 151 + } 152 + 90 153 #[cfg(test)] 91 154 mod tests { 92 155 use super::*; 156 + use bytes::Bytes; 93 157 94 158 #[test] 95 159 fn stream_error_carries_kind_and_source() { ··· 107 171 108 172 assert_eq!(err.kind(), &StreamErrorKind::Closed); 109 173 assert!(err.source().is_none()); 174 + } 175 + 176 + #[tokio::test] 177 + async fn byte_stream_can_be_created() { 178 + use futures::stream; 179 + 180 + let data = vec![ 181 + Ok(Bytes::from("hello")), 182 + Ok(Bytes::from(" world")), 183 + ]; 184 + let stream = stream::iter(data); 185 + 186 + let byte_stream = ByteStream::new(stream); 187 + assert!(!byte_stream.is_empty()); 110 188 } 111 189 }