this repo has no description
1use crate::utils::{
2 CmprssInput, CmprssOutput, Compressor, ExtractedTarget, ReadWrapper, Result, WriteWrapper,
3};
4use anyhow::{anyhow, bail};
5use std::io::{self, Read, Write};
6use std::path::Path;
7use std::sync::mpsc::{Receiver, Sender, channel};
8use std::thread;
9
10/// A pipeline of one or more compressors applied in sequence (e.g., tar.gz)
11pub struct Pipeline {
12 // The chain of compressors to apply in order (innermost to outermost)
13 compressors: Vec<Box<dyn Compressor>>,
14 /// Preserves the user's original format string (e.g. `tgz`) so default
15 /// filenames use it verbatim instead of the dotted composition of each
16 /// stage's extension. `None` falls back to joining the per-stage
17 /// extensions.
18 format_override: Option<String>,
19}
20
21impl Clone for Pipeline {
22 fn clone(&self) -> Self {
23 Pipeline {
24 compressors: self.compressors.iter().map(|c| c.clone_boxed()).collect(),
25 format_override: self.format_override.clone(),
26 }
27 }
28}
29
30/// Which method intermediate (threaded) stages should invoke. The final stage
31/// always runs on the calling thread and is handled by a caller-supplied
32/// closure — only the intermediate layers need this dispatch.
33#[derive(Clone, Copy)]
34enum StageAction {
35 Compress,
36 Extract,
37}
38
39impl Pipeline {
40 /// Create a new Pipeline with the given compressors
41 pub fn new(compressors: Vec<Box<dyn Compressor>>) -> Self {
42 Pipeline {
43 compressors,
44 format_override: None,
45 }
46 }
47
48 /// Create a Pipeline that keeps `format` as its canonical format string,
49 /// used for default output filenames. Intended for shortcut forms like
50 /// `tgz` where the user-facing extension differs from the dotted chain.
51 pub fn with_format(compressors: Vec<Box<dyn Compressor>>, format: String) -> Self {
52 Pipeline {
53 compressors,
54 format_override: Some(format),
55 }
56 }
57
58 /// Get a string representation of the chained format (e.g., "tar.gz")
59 fn format_chain(&self) -> String {
60 if let Some(ref f) = self.format_override {
61 return f.clone();
62 }
63 self.compressors
64 .iter()
65 .map(|c| c.extension())
66 .collect::<Vec<&str>>()
67 .join(".")
68 }
69
70 /// Run an ordered chain of compressor stages, with each non-final stage
71 /// in its own thread linked by an in-memory pipe. The final (last) stage
72 /// runs on the calling thread via `finalize`. Intermediate stages all
73 /// invoke the same method — `compress` going outward through a
74 /// compression pipeline, `extract` unwrapping layers on the way in.
75 fn run_threaded<F>(
76 stages: Vec<Box<dyn Compressor>>,
77 initial_input: CmprssInput,
78 intermediate: StageAction,
79 finalize: F,
80 ) -> Result
81 where
82 F: FnOnce(Box<dyn Compressor>, CmprssInput) -> Result,
83 {
84 debug_assert!(!stages.is_empty(), "pipeline is never empty");
85 let mut stages = stages;
86 let last = stages.pop().expect("pipeline is never empty");
87 let buffer_size = 64 * 1024;
88 let mut current_input = initial_input;
89 let mut handles = Vec::new();
90
91 for stage in stages {
92 let (sender, receiver) = channel::<Vec<u8>>();
93 let stage_output =
94 CmprssOutput::Writer(WriteWrapper(Box::new(PipeWriter::new(sender, buffer_size))));
95 let next_input = CmprssInput::Reader(ReadWrapper(Box::new(PipeReader::new(receiver))));
96 let stage_input = std::mem::replace(&mut current_input, next_input);
97
98 let handle = thread::spawn(move || match intermediate {
99 StageAction::Compress => stage.compress(stage_input, stage_output),
100 StageAction::Extract => stage.extract(stage_input, stage_output),
101 });
102 handles.push(handle);
103 }
104
105 finalize(last, current_input)?;
106
107 for handle in handles {
108 handle
109 .join()
110 .map_err(|_| anyhow!("Pipeline stage thread panicked"))??;
111 }
112 Ok(())
113 }
114}
115
116/// A reader that reads from a receiver channel
117struct PipeReader {
118 receiver: Receiver<Vec<u8>>,
119 buffer: Vec<u8>,
120 position: usize,
121 eof: bool,
122}
123
124impl PipeReader {
125 fn new(receiver: Receiver<Vec<u8>>) -> Self {
126 PipeReader {
127 receiver,
128 buffer: Vec::new(),
129 position: 0,
130 eof: false,
131 }
132 }
133}
134
135impl Read for PipeReader {
136 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
137 // If we've reached EOF, return 0 to signal that
138 if self.eof && self.position >= self.buffer.len() {
139 return Ok(0);
140 }
141
142 // If we've consumed the current buffer, try to get a new one
143 if self.position >= self.buffer.len() {
144 match self.receiver.recv() {
145 Ok(data) => {
146 // Empty data signals EOF from the writer
147 if data.is_empty() {
148 self.eof = true;
149 return Ok(0);
150 }
151 self.buffer = data;
152 self.position = 0;
153 }
154 Err(_) => {
155 // Channel closed, this means EOF
156 self.eof = true;
157 return Ok(0);
158 }
159 }
160 }
161
162 // Copy data from our buffer to the output buffer
163 let available = self.buffer.len() - self.position;
164 let to_copy = available.min(buf.len());
165 buf[..to_copy].copy_from_slice(&self.buffer[self.position..self.position + to_copy]);
166 self.position += to_copy;
167 Ok(to_copy)
168 }
169}
170
171/// A writer that writes to a sender channel
172struct PipeWriter {
173 sender: Sender<Vec<u8>>,
174 buffer_size: usize,
175}
176
177impl PipeWriter {
178 fn new(sender: Sender<Vec<u8>>, buffer_size: usize) -> Self {
179 PipeWriter {
180 sender,
181 buffer_size,
182 }
183 }
184}
185
186impl Write for PipeWriter {
187 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
188 // Split the input into chunks of buffer_size
189 let mut start = 0;
190 while start < buf.len() {
191 let end = (start + self.buffer_size).min(buf.len());
192 let chunk = Vec::from(&buf[start..end]);
193
194 // Send the chunk through the channel
195 if self.sender.send(chunk).is_err() {
196 // If the receiver is gone, report an error
197 return Err(io::Error::new(
198 io::ErrorKind::BrokenPipe,
199 "Pipe receiver has been closed",
200 ));
201 }
202 start = end;
203 }
204 Ok(buf.len())
205 }
206
207 fn flush(&mut self) -> io::Result<()> {
208 // No need to flush, the channel sends immediately
209 Ok(())
210 }
211}
212
213impl Drop for PipeWriter {
214 fn drop(&mut self) {
215 // Send an empty buffer to signal EOF
216 let _ = self.sender.send(Vec::new());
217 }
218}
219
220impl Compressor for Pipeline {
221 fn name(&self) -> &str {
222 self.compressors
223 .last()
224 .expect("pipeline is never empty")
225 .name()
226 }
227
228 fn extension(&self) -> &str {
229 self.compressors
230 .last()
231 .expect("pipeline is never empty")
232 .extension()
233 }
234
235 fn default_extracted_target(&self) -> ExtractedTarget {
236 self.compressors
237 .first()
238 .expect("pipeline is never empty")
239 .default_extracted_target()
240 }
241
242 fn default_compressed_filename(&self, in_path: &Path) -> String {
243 // Add all extensions: input.txt → input.txt.tar.gz
244 let base = in_path
245 .file_name()
246 .map(|n| n.to_string_lossy().into_owned())
247 .unwrap_or_else(|| "archive".to_string());
248 format!("{}.{}", base, self.format_chain())
249 }
250
251 fn default_extracted_filename(&self, in_path: &Path) -> String {
252 if self.default_extracted_target() == ExtractedTarget::Directory {
253 return ".".to_string();
254 }
255 // Strip all known extensions: input.tar.gz → input
256 let mut name = in_path
257 .file_name()
258 .map(|n| n.to_string_lossy().into_owned())
259 .unwrap_or_else(|| "archive".to_string());
260 for comp in self.compressors.iter().rev() {
261 let ext = format!(".{}", comp.extension());
262 if let Some(stripped) = name.strip_suffix(&ext) {
263 name = stripped.to_string();
264 }
265 }
266 name
267 }
268
269 fn is_archive(&self, in_path: &Path) -> bool {
270 let file_name = match in_path.file_name().and_then(|f| f.to_str()) {
271 Some(f) => f,
272 None => return false,
273 };
274 file_name.ends_with(&format!(".{}", self.format_chain()))
275 }
276
277 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result {
278 debug_assert!(!self.compressors.is_empty(), "pipeline is never empty");
279 if self.compressors.len() == 1 {
280 return self.compressors[0].compress(input, output);
281 }
282 // Innermost → outermost: the outermost compressor runs on the main
283 // thread and writes to the user-supplied output.
284 let stages = self.compressors.iter().map(|c| c.clone_boxed()).collect();
285 Self::run_threaded(stages, input, StageAction::Compress, |last, input| {
286 last.compress(input, output)
287 })
288 }
289
290 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result {
291 debug_assert!(!self.compressors.is_empty(), "pipeline is never empty");
292 if self.compressors.len() == 1 {
293 return self.compressors[0].extract(input, output);
294 }
295 // Outermost → innermost: the innermost extractor (typically the
296 // container format like tar/zip) runs on the main thread so it can
297 // unpack into the user-supplied output.
298 let stages = self
299 .compressors
300 .iter()
301 .rev()
302 .map(|c| c.clone_boxed())
303 .collect();
304 Self::run_threaded(stages, input, StageAction::Extract, |last, input| {
305 let final_output = match output {
306 CmprssOutput::Path(ref p) => {
307 // If the innermost extractor wants a directory and the
308 // user's output path doesn't exist yet, create it so
309 // e.g. tar::unpack has somewhere to write.
310 if last.default_extracted_target() == ExtractedTarget::Directory && !p.exists()
311 {
312 std::fs::create_dir_all(p)?;
313 }
314 CmprssOutput::Path(p.clone())
315 }
316 CmprssOutput::Pipe(_) | CmprssOutput::Writer(_) => output,
317 };
318 last.extract(input, final_output)
319 })
320 }
321
322 fn append(&self, input: CmprssInput, output: CmprssOutput) -> Result {
323 debug_assert!(!self.compressors.is_empty(), "pipeline is never empty");
324 if self.compressors.len() == 1 {
325 // Single-stage pipelines are just a wrapper; delegate so tar/zip
326 // reached via positional-path inference still support --append.
327 return self.compressors[0].append(input, output);
328 }
329 bail!(
330 "cannot --append to a compound archive ({}); it would require decompressing and recompressing the whole archive",
331 self.format_chain()
332 )
333 }
334
335 fn list(&self, input: CmprssInput) -> Result {
336 debug_assert!(!self.compressors.is_empty(), "pipeline is never empty");
337 if self.compressors.len() == 1 {
338 return self.compressors[0].list(input);
339 }
340 // Same plumbing as `extract`, except the innermost compressor lists
341 // its entries to stdout instead of unpacking. Outer layers still
342 // decompress into the in-memory pipe so the innermost container sees
343 // plain archive bytes.
344 let stages = self
345 .compressors
346 .iter()
347 .rev()
348 .map(|c| c.clone_boxed())
349 .collect();
350 Self::run_threaded(stages, input, StageAction::Extract, |innermost, input| {
351 innermost.list(input)
352 })
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use super::*;
359 use std::fs;
360 use tempfile::tempdir;
361
362 #[test]
363 fn test_pipeline_compression() -> Result {
364 let temp_dir = tempdir()?;
365
366 let test_content = "This is a test file for pipeline compression";
367 let test_file_path = temp_dir.path().join("test.txt");
368 fs::write(&test_file_path, test_content)?;
369
370 let pipeline = Pipeline::new(vec![
371 Box::new(crate::backends::Tar::default()),
372 Box::new(crate::backends::Gzip::default()),
373 ]);
374
375 let archive_path = temp_dir.path().join("test.tar.gz");
376 pipeline.compress(
377 CmprssInput::Path(vec![test_file_path.clone()]),
378 CmprssOutput::Path(archive_path.clone()),
379 )?;
380
381 assert!(archive_path.exists());
382
383 let output_dir = temp_dir.path().join("extracted");
384 fs::create_dir(&output_dir)?;
385 pipeline.extract(
386 CmprssInput::Path(vec![archive_path.clone()]),
387 CmprssOutput::Path(output_dir.clone()),
388 )?;
389
390 let extracted_file = output_dir.join("test.txt");
391 assert!(extracted_file.exists());
392
393 let extracted_content = fs::read_to_string(extracted_file)?;
394 assert_eq!(extracted_content, test_content);
395
396 Ok(())
397 }
398
399 /// Regression test: per-stage configuration (e.g. `--level 1` vs
400 /// `--level 9` on the outer gzip of a `.tar.gz`) must survive the
401 /// thread-dispatch in `Pipeline::compress`. Previously the pipeline
402 /// reconstructed each stage from its *name* alone, producing a default
403 /// Gzip regardless of the level the user requested.
404 #[test]
405 fn test_pipeline_preserves_stage_config() -> Result {
406 use crate::progress::ProgressArgs;
407
408 let temp_dir = tempdir()?;
409 let input = temp_dir.path().join("input.txt");
410 // Repetitive content amplifies the level difference in output size.
411 fs::write(&input, "0123456789abcdef".repeat(1024))?;
412
413 let run = |level: i32, suffix: &str| -> Result<u64> {
414 let fast = Pipeline::new(vec![
415 Box::new(crate::backends::Tar::default()),
416 Box::new(crate::backends::Gzip {
417 compression_level: level,
418 progress_args: ProgressArgs::default(),
419 }),
420 ]);
421 let out = temp_dir.path().join(format!("out.{suffix}.tar.gz"));
422 fast.compress(
423 CmprssInput::Path(vec![input.clone()]),
424 CmprssOutput::Path(out.clone()),
425 )?;
426 Ok(fs::metadata(&out)?.len())
427 };
428
429 let fast_size = run(1, "fast")?;
430 let best_size = run(9, "best")?;
431 assert!(
432 best_size < fast_size,
433 "expected best (level 9) to be smaller than fast (level 1), got {best_size} >= {fast_size}",
434 );
435
436 Ok(())
437 }
438}