A fork of attic a self-hostable Nix Binary Cache server
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

server/upload_path: Switch to AsyncBufRead

+11 -11
+9 -9
server/src/api/v1/upload_path.rs
··· 61 61 Bytes(Bytes), 62 62 63 63 /// A stream with a user-claimed hash and size that are potentially incorrect. 64 - Stream(Box<dyn AsyncRead + Send + Unpin + 'static>, Hash, usize), 64 + Stream(Box<dyn AsyncBufRead + Send + Unpin + 'static>, Hash, usize), 65 65 } 66 66 67 67 /// Result of a chunk upload. ··· 185 185 username: Option<String>, 186 186 cache: cache::Model, 187 187 upload_info: UploadPathNarInfo, 188 - stream: impl AsyncRead + Unpin, 188 + stream: impl AsyncBufRead + Unpin, 189 189 database: &DatabaseConnection, 190 190 state: &State, 191 191 existing_nar: NarGuard, ··· 265 265 username: Option<String>, 266 266 cache: cache::Model, 267 267 upload_info: UploadPathNarInfo, 268 - stream: impl AsyncRead + Send + Unpin + 'static, 268 + stream: impl AsyncBufRead + Send + Unpin + 'static, 269 269 database: &DatabaseConnection, 270 270 state: &State, 271 271 ) -> ServerResult<Json<UploadPathResult>> { ··· 283 283 username: Option<String>, 284 284 cache: cache::Model, 285 285 upload_info: UploadPathNarInfo, 286 - stream: impl AsyncRead + Send + Unpin + 'static, 286 + stream: impl AsyncBufRead + Send + Unpin + 'static, 287 287 database: &DatabaseConnection, 288 288 state: &State, 289 289 ) -> ServerResult<Json<UploadPathResult>> { ··· 474 474 username: Option<String>, 475 475 cache: cache::Model, 476 476 upload_info: UploadPathNarInfo, 477 - stream: impl AsyncRead + Send + Unpin + 'static, 477 + stream: impl AsyncBufRead + Send + Unpin + 'static, 478 478 database: &DatabaseConnection, 479 479 state: &State, 480 480 ) -> ServerResult<Json<UploadPathResult>> { ··· 587 587 { 588 588 // There's an existing chunk matching the hash 589 589 if require_proof_of_possession && !data.is_hash_trusted() { 590 - let stream = data.into_async_read(); 590 + let stream = data.into_async_buf_read(); 591 591 592 592 let (mut stream, nar_compute) = HashReader::new(stream, Sha256::new()); 593 593 tokio::io::copy(&mut stream, &mut tokio::io::sink()) ··· 669 669 670 670 // Compress and stream to the storage backend 671 671 let compressor = get_compressor_fn(compression_type, compression_level); 672 - let mut stream = CompressionStream::new(data.into_async_read(), compressor); 672 + let mut stream = CompressionStream::new(data.into_async_buf_read(), compressor); 673 673 674 674 backend 675 675 .upload_file(key, stream.stream()) ··· 773 773 matches!(self, ChunkData::Bytes(_)) 774 774 } 775 775 776 - /// Turns the data into a stream. 777 - fn into_async_read(self) -> Box<dyn AsyncRead + Unpin + Send> { 776 + /// Turns the data into an AsyncBufRead. 777 + fn into_async_buf_read(self) -> Box<dyn AsyncBufRead + Unpin + Send> { 778 778 match self { 779 779 Self::Bytes(bytes) => Box::new(Cursor::new(bytes)), 780 780 Self::Stream(stream, _, _) => stream,
+2 -2
server/src/compression.rs
··· 2 2 3 3 use digest::Output as DigestOutput; 4 4 use sha2::{Digest, Sha256}; 5 - use tokio::io::{AsyncRead, BufReader}; 5 + use tokio::io::{AsyncBufRead, AsyncRead, BufReader}; 6 6 use tokio::sync::OnceCell; 7 7 8 8 use attic::io::HashReader; ··· 40 40 /// Creates a new compression stream. 41 41 pub fn new<R>(stream: R, compressor: CompressorFn<BufReader<HashReader<R, Sha256>>>) -> Self 42 42 where 43 - R: AsyncRead + Unpin + Send + 'static, 43 + R: AsyncBufRead + Unpin + Send + 'static, 44 44 { 45 45 // compute NAR hash and size 46 46 let (stream, nar_compute) = HashReader::new(stream, Sha256::new());