this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at c03f7c5b5e969e84c16545ec93851f2fa78da9d8 340 lines 10 kB view raw
1use clap::Args; 2use indicatif::{HumanBytes, ProgressBar}; 3use std::io::{self, Read, Seek, SeekFrom, Write}; 4use std::str::FromStr; 5use std::time::Duration; 6use std::time::Instant; 7 8#[derive(clap::ValueEnum, Clone, Copy, Debug, Default)] 9pub enum ProgressDisplay { 10 #[default] 11 Auto, 12 On, 13 Off, 14} 15 16/// How a resolved `CmprssOutput` should be treated by the progress/copy 17/// helpers. Decoupling this from `CmprssOutput` lets us consume the output 18/// into an owned boxed writer up front and still drive progress-bar decisions. 19#[derive(Debug, Clone, Copy, PartialEq, Eq)] 20pub enum OutputTarget { 21 /// Path on disk — show progress when enabled. 22 File, 23 /// Stdout — suppress progress in `Auto` mode to avoid mixing with piped 24 /// bytes. 25 Stdout, 26 /// Pipeline-internal writer (channel between stages) — no progress. 27 InMemory, 28} 29 30#[derive(Debug, Clone, Copy, PartialEq)] 31pub struct ChunkSize { 32 pub size_in_bytes: usize, 33} 34 35impl Default for ChunkSize { 36 fn default() -> Self { 37 ChunkSize { 38 size_in_bytes: 8192, 39 } 40 } 41} 42 43impl FromStr for ChunkSize { 44 type Err = &'static str; 45 46 fn from_str(s: &str) -> Result<Self, Self::Err> { 47 // Try to parse s as just a number 48 if let Ok(num) = s.parse::<usize>() { 49 if num == 0 { 50 return Err("Invalid number"); 51 } 52 return Ok(ChunkSize { size_in_bytes: num }); 53 } 54 // Simplify so that we always assume base 2, regardless of whether we see 55 // 'kb' or 'kib' 56 let mut s = s.to_lowercase(); 57 if s.ends_with("ib") { 58 s.truncate(s.len() - 2); 59 s.push('b'); 60 }; 61 let (num_str, unit) = s.split_at(s.len() - 2); 62 let num = num_str.parse::<usize>().map_err(|_| "Invalid number")?; 63 64 let size_in_bytes = match unit { 65 "kb" => num * 1024, 66 "mb" => num * 1024 * 1024, 67 "gb" => num * 1024 * 1024 * 1024, 68 _ => return Err("Invalid unit"), 69 }; 70 if size_in_bytes == 0 { 71 return Err("Invalid number"); 72 } 73 74 Ok(ChunkSize { size_in_bytes }) 75 } 76} 77 78#[derive(Args, Debug, Default, Clone, Copy)] 79pub struct ProgressArgs { 80 /// Show progress. 81 #[arg(long, value_enum, default_value = "auto")] 82 pub progress: ProgressDisplay, 83 84 /// Chunk size to use during the copy when showing the progress bar. 85 #[arg(long, default_value = "8kib")] 86 pub chunk_size: ChunkSize, 87} 88 89/// Create a progress bar if necessary based on settings 90pub fn create_progress_bar( 91 input_size: Option<u64>, 92 progress: ProgressDisplay, 93 target: OutputTarget, 94) -> Option<ProgressBar> { 95 match (progress, target) { 96 (ProgressDisplay::Auto, OutputTarget::Stdout) => None, 97 (ProgressDisplay::Off, _) => None, 98 (_, _) => { 99 let bar = match input_size { 100 Some(size) => ProgressBar::new(size), 101 None => ProgressBar::new_spinner(), 102 }; 103 bar.set_style( 104 indicatif::ProgressStyle::default_bar() 105 .template("{spinner:.green} [{elapsed_precise}] ({eta}) [{bar:40.cyan/blue}] {bytes}/{total_bytes} => {msg}").expect("progress bar template literal is valid") 106 .progress_chars("#>-"), 107 ); 108 bar.enable_steady_tick(Duration::from_millis(100)); 109 Some(bar) 110 } 111 } 112} 113 114/// A reader that tracks progress of bytes read. Multiple readers may share 115/// the same `ProgressBar` clone to drive a single bar across several input 116/// streams (container formats iterate over many files); the bar is advanced 117/// via `inc`, which is atomic and relative. 118pub struct ProgressReader<R> { 119 inner: R, 120 bar: Option<ProgressBar>, 121 last_update: Instant, 122 bytes_since_update: u64, 123 bytes_per_update: u64, 124} 125 126impl<R: Read> ProgressReader<R> { 127 pub fn new(inner: R, bar: Option<ProgressBar>) -> Self { 128 ProgressReader { 129 inner, 130 bar, 131 last_update: Instant::now(), 132 bytes_since_update: 0, 133 bytes_per_update: 8192, // Start with 8KB, will adjust dynamically 134 } 135 } 136 137 /// Updates the progress bar if enough bytes have been read since the last update. 138 /// Dynamically adjusts the update frequency to target ~100ms between updates by 139 /// tracking the elapsed time and adjusting bytes_per_update accordingly. 140 fn maybe_update_progress(&mut self, bytes_read: u64) { 141 if let Some(ref bar) = self.bar { 142 self.bytes_since_update += bytes_read; 143 144 if self.bytes_since_update >= self.bytes_per_update { 145 let now = Instant::now(); 146 let elapsed = now.duration_since(self.last_update); 147 148 bar.inc(self.bytes_since_update); 149 150 // Adjust bytes_per_update to target ~100ms between updates 151 if elapsed < Duration::from_millis(50) { 152 self.bytes_per_update *= 2; 153 } else if elapsed > Duration::from_millis(150) { 154 self.bytes_per_update = (self.bytes_per_update / 2).max(1024); 155 } 156 157 self.last_update = now; 158 self.bytes_since_update = 0; 159 } 160 } 161 } 162} 163 164impl<R> Drop for ProgressReader<R> { 165 fn drop(&mut self) { 166 if let Some(ref bar) = self.bar 167 && self.bytes_since_update > 0 168 { 169 bar.inc(self.bytes_since_update); 170 self.bytes_since_update = 0; 171 } 172 } 173} 174 175impl<R: Read> Read for ProgressReader<R> { 176 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 177 let bytes_read = self.inner.read(buf)?; 178 if bytes_read > 0 { 179 self.maybe_update_progress(bytes_read as u64); 180 } 181 Ok(bytes_read) 182 } 183} 184 185impl<R: Seek> Seek for ProgressReader<R> { 186 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> { 187 self.inner.seek(pos) 188 } 189} 190 191/// A writer that tracks progress of bytes written 192pub struct ProgressWriter<W> { 193 inner: W, 194 bar: Option<ProgressBar>, 195 total_written: u64, 196 last_update: Instant, 197 bytes_since_update: u64, 198 bytes_per_update: u64, 199} 200 201impl<W: Write> ProgressWriter<W> { 202 pub fn new(inner: W, bar: Option<ProgressBar>) -> Self { 203 ProgressWriter { 204 inner, 205 bar, 206 total_written: 0, 207 last_update: Instant::now(), 208 bytes_since_update: 0, 209 bytes_per_update: 8192, // Start with 8KB, will adjust dynamically 210 } 211 } 212 213 pub fn finish(self) { 214 if let Some(bar) = self.bar { 215 bar.finish(); 216 } 217 } 218 219 /// Updates the progress bar if enough bytes have been written since the last update. 220 /// Dynamically adjusts the update frequency to target ~100ms between updates by 221 /// tracking the elapsed time and adjusting bytes_per_update accordingly. 222 fn maybe_update_progress(&mut self, bytes_written: u64) { 223 if let Some(ref bar) = self.bar { 224 self.bytes_since_update += bytes_written; 225 226 if self.bytes_since_update >= self.bytes_per_update { 227 let now = Instant::now(); 228 let elapsed = now.duration_since(self.last_update); 229 230 // Update the progress 231 bar.set_message(HumanBytes(self.total_written).to_string()); 232 233 // Adjust bytes_per_update to target ~100ms between updates 234 if elapsed < Duration::from_millis(50) { 235 self.bytes_per_update *= 2; 236 } else if elapsed > Duration::from_millis(150) { 237 self.bytes_per_update = (self.bytes_per_update / 2).max(1024); 238 } 239 240 self.last_update = now; 241 self.bytes_since_update = 0; 242 } 243 } 244 } 245} 246 247impl<W: Write> Write for ProgressWriter<W> { 248 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 249 let bytes_written = self.inner.write(buf)?; 250 if bytes_written > 0 { 251 self.total_written += bytes_written as u64; 252 self.maybe_update_progress(bytes_written as u64); 253 } 254 Ok(bytes_written) 255 } 256 257 fn flush(&mut self) -> io::Result<()> { 258 self.inner.flush() 259 } 260} 261 262/// Process data with progress bar updates 263pub fn copy_with_progress<R: Read, W: Write>( 264 reader: R, 265 writer: W, 266 chunk_size: usize, 267 input_size: Option<u64>, 268 progress_display: ProgressDisplay, 269 target: OutputTarget, 270) -> io::Result<()> { 271 // Create the progress bar if needed 272 let progress_bar = create_progress_bar(input_size, progress_display, target); 273 274 // Create reader and writer with progress tracking 275 let mut reader = ProgressReader::new(reader, progress_bar.clone()); 276 let mut writer = ProgressWriter::new(writer, progress_bar); 277 278 let mut buffer = vec![0; chunk_size]; 279 loop { 280 let bytes_read = reader.read(&mut buffer)?; 281 if bytes_read == 0 { 282 break; 283 } 284 writer.write_all(&buffer[..bytes_read])?; 285 } 286 writer.flush()?; 287 writer.finish(); 288 Ok(()) 289} 290 291#[cfg(test)] 292mod tests { 293 use super::*; 294 295 #[test] 296 fn chunk_size_parsing() { 297 assert!(ChunkSize::from_str("0").is_err()); 298 assert!(ChunkSize::from_str("0mb").is_err()); 299 assert_eq!( 300 ChunkSize::from_str("1").unwrap(), 301 ChunkSize { size_in_bytes: 1 } 302 ); 303 assert_eq!( 304 ChunkSize::from_str("1kb").unwrap(), 305 ChunkSize { 306 size_in_bytes: 1024 307 } 308 ); 309 assert_eq!( 310 ChunkSize::from_str("16kib").unwrap(), 311 ChunkSize { 312 size_in_bytes: 16 * 1024 313 } 314 ); 315 assert_eq!( 316 ChunkSize::from_str("8mib").unwrap(), 317 ChunkSize { 318 size_in_bytes: 8 * 1024 * 1024 319 } 320 ); 321 assert_eq!( 322 ChunkSize::from_str("16mb").unwrap(), 323 ChunkSize { 324 size_in_bytes: 16 * 1024 * 1024 325 } 326 ); 327 assert_eq!( 328 ChunkSize::from_str("1gb").unwrap(), 329 ChunkSize { 330 size_in_bytes: 1024 * 1024 * 1024 331 } 332 ); 333 assert_eq!( 334 ChunkSize::from_str("16gib").unwrap(), 335 ChunkSize { 336 size_in_bytes: 16 * 1024 * 1024 * 1024 337 } 338 ); 339 } 340}