this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at f26a525327ea65cc8a0ba85bb6825e4691c3f435 334 lines 10 kB view raw
1use clap::Args; 2use indicatif::{HumanBytes, ProgressBar}; 3use std::io::{self, Read, Write}; 4use std::str::FromStr; 5use std::time::Duration; 6use std::time::Instant; 7 8#[derive(clap::ValueEnum, Clone, Copy, Debug, Default)] 9pub enum ProgressDisplay { 10 #[default] 11 Auto, 12 On, 13 Off, 14} 15 16/// How a resolved `CmprssOutput` should be treated by the progress/copy 17/// helpers. Decoupling this from `CmprssOutput` lets us consume the output 18/// into an owned boxed writer up front and still drive progress-bar decisions. 19#[derive(Debug, Clone, Copy, PartialEq, Eq)] 20pub enum OutputTarget { 21 /// Path on disk — show progress when enabled. 22 File, 23 /// Stdout — suppress progress in `Auto` mode to avoid mixing with piped 24 /// bytes. 25 Stdout, 26 /// Pipeline-internal writer (channel between stages) — no progress. 27 InMemory, 28} 29 30#[derive(Debug, Clone, Copy, PartialEq)] 31pub struct ChunkSize { 32 pub size_in_bytes: usize, 33} 34 35impl Default for ChunkSize { 36 fn default() -> Self { 37 ChunkSize { 38 size_in_bytes: 8192, 39 } 40 } 41} 42 43impl FromStr for ChunkSize { 44 type Err = &'static str; 45 46 fn from_str(s: &str) -> Result<Self, Self::Err> { 47 // Try to parse s as just a number 48 if let Ok(num) = s.parse::<usize>() { 49 if num == 0 { 50 return Err("Invalid number"); 51 } 52 return Ok(ChunkSize { size_in_bytes: num }); 53 } 54 // Simplify so that we always assume base 2, regardless of whether we see 55 // 'kb' or 'kib' 56 let mut s = s.to_lowercase(); 57 if s.ends_with("ib") { 58 s.truncate(s.len() - 2); 59 s.push('b'); 60 }; 61 let (num_str, unit) = s.split_at(s.len() - 2); 62 let num = num_str.parse::<usize>().map_err(|_| "Invalid number")?; 63 64 let size_in_bytes = match unit { 65 "kb" => num * 1024, 66 "mb" => num * 1024 * 1024, 67 "gb" => num * 1024 * 1024 * 1024, 68 _ => return Err("Invalid unit"), 69 }; 70 if size_in_bytes == 0 { 71 return Err("Invalid number"); 72 } 73 74 Ok(ChunkSize { size_in_bytes }) 75 } 76} 77 78#[derive(Args, Debug, Default, Clone, Copy)] 79pub struct ProgressArgs { 80 /// Show progress. 81 #[arg(long, value_enum, default_value = "auto")] 82 pub progress: ProgressDisplay, 83 84 /// Chunk size to use during the copy when showing the progress bar. 85 #[arg(long, default_value = "8kib")] 86 pub chunk_size: ChunkSize, 87} 88 89/// Create a progress bar if necessary based on settings 90pub fn create_progress_bar( 91 input_size: Option<u64>, 92 progress: ProgressDisplay, 93 target: OutputTarget, 94) -> Option<ProgressBar> { 95 match (progress, target) { 96 (ProgressDisplay::Auto, OutputTarget::Stdout) => None, 97 (ProgressDisplay::Off, _) => None, 98 (_, _) => { 99 let bar = match input_size { 100 Some(size) => ProgressBar::new(size), 101 None => ProgressBar::new_spinner(), 102 }; 103 bar.set_style( 104 indicatif::ProgressStyle::default_bar() 105 .template("{spinner:.green} [{elapsed_precise}] ({eta}) [{bar:40.cyan/blue}] {bytes}/{total_bytes} => {msg}").unwrap() 106 .progress_chars("#>-"), 107 ); 108 bar.enable_steady_tick(Duration::from_millis(100)); 109 Some(bar) 110 } 111 } 112} 113 114/// A reader that tracks progress of bytes read. Multiple readers may share 115/// the same `ProgressBar` clone to drive a single bar across several input 116/// streams (container formats iterate over many files); the bar is advanced 117/// via `inc`, which is atomic and relative. 118pub struct ProgressReader<R> { 119 inner: R, 120 bar: Option<ProgressBar>, 121 last_update: Instant, 122 bytes_since_update: u64, 123 bytes_per_update: u64, 124} 125 126impl<R: Read> ProgressReader<R> { 127 pub fn new(inner: R, bar: Option<ProgressBar>) -> Self { 128 ProgressReader { 129 inner, 130 bar, 131 last_update: Instant::now(), 132 bytes_since_update: 0, 133 bytes_per_update: 8192, // Start with 8KB, will adjust dynamically 134 } 135 } 136 137 /// Updates the progress bar if enough bytes have been read since the last update. 138 /// Dynamically adjusts the update frequency to target ~100ms between updates by 139 /// tracking the elapsed time and adjusting bytes_per_update accordingly. 140 fn maybe_update_progress(&mut self, bytes_read: u64) { 141 if let Some(ref bar) = self.bar { 142 self.bytes_since_update += bytes_read; 143 144 if self.bytes_since_update >= self.bytes_per_update { 145 let now = Instant::now(); 146 let elapsed = now.duration_since(self.last_update); 147 148 bar.inc(self.bytes_since_update); 149 150 // Adjust bytes_per_update to target ~100ms between updates 151 if elapsed < Duration::from_millis(50) { 152 self.bytes_per_update *= 2; 153 } else if elapsed > Duration::from_millis(150) { 154 self.bytes_per_update = (self.bytes_per_update / 2).max(1024); 155 } 156 157 self.last_update = now; 158 self.bytes_since_update = 0; 159 } 160 } 161 } 162} 163 164impl<R> Drop for ProgressReader<R> { 165 fn drop(&mut self) { 166 if let Some(ref bar) = self.bar 167 && self.bytes_since_update > 0 168 { 169 bar.inc(self.bytes_since_update); 170 self.bytes_since_update = 0; 171 } 172 } 173} 174 175impl<R: Read> Read for ProgressReader<R> { 176 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 177 let bytes_read = self.inner.read(buf)?; 178 if bytes_read > 0 { 179 self.maybe_update_progress(bytes_read as u64); 180 } 181 Ok(bytes_read) 182 } 183} 184 185/// A writer that tracks progress of bytes written 186pub struct ProgressWriter<W> { 187 inner: W, 188 bar: Option<ProgressBar>, 189 total_written: u64, 190 last_update: Instant, 191 bytes_since_update: u64, 192 bytes_per_update: u64, 193} 194 195impl<W: Write> ProgressWriter<W> { 196 pub fn new(inner: W, bar: Option<ProgressBar>) -> Self { 197 ProgressWriter { 198 inner, 199 bar, 200 total_written: 0, 201 last_update: Instant::now(), 202 bytes_since_update: 0, 203 bytes_per_update: 8192, // Start with 8KB, will adjust dynamically 204 } 205 } 206 207 pub fn finish(self) { 208 if let Some(bar) = self.bar { 209 bar.finish(); 210 } 211 } 212 213 /// Updates the progress bar if enough bytes have been written since the last update. 214 /// Dynamically adjusts the update frequency to target ~100ms between updates by 215 /// tracking the elapsed time and adjusting bytes_per_update accordingly. 216 fn maybe_update_progress(&mut self, bytes_written: u64) { 217 if let Some(ref bar) = self.bar { 218 self.bytes_since_update += bytes_written; 219 220 if self.bytes_since_update >= self.bytes_per_update { 221 let now = Instant::now(); 222 let elapsed = now.duration_since(self.last_update); 223 224 // Update the progress 225 bar.set_message(HumanBytes(self.total_written).to_string()); 226 227 // Adjust bytes_per_update to target ~100ms between updates 228 if elapsed < Duration::from_millis(50) { 229 self.bytes_per_update *= 2; 230 } else if elapsed > Duration::from_millis(150) { 231 self.bytes_per_update = (self.bytes_per_update / 2).max(1024); 232 } 233 234 self.last_update = now; 235 self.bytes_since_update = 0; 236 } 237 } 238 } 239} 240 241impl<W: Write> Write for ProgressWriter<W> { 242 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 243 let bytes_written = self.inner.write(buf)?; 244 if bytes_written > 0 { 245 self.total_written += bytes_written as u64; 246 self.maybe_update_progress(bytes_written as u64); 247 } 248 Ok(bytes_written) 249 } 250 251 fn flush(&mut self) -> io::Result<()> { 252 self.inner.flush() 253 } 254} 255 256/// Process data with progress bar updates 257pub fn copy_with_progress<R: Read, W: Write>( 258 reader: R, 259 writer: W, 260 chunk_size: usize, 261 input_size: Option<u64>, 262 progress_display: ProgressDisplay, 263 target: OutputTarget, 264) -> io::Result<()> { 265 // Create the progress bar if needed 266 let progress_bar = create_progress_bar(input_size, progress_display, target); 267 268 // Create reader and writer with progress tracking 269 let mut reader = ProgressReader::new(reader, progress_bar.clone()); 270 let mut writer = ProgressWriter::new(writer, progress_bar); 271 272 let mut buffer = vec![0; chunk_size]; 273 loop { 274 let bytes_read = reader.read(&mut buffer)?; 275 if bytes_read == 0 { 276 break; 277 } 278 writer.write_all(&buffer[..bytes_read])?; 279 } 280 writer.flush()?; 281 writer.finish(); 282 Ok(()) 283} 284 285#[cfg(test)] 286mod tests { 287 use super::*; 288 289 #[test] 290 fn chunk_size_parsing() { 291 assert!(ChunkSize::from_str("0").is_err()); 292 assert!(ChunkSize::from_str("0mb").is_err()); 293 assert_eq!( 294 ChunkSize::from_str("1").unwrap(), 295 ChunkSize { size_in_bytes: 1 } 296 ); 297 assert_eq!( 298 ChunkSize::from_str("1kb").unwrap(), 299 ChunkSize { 300 size_in_bytes: 1024 301 } 302 ); 303 assert_eq!( 304 ChunkSize::from_str("16kib").unwrap(), 305 ChunkSize { 306 size_in_bytes: 16 * 1024 307 } 308 ); 309 assert_eq!( 310 ChunkSize::from_str("8mib").unwrap(), 311 ChunkSize { 312 size_in_bytes: 8 * 1024 * 1024 313 } 314 ); 315 assert_eq!( 316 ChunkSize::from_str("16mb").unwrap(), 317 ChunkSize { 318 size_in_bytes: 16 * 1024 * 1024 319 } 320 ); 321 assert_eq!( 322 ChunkSize::from_str("1gb").unwrap(), 323 ChunkSize { 324 size_in_bytes: 1024 * 1024 * 1024 325 } 326 ); 327 assert_eq!( 328 ChunkSize::from_str("16gib").unwrap(), 329 ChunkSize { 330 size_in_bytes: 16 * 1024 * 1024 * 1024 331 } 332 ); 333 } 334}