A fork of attic a self-hostable Nix Binary Cache server
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

server: Factor out CompressionStream

+85 -99
+3 -99
server/src/api/v1/upload_path.rs
··· 14 14 }; 15 15 use bytes::{Bytes, BytesMut}; 16 16 use chrono::Utc; 17 - use digest::Output as DigestOutput; 18 17 use futures::future::join_all; 19 18 use futures::StreamExt; 20 19 use sea_orm::entity::prelude::*; ··· 22 21 use sea_orm::ActiveValue::Set; 23 22 use sea_orm::{QuerySelect, TransactionTrait}; 24 23 use sha2::{Digest, Sha256}; 25 - use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, BufReader}; 26 - use tokio::sync::{OnceCell, Semaphore}; 24 + use tokio::io::{AsyncBufRead, AsyncReadExt}; 25 + use tokio::sync::Semaphore; 27 26 use tokio::task::spawn; 28 27 use tokio_util::io::StreamReader; 29 28 use tracing::instrument; 30 29 use uuid::Uuid; 31 30 31 + use crate::compression::{CompressionStream, CompressorFn}; 32 32 use crate::config::CompressionType; 33 33 use crate::error::{ErrorKind, ServerError, ServerResult}; 34 34 use crate::narinfo::Compression; ··· 55 55 /// TODO: Make this configurable 56 56 const CONCURRENT_CHUNK_UPLOADS: usize = 10; 57 57 58 - type CompressorFn<C> = Box<dyn FnOnce(C) -> Box<dyn AsyncRead + Unpin + Send> + Send>; 59 - 60 58 /// Data of a chunk. 61 59 enum ChunkData { 62 60 /// Some bytes in memory. ··· 70 68 struct UploadChunkResult { 71 69 guard: ChunkGuard, 72 70 deduplicated: bool, 73 - } 74 - 75 - /// Applies compression to a stream, computing hashes along the way. 76 - /// 77 - /// Our strategy is to stream directly onto a UUID-keyed file on the 78 - /// storage backend, performing compression and computing the hashes 79 - /// along the way. We delete the file if the hashes do not match. 80 - /// 81 - /// ```text 82 - /// ┌───────────────────────────────────►NAR Hash 83 - /// │ 84 - /// │ 85 - /// ├───────────────────────────────────►NAR Size 86 - /// │ 87 - /// ┌─────┴────┐ ┌──────────┐ ┌───────────┐ 88 - /// NAR Stream──►│NAR Hasher├─►│Compressor├─►│File Hasher├─►File Stream 89 - /// └──────────┘ └──────────┘ └─────┬─────┘ 90 - /// │ 91 - /// ├───────►File Hash 92 - /// │ 93 - /// │ 94 - /// └───────►File Size 95 - /// ``` 96 - struct CompressionStream { 97 - stream: Box<dyn AsyncRead + Unpin + Send>, 98 - nar_compute: Arc<OnceCell<(DigestOutput<Sha256>, usize)>>, 99 - file_compute: Arc<OnceCell<(DigestOutput<Sha256>, usize)>>, 100 71 } 101 72 102 73 trait UploadPathNarInfoExt { ··· 808 779 Self::Bytes(bytes) => Box::new(Cursor::new(bytes)), 809 780 Self::Stream(stream, _, _) => stream, 810 781 } 811 - } 812 - } 813 - 814 - impl CompressionStream { 815 - /// Creates a new compression stream. 816 - fn new<R>(stream: R, compressor: CompressorFn<BufReader<HashReader<R, Sha256>>>) -> Self 817 - where 818 - R: AsyncRead + Unpin + Send + 'static, 819 - { 820 - // compute NAR hash and size 821 - let (stream, nar_compute) = HashReader::new(stream, Sha256::new()); 822 - 823 - // compress NAR 824 - let stream = compressor(BufReader::new(stream)); 825 - 826 - // compute file hash and size 827 - let (stream, file_compute) = HashReader::new(stream, Sha256::new()); 828 - 829 - Self { 830 - stream: Box::new(stream), 831 - nar_compute, 832 - file_compute, 833 - } 834 - } 835 - 836 - /* 837 - /// Creates a compression stream without compute the uncompressed hash/size. 838 - /// 839 - /// This is useful if you already know the hash. `nar_hash_and_size` will 840 - /// always return `None`. 841 - fn new_without_nar_hash<R>(stream: R, compressor: CompressorFn<BufReader<R>>) -> Self 842 - where 843 - R: AsyncRead + Unpin + Send + 'static, 844 - { 845 - // compress NAR 846 - let stream = compressor(BufReader::new(stream)); 847 - 848 - // compute file hash and size 849 - let (stream, file_compute) = HashReader::new(stream, Sha256::new()); 850 - 851 - Self { 852 - stream: Box::new(stream), 853 - nar_compute: Arc::new(OnceCell::new()), 854 - file_compute, 855 - } 856 - } 857 - */ 858 - 859 - /// Returns the stream of the compressed object. 860 - fn stream(&mut self) -> &mut (impl AsyncRead + Unpin) { 861 - &mut self.stream 862 - } 863 - 864 - /// Returns the NAR hash and size. 865 - /// 866 - /// The hash is only finalized when the stream is fully read. 867 - /// Otherwise, returns `None`. 868 - fn nar_hash_and_size(&self) -> Option<&(DigestOutput<Sha256>, usize)> { 869 - self.nar_compute.get() 870 - } 871 - 872 - /// Returns the file hash and size. 873 - /// 874 - /// The hash is only finalized when the stream is fully read. 875 - /// Otherwise, returns `None`. 876 - fn file_hash_and_size(&self) -> Option<&(DigestOutput<Sha256>, usize)> { 877 - self.file_compute.get() 878 782 } 879 783 } 880 784
+81
server/src/compression.rs
··· 1 + use std::sync::Arc; 2 + 3 + use digest::Output as DigestOutput; 4 + use sha2::{Digest, Sha256}; 5 + use tokio::io::{AsyncRead, BufReader}; 6 + use tokio::sync::OnceCell; 7 + 8 + use attic::io::HashReader; 9 + 10 + pub type CompressorFn<C> = Box<dyn FnOnce(C) -> Box<dyn AsyncRead + Unpin + Send> + Send>; 11 + 12 + /// Applies compression to a stream, computing hashes along the way. 13 + /// 14 + /// Our strategy is to stream directly onto a UUID-keyed file on the 15 + /// storage backend, performing compression and computing the hashes 16 + /// along the way. We delete the file if the hashes do not match. 17 + /// 18 + /// ```text 19 + /// ┌───────────────────────────────────►NAR Hash 20 + /// │ 21 + /// │ 22 + /// ├───────────────────────────────────►NAR Size 23 + /// │ 24 + /// ┌─────┴────┐ ┌──────────┐ ┌───────────┐ 25 + /// NAR Stream──►│NAR Hasher├─►│Compressor├─►│File Hasher├─►File Stream 26 + /// └──────────┘ └──────────┘ └─────┬─────┘ 27 + /// │ 28 + /// ├───────►File Hash 29 + /// │ 30 + /// │ 31 + /// └───────►File Size 32 + /// ``` 33 + pub struct CompressionStream { 34 + stream: Box<dyn AsyncRead + Unpin + Send>, 35 + nar_compute: Arc<OnceCell<(DigestOutput<Sha256>, usize)>>, 36 + file_compute: Arc<OnceCell<(DigestOutput<Sha256>, usize)>>, 37 + } 38 + 39 + impl CompressionStream { 40 + /// Creates a new compression stream. 41 + pub fn new<R>(stream: R, compressor: CompressorFn<BufReader<HashReader<R, Sha256>>>) -> Self 42 + where 43 + R: AsyncRead + Unpin + Send + 'static, 44 + { 45 + // compute NAR hash and size 46 + let (stream, nar_compute) = HashReader::new(stream, Sha256::new()); 47 + 48 + // compress NAR 49 + let stream = compressor(BufReader::new(stream)); 50 + 51 + // compute file hash and size 52 + let (stream, file_compute) = HashReader::new(stream, Sha256::new()); 53 + 54 + Self { 55 + stream: Box::new(stream), 56 + nar_compute, 57 + file_compute, 58 + } 59 + } 60 + 61 + /// Returns the stream of the compressed object. 62 + pub fn stream(&mut self) -> &mut (impl AsyncRead + Unpin) { 63 + &mut self.stream 64 + } 65 + 66 + /// Returns the NAR hash and size. 67 + /// 68 + /// The hash is only finalized when the stream is fully read. 69 + /// Otherwise, returns `None`. 70 + pub fn nar_hash_and_size(&self) -> Option<&(DigestOutput<Sha256>, usize)> { 71 + self.nar_compute.get() 72 + } 73 + 74 + /// Returns the file hash and size. 75 + /// 76 + /// The hash is only finalized when the stream is fully read. 77 + /// Otherwise, returns `None`. 78 + pub fn file_hash_and_size(&self) -> Option<&(DigestOutput<Sha256>, usize)> { 79 + self.file_compute.get() 80 + } 81 + }
+1
server/src/lib.rs
··· 15 15 16 16 pub mod access; 17 17 mod api; 18 + mod compression; 18 19 pub mod config; 19 20 pub mod database; 20 21 pub mod error;