A fork of attic a self-hostable Nix Binary Cache server
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #258 from zhaofengli/refactor/asyncbufread

server/upload_path: Get rid of necessary double-buffering

authored by

Zhaofeng Li and committed by
GitHub
12cbeca1 7c5d79ad

+526 -415
+1
Cargo.lock
··· 242 242 "hex", 243 243 "lazy_static", 244 244 "nix-base32", 245 + "pin-project", 245 246 "regex", 246 247 "serde", 247 248 "serde_json",
+5 -4
attic/Cargo.toml
··· 16 16 hex = "0.4.3" 17 17 lazy_static = "1.5.0" 18 18 nix-base32 = "0.2.0" 19 + pin-project = "1.1.10" 19 20 regex = "1.11.1" 20 21 serde = { version = "1.0.219", features = ["derive"] } 21 22 serde_with = "3.14.0" ··· 53 54 [features] 54 55 default = [ 55 56 "chunking", 57 + "io", 56 58 "nix_store", 57 - "stream", 58 59 "tokio", 59 60 ] 60 61 61 62 # Chunking. 62 - chunking = ["tokio", "stream", "dep:async-stream"] 63 + chunking = ["tokio", "io", "dep:async-stream"] 63 64 64 65 # Native libnixstore bindings. 65 66 # ··· 73 74 "dep:system-deps", 74 75 ] 75 76 76 - # Stream utilities. 77 - stream = ["tokio", "dep:async-stream"] 77 + # IO utilities. 78 + io = ["tokio", "dep:async-stream"] 78 79 79 80 # Tokio runtime. 80 81 tokio = ["dep:tokio", "tokio/rt", "tokio/time"]
+1 -1
attic/src/chunking/mod.rs
··· 9 9 use futures::stream::Stream; 10 10 use tokio::io::AsyncRead; 11 11 12 - use crate::stream::read_chunk_async; 12 + use crate::io::read_chunk_async; 13 13 14 14 /// Splits a streams into content-defined chunks. 15 15 ///
+234
attic/src/io/hash_reader.rs
··· 1 + use std::marker::Unpin; 2 + use std::pin::Pin; 3 + use std::sync::Arc; 4 + use std::task::{ready, Context, Poll}; 5 + 6 + use digest::{Digest, Output as DigestOutput}; 7 + use pin_project::pin_project; 8 + use tokio::io::{self, AsyncBufRead, AsyncRead, ReadBuf}; 9 + use tokio::sync::OnceCell; 10 + 11 + /// AsyncRead filter that hashes the bytes that have been read. 12 + /// 13 + /// The hash is finalized when EOF is reached. 14 + #[pin_project(project = HashReaderProj)] 15 + pub struct HashReader<R, D> 16 + where 17 + R: AsyncRead + Unpin, 18 + D: Digest + Unpin, 19 + { 20 + #[pin] 21 + inner: R, 22 + state: State<D>, 23 + } 24 + 25 + struct State<D> 26 + where 27 + D: Digest + Unpin, 28 + { 29 + digest: Option<D>, 30 + bytes_hashed: usize, 31 + bytes_consumed: usize, 32 + finalized: Arc<OnceCell<(DigestOutput<D>, usize)>>, 33 + } 34 + 35 + impl<D> State<D> 36 + where 37 + D: Digest + Unpin, 38 + { 39 + fn hash_unconsumed(&mut self, unconsumed: &[u8]) { 40 + let unhashed_offset = self.bytes_hashed - self.bytes_consumed; 41 + 42 + // It's technically possible for the `poll_read`/`poll_fill_buf` implementation 43 + // to return less data than the unconsumed portion returned by a previous 44 + // call to `AsyncBufRead::poll_fill_buf`. 45 + if unhashed_offset < unconsumed.len() { 46 + let unhashed = &unconsumed[unhashed_offset..]; 47 + self.bytes_hashed += unhashed.len(); 48 + 49 + let digest = self.digest.as_mut().expect("Stream has data after EOF"); 50 + digest.update(unhashed); 51 + } 52 + } 53 + 54 + fn eof(&mut self) { 55 + if let Some(digest) = self.digest.take() { 56 + assert!(self.bytes_hashed == self.bytes_consumed, "bytes_hashed != bytes_consumed but EOF - Unconsumed bytes disappeared from buffer??"); 57 + self.finalized 58 + .set((digest.finalize(), self.bytes_hashed)) 59 + .expect("Hash has already been finalized"); 60 + } 61 + } 62 + } 63 + 64 + impl<R, D> HashReader<R, D> 65 + where 66 + R: AsyncRead + Unpin, 67 + D: Digest + Unpin, 68 + { 69 + pub fn new(inner: R, digest: D) -> (Self, Arc<OnceCell<(DigestOutput<D>, usize)>>) { 70 + let finalized = Arc::new(OnceCell::new()); 71 + 72 + ( 73 + Self { 74 + inner, 75 + state: State { 76 + digest: Some(digest), 77 + bytes_hashed: 0, 78 + bytes_consumed: 0, 79 + finalized: finalized.clone(), 80 + }, 81 + }, 82 + finalized, 83 + ) 84 + } 85 + } 86 + 87 + impl<R, D> AsyncRead for HashReader<R, D> 88 + where 89 + R: AsyncRead + Unpin, 90 + D: Digest + Unpin, 91 + { 92 + fn poll_read( 93 + self: Pin<&mut Self>, 94 + cx: &mut Context<'_>, 95 + buf: &mut ReadBuf<'_>, 96 + ) -> Poll<io::Result<()>> { 97 + let this = self.project(); 98 + 99 + let old_filled = buf.filled().len(); 100 + ready!(this.inner.poll_read(cx, buf))?; 101 + 102 + let filled = buf.filled(); 103 + let unconsumed = &filled[old_filled..]; 104 + if unconsumed.len() == 0 { 105 + this.state.eof(); 106 + } else { 107 + this.state.hash_unconsumed(unconsumed); 108 + this.state.bytes_consumed += unconsumed.len(); 109 + } 110 + 111 + debug_assert!(this.state.bytes_consumed <= this.state.bytes_hashed); 112 + Poll::Ready(Ok(())) 113 + } 114 + } 115 + 116 + impl<R, D> AsyncBufRead for HashReader<R, D> 117 + where 118 + R: AsyncBufRead + Unpin, 119 + D: Digest + Unpin, 120 + { 121 + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { 122 + let this = self.project(); 123 + let unconsumed = ready!(this.inner.poll_fill_buf(cx))?; 124 + 125 + if unconsumed.len() == 0 { 126 + this.state.eof(); 127 + } else { 128 + this.state.hash_unconsumed(unconsumed); 129 + } 130 + 131 + debug_assert!(this.state.bytes_consumed <= this.state.bytes_hashed); 132 + Poll::Ready(Ok(unconsumed)) 133 + } 134 + 135 + fn consume(self: Pin<&mut Self>, amt: usize) { 136 + let this = self.project(); 137 + this.inner.consume(amt); 138 + this.state.bytes_consumed += amt; 139 + 140 + debug_assert!(this.state.bytes_consumed <= this.state.bytes_hashed); 141 + } 142 + } 143 + 144 + #[cfg(test)] 145 + mod tests { 146 + use super::*; 147 + 148 + use tokio::io::{AsyncBufReadExt, AsyncReadExt}; 149 + 150 + #[tokio::test] 151 + async fn test_hash_reader() { 152 + let expected = b"hello world"; 153 + let expected_sha256 = 154 + hex::decode("b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9") 155 + .unwrap(); 156 + 157 + let (mut read, finalized) = HashReader::new(expected.as_slice(), sha2::Sha256::new()); 158 + assert!(finalized.get().is_none()); 159 + 160 + // force multiple reads 161 + let mut buf = vec![0u8; 100]; 162 + let mut bytes_read = 0; 163 + bytes_read += read 164 + .read(&mut buf[bytes_read..bytes_read + 5]) 165 + .await 166 + .unwrap(); 167 + bytes_read += read 168 + .read(&mut buf[bytes_read..bytes_read + 5]) 169 + .await 170 + .unwrap(); 171 + bytes_read += read 172 + .read(&mut buf[bytes_read..bytes_read + 5]) 173 + .await 174 + .unwrap(); 175 + bytes_read += read 176 + .read(&mut buf[bytes_read..bytes_read + 5]) 177 + .await 178 + .unwrap(); 179 + 180 + assert_eq!(expected.len(), bytes_read); 181 + assert_eq!(expected, &buf[..bytes_read]); 182 + 183 + let (hash, count) = finalized.get().expect("Hash wasn't finalized"); 184 + 185 + assert_eq!(expected_sha256.as_slice(), hash.as_slice()); 186 + assert_eq!(expected.len(), *count); 187 + eprintln!("finalized = {:x?}", finalized); 188 + } 189 + 190 + #[tokio::test] 191 + async fn test_hash_reader_buf() { 192 + let expected = b"hello world"; 193 + let expected_sha256 = 194 + hex::decode("b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9") 195 + .unwrap(); 196 + 197 + let (mut read, finalized) = HashReader::new(expected.as_slice(), sha2::Sha256::new()); 198 + assert!(finalized.get().is_none()); 199 + 200 + let mut buf = vec![0u8; 100]; 201 + let mut bytes_read = 0; 202 + 203 + // Mix AsyncRead::read() and AsyncBufRead::fill_buf() 204 + 205 + bytes_read += read 206 + .read(&mut buf[bytes_read..bytes_read + 1]) 207 + .await 208 + .unwrap(); 209 + 210 + loop { 211 + // Perform multiple AsyncBufRead::fill_buf()s _without_ consuming 212 + let _ = read.fill_buf().await.unwrap(); 213 + let _ = read.fill_buf().await.unwrap(); 214 + let read_buf = read.fill_buf().await.unwrap(); 215 + 216 + if read_buf.is_empty() { 217 + break; 218 + } 219 + 220 + buf[bytes_read] = read_buf[0]; 221 + read.consume(1); 222 + bytes_read += 1; 223 + } 224 + 225 + assert_eq!(expected.len(), bytes_read); 226 + assert_eq!(expected, &buf[..bytes_read]); 227 + 228 + let (hash, count) = finalized.get().expect("Hash wasn't finalized"); 229 + 230 + assert_eq!(expected_sha256.as_slice(), hash.as_slice()); 231 + assert_eq!(expected.len(), *count); 232 + eprintln!("finalized = {:x?}", finalized); 233 + } 234 + }
+153
attic/src/io/mod.rs
··· 1 + //! Stream utilities. 2 + 3 + mod hash_reader; 4 + 5 + use std::collections::VecDeque; 6 + use std::future::Future; 7 + use std::marker::Unpin; 8 + use std::pin::Pin; 9 + 10 + use async_stream::try_stream; 11 + use bytes::{Bytes, BytesMut}; 12 + use futures::stream::{BoxStream, Stream, StreamExt}; 13 + use tokio::io::{AsyncRead, AsyncReadExt}; 14 + use tokio::task::spawn; 15 + 16 + pub use hash_reader::HashReader; 17 + 18 + /// Merge chunks lazily into a continuous stream. 19 + /// 20 + /// For each chunk, a function is called to transform it into a 21 + /// `Stream<Item = Result<Bytes>>`. This function does something like 22 + /// opening the local file or sending a request to S3. 23 + /// 24 + /// We call this function some time before the start of the chunk 25 + /// is reached to eliminate delays between chunks so the merged 26 + /// stream is smooth. We don't want to start streaming all chunks 27 + /// at once as it's a waste of resources. 28 + /// 29 + /// ```text 30 + /// | S3 GET | Chunk | S3 GET | ... | S3 GET | Chunk 31 + /// ``` 32 + /// 33 + /// ```text 34 + /// | S3 GET | Chunk | Chunk | Chunk | Chunk 35 + /// | S3 GET |-----------^ ^ ^ 36 + /// | S3 GET |------| | 37 + /// | S3 GET |--------------| 38 + /// 39 + /// ``` 40 + /// 41 + /// TODO: Support range requests so we can have seekable NARs. 42 + pub fn merge_chunks<C, F, S, Fut, E>( 43 + mut chunks: VecDeque<C>, 44 + streamer: F, 45 + streamer_arg: S, 46 + num_prefetch: usize, 47 + ) -> Pin<Box<impl Stream<Item = Result<Bytes, E>>>> 48 + where 49 + F: Fn(C, S) -> Fut, 50 + S: Clone, 51 + Fut: Future<Output = Result<BoxStream<'static, Result<Bytes, E>>, E>> + Send + 'static, 52 + E: Send + 'static, 53 + { 54 + let s = try_stream! { 55 + let mut streams = VecDeque::new(); // a queue of JoinHandles 56 + 57 + // otherwise type inference gets confused :/ 58 + if false { 59 + let chunk = chunks.pop_front().unwrap(); 60 + let stream = spawn(streamer(chunk, streamer_arg.clone())); 61 + streams.push_back(stream); 62 + } 63 + 64 + loop { 65 + if let Some(stream) = streams.pop_front() { 66 + let mut stream = stream.await.unwrap()?; 67 + while let Some(item) = stream.next().await { 68 + let item = item?; 69 + yield item; 70 + } 71 + } 72 + 73 + while streams.len() < num_prefetch { 74 + if let Some(chunk) = chunks.pop_front() { 75 + let stream = spawn(streamer(chunk, streamer_arg.clone())); 76 + streams.push_back(stream); 77 + } else { 78 + break; 79 + } 80 + } 81 + 82 + if chunks.is_empty() && streams.is_empty() { 83 + // we are done! 84 + break; 85 + } 86 + } 87 + }; 88 + Box::pin(s) 89 + } 90 + 91 + /// Greedily reads from a stream to fill a buffer. 92 + pub async fn read_chunk_async<S: AsyncRead + Unpin + Send>( 93 + stream: &mut S, 94 + mut chunk: BytesMut, 95 + ) -> std::io::Result<Bytes> { 96 + while chunk.len() < chunk.capacity() { 97 + let read = stream.read_buf(&mut chunk).await?; 98 + 99 + if read == 0 { 100 + break; 101 + } 102 + } 103 + 104 + Ok(chunk.freeze()) 105 + } 106 + 107 + #[cfg(test)] 108 + mod tests { 109 + use super::*; 110 + 111 + use async_stream::stream; 112 + use bytes::{BufMut, BytesMut}; 113 + use futures::future; 114 + 115 + #[tokio::test] 116 + async fn test_merge_chunks() { 117 + let chunk_a: BoxStream<Result<Bytes, ()>> = { 118 + let s = stream! { 119 + yield Ok(Bytes::from_static(b"Hello")); 120 + }; 121 + Box::pin(s) 122 + }; 123 + 124 + let chunk_b: BoxStream<Result<Bytes, ()>> = { 125 + let s = stream! { 126 + yield Ok(Bytes::from_static(b", ")); 127 + yield Ok(Bytes::from_static(b"world")); 128 + }; 129 + Box::pin(s) 130 + }; 131 + 132 + let chunk_c: BoxStream<Result<Bytes, ()>> = { 133 + let s = stream! { 134 + yield Ok(Bytes::from_static(b"!")); 135 + }; 136 + Box::pin(s) 137 + }; 138 + 139 + let chunks: VecDeque<BoxStream<'static, Result<Bytes, ()>>> = 140 + [chunk_a, chunk_b, chunk_c].into_iter().collect(); 141 + 142 + let streamer = |c, _| future::ok(c); 143 + let mut merged = merge_chunks(chunks, streamer, (), 2); 144 + 145 + let mut bytes = BytesMut::with_capacity(100); 146 + while let Some(item) = merged.next().await { 147 + bytes.put(item.unwrap()); 148 + } 149 + let bytes = bytes.freeze(); 150 + 151 + assert_eq!(&*bytes, b"Hello, world!"); 152 + } 153 + }
+2 -2
attic/src/lib.rs
··· 21 21 pub mod chunking; 22 22 pub mod error; 23 23 pub mod hash; 24 + #[cfg(feature = "io")] 25 + pub mod io; 24 26 pub mod mime; 25 27 pub mod nix_store; 26 28 pub mod signing; 27 - #[cfg(feature = "stream")] 28 - pub mod stream; 29 29 #[cfg(target_family = "unix")] 30 30 pub mod testing; 31 31 #[cfg(feature = "tokio")]
-258
attic/src/stream.rs
··· 1 - //! Stream utilities. 2 - 3 - use std::collections::VecDeque; 4 - use std::future::Future; 5 - use std::marker::Unpin; 6 - use std::pin::Pin; 7 - use std::sync::Arc; 8 - use std::task::{Context, Poll}; 9 - 10 - use async_stream::try_stream; 11 - use bytes::{Bytes, BytesMut}; 12 - use digest::{Digest, Output as DigestOutput}; 13 - use futures::stream::{BoxStream, Stream, StreamExt}; 14 - use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; 15 - use tokio::sync::OnceCell; 16 - use tokio::task::spawn; 17 - 18 - /// Stream filter that hashes the bytes that have been read. 19 - /// 20 - /// The hash is finalized when EOF is reached. 21 - pub struct StreamHasher<R: AsyncRead + Unpin, D: Digest + Unpin> { 22 - inner: R, 23 - digest: Option<D>, 24 - bytes_read: usize, 25 - finalized: Arc<OnceCell<(DigestOutput<D>, usize)>>, 26 - } 27 - 28 - /// Merge chunks lazily into a continuous stream. 29 - /// 30 - /// For each chunk, a function is called to transform it into a 31 - /// `Stream<Item = Result<Bytes>>`. This function does something like 32 - /// opening the local file or sending a request to S3. 33 - /// 34 - /// We call this function some time before the start of the chunk 35 - /// is reached to eliminate delays between chunks so the merged 36 - /// stream is smooth. We don't want to start streaming all chunks 37 - /// at once as it's a waste of resources. 38 - /// 39 - /// ```text 40 - /// | S3 GET | Chunk | S3 GET | ... | S3 GET | Chunk 41 - /// ``` 42 - /// 43 - /// ```text 44 - /// | S3 GET | Chunk | Chunk | Chunk | Chunk 45 - /// | S3 GET |-----------^ ^ ^ 46 - /// | S3 GET |------| | 47 - /// | S3 GET |--------------| 48 - /// 49 - /// ``` 50 - /// 51 - /// TODO: Support range requests so we can have seekable NARs. 52 - pub fn merge_chunks<C, F, S, Fut, E>( 53 - mut chunks: VecDeque<C>, 54 - streamer: F, 55 - streamer_arg: S, 56 - num_prefetch: usize, 57 - ) -> Pin<Box<impl Stream<Item = Result<Bytes, E>>>> 58 - where 59 - F: Fn(C, S) -> Fut, 60 - S: Clone, 61 - Fut: Future<Output = Result<BoxStream<'static, Result<Bytes, E>>, E>> + Send + 'static, 62 - E: Send + 'static, 63 - { 64 - let s = try_stream! { 65 - let mut streams = VecDeque::new(); // a queue of JoinHandles 66 - 67 - // otherwise type inference gets confused :/ 68 - if false { 69 - let chunk = chunks.pop_front().unwrap(); 70 - let stream = spawn(streamer(chunk, streamer_arg.clone())); 71 - streams.push_back(stream); 72 - } 73 - 74 - loop { 75 - if let Some(stream) = streams.pop_front() { 76 - let mut stream = stream.await.unwrap()?; 77 - while let Some(item) = stream.next().await { 78 - let item = item?; 79 - yield item; 80 - } 81 - } 82 - 83 - while streams.len() < num_prefetch { 84 - if let Some(chunk) = chunks.pop_front() { 85 - let stream = spawn(streamer(chunk, streamer_arg.clone())); 86 - streams.push_back(stream); 87 - } else { 88 - break; 89 - } 90 - } 91 - 92 - if chunks.is_empty() && streams.is_empty() { 93 - // we are done! 94 - break; 95 - } 96 - } 97 - }; 98 - Box::pin(s) 99 - } 100 - 101 - impl<R: AsyncRead + Unpin, D: Digest + Unpin> StreamHasher<R, D> { 102 - pub fn new(inner: R, digest: D) -> (Self, Arc<OnceCell<(DigestOutput<D>, usize)>>) { 103 - let finalized = Arc::new(OnceCell::new()); 104 - 105 - ( 106 - Self { 107 - inner, 108 - digest: Some(digest), 109 - bytes_read: 0, 110 - finalized: finalized.clone(), 111 - }, 112 - finalized, 113 - ) 114 - } 115 - } 116 - 117 - impl<R: AsyncRead + Unpin, D: Digest + Unpin> AsyncRead for StreamHasher<R, D> { 118 - fn poll_read( 119 - mut self: Pin<&mut Self>, 120 - cx: &mut Context<'_>, 121 - buf: &mut ReadBuf<'_>, 122 - ) -> Poll<tokio::io::Result<()>> { 123 - let old_filled = buf.filled().len(); 124 - let r = Pin::new(&mut self.inner).poll_read(cx, buf); 125 - let read_len = buf.filled().len() - old_filled; 126 - 127 - match r { 128 - Poll::Ready(Ok(())) => { 129 - if read_len == 0 { 130 - // EOF 131 - if let Some(digest) = self.digest.take() { 132 - self.finalized 133 - .set((digest.finalize(), self.bytes_read)) 134 - .expect("Hash has already been finalized"); 135 - } 136 - } else { 137 - // Read something 138 - let digest = self.digest.as_mut().expect("Stream has data after EOF"); 139 - 140 - let filled = buf.filled(); 141 - digest.update(&filled[filled.len() - read_len..]); 142 - self.bytes_read += read_len; 143 - } 144 - } 145 - Poll::Ready(Err(_)) => { 146 - assert!(read_len == 0); 147 - } 148 - Poll::Pending => {} 149 - } 150 - 151 - r 152 - } 153 - } 154 - 155 - /// Greedily reads from a stream to fill a buffer. 156 - pub async fn read_chunk_async<S: AsyncRead + Unpin + Send>( 157 - stream: &mut S, 158 - mut chunk: BytesMut, 159 - ) -> std::io::Result<Bytes> { 160 - while chunk.len() < chunk.capacity() { 161 - let read = stream.read_buf(&mut chunk).await?; 162 - 163 - if read == 0 { 164 - break; 165 - } 166 - } 167 - 168 - Ok(chunk.freeze()) 169 - } 170 - 171 - #[cfg(test)] 172 - mod tests { 173 - use super::*; 174 - 175 - use async_stream::stream; 176 - use bytes::{BufMut, BytesMut}; 177 - use futures::future; 178 - use tokio::io::AsyncReadExt; 179 - 180 - #[tokio::test] 181 - async fn test_stream_hasher() { 182 - let expected = b"hello world"; 183 - let expected_sha256 = 184 - hex::decode("b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9") 185 - .unwrap(); 186 - 187 - let (mut read, finalized) = StreamHasher::new(expected.as_slice(), sha2::Sha256::new()); 188 - assert!(finalized.get().is_none()); 189 - 190 - // force multiple reads 191 - let mut buf = vec![0u8; 100]; 192 - let mut bytes_read = 0; 193 - bytes_read += read 194 - .read(&mut buf[bytes_read..bytes_read + 5]) 195 - .await 196 - .unwrap(); 197 - bytes_read += read 198 - .read(&mut buf[bytes_read..bytes_read + 5]) 199 - .await 200 - .unwrap(); 201 - bytes_read += read 202 - .read(&mut buf[bytes_read..bytes_read + 5]) 203 - .await 204 - .unwrap(); 205 - bytes_read += read 206 - .read(&mut buf[bytes_read..bytes_read + 5]) 207 - .await 208 - .unwrap(); 209 - 210 - assert_eq!(expected.len(), bytes_read); 211 - assert_eq!(expected, &buf[..bytes_read]); 212 - 213 - let (hash, count) = finalized.get().expect("Hash wasn't finalized"); 214 - 215 - assert_eq!(expected_sha256.as_slice(), hash.as_slice()); 216 - assert_eq!(expected.len(), *count); 217 - eprintln!("finalized = {:x?}", finalized); 218 - } 219 - 220 - #[tokio::test] 221 - async fn test_merge_chunks() { 222 - let chunk_a: BoxStream<Result<Bytes, ()>> = { 223 - let s = stream! { 224 - yield Ok(Bytes::from_static(b"Hello")); 225 - }; 226 - Box::pin(s) 227 - }; 228 - 229 - let chunk_b: BoxStream<Result<Bytes, ()>> = { 230 - let s = stream! { 231 - yield Ok(Bytes::from_static(b", ")); 232 - yield Ok(Bytes::from_static(b"world")); 233 - }; 234 - Box::pin(s) 235 - }; 236 - 237 - let chunk_c: BoxStream<Result<Bytes, ()>> = { 238 - let s = stream! { 239 - yield Ok(Bytes::from_static(b"!")); 240 - }; 241 - Box::pin(s) 242 - }; 243 - 244 - let chunks: VecDeque<BoxStream<'static, Result<Bytes, ()>>> = 245 - [chunk_a, chunk_b, chunk_c].into_iter().collect(); 246 - 247 - let streamer = |c, _| future::ok(c); 248 - let mut merged = merge_chunks(chunks, streamer, (), 2); 249 - 250 - let mut bytes = BytesMut::with_capacity(100); 251 - while let Some(item) = merged.next().await { 252 - bytes.put(item.unwrap()); 253 - } 254 - let bytes = bytes.freeze(); 255 - 256 - assert_eq!(&*bytes, b"Hello, world!"); 257 - } 258 - }
+2 -2
client/src/api/mod.rs
··· 3 3 4 4 use anyhow::Result; 5 5 use bytes::Bytes; 6 - use const_format::concatcp; 6 + use const_format::formatcp; 7 7 use displaydoc::Display; 8 8 use futures::{ 9 9 future, ··· 27 27 28 28 /// The User-Agent string of Attic. 29 29 const ATTIC_USER_AGENT: &str = 30 - concatcp!("Attic/{} ({})", env!("CARGO_PKG_NAME"), ATTIC_DISTRIBUTOR); 30 + formatcp!("Attic/{} ({})", env!("CARGO_PKG_NAME"), ATTIC_DISTRIBUTOR); 31 31 32 32 /// The size threshold to send the upload info as part of the PUT body. 33 33 const NAR_INFO_PREAMBLE_THRESHOLD: usize = 4 * 1024; // 4 KiB
+2 -1
flake/devshells.nix
··· 40 40 ]; 41 41 42 42 rust = [ 43 + cargo-audit 43 44 cargo-expand 44 45 cargo-outdated 45 46 cargo-edit 47 + cargo-udeps 46 48 tokio-console 47 49 ]; 48 50 ··· 62 64 postgresql 63 65 sqlite-interactive 64 66 65 - flyctl 66 67 skopeo 67 68 manifest-tool 68 69 ];
+1 -1
justfile
··· 27 27 export RUST_MIN_STACK=16777216 28 28 29 29 pushd attic 30 - cargo build --target wasm32-unknown-unknown --no-default-features -F chunking -F stream 30 + cargo build --target wasm32-unknown-unknown --no-default-features -F chunking -F io 31 31 popd 32 32 pushd token 33 33 cargo build --target wasm32-unknown-unknown
+1 -1
server/Cargo.toml
··· 19 19 doc = false 20 20 21 21 [dependencies] 22 - attic = { path = "../attic", default-features = false, features = ["chunking", "stream", "tokio"] } 22 + attic = { path = "../attic", default-features = false, features = ["chunking", "io", "tokio"] } 23 23 attic-token = { path = "../token" } 24 24 25 25 anyhow = "1.0.98"
+1 -1
server/src/api/binary_cache.rs
··· 32 32 use crate::storage::{Download, StorageBackend}; 33 33 use crate::{RequestState, State}; 34 34 use attic::cache::CacheName; 35 + use attic::io::merge_chunks; 35 36 use attic::mime; 36 37 use attic::nix_store::StorePathHash; 37 - use attic::stream::merge_chunks; 38 38 39 39 /// Nix cache information. 40 40 ///
+40 -143
server/src/api/v1/upload_path.rs
··· 14 14 }; 15 15 use bytes::{Bytes, BytesMut}; 16 16 use chrono::Utc; 17 - use digest::Output as DigestOutput; 18 17 use futures::future::join_all; 19 18 use futures::StreamExt; 20 19 use sea_orm::entity::prelude::*; ··· 22 21 use sea_orm::ActiveValue::Set; 23 22 use sea_orm::{QuerySelect, TransactionTrait}; 24 23 use sha2::{Digest, Sha256}; 25 - use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, BufReader}; 26 - use tokio::sync::{OnceCell, Semaphore}; 24 + use tokio::io::{AsyncBufRead, AsyncReadExt}; 25 + use tokio::sync::Semaphore; 27 26 use tokio::task::spawn; 28 27 use tokio_util::io::StreamReader; 29 28 use tracing::instrument; 30 29 use uuid::Uuid; 31 30 31 + use crate::compression::{CompressionStream, CompressorFn}; 32 32 use crate::config::CompressionType; 33 33 use crate::error::{ErrorKind, ServerError, ServerResult}; 34 34 use crate::narinfo::Compression; ··· 39 39 }; 40 40 use attic::chunking::chunk_stream; 41 41 use attic::hash::Hash; 42 - use attic::stream::{read_chunk_async, StreamHasher}; 42 + use attic::io::{read_chunk_async, HashReader}; 43 43 use attic::util::Finally; 44 44 45 45 use crate::database::entity::cache; ··· 55 55 /// TODO: Make this configurable 56 56 const CONCURRENT_CHUNK_UPLOADS: usize = 10; 57 57 58 - type CompressorFn<C> = Box<dyn FnOnce(C) -> Box<dyn AsyncRead + Unpin + Send> + Send>; 59 - 60 58 /// Data of a chunk. 61 59 enum ChunkData { 62 60 /// Some bytes in memory. 63 61 Bytes(Bytes), 64 62 65 63 /// A stream with a user-claimed hash and size that are potentially incorrect. 66 - Stream(Box<dyn AsyncRead + Send + Unpin + 'static>, Hash, usize), 64 + Stream(Box<dyn AsyncBufRead + Send + Unpin + 'static>, Hash, usize), 67 65 } 68 66 69 67 /// Result of a chunk upload. ··· 72 70 deduplicated: bool, 73 71 } 74 72 75 - /// Applies compression to a stream, computing hashes along the way. 76 - /// 77 - /// Our strategy is to stream directly onto a UUID-keyed file on the 78 - /// storage backend, performing compression and computing the hashes 79 - /// along the way. We delete the file if the hashes do not match. 80 - /// 81 - /// ```text 82 - /// ┌───────────────────────────────────►NAR Hash 83 - /// │ 84 - /// │ 85 - /// ├───────────────────────────────────►NAR Size 86 - /// │ 87 - /// ┌─────┴────┐ ┌──────────┐ ┌───────────┐ 88 - /// NAR Stream──►│NAR Hasher├─►│Compressor├─►│File Hasher├─►File Stream 89 - /// └──────────┘ └──────────┘ └─────┬─────┘ 90 - /// │ 91 - /// ├───────►File Hash 92 - /// │ 93 - /// │ 94 - /// └───────►File Size 95 - /// ``` 96 - struct CompressionStream { 97 - stream: Box<dyn AsyncRead + Unpin + Send>, 98 - nar_compute: Arc<OnceCell<(DigestOutput<Sha256>, usize)>>, 99 - file_compute: Arc<OnceCell<(DigestOutput<Sha256>, usize)>>, 100 - } 101 - 102 73 trait UploadPathNarInfoExt { 103 74 fn to_active_model(&self) -> object::ActiveModel; 104 75 } ··· 180 151 let username = req_state.auth.username().map(str::to_string); 181 152 182 153 // Try to acquire a lock on an existing NAR 183 - let existing_nar = database.find_and_lock_nar(&upload_info.nar_hash).await?; 184 - match existing_nar { 185 - Some(existing_nar) => { 186 - // Deduplicate? 187 - let missing_chunk = ChunkRef::find() 188 - .filter(chunkref::Column::NarId.eq(existing_nar.id)) 189 - .filter(chunkref::Column::ChunkId.is_null()) 190 - .limit(1) 191 - .one(database) 192 - .await 193 - .map_err(ServerError::database_error)?; 154 + if let Some(existing_nar) = database.find_and_lock_nar(&upload_info.nar_hash).await? { 155 + // Deduplicate? 156 + let missing_chunk = ChunkRef::find() 157 + .filter(chunkref::Column::NarId.eq(existing_nar.id)) 158 + .filter(chunkref::Column::ChunkId.is_null()) 159 + .limit(1) 160 + .one(database) 161 + .await 162 + .map_err(ServerError::database_error)?; 194 163 195 - if missing_chunk.is_some() { 196 - // Need to repair 197 - upload_path_new(username, cache, upload_info, stream, database, &state).await 198 - } else { 199 - // Can actually be deduplicated 200 - upload_path_dedup( 201 - username, 202 - cache, 203 - upload_info, 204 - stream, 205 - database, 206 - &state, 207 - existing_nar, 208 - ) 209 - .await 210 - } 211 - } 212 - None => { 213 - // New NAR 214 - upload_path_new(username, cache, upload_info, stream, database, &state).await 164 + if missing_chunk.is_none() { 165 + // Can actually be deduplicated 166 + return upload_path_dedup( 167 + username, 168 + cache, 169 + upload_info, 170 + stream, 171 + database, 172 + &state, 173 + existing_nar, 174 + ) 175 + .await; 215 176 } 216 177 } 178 + 179 + // New NAR or need to repair 180 + upload_path_new(username, cache, upload_info, stream, database, &state).await 217 181 } 218 182 219 183 /// Uploads a path when there is already a matching NAR in the global cache. ··· 221 185 username: Option<String>, 222 186 cache: cache::Model, 223 187 upload_info: UploadPathNarInfo, 224 - stream: impl AsyncRead + Unpin, 188 + stream: impl AsyncBufRead + Unpin, 225 189 database: &DatabaseConnection, 226 190 state: &State, 227 191 existing_nar: NarGuard, 228 192 ) -> ServerResult<Json<UploadPathResult>> { 229 193 if state.config.require_proof_of_possession { 230 - let (mut stream, nar_compute) = StreamHasher::new(stream, Sha256::new()); 194 + let (mut stream, nar_compute) = HashReader::new(stream, Sha256::new()); 231 195 tokio::io::copy(&mut stream, &mut tokio::io::sink()) 232 196 .await 233 197 .map_err(ServerError::request_error)?; ··· 301 265 username: Option<String>, 302 266 cache: cache::Model, 303 267 upload_info: UploadPathNarInfo, 304 - stream: impl AsyncRead + Send + Unpin + 'static, 268 + stream: impl AsyncBufRead + Send + Unpin + 'static, 305 269 database: &DatabaseConnection, 306 270 state: &State, 307 271 ) -> ServerResult<Json<UploadPathResult>> { ··· 319 283 username: Option<String>, 320 284 cache: cache::Model, 321 285 upload_info: UploadPathNarInfo, 322 - stream: impl AsyncRead + Send + Unpin + 'static, 286 + stream: impl AsyncBufRead + Send + Unpin + 'static, 323 287 database: &DatabaseConnection, 324 288 state: &State, 325 289 ) -> ServerResult<Json<UploadPathResult>> { ··· 371 335 }); 372 336 373 337 let stream = stream.take(upload_info.nar_size as u64); 374 - let (stream, nar_compute) = StreamHasher::new(stream, Sha256::new()); 338 + let (stream, nar_compute) = HashReader::new(stream, Sha256::new()); 375 339 let mut chunks = chunk_stream( 376 340 stream, 377 341 chunking_config.min_size, ··· 510 474 username: Option<String>, 511 475 cache: cache::Model, 512 476 upload_info: UploadPathNarInfo, 513 - stream: impl AsyncRead + Send + Unpin + 'static, 477 + stream: impl AsyncBufRead + Send + Unpin + 'static, 514 478 database: &DatabaseConnection, 515 479 state: &State, 516 480 ) -> ServerResult<Json<UploadPathResult>> { ··· 623 587 { 624 588 // There's an existing chunk matching the hash 625 589 if require_proof_of_possession && !data.is_hash_trusted() { 626 - let stream = data.into_async_read(); 590 + let stream = data.into_async_buf_read(); 627 591 628 - let (mut stream, nar_compute) = StreamHasher::new(stream, Sha256::new()); 592 + let (mut stream, nar_compute) = HashReader::new(stream, Sha256::new()); 629 593 tokio::io::copy(&mut stream, &mut tokio::io::sink()) 630 594 .await 631 595 .map_err(ServerError::request_error)?; ··· 705 669 706 670 // Compress and stream to the storage backend 707 671 let compressor = get_compressor_fn(compression_type, compression_level); 708 - let mut stream = CompressionStream::new(data.into_async_read(), compressor); 672 + let mut stream = CompressionStream::new(data.into_async_buf_read(), compressor); 709 673 710 674 backend 711 675 .upload_file(key, stream.stream()) ··· 809 773 matches!(self, ChunkData::Bytes(_)) 810 774 } 811 775 812 - /// Turns the data into a stream. 813 - fn into_async_read(self) -> Box<dyn AsyncRead + Unpin + Send> { 776 + /// Turns the data into an AsyncBufRead. 777 + fn into_async_buf_read(self) -> Box<dyn AsyncBufRead + Unpin + Send> { 814 778 match self { 815 779 Self::Bytes(bytes) => Box::new(Cursor::new(bytes)), 816 780 Self::Stream(stream, _, _) => stream, 817 781 } 818 - } 819 - } 820 - 821 - impl CompressionStream { 822 - /// Creates a new compression stream. 823 - fn new<R>(stream: R, compressor: CompressorFn<BufReader<StreamHasher<R, Sha256>>>) -> Self 824 - where 825 - R: AsyncRead + Unpin + Send + 'static, 826 - { 827 - // compute NAR hash and size 828 - let (stream, nar_compute) = StreamHasher::new(stream, Sha256::new()); 829 - 830 - // compress NAR 831 - let stream = compressor(BufReader::new(stream)); 832 - 833 - // compute file hash and size 834 - let (stream, file_compute) = StreamHasher::new(stream, Sha256::new()); 835 - 836 - Self { 837 - stream: Box::new(stream), 838 - nar_compute, 839 - file_compute, 840 - } 841 - } 842 - 843 - /* 844 - /// Creates a compression stream without compute the uncompressed hash/size. 845 - /// 846 - /// This is useful if you already know the hash. `nar_hash_and_size` will 847 - /// always return `None`. 848 - fn new_without_nar_hash<R>(stream: R, compressor: CompressorFn<BufReader<R>>) -> Self 849 - where 850 - R: AsyncRead + Unpin + Send + 'static, 851 - { 852 - // compress NAR 853 - let stream = compressor(BufReader::new(stream)); 854 - 855 - // compute file hash and size 856 - let (stream, file_compute) = StreamHasher::new(stream, Sha256::new()); 857 - 858 - Self { 859 - stream: Box::new(stream), 860 - nar_compute: Arc::new(OnceCell::new()), 861 - file_compute, 862 - } 863 - } 864 - */ 865 - 866 - /// Returns the stream of the compressed object. 867 - fn stream(&mut self) -> &mut (impl AsyncRead + Unpin) { 868 - &mut self.stream 869 - } 870 - 871 - /// Returns the NAR hash and size. 872 - /// 873 - /// The hash is only finalized when the stream is fully read. 874 - /// Otherwise, returns `None`. 875 - fn nar_hash_and_size(&self) -> Option<&(DigestOutput<Sha256>, usize)> { 876 - self.nar_compute.get() 877 - } 878 - 879 - /// Returns the file hash and size. 880 - /// 881 - /// The hash is only finalized when the stream is fully read. 882 - /// Otherwise, returns `None`. 883 - fn file_hash_and_size(&self) -> Option<&(DigestOutput<Sha256>, usize)> { 884 - self.file_compute.get() 885 782 } 886 783 } 887 784
+81
server/src/compression.rs
··· 1 + use std::sync::Arc; 2 + 3 + use digest::Output as DigestOutput; 4 + use sha2::{Digest, Sha256}; 5 + use tokio::io::{AsyncBufRead, AsyncRead}; 6 + use tokio::sync::OnceCell; 7 + 8 + use attic::io::HashReader; 9 + 10 + pub type CompressorFn<C> = Box<dyn FnOnce(C) -> Box<dyn AsyncRead + Unpin + Send> + Send>; 11 + 12 + /// Applies compression to a stream, computing hashes along the way. 13 + /// 14 + /// Our strategy is to stream directly onto a UUID-keyed file on the 15 + /// storage backend, performing compression and computing the hashes 16 + /// along the way. We delete the file if the hashes do not match. 17 + /// 18 + /// ```text 19 + /// ┌───────────────────────────────────►NAR Hash 20 + /// │ 21 + /// │ 22 + /// ├───────────────────────────────────►NAR Size 23 + /// │ 24 + /// ┌─────┴────┐ ┌──────────┐ ┌───────────┐ 25 + /// NAR Stream──►│NAR Hasher├─►│Compressor├─►│File Hasher├─►File Stream 26 + /// └──────────┘ └──────────┘ └─────┬─────┘ 27 + /// │ 28 + /// ├───────►File Hash 29 + /// │ 30 + /// │ 31 + /// └───────►File Size 32 + /// ``` 33 + pub struct CompressionStream { 34 + stream: Box<dyn AsyncRead + Unpin + Send>, 35 + nar_compute: Arc<OnceCell<(DigestOutput<Sha256>, usize)>>, 36 + file_compute: Arc<OnceCell<(DigestOutput<Sha256>, usize)>>, 37 + } 38 + 39 + impl CompressionStream { 40 + /// Creates a new compression stream. 41 + pub fn new<R>(stream: R, compressor: CompressorFn<HashReader<R, Sha256>>) -> Self 42 + where 43 + R: AsyncBufRead + Unpin + Send + 'static, 44 + { 45 + // compute NAR hash and size 46 + let (stream, nar_compute) = HashReader::new(stream, Sha256::new()); 47 + 48 + // compress NAR 49 + let stream = compressor(stream); 50 + 51 + // compute file hash and size 52 + let (stream, file_compute) = HashReader::new(stream, Sha256::new()); 53 + 54 + Self { 55 + stream: Box::new(stream), 56 + nar_compute, 57 + file_compute, 58 + } 59 + } 60 + 61 + /// Returns the stream of the compressed object. 62 + pub fn stream(&mut self) -> &mut (impl AsyncRead + Unpin) { 63 + &mut self.stream 64 + } 65 + 66 + /// Returns the NAR hash and size. 67 + /// 68 + /// The hash is only finalized when the stream is fully read. 69 + /// Otherwise, returns `None`. 70 + pub fn nar_hash_and_size(&self) -> Option<&(DigestOutput<Sha256>, usize)> { 71 + self.nar_compute.get() 72 + } 73 + 74 + /// Returns the file hash and size. 75 + /// 76 + /// The hash is only finalized when the stream is fully read. 77 + /// Otherwise, returns `None`. 78 + pub fn file_hash_and_size(&self) -> Option<&(DigestOutput<Sha256>, usize)> { 79 + self.file_compute.get() 80 + } 81 + }
+1
server/src/lib.rs
··· 15 15 16 16 pub mod access; 17 17 mod api; 18 + mod compression; 18 19 pub mod config; 19 20 pub mod database; 20 21 pub mod error;
+1 -1
server/src/storage/s3.rs
··· 19 19 20 20 use super::{Download, RemoteFile, StorageBackend}; 21 21 use crate::error::{ErrorKind, ServerError, ServerResult}; 22 - use attic::stream::read_chunk_async; 22 + use attic::io::read_chunk_async; 23 23 use attic::util::Finally; 24 24 25 25 /// The chunk size for each part in a multipart upload.