this repo has no description
1use clap::Args;
2use indicatif::{HumanBytes, ProgressBar};
3use std::io::{self, Read, Write};
4use std::str::FromStr;
5use std::time::Duration;
6use std::time::Instant;
7
8#[derive(clap::ValueEnum, Clone, Copy, Debug, Default)]
9pub enum ProgressDisplay {
10 #[default]
11 Auto,
12 On,
13 Off,
14}
15
16/// How a resolved `CmprssOutput` should be treated by the progress/copy
17/// helpers. Decoupling this from `CmprssOutput` lets us consume the output
18/// into an owned boxed writer up front and still drive progress-bar decisions.
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum OutputTarget {
21 /// Path on disk — show progress when enabled.
22 File,
23 /// Stdout — suppress progress in `Auto` mode to avoid mixing with piped
24 /// bytes.
25 Stdout,
26 /// Pipeline-internal writer (channel between stages) — no progress.
27 InMemory,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq)]
31pub struct ChunkSize {
32 pub size_in_bytes: usize,
33}
34
35impl Default for ChunkSize {
36 fn default() -> Self {
37 ChunkSize {
38 size_in_bytes: 8192,
39 }
40 }
41}
42
43impl FromStr for ChunkSize {
44 type Err = &'static str;
45
46 fn from_str(s: &str) -> Result<Self, Self::Err> {
47 // Try to parse s as just a number
48 if let Ok(num) = s.parse::<usize>() {
49 if num == 0 {
50 return Err("Invalid number");
51 }
52 return Ok(ChunkSize { size_in_bytes: num });
53 }
54 // Simplify so that we always assume base 2, regardless of whether we see
55 // 'kb' or 'kib'
56 let mut s = s.to_lowercase();
57 if s.ends_with("ib") {
58 s.truncate(s.len() - 2);
59 s.push('b');
60 };
61 let (num_str, unit) = s.split_at(s.len() - 2);
62 let num = num_str.parse::<usize>().map_err(|_| "Invalid number")?;
63
64 let size_in_bytes = match unit {
65 "kb" => num * 1024,
66 "mb" => num * 1024 * 1024,
67 "gb" => num * 1024 * 1024 * 1024,
68 _ => return Err("Invalid unit"),
69 };
70 if size_in_bytes == 0 {
71 return Err("Invalid number");
72 }
73
74 Ok(ChunkSize { size_in_bytes })
75 }
76}
77
78#[derive(Args, Debug, Default, Clone, Copy)]
79pub struct ProgressArgs {
80 /// Show progress.
81 #[arg(long, value_enum, default_value = "auto")]
82 pub progress: ProgressDisplay,
83
84 /// Chunk size to use during the copy when showing the progress bar.
85 #[arg(long, default_value = "8kib")]
86 pub chunk_size: ChunkSize,
87}
88
89/// Create a progress bar if necessary based on settings
90pub fn create_progress_bar(
91 input_size: Option<u64>,
92 progress: ProgressDisplay,
93 target: OutputTarget,
94) -> Option<ProgressBar> {
95 match (progress, target) {
96 (ProgressDisplay::Auto, OutputTarget::Stdout) => None,
97 (ProgressDisplay::Off, _) => None,
98 (_, _) => {
99 let bar = match input_size {
100 Some(size) => ProgressBar::new(size),
101 None => ProgressBar::new_spinner(),
102 };
103 bar.set_style(
104 indicatif::ProgressStyle::default_bar()
105 .template("{spinner:.green} [{elapsed_precise}] ({eta}) [{bar:40.cyan/blue}] {bytes}/{total_bytes} => {msg}").unwrap()
106 .progress_chars("#>-"),
107 );
108 bar.enable_steady_tick(Duration::from_millis(100));
109 Some(bar)
110 }
111 }
112}
113
114/// A reader that tracks progress of bytes read. Multiple readers may share
115/// the same `ProgressBar` clone to drive a single bar across several input
116/// streams (container formats iterate over many files); the bar is advanced
117/// via `inc`, which is atomic and relative.
118pub struct ProgressReader<R> {
119 inner: R,
120 bar: Option<ProgressBar>,
121 last_update: Instant,
122 bytes_since_update: u64,
123 bytes_per_update: u64,
124}
125
126impl<R: Read> ProgressReader<R> {
127 pub fn new(inner: R, bar: Option<ProgressBar>) -> Self {
128 ProgressReader {
129 inner,
130 bar,
131 last_update: Instant::now(),
132 bytes_since_update: 0,
133 bytes_per_update: 8192, // Start with 8KB, will adjust dynamically
134 }
135 }
136
137 /// Updates the progress bar if enough bytes have been read since the last update.
138 /// Dynamically adjusts the update frequency to target ~100ms between updates by
139 /// tracking the elapsed time and adjusting bytes_per_update accordingly.
140 fn maybe_update_progress(&mut self, bytes_read: u64) {
141 if let Some(ref bar) = self.bar {
142 self.bytes_since_update += bytes_read;
143
144 if self.bytes_since_update >= self.bytes_per_update {
145 let now = Instant::now();
146 let elapsed = now.duration_since(self.last_update);
147
148 bar.inc(self.bytes_since_update);
149
150 // Adjust bytes_per_update to target ~100ms between updates
151 if elapsed < Duration::from_millis(50) {
152 self.bytes_per_update *= 2;
153 } else if elapsed > Duration::from_millis(150) {
154 self.bytes_per_update = (self.bytes_per_update / 2).max(1024);
155 }
156
157 self.last_update = now;
158 self.bytes_since_update = 0;
159 }
160 }
161 }
162}
163
164impl<R> Drop for ProgressReader<R> {
165 fn drop(&mut self) {
166 if let Some(ref bar) = self.bar
167 && self.bytes_since_update > 0
168 {
169 bar.inc(self.bytes_since_update);
170 self.bytes_since_update = 0;
171 }
172 }
173}
174
175impl<R: Read> Read for ProgressReader<R> {
176 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
177 let bytes_read = self.inner.read(buf)?;
178 if bytes_read > 0 {
179 self.maybe_update_progress(bytes_read as u64);
180 }
181 Ok(bytes_read)
182 }
183}
184
185/// A writer that tracks progress of bytes written
186pub struct ProgressWriter<W> {
187 inner: W,
188 bar: Option<ProgressBar>,
189 total_written: u64,
190 last_update: Instant,
191 bytes_since_update: u64,
192 bytes_per_update: u64,
193}
194
195impl<W: Write> ProgressWriter<W> {
196 pub fn new(inner: W, bar: Option<ProgressBar>) -> Self {
197 ProgressWriter {
198 inner,
199 bar,
200 total_written: 0,
201 last_update: Instant::now(),
202 bytes_since_update: 0,
203 bytes_per_update: 8192, // Start with 8KB, will adjust dynamically
204 }
205 }
206
207 pub fn finish(self) {
208 if let Some(bar) = self.bar {
209 bar.finish();
210 }
211 }
212
213 /// Updates the progress bar if enough bytes have been written since the last update.
214 /// Dynamically adjusts the update frequency to target ~100ms between updates by
215 /// tracking the elapsed time and adjusting bytes_per_update accordingly.
216 fn maybe_update_progress(&mut self, bytes_written: u64) {
217 if let Some(ref bar) = self.bar {
218 self.bytes_since_update += bytes_written;
219
220 if self.bytes_since_update >= self.bytes_per_update {
221 let now = Instant::now();
222 let elapsed = now.duration_since(self.last_update);
223
224 // Update the progress
225 bar.set_message(HumanBytes(self.total_written).to_string());
226
227 // Adjust bytes_per_update to target ~100ms between updates
228 if elapsed < Duration::from_millis(50) {
229 self.bytes_per_update *= 2;
230 } else if elapsed > Duration::from_millis(150) {
231 self.bytes_per_update = (self.bytes_per_update / 2).max(1024);
232 }
233
234 self.last_update = now;
235 self.bytes_since_update = 0;
236 }
237 }
238 }
239}
240
241impl<W: Write> Write for ProgressWriter<W> {
242 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
243 let bytes_written = self.inner.write(buf)?;
244 if bytes_written > 0 {
245 self.total_written += bytes_written as u64;
246 self.maybe_update_progress(bytes_written as u64);
247 }
248 Ok(bytes_written)
249 }
250
251 fn flush(&mut self) -> io::Result<()> {
252 self.inner.flush()
253 }
254}
255
256/// Process data with progress bar updates
257pub fn copy_with_progress<R: Read, W: Write>(
258 reader: R,
259 writer: W,
260 chunk_size: usize,
261 input_size: Option<u64>,
262 progress_display: ProgressDisplay,
263 target: OutputTarget,
264) -> io::Result<()> {
265 // Create the progress bar if needed
266 let progress_bar = create_progress_bar(input_size, progress_display, target);
267
268 // Create reader and writer with progress tracking
269 let mut reader = ProgressReader::new(reader, progress_bar.clone());
270 let mut writer = ProgressWriter::new(writer, progress_bar);
271
272 let mut buffer = vec![0; chunk_size];
273 loop {
274 let bytes_read = reader.read(&mut buffer)?;
275 if bytes_read == 0 {
276 break;
277 }
278 writer.write_all(&buffer[..bytes_read])?;
279 }
280 writer.flush()?;
281 writer.finish();
282 Ok(())
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288
289 #[test]
290 fn chunk_size_parsing() {
291 assert!(ChunkSize::from_str("0").is_err());
292 assert!(ChunkSize::from_str("0mb").is_err());
293 assert_eq!(
294 ChunkSize::from_str("1").unwrap(),
295 ChunkSize { size_in_bytes: 1 }
296 );
297 assert_eq!(
298 ChunkSize::from_str("1kb").unwrap(),
299 ChunkSize {
300 size_in_bytes: 1024
301 }
302 );
303 assert_eq!(
304 ChunkSize::from_str("16kib").unwrap(),
305 ChunkSize {
306 size_in_bytes: 16 * 1024
307 }
308 );
309 assert_eq!(
310 ChunkSize::from_str("8mib").unwrap(),
311 ChunkSize {
312 size_in_bytes: 8 * 1024 * 1024
313 }
314 );
315 assert_eq!(
316 ChunkSize::from_str("16mb").unwrap(),
317 ChunkSize {
318 size_in_bytes: 16 * 1024 * 1024
319 }
320 );
321 assert_eq!(
322 ChunkSize::from_str("1gb").unwrap(),
323 ChunkSize {
324 size_in_bytes: 1024 * 1024 * 1024
325 }
326 );
327 assert_eq!(
328 ChunkSize::from_str("16gib").unwrap(),
329 ChunkSize {
330 size_in_bytes: 16 * 1024 * 1024 * 1024
331 }
332 );
333 }
334}