this repo has no description
1//! Shared I/O plumbing for single-stream compressors.
2//!
3//! Every single-file codec (gzip, xz, bzip2, zstd, lz4, brotli, snappy, lzma)
4//! has the same shape: resolve the input into a `Read`, resolve the output
5//! into a `Write`, reject directory inputs/outputs, and forward the in-memory
6//! `Reader`/`Writer` variants untouched for pipeline stages. These helpers
7//! consolidate that plumbing so each backend only expresses its codec choice.
8
9use crate::progress::{OutputTarget, ProgressArgs, copy_with_progress};
10use crate::utils::{CmprssInput, CmprssOutput, Result, WriteWrapper};
11use anyhow::bail;
12use std::fs::File;
13use std::io::{self, BufReader, BufWriter, Read, Write};
14
15/// Resolve a `CmprssInput` into a single boxed `Read` stream for single-stream
16/// codecs. Returns the stream together with the input file's size when known
17/// (used to drive progress bars) and a flag indicating whether the input is a
18/// pipeline-internal `Reader` (in which case progress should be suppressed in
19/// this stage — the innermost stage owns the bar).
20///
21/// Bails when multiple input paths are given, or when a path input is a
22/// directory — single-stream codecs operate on exactly one byte stream.
23pub fn open_input(
24 input: CmprssInput,
25 name: &str,
26) -> Result<(Box<dyn Read + Send>, Option<u64>, bool)> {
27 match input {
28 CmprssInput::Path(paths) => {
29 if paths.len() > 1 {
30 bail!("Multiple input files not supported for {name}");
31 }
32 let path = &paths[0];
33 if path.is_dir() {
34 bail!("{name} does not operate on directories; specify a file instead");
35 }
36 let size = std::fs::metadata(path)?.len();
37 let reader: Box<dyn Read + Send> = Box::new(BufReader::new(File::open(path)?));
38 Ok((reader, Some(size), false))
39 }
40 CmprssInput::Pipe(stdin) => Ok((Box::new(BufReader::new(stdin)), None, false)),
41 CmprssInput::Reader(reader) => Ok((reader.0, None, true)),
42 }
43}
44
45/// Bail if the output path refers to an existing directory. Single-stream
46/// codecs always emit a single byte stream, so they can't write to a
47/// directory.
48pub fn guard_file_output(output: &CmprssOutput, name: &str) -> Result {
49 if let CmprssOutput::Path(path) = output
50 && path.is_dir()
51 {
52 bail!("{name} does not operate on directories; specify an output file instead");
53 }
54 Ok(())
55}
56
57/// Resolve a `CmprssOutput` into an owned boxed writer plus an `OutputTarget`
58/// describing how it should be treated by the copy path (progress bar vs. no
59/// progress, etc.). This consumes the output, so callers that still need to
60/// inspect the `CmprssOutput` variant should capture what they need before
61/// calling.
62pub fn prepare_output(output: CmprssOutput) -> Result<(Box<dyn Write + Send>, OutputTarget)> {
63 match output {
64 CmprssOutput::Writer(WriteWrapper(w)) => Ok((w, OutputTarget::InMemory)),
65 CmprssOutput::Pipe(stdout) => Ok((Box::new(BufWriter::new(stdout)), OutputTarget::Stdout)),
66 CmprssOutput::Path(path) => Ok((
67 Box::new(BufWriter::new(File::create(path)?)),
68 OutputTarget::File,
69 )),
70 }
71}
72
73/// Copy bytes from `reader` through `writer`, branching on whether progress
74/// is relevant: pipeline-internal stages (either writing to an in-memory pipe
75/// or reading from one) use a bare `io::copy` with no progress, while
76/// user-facing outputs go through `copy_with_progress` to show a progress bar
77/// when configured. `pipeline_inner` is set when the input comes from an
78/// upstream pipeline stage — in that case we don't know the total size and
79/// the innermost stage already owns the progress bar, so we skip ours.
80pub fn copy_stream<R: Read, W: Write>(
81 mut reader: R,
82 mut writer: W,
83 file_size: Option<u64>,
84 pipeline_inner: bool,
85 progress_args: &ProgressArgs,
86 target: OutputTarget,
87) -> Result {
88 if pipeline_inner || target == OutputTarget::InMemory {
89 io::copy(&mut reader, &mut writer)?;
90 } else {
91 copy_with_progress(
92 reader,
93 writer,
94 progress_args.chunk_size.size_in_bytes,
95 file_size,
96 progress_args.progress,
97 target,
98 )?;
99 }
100 Ok(())
101}