A fork of attic a self-hostable Nix Binary Cache server
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

attic/io: Rename StreamHasher to HashReader

Makes all naming consistent.

+146 -120
+131
attic/src/io/hash_reader.rs
··· 1 + use std::marker::Unpin; 2 + use std::pin::Pin; 3 + use std::sync::Arc; 4 + use std::task::{Context, Poll}; 5 + 6 + use digest::{Digest, Output as DigestOutput}; 7 + use tokio::io::{AsyncRead, ReadBuf}; 8 + use tokio::sync::OnceCell; 9 + 10 + /// AsyncRead filter that hashes the bytes that have been read. 11 + /// 12 + /// The hash is finalized when EOF is reached. 13 + pub struct HashReader<R, D> 14 + where 15 + R: AsyncRead + Unpin, 16 + D: Digest + Unpin, 17 + { 18 + inner: R, 19 + digest: Option<D>, 20 + bytes_read: usize, 21 + finalized: Arc<OnceCell<(DigestOutput<D>, usize)>>, 22 + } 23 + 24 + impl<R, D> HashReader<R, D> 25 + where 26 + R: AsyncRead + Unpin, 27 + D: Digest + Unpin, 28 + { 29 + pub fn new(inner: R, digest: D) -> (Self, Arc<OnceCell<(DigestOutput<D>, usize)>>) { 30 + let finalized = Arc::new(OnceCell::new()); 31 + 32 + ( 33 + Self { 34 + inner, 35 + digest: Some(digest), 36 + bytes_read: 0, 37 + finalized: finalized.clone(), 38 + }, 39 + finalized, 40 + ) 41 + } 42 + } 43 + 44 + impl<R, D> AsyncRead for HashReader<R, D> 45 + where 46 + R: AsyncRead + Unpin, 47 + D: Digest + Unpin, 48 + { 49 + fn poll_read( 50 + mut self: Pin<&mut Self>, 51 + cx: &mut Context<'_>, 52 + buf: &mut ReadBuf<'_>, 53 + ) -> Poll<tokio::io::Result<()>> { 54 + let old_filled = buf.filled().len(); 55 + let r = Pin::new(&mut self.inner).poll_read(cx, buf); 56 + let read_len = buf.filled().len() - old_filled; 57 + 58 + match r { 59 + Poll::Ready(Ok(())) => { 60 + if read_len == 0 { 61 + // EOF 62 + if let Some(digest) = self.digest.take() { 63 + self.finalized 64 + .set((digest.finalize(), self.bytes_read)) 65 + .expect("Hash has already been finalized"); 66 + } 67 + } else { 68 + // Read something 69 + let digest = self.digest.as_mut().expect("Stream has data after EOF"); 70 + 71 + let filled = buf.filled(); 72 + digest.update(&filled[filled.len() - read_len..]); 73 + self.bytes_read += read_len; 74 + } 75 + } 76 + Poll::Ready(Err(_)) => { 77 + assert!(read_len == 0); 78 + } 79 + Poll::Pending => {} 80 + } 81 + 82 + r 83 + } 84 + } 85 + 86 + #[cfg(test)] 87 + mod tests { 88 + use super::*; 89 + 90 + use tokio::io::AsyncReadExt; 91 + 92 + #[tokio::test] 93 + async fn test_hash_reader() { 94 + let expected = b"hello world"; 95 + let expected_sha256 = 96 + hex::decode("b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9") 97 + .unwrap(); 98 + 99 + let (mut read, finalized) = HashReader::new(expected.as_slice(), sha2::Sha256::new()); 100 + assert!(finalized.get().is_none()); 101 + 102 + // force multiple reads 103 + let mut buf = vec![0u8; 100]; 104 + let mut bytes_read = 0; 105 + bytes_read += read 106 + .read(&mut buf[bytes_read..bytes_read + 5]) 107 + .await 108 + .unwrap(); 109 + bytes_read += read 110 + .read(&mut buf[bytes_read..bytes_read + 5]) 111 + .await 112 + .unwrap(); 113 + bytes_read += read 114 + .read(&mut buf[bytes_read..bytes_read + 5]) 115 + .await 116 + .unwrap(); 117 + bytes_read += read 118 + .read(&mut buf[bytes_read..bytes_read + 5]) 119 + .await 120 + .unwrap(); 121 + 122 + assert_eq!(expected.len(), bytes_read); 123 + assert_eq!(expected, &buf[..bytes_read]); 124 + 125 + let (hash, count) = finalized.get().expect("Hash wasn't finalized"); 126 + 127 + assert_eq!(expected_sha256.as_slice(), hash.as_slice()); 128 + assert_eq!(expected.len(), *count); 129 + eprintln!("finalized = {:x?}", finalized); 130 + } 131 + }
+4 -109
attic/src/io/mod.rs
··· 1 1 //! Stream utilities. 2 2 3 + mod hash_reader; 4 + 3 5 use std::collections::VecDeque; 4 6 use std::future::Future; 5 7 use std::marker::Unpin; 6 8 use std::pin::Pin; 7 - use std::sync::Arc; 8 - use std::task::{Context, Poll}; 9 9 10 10 use async_stream::try_stream; 11 11 use bytes::{Bytes, BytesMut}; 12 - use digest::{Digest, Output as DigestOutput}; 13 12 use futures::stream::{BoxStream, Stream, StreamExt}; 14 - use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; 15 - use tokio::sync::OnceCell; 13 + use tokio::io::{AsyncRead, AsyncReadExt}; 16 14 use tokio::task::spawn; 17 15 18 - /// Stream filter that hashes the bytes that have been read. 19 - /// 20 - /// The hash is finalized when EOF is reached. 21 - pub struct StreamHasher<R: AsyncRead + Unpin, D: Digest + Unpin> { 22 - inner: R, 23 - digest: Option<D>, 24 - bytes_read: usize, 25 - finalized: Arc<OnceCell<(DigestOutput<D>, usize)>>, 26 - } 16 + pub use hash_reader::HashReader; 27 17 28 18 /// Merge chunks lazily into a continuous stream. 29 19 /// ··· 98 88 Box::pin(s) 99 89 } 100 90 101 - impl<R: AsyncRead + Unpin, D: Digest + Unpin> StreamHasher<R, D> { 102 - pub fn new(inner: R, digest: D) -> (Self, Arc<OnceCell<(DigestOutput<D>, usize)>>) { 103 - let finalized = Arc::new(OnceCell::new()); 104 - 105 - ( 106 - Self { 107 - inner, 108 - digest: Some(digest), 109 - bytes_read: 0, 110 - finalized: finalized.clone(), 111 - }, 112 - finalized, 113 - ) 114 - } 115 - } 116 - 117 - impl<R: AsyncRead + Unpin, D: Digest + Unpin> AsyncRead for StreamHasher<R, D> { 118 - fn poll_read( 119 - mut self: Pin<&mut Self>, 120 - cx: &mut Context<'_>, 121 - buf: &mut ReadBuf<'_>, 122 - ) -> Poll<tokio::io::Result<()>> { 123 - let old_filled = buf.filled().len(); 124 - let r = Pin::new(&mut self.inner).poll_read(cx, buf); 125 - let read_len = buf.filled().len() - old_filled; 126 - 127 - match r { 128 - Poll::Ready(Ok(())) => { 129 - if read_len == 0 { 130 - // EOF 131 - if let Some(digest) = self.digest.take() { 132 - self.finalized 133 - .set((digest.finalize(), self.bytes_read)) 134 - .expect("Hash has already been finalized"); 135 - } 136 - } else { 137 - // Read something 138 - let digest = self.digest.as_mut().expect("Stream has data after EOF"); 139 - 140 - let filled = buf.filled(); 141 - digest.update(&filled[filled.len() - read_len..]); 142 - self.bytes_read += read_len; 143 - } 144 - } 145 - Poll::Ready(Err(_)) => { 146 - assert!(read_len == 0); 147 - } 148 - Poll::Pending => {} 149 - } 150 - 151 - r 152 - } 153 - } 154 - 155 91 /// Greedily reads from a stream to fill a buffer. 156 92 pub async fn read_chunk_async<S: AsyncRead + Unpin + Send>( 157 93 stream: &mut S, ··· 175 111 use async_stream::stream; 176 112 use bytes::{BufMut, BytesMut}; 177 113 use futures::future; 178 - use tokio::io::AsyncReadExt; 179 - 180 - #[tokio::test] 181 - async fn test_stream_hasher() { 182 - let expected = b"hello world"; 183 - let expected_sha256 = 184 - hex::decode("b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9") 185 - .unwrap(); 186 - 187 - let (mut read, finalized) = StreamHasher::new(expected.as_slice(), sha2::Sha256::new()); 188 - assert!(finalized.get().is_none()); 189 - 190 - // force multiple reads 191 - let mut buf = vec![0u8; 100]; 192 - let mut bytes_read = 0; 193 - bytes_read += read 194 - .read(&mut buf[bytes_read..bytes_read + 5]) 195 - .await 196 - .unwrap(); 197 - bytes_read += read 198 - .read(&mut buf[bytes_read..bytes_read + 5]) 199 - .await 200 - .unwrap(); 201 - bytes_read += read 202 - .read(&mut buf[bytes_read..bytes_read + 5]) 203 - .await 204 - .unwrap(); 205 - bytes_read += read 206 - .read(&mut buf[bytes_read..bytes_read + 5]) 207 - .await 208 - .unwrap(); 209 - 210 - assert_eq!(expected.len(), bytes_read); 211 - assert_eq!(expected, &buf[..bytes_read]); 212 - 213 - let (hash, count) = finalized.get().expect("Hash wasn't finalized"); 214 - 215 - assert_eq!(expected_sha256.as_slice(), hash.as_slice()); 216 - assert_eq!(expected.len(), *count); 217 - eprintln!("finalized = {:x?}", finalized); 218 - } 219 114 220 115 #[tokio::test] 221 116 async fn test_merge_chunks() {
+2 -2
attic/src/lib.rs
··· 21 21 pub mod chunking; 22 22 pub mod error; 23 23 pub mod hash; 24 + #[cfg(feature = "io")] 25 + pub mod io; 24 26 pub mod mime; 25 27 pub mod nix_store; 26 28 pub mod signing; 27 - #[cfg(feature = "io")] 28 - pub mod io; 29 29 #[cfg(target_family = "unix")] 30 30 pub mod testing; 31 31 #[cfg(feature = "tokio")]
+1 -1
server/src/api/binary_cache.rs
··· 32 32 use crate::storage::{Download, StorageBackend}; 33 33 use crate::{RequestState, State}; 34 34 use attic::cache::CacheName; 35 + use attic::io::merge_chunks; 35 36 use attic::mime; 36 37 use attic::nix_store::StorePathHash; 37 - use attic::io::merge_chunks; 38 38 39 39 /// Nix cache information. 40 40 ///
+8 -8
server/src/api/v1/upload_path.rs
··· 39 39 }; 40 40 use attic::chunking::chunk_stream; 41 41 use attic::hash::Hash; 42 - use attic::io::{read_chunk_async, StreamHasher}; 42 + use attic::io::{read_chunk_async, HashReader}; 43 43 use attic::util::Finally; 44 44 45 45 use crate::database::entity::cache; ··· 227 227 existing_nar: NarGuard, 228 228 ) -> ServerResult<Json<UploadPathResult>> { 229 229 if state.config.require_proof_of_possession { 230 - let (mut stream, nar_compute) = StreamHasher::new(stream, Sha256::new()); 230 + let (mut stream, nar_compute) = HashReader::new(stream, Sha256::new()); 231 231 tokio::io::copy(&mut stream, &mut tokio::io::sink()) 232 232 .await 233 233 .map_err(ServerError::request_error)?; ··· 371 371 }); 372 372 373 373 let stream = stream.take(upload_info.nar_size as u64); 374 - let (stream, nar_compute) = StreamHasher::new(stream, Sha256::new()); 374 + let (stream, nar_compute) = HashReader::new(stream, Sha256::new()); 375 375 let mut chunks = chunk_stream( 376 376 stream, 377 377 chunking_config.min_size, ··· 625 625 if require_proof_of_possession && !data.is_hash_trusted() { 626 626 let stream = data.into_async_read(); 627 627 628 - let (mut stream, nar_compute) = StreamHasher::new(stream, Sha256::new()); 628 + let (mut stream, nar_compute) = HashReader::new(stream, Sha256::new()); 629 629 tokio::io::copy(&mut stream, &mut tokio::io::sink()) 630 630 .await 631 631 .map_err(ServerError::request_error)?; ··· 820 820 821 821 impl CompressionStream { 822 822 /// Creates a new compression stream. 823 - fn new<R>(stream: R, compressor: CompressorFn<BufReader<StreamHasher<R, Sha256>>>) -> Self 823 + fn new<R>(stream: R, compressor: CompressorFn<BufReader<HashReader<R, Sha256>>>) -> Self 824 824 where 825 825 R: AsyncRead + Unpin + Send + 'static, 826 826 { 827 827 // compute NAR hash and size 828 - let (stream, nar_compute) = StreamHasher::new(stream, Sha256::new()); 828 + let (stream, nar_compute) = HashReader::new(stream, Sha256::new()); 829 829 830 830 // compress NAR 831 831 let stream = compressor(BufReader::new(stream)); 832 832 833 833 // compute file hash and size 834 - let (stream, file_compute) = StreamHasher::new(stream, Sha256::new()); 834 + let (stream, file_compute) = HashReader::new(stream, Sha256::new()); 835 835 836 836 Self { 837 837 stream: Box::new(stream), ··· 853 853 let stream = compressor(BufReader::new(stream)); 854 854 855 855 // compute file hash and size 856 - let (stream, file_compute) = StreamHasher::new(stream, Sha256::new()); 856 + let (stream, file_compute) = HashReader::new(stream, Sha256::new()); 857 857 858 858 Self { 859 859 stream: Box::new(stream),