this repo has no description
1use clap::Args;
2use indicatif::{HumanBytes, ProgressBar};
3use std::io::{self, Read, Seek, SeekFrom, Write};
4use std::str::FromStr;
5use std::time::Duration;
6use std::time::Instant;
7
8#[derive(clap::ValueEnum, Clone, Copy, Debug, Default)]
9pub enum ProgressDisplay {
10 #[default]
11 Auto,
12 On,
13 Off,
14}
15
16/// How a resolved `CmprssOutput` should be treated by the progress/copy
17/// helpers. Decoupling this from `CmprssOutput` lets us consume the output
18/// into an owned boxed writer up front and still drive progress-bar decisions.
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum OutputTarget {
21 /// Path on disk — show progress when enabled.
22 File,
23 /// Stdout — suppress progress in `Auto` mode to avoid mixing with piped
24 /// bytes.
25 Stdout,
26 /// Pipeline-internal writer (channel between stages) — no progress.
27 InMemory,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq)]
31pub struct ChunkSize {
32 pub size_in_bytes: usize,
33}
34
35impl Default for ChunkSize {
36 fn default() -> Self {
37 ChunkSize {
38 size_in_bytes: 8192,
39 }
40 }
41}
42
43impl FromStr for ChunkSize {
44 type Err = &'static str;
45
46 fn from_str(s: &str) -> Result<Self, Self::Err> {
47 // Try to parse s as just a number
48 if let Ok(num) = s.parse::<usize>() {
49 if num == 0 {
50 return Err("Invalid number");
51 }
52 return Ok(ChunkSize { size_in_bytes: num });
53 }
54 // Simplify so that we always assume base 2, regardless of whether we see
55 // 'kb' or 'kib'
56 let mut s = s.to_lowercase();
57 if s.ends_with("ib") {
58 s.truncate(s.len() - 2);
59 s.push('b');
60 };
61 let (num_str, unit) = s.split_at(s.len() - 2);
62 let num = num_str.parse::<usize>().map_err(|_| "Invalid number")?;
63
64 let size_in_bytes = match unit {
65 "kb" => num * 1024,
66 "mb" => num * 1024 * 1024,
67 "gb" => num * 1024 * 1024 * 1024,
68 _ => return Err("Invalid unit"),
69 };
70 if size_in_bytes == 0 {
71 return Err("Invalid number");
72 }
73
74 Ok(ChunkSize { size_in_bytes })
75 }
76}
77
78#[derive(Args, Debug, Default, Clone, Copy)]
79pub struct ProgressArgs {
80 /// Show progress.
81 #[arg(long, value_enum, default_value = "auto")]
82 pub progress: ProgressDisplay,
83
84 /// Chunk size to use during the copy when showing the progress bar.
85 #[arg(long, default_value = "8kib")]
86 pub chunk_size: ChunkSize,
87}
88
89/// Create a progress bar if necessary based on settings
90pub fn create_progress_bar(
91 input_size: Option<u64>,
92 progress: ProgressDisplay,
93 target: OutputTarget,
94) -> Option<ProgressBar> {
95 match (progress, target) {
96 (ProgressDisplay::Auto, OutputTarget::Stdout) => None,
97 (ProgressDisplay::Off, _) => None,
98 (_, _) => {
99 let bar = match input_size {
100 Some(size) => ProgressBar::new(size),
101 None => ProgressBar::new_spinner(),
102 };
103 bar.set_style(
104 indicatif::ProgressStyle::default_bar()
105 .template("{spinner:.green} [{elapsed_precise}] ({eta}) [{bar:40.cyan/blue}] {bytes}/{total_bytes} => {msg}").expect("progress bar template literal is valid")
106 .progress_chars("#>-"),
107 );
108 bar.enable_steady_tick(Duration::from_millis(100));
109 Some(bar)
110 }
111 }
112}
113
114/// A reader that tracks progress of bytes read. Multiple readers may share
115/// the same `ProgressBar` clone to drive a single bar across several input
116/// streams (container formats iterate over many files); the bar is advanced
117/// via `inc`, which is atomic and relative.
118pub struct ProgressReader<R> {
119 inner: R,
120 bar: Option<ProgressBar>,
121 last_update: Instant,
122 bytes_since_update: u64,
123 bytes_per_update: u64,
124}
125
126impl<R: Read> ProgressReader<R> {
127 pub fn new(inner: R, bar: Option<ProgressBar>) -> Self {
128 ProgressReader {
129 inner,
130 bar,
131 last_update: Instant::now(),
132 bytes_since_update: 0,
133 bytes_per_update: 8192, // Start with 8KB, will adjust dynamically
134 }
135 }
136
137 /// Updates the progress bar if enough bytes have been read since the last update.
138 /// Dynamically adjusts the update frequency to target ~100ms between updates by
139 /// tracking the elapsed time and adjusting bytes_per_update accordingly.
140 fn maybe_update_progress(&mut self, bytes_read: u64) {
141 if let Some(ref bar) = self.bar {
142 self.bytes_since_update += bytes_read;
143
144 if self.bytes_since_update >= self.bytes_per_update {
145 let now = Instant::now();
146 let elapsed = now.duration_since(self.last_update);
147
148 bar.inc(self.bytes_since_update);
149
150 // Adjust bytes_per_update to target ~100ms between updates
151 if elapsed < Duration::from_millis(50) {
152 self.bytes_per_update *= 2;
153 } else if elapsed > Duration::from_millis(150) {
154 self.bytes_per_update = (self.bytes_per_update / 2).max(1024);
155 }
156
157 self.last_update = now;
158 self.bytes_since_update = 0;
159 }
160 }
161 }
162}
163
164impl<R> Drop for ProgressReader<R> {
165 fn drop(&mut self) {
166 if let Some(ref bar) = self.bar
167 && self.bytes_since_update > 0
168 {
169 bar.inc(self.bytes_since_update);
170 self.bytes_since_update = 0;
171 }
172 }
173}
174
175impl<R: Read> Read for ProgressReader<R> {
176 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
177 let bytes_read = self.inner.read(buf)?;
178 if bytes_read > 0 {
179 self.maybe_update_progress(bytes_read as u64);
180 }
181 Ok(bytes_read)
182 }
183}
184
185impl<R: Seek> Seek for ProgressReader<R> {
186 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
187 self.inner.seek(pos)
188 }
189}
190
191/// A writer that tracks progress of bytes written
192pub struct ProgressWriter<W> {
193 inner: W,
194 bar: Option<ProgressBar>,
195 total_written: u64,
196 last_update: Instant,
197 bytes_since_update: u64,
198 bytes_per_update: u64,
199}
200
201impl<W: Write> ProgressWriter<W> {
202 pub fn new(inner: W, bar: Option<ProgressBar>) -> Self {
203 ProgressWriter {
204 inner,
205 bar,
206 total_written: 0,
207 last_update: Instant::now(),
208 bytes_since_update: 0,
209 bytes_per_update: 8192, // Start with 8KB, will adjust dynamically
210 }
211 }
212
213 pub fn finish(self) {
214 if let Some(bar) = self.bar {
215 bar.finish();
216 }
217 }
218
219 /// Updates the progress bar if enough bytes have been written since the last update.
220 /// Dynamically adjusts the update frequency to target ~100ms between updates by
221 /// tracking the elapsed time and adjusting bytes_per_update accordingly.
222 fn maybe_update_progress(&mut self, bytes_written: u64) {
223 if let Some(ref bar) = self.bar {
224 self.bytes_since_update += bytes_written;
225
226 if self.bytes_since_update >= self.bytes_per_update {
227 let now = Instant::now();
228 let elapsed = now.duration_since(self.last_update);
229
230 // Update the progress
231 bar.set_message(HumanBytes(self.total_written).to_string());
232
233 // Adjust bytes_per_update to target ~100ms between updates
234 if elapsed < Duration::from_millis(50) {
235 self.bytes_per_update *= 2;
236 } else if elapsed > Duration::from_millis(150) {
237 self.bytes_per_update = (self.bytes_per_update / 2).max(1024);
238 }
239
240 self.last_update = now;
241 self.bytes_since_update = 0;
242 }
243 }
244 }
245}
246
247impl<W: Write> Write for ProgressWriter<W> {
248 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
249 let bytes_written = self.inner.write(buf)?;
250 if bytes_written > 0 {
251 self.total_written += bytes_written as u64;
252 self.maybe_update_progress(bytes_written as u64);
253 }
254 Ok(bytes_written)
255 }
256
257 fn flush(&mut self) -> io::Result<()> {
258 self.inner.flush()
259 }
260}
261
262/// Process data with progress bar updates
263pub fn copy_with_progress<R: Read, W: Write>(
264 reader: R,
265 writer: W,
266 chunk_size: usize,
267 input_size: Option<u64>,
268 progress_display: ProgressDisplay,
269 target: OutputTarget,
270) -> io::Result<()> {
271 // Create the progress bar if needed
272 let progress_bar = create_progress_bar(input_size, progress_display, target);
273
274 // Create reader and writer with progress tracking
275 let mut reader = ProgressReader::new(reader, progress_bar.clone());
276 let mut writer = ProgressWriter::new(writer, progress_bar);
277
278 let mut buffer = vec![0; chunk_size];
279 loop {
280 let bytes_read = reader.read(&mut buffer)?;
281 if bytes_read == 0 {
282 break;
283 }
284 writer.write_all(&buffer[..bytes_read])?;
285 }
286 writer.flush()?;
287 writer.finish();
288 Ok(())
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294
295 #[test]
296 fn chunk_size_parsing() {
297 assert!(ChunkSize::from_str("0").is_err());
298 assert!(ChunkSize::from_str("0mb").is_err());
299 assert_eq!(
300 ChunkSize::from_str("1").unwrap(),
301 ChunkSize { size_in_bytes: 1 }
302 );
303 assert_eq!(
304 ChunkSize::from_str("1kb").unwrap(),
305 ChunkSize {
306 size_in_bytes: 1024
307 }
308 );
309 assert_eq!(
310 ChunkSize::from_str("16kib").unwrap(),
311 ChunkSize {
312 size_in_bytes: 16 * 1024
313 }
314 );
315 assert_eq!(
316 ChunkSize::from_str("8mib").unwrap(),
317 ChunkSize {
318 size_in_bytes: 8 * 1024 * 1024
319 }
320 );
321 assert_eq!(
322 ChunkSize::from_str("16mb").unwrap(),
323 ChunkSize {
324 size_in_bytes: 16 * 1024 * 1024
325 }
326 );
327 assert_eq!(
328 ChunkSize::from_str("1gb").unwrap(),
329 ChunkSize {
330 size_in_bytes: 1024 * 1024 * 1024
331 }
332 );
333 assert_eq!(
334 ChunkSize::from_str("16gib").unwrap(),
335 ChunkSize {
336 size_in_bytes: 16 * 1024 * 1024 * 1024
337 }
338 );
339 }
340}