this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(backends): consolidate stream-codec scaffolding via prepare_output/copy_stream

+208 -333
+22 -42
src/backends/brotli.rs
··· 1 - use super::stream::{guard_file_output, open_input, open_output}; 2 - use crate::progress::{ProgressArgs, copy_with_progress}; 1 + use super::stream::{copy_stream, guard_file_output, open_input, prepare_output}; 2 + use crate::progress::ProgressArgs; 3 3 use crate::utils::{ 4 4 CmprssInput, CmprssOutput, CommonArgs, CompressionLevelValidator, Compressor, LevelArgs, Result, 5 5 }; 6 6 use brotli::{CompressorWriter, Decompressor}; 7 7 use clap::Args; 8 - use std::io::{self, Write}; 8 + use std::io::Write; 9 9 10 10 /// Brotli buffer size used when constructing the encoder/decoder. 11 11 const BROTLI_BUFFER_SIZE: usize = 4096; ··· 90 90 /// Compress an input file or pipe to a brotli archive 91 91 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 92 92 guard_file_output(&output, "Brotli")?; 93 - let (mut input_stream, file_size) = open_input(input, "Brotli")?; 94 - let quality = self.compression_level as u32; 95 - 96 - if let CmprssOutput::Writer(writer) = output { 97 - let mut encoder = 98 - CompressorWriter::new(writer, BROTLI_BUFFER_SIZE, quality, BROTLI_LGWIN); 99 - io::copy(&mut input_stream, &mut encoder)?; 100 - encoder.flush()?; 101 - } else { 102 - let output_stream = open_output(&output)?; 103 - let mut encoder = 104 - CompressorWriter::new(output_stream, BROTLI_BUFFER_SIZE, quality, BROTLI_LGWIN); 105 - copy_with_progress( 106 - &mut input_stream, 107 - &mut encoder, 108 - self.progress_args.chunk_size.size_in_bytes, 109 - file_size, 110 - self.progress_args.progress, 111 - &output, 112 - )?; 113 - encoder.flush()?; 114 - } 115 - 93 + let (input_stream, file_size) = open_input(input, "Brotli")?; 94 + let (writer, target) = prepare_output(output)?; 95 + let mut encoder = CompressorWriter::new( 96 + writer, 97 + BROTLI_BUFFER_SIZE, 98 + self.compression_level as u32, 99 + BROTLI_LGWIN, 100 + ); 101 + copy_stream( 102 + input_stream, 103 + &mut encoder, 104 + file_size, 105 + &self.progress_args, 106 + target, 107 + )?; 108 + encoder.flush()?; 116 109 Ok(()) 117 110 } 118 111 ··· 120 113 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 121 114 guard_file_output(&output, "Brotli")?; 122 115 let (input_stream, file_size) = open_input(input, "Brotli")?; 123 - let mut decoder = Decompressor::new(input_stream, BROTLI_BUFFER_SIZE); 124 - 125 - if let CmprssOutput::Writer(mut writer) = output { 126 - io::copy(&mut decoder, &mut writer)?; 127 - } else { 128 - let mut output_stream = open_output(&output)?; 129 - copy_with_progress( 130 - &mut decoder, 131 - &mut output_stream, 132 - self.progress_args.chunk_size.size_in_bytes, 133 - file_size, 134 - self.progress_args.progress, 135 - &output, 136 - )?; 137 - } 138 - 116 + let decoder = Decompressor::new(input_stream, BROTLI_BUFFER_SIZE); 117 + let (writer, target) = prepare_output(output)?; 118 + copy_stream(decoder, writer, file_size, &self.progress_args, target)?; 139 119 Ok(()) 140 120 } 141 121 }
+25 -41
src/backends/bzip2.rs
··· 1 - use super::stream::{guard_file_output, open_input, open_output}; 1 + use super::stream::{copy_stream, guard_file_output, open_input, prepare_output}; 2 2 use crate::{ 3 - progress::{ProgressArgs, copy_with_progress}, 3 + progress::ProgressArgs, 4 4 utils::{ 5 5 CmprssInput, CmprssOutput, CommonArgs, CompressionLevelValidator, Compressor, LevelArgs, 6 6 Result, ··· 9 9 use bzip2::Compression; 10 10 use bzip2::write::{BzDecoder, BzEncoder}; 11 11 use clap::Args; 12 - use std::io; 13 12 14 13 /// BZip2-specific compression validator (1-9 range) 15 14 #[derive(Debug, Clone, Copy)] ··· 85 84 /// Compress an input file or pipe to a bz2 archive 86 85 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 87 86 guard_file_output(&output, "Bzip2")?; 88 - let (mut input_stream, file_size) = open_input(input, "Bzip2")?; 89 - let level = Compression::new(self.level as u32); 90 - 91 - if let CmprssOutput::Writer(writer) = output { 92 - let mut encoder = BzEncoder::new(writer, level); 93 - io::copy(&mut input_stream, &mut encoder)?; 94 - } else { 95 - let output_stream = open_output(&output)?; 96 - let mut encoder = BzEncoder::new(output_stream, level); 97 - copy_with_progress( 98 - &mut input_stream, 99 - &mut encoder, 100 - self.progress_args.chunk_size.size_in_bytes, 101 - file_size, 102 - self.progress_args.progress, 103 - &output, 104 - )?; 105 - } 106 - 87 + let (input_stream, file_size) = open_input(input, "Bzip2")?; 88 + let (writer, target) = prepare_output(output)?; 89 + let mut encoder = BzEncoder::new(writer, Compression::new(self.level as u32)); 90 + copy_stream( 91 + input_stream, 92 + &mut encoder, 93 + file_size, 94 + &self.progress_args, 95 + target, 96 + )?; 107 97 Ok(()) 108 98 } 109 99 110 - /// Extract a bz2 archive to a file or pipe 100 + /// Extract a bz2 archive to a file or pipe. Unlike most decoders, 101 + /// `BzDecoder` is write-driven: it wraps the output writer and we feed 102 + /// compressed bytes into it. 111 103 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 112 104 guard_file_output(&output, "Bzip2")?; 113 - let (mut input_stream, file_size) = open_input(input, "Bzip2")?; 114 - 115 - if let CmprssOutput::Writer(writer) = output { 116 - let mut decoder = BzDecoder::new(writer); 117 - io::copy(&mut input_stream, &mut decoder)?; 118 - } else { 119 - let output_stream = open_output(&output)?; 120 - let mut decoder = BzDecoder::new(output_stream); 121 - copy_with_progress( 122 - &mut input_stream, 123 - &mut decoder, 124 - self.progress_args.chunk_size.size_in_bytes, 125 - file_size, 126 - self.progress_args.progress, 127 - &output, 128 - )?; 129 - } 130 - 105 + let (input_stream, file_size) = open_input(input, "Bzip2")?; 106 + let (writer, target) = prepare_output(output)?; 107 + let mut decoder = BzDecoder::new(writer); 108 + copy_stream( 109 + input_stream, 110 + &mut decoder, 111 + file_size, 112 + &self.progress_args, 113 + target, 114 + )?; 131 115 Ok(()) 132 116 } 133 117 }
+16 -38
src/backends/gzip.rs
··· 1 - use super::stream::{guard_file_output, open_input, open_output}; 2 - use crate::progress::{ProgressArgs, copy_with_progress}; 1 + use super::stream::{copy_stream, guard_file_output, open_input, prepare_output}; 2 + use crate::progress::ProgressArgs; 3 3 use crate::utils::{ 4 4 CmprssInput, CmprssOutput, CommonArgs, CompressionLevelValidator, Compressor, 5 5 DefaultCompressionValidator, LevelArgs, Result, ··· 7 7 use clap::Args; 8 8 use flate2::write::GzEncoder; 9 9 use flate2::{Compression, read::GzDecoder}; 10 - use std::io; 11 10 12 11 #[derive(Args, Debug)] 13 12 pub struct GzipArgs { ··· 59 58 /// Compress an input file or pipe to a gzip archive 60 59 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 61 60 guard_file_output(&output, "Gzip")?; 62 - let (mut input_stream, file_size) = open_input(input, "Gzip")?; 63 - let level = Compression::new(self.compression_level as u32); 64 - 65 - if let CmprssOutput::Writer(writer) = output { 66 - let mut encoder = GzEncoder::new(writer, level); 67 - io::copy(&mut input_stream, &mut encoder)?; 68 - encoder.finish()?; 69 - } else { 70 - let output_stream = open_output(&output)?; 71 - let mut encoder = GzEncoder::new(output_stream, level); 72 - copy_with_progress( 73 - &mut input_stream, 74 - &mut encoder, 75 - self.progress_args.chunk_size.size_in_bytes, 76 - file_size, 77 - self.progress_args.progress, 78 - &output, 79 - )?; 80 - encoder.finish()?; 81 - } 61 + let (input_stream, file_size) = open_input(input, "Gzip")?; 62 + let (writer, target) = prepare_output(output)?; 63 + let mut encoder = GzEncoder::new(writer, Compression::new(self.compression_level as u32)); 64 + copy_stream( 65 + input_stream, 66 + &mut encoder, 67 + file_size, 68 + &self.progress_args, 69 + target, 70 + )?; 71 + encoder.finish()?; 82 72 Ok(()) 83 73 } 84 74 ··· 86 76 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 87 77 guard_file_output(&output, "Gzip")?; 88 78 let (input_stream, file_size) = open_input(input, "Gzip")?; 89 - let mut decoder = GzDecoder::new(input_stream); 90 - 91 - if let CmprssOutput::Writer(mut writer) = output { 92 - io::copy(&mut decoder, &mut writer)?; 93 - } else { 94 - let mut output_stream = open_output(&output)?; 95 - copy_with_progress( 96 - &mut decoder, 97 - &mut output_stream, 98 - self.progress_args.chunk_size.size_in_bytes, 99 - file_size, 100 - self.progress_args.progress, 101 - &output, 102 - )?; 103 - } 79 + let decoder = GzDecoder::new(input_stream); 80 + let (writer, target) = prepare_output(output)?; 81 + copy_stream(decoder, writer, file_size, &self.progress_args, target)?; 104 82 Ok(()) 105 83 } 106 84 }
+16 -39
src/backends/lz4.rs
··· 1 - use super::stream::{guard_file_output, open_input, open_output}; 2 - use crate::progress::{ProgressArgs, copy_with_progress}; 1 + use super::stream::{copy_stream, guard_file_output, open_input, prepare_output}; 2 + use crate::progress::ProgressArgs; 3 3 use crate::utils::{CmprssInput, CmprssOutput, CommonArgs, Compressor, Result}; 4 4 use clap::Args; 5 5 use lz4_flex::frame::{FrameDecoder, FrameEncoder}; 6 - use std::io; 7 6 8 7 #[derive(Args, Debug)] 9 8 pub struct Lz4Args { ··· 41 40 /// Compress an input file or pipe to a lz4 archive 42 41 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 43 42 guard_file_output(&output, "LZ4")?; 44 - let (mut input_stream, file_size) = open_input(input, "LZ4")?; 45 - 46 - if let CmprssOutput::Writer(writer) = output { 47 - let mut encoder = FrameEncoder::new(writer); 48 - io::copy(&mut input_stream, &mut encoder)?; 49 - encoder.finish()?; 50 - } else { 51 - let output_stream = open_output(&output)?; 52 - let mut encoder = FrameEncoder::new(output_stream); 53 - copy_with_progress( 54 - &mut input_stream, 55 - &mut encoder, 56 - self.progress_args.chunk_size.size_in_bytes, 57 - file_size, 58 - self.progress_args.progress, 59 - &output, 60 - )?; 61 - encoder.finish()?; 62 - } 63 - 43 + let (input_stream, file_size) = open_input(input, "LZ4")?; 44 + let (writer, target) = prepare_output(output)?; 45 + let mut encoder = FrameEncoder::new(writer); 46 + copy_stream( 47 + input_stream, 48 + &mut encoder, 49 + file_size, 50 + &self.progress_args, 51 + target, 52 + )?; 53 + encoder.finish()?; 64 54 Ok(()) 65 55 } 66 56 ··· 68 58 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 69 59 guard_file_output(&output, "LZ4")?; 70 60 let (input_stream, file_size) = open_input(input, "LZ4")?; 71 - let mut decoder = FrameDecoder::new(input_stream); 72 - 73 - if let CmprssOutput::Writer(mut writer) = output { 74 - io::copy(&mut decoder, &mut writer)?; 75 - } else { 76 - let mut output_stream = open_output(&output)?; 77 - copy_with_progress( 78 - &mut decoder, 79 - &mut output_stream, 80 - self.progress_args.chunk_size.size_in_bytes, 81 - file_size, 82 - self.progress_args.progress, 83 - &output, 84 - )?; 85 - } 86 - 61 + let decoder = FrameDecoder::new(input_stream); 62 + let (writer, target) = prepare_output(output)?; 63 + copy_stream(decoder, writer, file_size, &self.progress_args, target)?; 87 64 Ok(()) 88 65 } 89 66 }
+20 -38
src/backends/lzma.rs
··· 1 - use super::stream::{guard_file_output, open_input, open_output}; 1 + use super::stream::{copy_stream, guard_file_output, open_input, prepare_output}; 2 2 use crate::{ 3 - progress::{ProgressArgs, copy_with_progress}, 3 + progress::ProgressArgs, 4 4 utils::{ 5 5 CmprssInput, CmprssOutput, CommonArgs, CompressionLevelValidator, Compressor, 6 6 DefaultCompressionValidator, LevelArgs, Result, ··· 93 93 94 94 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 95 95 guard_file_output(&output, "LZMA")?; 96 - let (mut input_stream, file_size) = open_input(input, "LZMA")?; 97 - 98 - if let CmprssOutput::Writer(writer) = output { 99 - let mut encoder = XzEncoder::new_stream(writer, self.encoder_stream()?); 100 - io::copy(&mut input_stream, &mut encoder)?; 101 - encoder.try_finish()?; 102 - } else { 103 - let output_stream = open_output(&output)?; 104 - let mut encoder = XzEncoder::new_stream(output_stream, self.encoder_stream()?); 105 - copy_with_progress( 106 - &mut input_stream, 107 - NoFlush(&mut encoder), 108 - self.progress_args.chunk_size.size_in_bytes, 109 - file_size, 110 - self.progress_args.progress, 111 - &output, 112 - )?; 113 - encoder.try_finish()?; 114 - } 115 - 96 + let (input_stream, file_size) = open_input(input, "LZMA")?; 97 + let (writer, target) = prepare_output(output)?; 98 + let mut encoder = XzEncoder::new_stream(writer, self.encoder_stream()?); 99 + // `copy_stream` flushes the final writer on the path/pipe branch via 100 + // `copy_with_progress`; LZMA1 (`lzma_alone`) rejects mid-stream flush, 101 + // so wrap the encoder to swallow those calls and let `try_finish` 102 + // finalize the stream. 103 + copy_stream( 104 + input_stream, 105 + NoFlush(&mut encoder), 106 + file_size, 107 + &self.progress_args, 108 + target, 109 + )?; 110 + encoder.try_finish()?; 116 111 Ok(()) 117 112 } 118 113 119 114 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 120 115 guard_file_output(&output, "LZMA")?; 121 116 let (input_stream, file_size) = open_input(input, "LZMA")?; 122 - let mut decoder = XzDecoder::new_stream(input_stream, Self::decoder_stream()?); 123 - 124 - if let CmprssOutput::Writer(mut writer) = output { 125 - io::copy(&mut decoder, &mut writer)?; 126 - } else { 127 - let mut output_stream = open_output(&output)?; 128 - copy_with_progress( 129 - &mut decoder, 130 - &mut output_stream, 131 - self.progress_args.chunk_size.size_in_bytes, 132 - file_size, 133 - self.progress_args.progress, 134 - &output, 135 - )?; 136 - } 137 - 117 + let decoder = XzDecoder::new_stream(input_stream, Self::decoder_stream()?); 118 + let (writer, target) = prepare_output(output)?; 119 + copy_stream(decoder, writer, file_size, &self.progress_args, target)?; 138 120 Ok(()) 139 121 } 140 122 }
+17 -39
src/backends/snappy.rs
··· 1 - use super::stream::{guard_file_output, open_input, open_output}; 2 - use crate::progress::{ProgressArgs, copy_with_progress}; 1 + use super::stream::{copy_stream, guard_file_output, open_input, prepare_output}; 2 + use crate::progress::ProgressArgs; 3 3 use crate::utils::{CmprssInput, CmprssOutput, CommonArgs, Compressor, Result}; 4 4 use clap::Args; 5 5 use snap::read::FrameDecoder; 6 6 use snap::write::FrameEncoder; 7 - use std::io::{self, Write}; 7 + use std::io::Write; 8 8 9 9 #[derive(Args, Debug)] 10 10 pub struct SnappyArgs { ··· 43 43 /// Compress an input file or pipe to a snappy frame-format archive 44 44 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 45 45 guard_file_output(&output, "Snappy")?; 46 - let (mut input_stream, file_size) = open_input(input, "Snappy")?; 47 - 48 - if let CmprssOutput::Writer(writer) = output { 49 - let mut encoder = FrameEncoder::new(writer); 50 - io::copy(&mut input_stream, &mut encoder)?; 51 - encoder.flush()?; 52 - } else { 53 - let output_stream = open_output(&output)?; 54 - let mut encoder = FrameEncoder::new(output_stream); 55 - copy_with_progress( 56 - &mut input_stream, 57 - &mut encoder, 58 - self.progress_args.chunk_size.size_in_bytes, 59 - file_size, 60 - self.progress_args.progress, 61 - &output, 62 - )?; 63 - encoder.flush()?; 64 - } 65 - 46 + let (input_stream, file_size) = open_input(input, "Snappy")?; 47 + let (writer, target) = prepare_output(output)?; 48 + let mut encoder = FrameEncoder::new(writer); 49 + copy_stream( 50 + input_stream, 51 + &mut encoder, 52 + file_size, 53 + &self.progress_args, 54 + target, 55 + )?; 56 + encoder.flush()?; 66 57 Ok(()) 67 58 } 68 59 ··· 70 61 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 71 62 guard_file_output(&output, "Snappy")?; 72 63 let (input_stream, file_size) = open_input(input, "Snappy")?; 73 - let mut decoder = FrameDecoder::new(input_stream); 74 - 75 - if let CmprssOutput::Writer(mut writer) = output { 76 - io::copy(&mut decoder, &mut writer)?; 77 - } else { 78 - let mut output_stream = open_output(&output)?; 79 - copy_with_progress( 80 - &mut decoder, 81 - &mut output_stream, 82 - self.progress_args.chunk_size.size_in_bytes, 83 - file_size, 84 - self.progress_args.progress, 85 - &output, 86 - )?; 87 - } 88 - 64 + let decoder = FrameDecoder::new(input_stream); 65 + let (writer, target) = prepare_output(output)?; 66 + copy_stream(decoder, writer, file_size, &self.progress_args, target)?; 89 67 Ok(()) 90 68 } 91 69 }
+41 -13
src/backends/stream.rs
··· 6 6 //! `Reader`/`Writer` variants untouched for pipeline stages. These helpers 7 7 //! consolidate that plumbing so each backend only expresses its codec choice. 8 8 9 - use crate::utils::{CmprssInput, CmprssOutput, Result}; 9 + use crate::progress::{OutputTarget, ProgressArgs, copy_with_progress}; 10 + use crate::utils::{CmprssInput, CmprssOutput, Result, WriteWrapper}; 10 11 use anyhow::bail; 11 12 use std::fs::File; 12 - use std::io::{BufReader, BufWriter, Read, Write}; 13 + use std::io::{self, BufReader, BufWriter, Read, Write}; 13 14 14 15 /// Resolve a `CmprssInput` into a single boxed `Read` stream for single-stream 15 16 /// codecs. Returns the stream together with the input file's size when known ··· 48 49 Ok(()) 49 50 } 50 51 51 - /// Open a `CmprssOutput` as a boxed `Write`. 52 - /// 53 - /// Callers must destructure `CmprssOutput::Writer` themselves before calling 54 - /// this — the in-memory `Writer` is already a boxed `Write` and doesn't need 55 - /// an additional buffering layer. 56 - pub fn open_output(output: &CmprssOutput) -> Result<Box<dyn Write + Send + '_>> { 52 + /// Resolve a `CmprssOutput` into an owned boxed writer plus an `OutputTarget` 53 + /// describing how it should be treated by the copy path (progress bar vs. no 54 + /// progress, etc.). This consumes the output, so callers that still need to 55 + /// inspect the `CmprssOutput` variant should capture what they need before 56 + /// calling. 57 + pub fn prepare_output(output: CmprssOutput) -> Result<(Box<dyn Write + Send>, OutputTarget)> { 57 58 match output { 58 - CmprssOutput::Path(path) => Ok(Box::new(BufWriter::new(File::create(path)?))), 59 - CmprssOutput::Pipe(stdout) => Ok(Box::new(BufWriter::new(stdout))), 60 - CmprssOutput::Writer(_) => { 61 - unreachable!("open_output called with CmprssOutput::Writer; destructure it first") 62 - } 59 + CmprssOutput::Writer(WriteWrapper(w)) => Ok((w, OutputTarget::InMemory)), 60 + CmprssOutput::Pipe(stdout) => Ok((Box::new(BufWriter::new(stdout)), OutputTarget::Stdout)), 61 + CmprssOutput::Path(path) => Ok(( 62 + Box::new(BufWriter::new(File::create(path)?)), 63 + OutputTarget::File, 64 + )), 63 65 } 64 66 } 67 + 68 + /// Copy bytes from `reader` through `writer`, branching on `target`: 69 + /// pipeline-internal stages use a bare `io::copy` (no progress), while 70 + /// user-facing outputs go through `copy_with_progress` to show a progress bar 71 + /// when configured. 72 + pub fn copy_stream<R: Read, W: Write>( 73 + mut reader: R, 74 + mut writer: W, 75 + file_size: Option<u64>, 76 + progress_args: &ProgressArgs, 77 + target: OutputTarget, 78 + ) -> Result { 79 + if target == OutputTarget::InMemory { 80 + io::copy(&mut reader, &mut writer)?; 81 + } else { 82 + copy_with_progress( 83 + reader, 84 + writer, 85 + progress_args.chunk_size.size_in_bytes, 86 + file_size, 87 + progress_args.progress, 88 + target, 89 + )?; 90 + } 91 + Ok(()) 92 + }
+16 -38
src/backends/xz.rs
··· 1 - use super::stream::{guard_file_output, open_input, open_output}; 1 + use super::stream::{copy_stream, guard_file_output, open_input, prepare_output}; 2 2 use crate::{ 3 - progress::{ProgressArgs, copy_with_progress}, 3 + progress::ProgressArgs, 4 4 utils::{ 5 5 CmprssInput, CmprssOutput, CommonArgs, CompressionLevelValidator, Compressor, 6 6 DefaultCompressionValidator, LevelArgs, Result, 7 7 }, 8 8 }; 9 9 use clap::Args; 10 - use std::io; 11 10 use xz2::read::XzDecoder; 12 11 use xz2::write::XzEncoder; 13 12 ··· 60 59 61 60 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 62 61 guard_file_output(&output, "Xz")?; 63 - let (mut input_stream, file_size) = open_input(input, "Xz")?; 64 - 65 - if let CmprssOutput::Writer(writer) = output { 66 - let mut encoder = XzEncoder::new(writer, self.level as u32); 67 - io::copy(&mut input_stream, &mut encoder)?; 68 - encoder.finish()?; 69 - } else { 70 - let output_stream = open_output(&output)?; 71 - let mut encoder = XzEncoder::new(output_stream, self.level as u32); 72 - copy_with_progress( 73 - &mut input_stream, 74 - &mut encoder, 75 - self.progress_args.chunk_size.size_in_bytes, 76 - file_size, 77 - self.progress_args.progress, 78 - &output, 79 - )?; 80 - } 81 - 62 + let (input_stream, file_size) = open_input(input, "Xz")?; 63 + let (writer, target) = prepare_output(output)?; 64 + let mut encoder = XzEncoder::new(writer, self.level as u32); 65 + copy_stream( 66 + input_stream, 67 + &mut encoder, 68 + file_size, 69 + &self.progress_args, 70 + target, 71 + )?; 72 + encoder.finish()?; 82 73 Ok(()) 83 74 } 84 75 85 76 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 86 77 guard_file_output(&output, "Xz")?; 87 78 let (input_stream, file_size) = open_input(input, "Xz")?; 88 - let mut decoder = XzDecoder::new(input_stream); 89 - 90 - if let CmprssOutput::Writer(mut writer) = output { 91 - io::copy(&mut decoder, &mut writer)?; 92 - } else { 93 - let mut output_stream = open_output(&output)?; 94 - copy_with_progress( 95 - &mut decoder, 96 - &mut output_stream, 97 - self.progress_args.chunk_size.size_in_bytes, 98 - file_size, 99 - self.progress_args.progress, 100 - &output, 101 - )?; 102 - } 103 - 79 + let decoder = XzDecoder::new(input_stream); 80 + let (writer, target) = prepare_output(output)?; 81 + copy_stream(decoder, writer, file_size, &self.progress_args, target)?; 104 82 Ok(()) 105 83 } 106 84 }
+16 -39
src/backends/zstd.rs
··· 1 - use super::stream::{guard_file_output, open_input, open_output}; 2 - use crate::progress::{ProgressArgs, copy_with_progress}; 1 + use super::stream::{copy_stream, guard_file_output, open_input, prepare_output}; 2 + use crate::progress::ProgressArgs; 3 3 use crate::utils::{ 4 4 CmprssInput, CmprssOutput, CommonArgs, CompressionLevelValidator, Compressor, LevelArgs, Result, 5 5 }; 6 6 use clap::Args; 7 - use std::io; 8 7 use zstd::stream::{read::Decoder, write::Encoder}; 9 8 10 9 /// Zstd-specific compression validator (-7 to 22 range) ··· 82 81 /// Compress an input file or pipe to a zstd archive 83 82 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 84 83 guard_file_output(&output, "Zstd")?; 85 - let (mut input_stream, file_size) = open_input(input, "Zstd")?; 86 - 87 - if let CmprssOutput::Writer(writer) = output { 88 - let mut encoder = Encoder::new(writer, self.compression_level)?; 89 - io::copy(&mut input_stream, &mut encoder)?; 90 - encoder.finish()?; 91 - } else { 92 - let output_stream = open_output(&output)?; 93 - let mut encoder = Encoder::new(output_stream, self.compression_level)?; 94 - copy_with_progress( 95 - &mut input_stream, 96 - &mut encoder, 97 - self.progress_args.chunk_size.size_in_bytes, 98 - file_size, 99 - self.progress_args.progress, 100 - &output, 101 - )?; 102 - encoder.finish()?; 103 - } 104 - 84 + let (input_stream, file_size) = open_input(input, "Zstd")?; 85 + let (writer, target) = prepare_output(output)?; 86 + let mut encoder = Encoder::new(writer, self.compression_level)?; 87 + copy_stream( 88 + input_stream, 89 + &mut encoder, 90 + file_size, 91 + &self.progress_args, 92 + target, 93 + )?; 94 + encoder.finish()?; 105 95 Ok(()) 106 96 } 107 97 ··· 109 99 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 110 100 guard_file_output(&output, "Zstd")?; 111 101 let (input_stream, file_size) = open_input(input, "Zstd")?; 112 - let mut decoder = Decoder::new(input_stream)?; 113 - 114 - if let CmprssOutput::Writer(mut writer) = output { 115 - io::copy(&mut decoder, &mut writer)?; 116 - } else { 117 - let mut output_stream = open_output(&output)?; 118 - copy_with_progress( 119 - &mut decoder, 120 - &mut output_stream, 121 - self.progress_args.chunk_size.size_in_bytes, 122 - file_size, 123 - self.progress_args.progress, 124 - &output, 125 - )?; 126 - } 127 - 102 + let decoder = Decoder::new(input_stream)?; 103 + let (writer, target) = prepare_output(output)?; 104 + copy_stream(decoder, writer, file_size, &self.progress_args, target)?; 128 105 Ok(()) 129 106 } 130 107 }
+19 -6
src/progress.rs
··· 1 - use crate::utils::CmprssOutput; 2 1 use clap::Args; 3 2 use indicatif::{HumanBytes, ProgressBar}; 4 3 use std::io::{self, Read, Write}; ··· 12 11 Auto, 13 12 On, 14 13 Off, 14 + } 15 + 16 + /// How a resolved `CmprssOutput` should be treated by the progress/copy 17 + /// helpers. Decoupling this from `CmprssOutput` lets us consume the output 18 + /// into an owned boxed writer up front and still drive progress-bar decisions. 19 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 20 + pub enum OutputTarget { 21 + /// Path on disk — show progress when enabled. 22 + File, 23 + /// Stdout — suppress progress in `Auto` mode to avoid mixing with piped 24 + /// bytes. 25 + Stdout, 26 + /// Pipeline-internal writer (channel between stages) — no progress. 27 + InMemory, 15 28 } 16 29 17 30 #[derive(Debug, Clone, Copy, PartialEq)] ··· 77 90 pub fn create_progress_bar( 78 91 input_size: Option<u64>, 79 92 progress: ProgressDisplay, 80 - output: &CmprssOutput, 93 + target: OutputTarget, 81 94 ) -> Option<ProgressBar> { 82 - match (progress, output) { 83 - (ProgressDisplay::Auto, CmprssOutput::Pipe(_)) => None, 95 + match (progress, target) { 96 + (ProgressDisplay::Auto, OutputTarget::Stdout) => None, 84 97 (ProgressDisplay::Off, _) => None, 85 98 (_, _) => { 86 99 let bar = match input_size { ··· 237 250 chunk_size: usize, 238 251 input_size: Option<u64>, 239 252 progress_display: ProgressDisplay, 240 - output: &CmprssOutput, 253 + target: OutputTarget, 241 254 ) -> io::Result<()> { 242 255 // Create the progress bar if needed 243 - let progress_bar = create_progress_bar(input_size, progress_display, output); 256 + let progress_bar = create_progress_bar(input_size, progress_display, target); 244 257 245 258 // Create reader and writer with progress tracking 246 259 let mut reader = ProgressReader::new(reader, progress_bar.clone());