this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(progress): show known total on pipeline compression bar

Pipelines like tar.gz produced a spinner (pos and total ticking up
together) because the outer stage's input came from an in-memory pipe
with unknown size. Route the bar to the innermost stage — the only one
that sees real input bytes — and suppress it on outer stages by
threading a pipeline-inner flag out of open_input.

+113 -33
+11 -3
src/backends/brotli.rs
··· 91 91 /// Compress an input file or pipe to a brotli archive 92 92 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 93 93 guard_file_output(&output, "Brotli")?; 94 - let (input_stream, file_size) = open_input(input, "Brotli")?; 94 + let (input_stream, file_size, pipeline_inner) = open_input(input, "Brotli")?; 95 95 let (writer, target) = prepare_output(output)?; 96 96 let mut encoder = CompressorWriter::new( 97 97 writer, ··· 103 103 input_stream, 104 104 &mut encoder, 105 105 file_size, 106 + pipeline_inner, 106 107 &self.progress_args, 107 108 target, 108 109 )?; ··· 113 114 /// Extract a brotli archive to an output file or pipe 114 115 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 115 116 guard_file_output(&output, "Brotli")?; 116 - let (input_stream, file_size) = open_input(input, "Brotli")?; 117 + let (input_stream, file_size, pipeline_inner) = open_input(input, "Brotli")?; 117 118 let decoder = Decompressor::new(input_stream, BROTLI_BUFFER_SIZE); 118 119 let (writer, target) = prepare_output(output)?; 119 - copy_stream(decoder, writer, file_size, &self.progress_args, target)?; 120 + copy_stream( 121 + decoder, 122 + writer, 123 + file_size, 124 + pipeline_inner, 125 + &self.progress_args, 126 + target, 127 + )?; 120 128 Ok(()) 121 129 } 122 130 }
+4 -2
src/backends/bzip2.rs
··· 85 85 /// Compress an input file or pipe to a bz2 archive 86 86 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 87 87 guard_file_output(&output, "Bzip2")?; 88 - let (input_stream, file_size) = open_input(input, "Bzip2")?; 88 + let (input_stream, file_size, pipeline_inner) = open_input(input, "Bzip2")?; 89 89 let (writer, target) = prepare_output(output)?; 90 90 let mut encoder = BzEncoder::new(writer, Compression::new(self.level as u32)); 91 91 copy_stream( 92 92 input_stream, 93 93 &mut encoder, 94 94 file_size, 95 + pipeline_inner, 95 96 &self.progress_args, 96 97 target, 97 98 )?; ··· 103 104 /// compressed bytes into it. 104 105 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 105 106 guard_file_output(&output, "Bzip2")?; 106 - let (input_stream, file_size) = open_input(input, "Bzip2")?; 107 + let (input_stream, file_size, pipeline_inner) = open_input(input, "Bzip2")?; 107 108 let (writer, target) = prepare_output(output)?; 108 109 let mut decoder = BzDecoder::new(writer); 109 110 copy_stream( 110 111 input_stream, 111 112 &mut decoder, 112 113 file_size, 114 + pipeline_inner, 113 115 &self.progress_args, 114 116 target, 115 117 )?;
+11 -3
src/backends/gzip.rs
··· 59 59 /// Compress an input file or pipe to a gzip archive 60 60 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 61 61 guard_file_output(&output, "Gzip")?; 62 - let (input_stream, file_size) = open_input(input, "Gzip")?; 62 + let (input_stream, file_size, pipeline_inner) = open_input(input, "Gzip")?; 63 63 let (writer, target) = prepare_output(output)?; 64 64 let mut encoder = GzEncoder::new(writer, Compression::new(self.compression_level as u32)); 65 65 copy_stream( 66 66 input_stream, 67 67 &mut encoder, 68 68 file_size, 69 + pipeline_inner, 69 70 &self.progress_args, 70 71 target, 71 72 )?; ··· 76 77 /// Extract a gzip archive 77 78 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 78 79 guard_file_output(&output, "Gzip")?; 79 - let (input_stream, file_size) = open_input(input, "Gzip")?; 80 + let (input_stream, file_size, pipeline_inner) = open_input(input, "Gzip")?; 80 81 let decoder = GzDecoder::new(input_stream); 81 82 let (writer, target) = prepare_output(output)?; 82 - copy_stream(decoder, writer, file_size, &self.progress_args, target)?; 83 + copy_stream( 84 + decoder, 85 + writer, 86 + file_size, 87 + pipeline_inner, 88 + &self.progress_args, 89 + target, 90 + )?; 83 91 Ok(()) 84 92 } 85 93 }
+11 -3
src/backends/lz4.rs
··· 40 40 /// Compress an input file or pipe to a lz4 archive 41 41 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 42 42 guard_file_output(&output, "LZ4")?; 43 - let (input_stream, file_size) = open_input(input, "LZ4")?; 43 + let (input_stream, file_size, pipeline_inner) = open_input(input, "LZ4")?; 44 44 let (writer, target) = prepare_output(output)?; 45 45 let mut encoder = FrameEncoder::new(writer); 46 46 copy_stream( 47 47 input_stream, 48 48 &mut encoder, 49 49 file_size, 50 + pipeline_inner, 50 51 &self.progress_args, 51 52 target, 52 53 )?; ··· 57 58 /// Extract a lz4 archive to an output file or pipe 58 59 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 59 60 guard_file_output(&output, "LZ4")?; 60 - let (input_stream, file_size) = open_input(input, "LZ4")?; 61 + let (input_stream, file_size, pipeline_inner) = open_input(input, "LZ4")?; 61 62 let decoder = FrameDecoder::new(input_stream); 62 63 let (writer, target) = prepare_output(output)?; 63 - copy_stream(decoder, writer, file_size, &self.progress_args, target)?; 64 + copy_stream( 65 + decoder, 66 + writer, 67 + file_size, 68 + pipeline_inner, 69 + &self.progress_args, 70 + target, 71 + )?; 64 72 Ok(()) 65 73 } 66 74 }
+11 -3
src/backends/lzma.rs
··· 94 94 95 95 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 96 96 guard_file_output(&output, "LZMA")?; 97 - let (input_stream, file_size) = open_input(input, "LZMA")?; 97 + let (input_stream, file_size, pipeline_inner) = open_input(input, "LZMA")?; 98 98 let (writer, target) = prepare_output(output)?; 99 99 let mut encoder = XzEncoder::new_stream(writer, self.encoder_stream()?); 100 100 // `copy_stream` flushes the final writer on the path/pipe branch via ··· 105 105 input_stream, 106 106 NoFlush(&mut encoder), 107 107 file_size, 108 + pipeline_inner, 108 109 &self.progress_args, 109 110 target, 110 111 )?; ··· 114 115 115 116 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 116 117 guard_file_output(&output, "LZMA")?; 117 - let (input_stream, file_size) = open_input(input, "LZMA")?; 118 + let (input_stream, file_size, pipeline_inner) = open_input(input, "LZMA")?; 118 119 let decoder = XzDecoder::new_stream(input_stream, Self::decoder_stream()?); 119 120 let (writer, target) = prepare_output(output)?; 120 - copy_stream(decoder, writer, file_size, &self.progress_args, target)?; 121 + copy_stream( 122 + decoder, 123 + writer, 124 + file_size, 125 + pipeline_inner, 126 + &self.progress_args, 127 + target, 128 + )?; 121 129 Ok(()) 122 130 } 123 131 }
+11 -3
src/backends/snappy.rs
··· 43 43 /// Compress an input file or pipe to a snappy frame-format archive 44 44 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 45 45 guard_file_output(&output, "Snappy")?; 46 - let (input_stream, file_size) = open_input(input, "Snappy")?; 46 + let (input_stream, file_size, pipeline_inner) = open_input(input, "Snappy")?; 47 47 let (writer, target) = prepare_output(output)?; 48 48 let mut encoder = FrameEncoder::new(writer); 49 49 copy_stream( 50 50 input_stream, 51 51 &mut encoder, 52 52 file_size, 53 + pipeline_inner, 53 54 &self.progress_args, 54 55 target, 55 56 )?; ··· 60 61 /// Extract a snappy frame-format archive to an output file or pipe 61 62 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 62 63 guard_file_output(&output, "Snappy")?; 63 - let (input_stream, file_size) = open_input(input, "Snappy")?; 64 + let (input_stream, file_size, pipeline_inner) = open_input(input, "Snappy")?; 64 65 let decoder = FrameDecoder::new(input_stream); 65 66 let (writer, target) = prepare_output(output)?; 66 - copy_stream(decoder, writer, file_size, &self.progress_args, target)?; 67 + copy_stream( 68 + decoder, 69 + writer, 70 + file_size, 71 + pipeline_inner, 72 + &self.progress_args, 73 + target, 74 + )?; 67 75 Ok(()) 68 76 } 69 77 }
+18 -9
src/backends/stream.rs
··· 14 14 15 15 /// Resolve a `CmprssInput` into a single boxed `Read` stream for single-stream 16 16 /// codecs. Returns the stream together with the input file's size when known 17 - /// (used to drive progress bars). 17 + /// (used to drive progress bars) and a flag indicating whether the input is a 18 + /// pipeline-internal `Reader` (in which case progress should be suppressed in 19 + /// this stage — the innermost stage owns the bar). 18 20 /// 19 21 /// Bails when multiple input paths are given, or when a path input is a 20 22 /// directory — single-stream codecs operate on exactly one byte stream. 21 - pub fn open_input(input: CmprssInput, name: &str) -> Result<(Box<dyn Read + Send>, Option<u64>)> { 23 + pub fn open_input( 24 + input: CmprssInput, 25 + name: &str, 26 + ) -> Result<(Box<dyn Read + Send>, Option<u64>, bool)> { 22 27 match input { 23 28 CmprssInput::Path(paths) => { 24 29 if paths.len() > 1 { ··· 30 35 } 31 36 let size = std::fs::metadata(path)?.len(); 32 37 let reader: Box<dyn Read + Send> = Box::new(BufReader::new(File::open(path)?)); 33 - Ok((reader, Some(size))) 38 + Ok((reader, Some(size), false)) 34 39 } 35 - CmprssInput::Pipe(stdin) => Ok((Box::new(BufReader::new(stdin)), None)), 36 - CmprssInput::Reader(reader) => Ok((reader.0, None)), 40 + CmprssInput::Pipe(stdin) => Ok((Box::new(BufReader::new(stdin)), None, false)), 41 + CmprssInput::Reader(reader) => Ok((reader.0, None, true)), 37 42 } 38 43 } 39 44 ··· 65 70 } 66 71 } 67 72 68 - /// Copy bytes from `reader` through `writer`, branching on `target`: 69 - /// pipeline-internal stages use a bare `io::copy` (no progress), while 73 + /// Copy bytes from `reader` through `writer`, branching on whether progress 74 + /// is relevant: pipeline-internal stages (either writing to an in-memory pipe 75 + /// or reading from one) use a bare `io::copy` with no progress, while 70 76 /// user-facing outputs go through `copy_with_progress` to show a progress bar 71 - /// when configured. 77 + /// when configured. `pipeline_inner` is set when the input comes from an 78 + /// upstream pipeline stage — in that case we don't know the total size and 79 + /// the innermost stage already owns the progress bar, so we skip ours. 72 80 pub fn copy_stream<R: Read, W: Write>( 73 81 mut reader: R, 74 82 mut writer: W, 75 83 file_size: Option<u64>, 84 + pipeline_inner: bool, 76 85 progress_args: &ProgressArgs, 77 86 target: OutputTarget, 78 87 ) -> Result { 79 - if target == OutputTarget::InMemory { 88 + if pipeline_inner || target == OutputTarget::InMemory { 80 89 io::copy(&mut reader, &mut writer)?; 81 90 } else { 82 91 copy_with_progress(
+14 -1
src/backends/tar.rs
··· 75 75 Ok(()) 76 76 } 77 77 CmprssOutput::Writer(mut writer) => { 78 + // Pipeline-internal: tar is the innermost stage, writing into an 79 + // in-memory pipe feeding the outer codec(s). We still own the 80 + // progress bar because only tar sees the real input bytes; outer 81 + // stages suppress their bar (their input size is unknown). 82 + let total = match &input { 83 + CmprssInput::Path(paths) => Some(total_input_bytes(paths)), 84 + _ => None, 85 + }; 86 + let bar = 87 + create_progress_bar(total, self.progress_args.progress, OutputTarget::File); 78 88 let mut temp_file = tempfile()?; 79 - self.compress_internal(input, Builder::new(&mut temp_file), None)?; 89 + self.compress_internal(input, Builder::new(&mut temp_file), bar.as_ref())?; 80 90 temp_file.seek(SeekFrom::Start(0))?; 81 91 io::copy(&mut temp_file, &mut writer)?; 92 + if let Some(b) = bar { 93 + b.finish(); 94 + } 82 95 Ok(()) 83 96 } 84 97 }
+11 -3
src/backends/xz.rs
··· 60 60 61 61 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 62 62 guard_file_output(&output, "Xz")?; 63 - let (input_stream, file_size) = open_input(input, "Xz")?; 63 + let (input_stream, file_size, pipeline_inner) = open_input(input, "Xz")?; 64 64 let (writer, target) = prepare_output(output)?; 65 65 let mut encoder = XzEncoder::new(writer, self.level as u32); 66 66 copy_stream( 67 67 input_stream, 68 68 &mut encoder, 69 69 file_size, 70 + pipeline_inner, 70 71 &self.progress_args, 71 72 target, 72 73 )?; ··· 76 77 77 78 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 78 79 guard_file_output(&output, "Xz")?; 79 - let (input_stream, file_size) = open_input(input, "Xz")?; 80 + let (input_stream, file_size, pipeline_inner) = open_input(input, "Xz")?; 80 81 let decoder = XzDecoder::new(input_stream); 81 82 let (writer, target) = prepare_output(output)?; 82 - copy_stream(decoder, writer, file_size, &self.progress_args, target)?; 83 + copy_stream( 84 + decoder, 85 + writer, 86 + file_size, 87 + pipeline_inner, 88 + &self.progress_args, 89 + target, 90 + )?; 83 91 Ok(()) 84 92 } 85 93 }
+11 -3
src/backends/zstd.rs
··· 82 82 /// Compress an input file or pipe to a zstd archive 83 83 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 84 84 guard_file_output(&output, "Zstd")?; 85 - let (input_stream, file_size) = open_input(input, "Zstd")?; 85 + let (input_stream, file_size, pipeline_inner) = open_input(input, "Zstd")?; 86 86 let (writer, target) = prepare_output(output)?; 87 87 let mut encoder = Encoder::new(writer, self.compression_level)?; 88 88 copy_stream( 89 89 input_stream, 90 90 &mut encoder, 91 91 file_size, 92 + pipeline_inner, 92 93 &self.progress_args, 93 94 target, 94 95 )?; ··· 99 100 /// Extract a zstd archive to an output file or pipe 100 101 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 101 102 guard_file_output(&output, "Zstd")?; 102 - let (input_stream, file_size) = open_input(input, "Zstd")?; 103 + let (input_stream, file_size, pipeline_inner) = open_input(input, "Zstd")?; 103 104 let decoder = Decoder::new(input_stream)?; 104 105 let (writer, target) = prepare_output(output)?; 105 - copy_stream(decoder, writer, file_size, &self.progress_args, target)?; 106 + copy_stream( 107 + decoder, 108 + writer, 109 + file_size, 110 + pipeline_inner, 111 + &self.progress_args, 112 + target, 113 + )?; 106 114 Ok(()) 107 115 } 108 116 }