···11+//! Chunking.
22+//!
33+//! We perform chunking on uncompressed NARs using the FastCDC
44+//! algorithm.
55+66+use async_stream::try_stream;
77+use bytes::{BufMut, Bytes, BytesMut};
88+use fastcdc::ronomon::FastCDC;
99+use futures::stream::Stream;
1010+use tokio::io::AsyncRead;
1111+1212+use crate::stream::read_chunk_async;
1313+1414+/// Splits a streams into content-defined chunks.
1515+///
1616+/// This is a wrapper over fastcdc-rs that takes an `AsyncRead` and
1717+/// returns a `Stream` of chunks as `Bytes`s.
1818+pub fn chunk_stream<R>(
1919+ mut stream: R,
2020+ min_size: usize,
2121+ avg_size: usize,
2222+ max_size: usize,
2323+) -> impl Stream<Item = std::io::Result<Bytes>>
2424+where
2525+ R: AsyncRead + Unpin + Send,
2626+{
2727+ let s = try_stream! {
2828+ let mut buf = BytesMut::with_capacity(max_size);
2929+3030+ loop {
3131+ let read = read_chunk_async(&mut stream, buf).await?;
3232+3333+ let mut eof = false;
3434+ if read.is_empty() {
3535+ // Already EOF
3636+ break;
3737+ } else if read.len() < max_size {
3838+ // Last read
3939+ eof = true;
4040+ }
4141+4242+ let chunks = FastCDC::with_eof(&read, min_size, avg_size, max_size, eof);
4343+ let mut consumed = 0;
4444+4545+ for chunk in chunks {
4646+ consumed += chunk.length;
4747+4848+ let slice = read.slice(chunk.offset..chunk.offset + chunk.length);
4949+ yield slice;
5050+ }
5151+5252+ if eof {
5353+ break;
5454+ }
5555+5656+ buf = BytesMut::with_capacity(max_size);
5757+5858+ if consumed < read.len() {
5959+ // remaining bytes for the next read
6060+ buf.put_slice(&read[consumed..]);
6161+ }
6262+ }
6363+ };
6464+6565+ Box::pin(s)
6666+}
6767+6868+#[cfg(test)]
6969+mod tests {
7070+ use super::*;
7171+7272+ use std::io::Cursor;
7373+7474+ use futures::StreamExt;
7575+7676+ use crate::testing::get_fake_data;
7777+7878+ /// Chunks and reconstructs a file.
7979+ #[tokio::test]
8080+ async fn test_chunking_basic() {
8181+ async fn case(size: usize) {
8282+ let test_file = get_fake_data(size); // 32 MiB
8383+ let mut reconstructed_file = Vec::new();
8484+8585+ let cursor = Cursor::new(&test_file);
8686+ let mut chunks = chunk_stream(cursor, 8 * 1024, 16 * 1024, 32 * 1024);
8787+8888+ while let Some(chunk) = chunks.next().await {
8989+ let chunk = chunk.unwrap();
9090+ eprintln!("Got a {}-byte chunk", chunk.len());
9191+ reconstructed_file.extend(chunk);
9292+ }
9393+9494+ assert_eq!(reconstructed_file, test_file);
9595+ }
9696+9797+ case(32 * 1024 * 1024 - 1).await;
9898+ case(32 * 1024 * 1024).await;
9999+ case(32 * 1024 * 1024 + 1).await;
100100+ }
101101+}
+1-1
attic/src/hash/mod.rs
···118118}
119119120120/// Decodes a base16 or base32 encoded hash containing a specified number of bytes.
121121-fn decode_hash<'s>(s: &'s str, typ: &'static str, expected_bytes: usize) -> AtticResult<Vec<u8>> {
121121+fn decode_hash(s: &str, typ: &'static str, expected_bytes: usize) -> AtticResult<Vec<u8>> {
122122 let base16_len = expected_bytes * 2;
123123 let base32_len = (expected_bytes * 8 - 1) / 5 + 1;
124124
+3-1
attic/src/lib.rs
···17171818pub mod api;
1919pub mod cache;
2020+#[cfg(feature = "chunking")]
2121+pub mod chunking;
2022pub mod error;
2123pub mod hash;
2224pub mod mime;
2325pub mod nix_store;
2426pub mod signing;
2525-#[cfg(feature = "tokio")]
2727+#[cfg(feature = "stream")]
2628pub mod stream;
2729#[cfg(target_family = "unix")]
2830pub mod testing;
···11-//! Chunking.
22-//!
33-//! We perform chunking on uncompressed NARs using the FastCDC
44-//! algorithm.
55-66-use async_stream::try_stream;
77-use bytes::{BufMut, Bytes, BytesMut};
88-use fastcdc::ronomon::FastCDC;
99-use futures::stream::Stream;
1010-use tokio::io::AsyncRead;
1111-1212-use attic::stream::read_chunk_async;
1313-1414-/// Splits a streams into content-defined chunks.
1515-///
1616-/// This is a wrapper over fastcdc-rs that takes an `AsyncRead` and
1717-/// returns a `Stream` of chunks as `Bytes`s.
1818-pub fn chunk_stream<R>(
1919- mut stream: R,
2020- min_size: usize,
2121- avg_size: usize,
2222- max_size: usize,
2323-) -> impl Stream<Item = std::io::Result<Bytes>>
2424-where
2525- R: AsyncRead + Unpin + Send,
2626-{
2727- let s = try_stream! {
2828- let mut buf = BytesMut::with_capacity(max_size);
2929-3030- loop {
3131- let read = read_chunk_async(&mut stream, buf).await?;
3232-3333- let mut eof = false;
3434- if read.is_empty() {
3535- // Already EOF
3636- break;
3737- } else if read.len() < max_size {
3838- // Last read
3939- eof = true;
4040- }
4141-4242- let chunks = FastCDC::with_eof(&read, min_size, avg_size, max_size, eof);
4343- let mut consumed = 0;
4444-4545- for chunk in chunks {
4646- consumed += chunk.length;
4747-4848- let slice = read.slice(chunk.offset..chunk.offset + chunk.length);
4949- yield slice;
5050- }
5151-5252- if eof {
5353- break;
5454- }
5555-5656- buf = BytesMut::with_capacity(max_size);
5757-5858- if consumed < read.len() {
5959- // remaining bytes for the next read
6060- buf.put_slice(&read[consumed..]);
6161- }
6262- }
6363- };
6464-6565- Box::pin(s)
6666-}
6767-6868-#[cfg(test)]
6969-mod tests {
7070- use super::*;
7171-7272- use std::io::Cursor;
7373-7474- use futures::StreamExt;
7575- use tokio_test::block_on;
7676-7777- /// Chunks and reconstructs a file.
7878- #[test]
7979- fn test_chunking_basic() {
8080- fn case(size: usize) {
8181- block_on(async move {
8282- let test_file = get_data(size); // 32 MiB
8383- let mut reconstructed_file = Vec::new();
8484-8585- let cursor = Cursor::new(&test_file);
8686- let mut chunks = chunk_stream(cursor, 8 * 1024, 16 * 1024, 32 * 1024);
8787-8888- while let Some(chunk) = chunks.next().await {
8989- let chunk = chunk.unwrap();
9090- eprintln!("Got a {}-byte chunk", chunk.len());
9191- reconstructed_file.extend(chunk);
9292- }
9393-9494- assert_eq!(reconstructed_file, test_file);
9595- });
9696- }
9797-9898- case(32 * 1024 * 1024 - 1);
9999- case(32 * 1024 * 1024);
100100- case(32 * 1024 * 1024 + 1);
101101- }
102102-103103- /// Returns some fake data.
104104- fn get_data(len: usize) -> Vec<u8> {
105105- let mut state = 42u32;
106106- let mut data = vec![0u8; len];
107107-108108- for i in 0..data.len() {
109109- (state, _) = state.overflowing_mul(1664525u32);
110110- (state, _) = state.overflowing_add(1013904223u32);
111111- data[i] = ((state >> (i % 24)) & 0xff) as u8;
112112- }
113113-114114- data
115115- }
116116-}
+4-4
server/src/gc.rs
···159159 let storage = state.storage().await?;
160160161161 let orphan_chunk_limit = match db.get_database_backend() {
162162- // Arbitrarily chosen sensible value since there's no good default to choose from for MySQL
163163- sea_orm::DatabaseBackend::MySql => 1000,
162162+ // Arbitrarily chosen sensible value since there's no good default to choose from for MySQL
163163+ sea_orm::DatabaseBackend::MySql => 1000,
164164 // Panic limit set by sqlx for postgresql: https://github.com/launchbadge/sqlx/issues/671#issuecomment-687043510
165165 sea_orm::DatabaseBackend::Postgres => u64::from(u16::MAX),
166166- // Default statement limit imposed by sqlite: https://www.sqlite.org/limits.html#max_variable_number
167167- sea_orm::DatabaseBackend::Sqlite => 500,
166166+ // Default statement limit imposed by sqlite: https://www.sqlite.org/limits.html#max_variable_number
167167+ sea_orm::DatabaseBackend::Sqlite => 500,
168168 };
169169170170 // find all orphan chunks...
-1
server/src/lib.rs
···15151616pub mod access;
1717mod api;
1818-mod chunking;
1918pub mod config;
2019pub mod database;
2120pub mod error;
+2-2
server/src/storage/local.rs
···7979 let name = file.file_name();
8080 let name_bytes = name.as_os_str().as_bytes();
8181 let parents = storage_path
8282- .join(OsStr::from_bytes(&name_bytes[0..1]))
8383- .join(OsStr::from_bytes(&name_bytes[0..2]));
8282+ .join(OsStr::from_bytes(&name_bytes[0..1]))
8383+ .join(OsStr::from_bytes(&name_bytes[0..2]));
8484 let new_path = parents.join(name);
8585 fs::create_dir_all(&parents).await.map_err(|e| {
8686 ErrorKind::StorageError(anyhow::anyhow!("Failed to create directory {}", e))