this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at tangled-ci 438 lines 15 kB view raw
1use crate::utils::{ 2 CmprssInput, CmprssOutput, Compressor, ExtractedTarget, ReadWrapper, Result, WriteWrapper, 3}; 4use anyhow::{anyhow, bail}; 5use std::io::{self, Read, Write}; 6use std::path::Path; 7use std::sync::mpsc::{Receiver, Sender, channel}; 8use std::thread; 9 10/// A pipeline of one or more compressors applied in sequence (e.g., tar.gz) 11pub struct Pipeline { 12 // The chain of compressors to apply in order (innermost to outermost) 13 compressors: Vec<Box<dyn Compressor>>, 14 /// Preserves the user's original format string (e.g. `tgz`) so default 15 /// filenames use it verbatim instead of the dotted composition of each 16 /// stage's extension. `None` falls back to joining the per-stage 17 /// extensions. 18 format_override: Option<String>, 19} 20 21impl Clone for Pipeline { 22 fn clone(&self) -> Self { 23 Pipeline { 24 compressors: self.compressors.iter().map(|c| c.clone_boxed()).collect(), 25 format_override: self.format_override.clone(), 26 } 27 } 28} 29 30/// Which method intermediate (threaded) stages should invoke. The final stage 31/// always runs on the calling thread and is handled by a caller-supplied 32/// closure — only the intermediate layers need this dispatch. 33#[derive(Clone, Copy)] 34enum StageAction { 35 Compress, 36 Extract, 37} 38 39impl Pipeline { 40 /// Create a new Pipeline with the given compressors 41 pub fn new(compressors: Vec<Box<dyn Compressor>>) -> Self { 42 Pipeline { 43 compressors, 44 format_override: None, 45 } 46 } 47 48 /// Create a Pipeline that keeps `format` as its canonical format string, 49 /// used for default output filenames. Intended for shortcut forms like 50 /// `tgz` where the user-facing extension differs from the dotted chain. 51 pub fn with_format(compressors: Vec<Box<dyn Compressor>>, format: String) -> Self { 52 Pipeline { 53 compressors, 54 format_override: Some(format), 55 } 56 } 57 58 /// Get a string representation of the chained format (e.g., "tar.gz") 59 fn format_chain(&self) -> String { 60 if let Some(ref f) = self.format_override { 61 return f.clone(); 62 } 63 self.compressors 64 .iter() 65 .map(|c| c.extension()) 66 .collect::<Vec<&str>>() 67 .join(".") 68 } 69 70 /// Run an ordered chain of compressor stages, with each non-final stage 71 /// in its own thread linked by an in-memory pipe. The final (last) stage 72 /// runs on the calling thread via `finalize`. Intermediate stages all 73 /// invoke the same method — `compress` going outward through a 74 /// compression pipeline, `extract` unwrapping layers on the way in. 75 fn run_threaded<F>( 76 stages: Vec<Box<dyn Compressor>>, 77 initial_input: CmprssInput, 78 intermediate: StageAction, 79 finalize: F, 80 ) -> Result 81 where 82 F: FnOnce(Box<dyn Compressor>, CmprssInput) -> Result, 83 { 84 debug_assert!(!stages.is_empty(), "pipeline is never empty"); 85 let mut stages = stages; 86 let last = stages.pop().expect("pipeline is never empty"); 87 let buffer_size = 64 * 1024; 88 let mut current_input = initial_input; 89 let mut handles = Vec::new(); 90 91 for stage in stages { 92 let (sender, receiver) = channel::<Vec<u8>>(); 93 let stage_output = 94 CmprssOutput::Writer(WriteWrapper(Box::new(PipeWriter::new(sender, buffer_size)))); 95 let next_input = CmprssInput::Reader(ReadWrapper(Box::new(PipeReader::new(receiver)))); 96 let stage_input = std::mem::replace(&mut current_input, next_input); 97 98 let handle = thread::spawn(move || match intermediate { 99 StageAction::Compress => stage.compress(stage_input, stage_output), 100 StageAction::Extract => stage.extract(stage_input, stage_output), 101 }); 102 handles.push(handle); 103 } 104 105 finalize(last, current_input)?; 106 107 for handle in handles { 108 handle 109 .join() 110 .map_err(|_| anyhow!("Pipeline stage thread panicked"))??; 111 } 112 Ok(()) 113 } 114} 115 116/// A reader that reads from a receiver channel 117struct PipeReader { 118 receiver: Receiver<Vec<u8>>, 119 buffer: Vec<u8>, 120 position: usize, 121 eof: bool, 122} 123 124impl PipeReader { 125 fn new(receiver: Receiver<Vec<u8>>) -> Self { 126 PipeReader { 127 receiver, 128 buffer: Vec::new(), 129 position: 0, 130 eof: false, 131 } 132 } 133} 134 135impl Read for PipeReader { 136 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 137 // If we've reached EOF, return 0 to signal that 138 if self.eof && self.position >= self.buffer.len() { 139 return Ok(0); 140 } 141 142 // If we've consumed the current buffer, try to get a new one 143 if self.position >= self.buffer.len() { 144 match self.receiver.recv() { 145 Ok(data) => { 146 // Empty data signals EOF from the writer 147 if data.is_empty() { 148 self.eof = true; 149 return Ok(0); 150 } 151 self.buffer = data; 152 self.position = 0; 153 } 154 Err(_) => { 155 // Channel closed, this means EOF 156 self.eof = true; 157 return Ok(0); 158 } 159 } 160 } 161 162 // Copy data from our buffer to the output buffer 163 let available = self.buffer.len() - self.position; 164 let to_copy = available.min(buf.len()); 165 buf[..to_copy].copy_from_slice(&self.buffer[self.position..self.position + to_copy]); 166 self.position += to_copy; 167 Ok(to_copy) 168 } 169} 170 171/// A writer that writes to a sender channel 172struct PipeWriter { 173 sender: Sender<Vec<u8>>, 174 buffer_size: usize, 175} 176 177impl PipeWriter { 178 fn new(sender: Sender<Vec<u8>>, buffer_size: usize) -> Self { 179 PipeWriter { 180 sender, 181 buffer_size, 182 } 183 } 184} 185 186impl Write for PipeWriter { 187 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 188 // Split the input into chunks of buffer_size 189 let mut start = 0; 190 while start < buf.len() { 191 let end = (start + self.buffer_size).min(buf.len()); 192 let chunk = Vec::from(&buf[start..end]); 193 194 // Send the chunk through the channel 195 if self.sender.send(chunk).is_err() { 196 // If the receiver is gone, report an error 197 return Err(io::Error::new( 198 io::ErrorKind::BrokenPipe, 199 "Pipe receiver has been closed", 200 )); 201 } 202 start = end; 203 } 204 Ok(buf.len()) 205 } 206 207 fn flush(&mut self) -> io::Result<()> { 208 // No need to flush, the channel sends immediately 209 Ok(()) 210 } 211} 212 213impl Drop for PipeWriter { 214 fn drop(&mut self) { 215 // Send an empty buffer to signal EOF 216 let _ = self.sender.send(Vec::new()); 217 } 218} 219 220impl Compressor for Pipeline { 221 fn name(&self) -> &str { 222 self.compressors 223 .last() 224 .expect("pipeline is never empty") 225 .name() 226 } 227 228 fn extension(&self) -> &str { 229 self.compressors 230 .last() 231 .expect("pipeline is never empty") 232 .extension() 233 } 234 235 fn default_extracted_target(&self) -> ExtractedTarget { 236 self.compressors 237 .first() 238 .expect("pipeline is never empty") 239 .default_extracted_target() 240 } 241 242 fn default_compressed_filename(&self, in_path: &Path) -> String { 243 // Add all extensions: input.txt → input.txt.tar.gz 244 let base = in_path 245 .file_name() 246 .map(|n| n.to_string_lossy().into_owned()) 247 .unwrap_or_else(|| "archive".to_string()); 248 format!("{}.{}", base, self.format_chain()) 249 } 250 251 fn default_extracted_filename(&self, in_path: &Path) -> String { 252 if self.default_extracted_target() == ExtractedTarget::Directory { 253 return ".".to_string(); 254 } 255 // Strip all known extensions: input.tar.gz → input 256 let mut name = in_path 257 .file_name() 258 .map(|n| n.to_string_lossy().into_owned()) 259 .unwrap_or_else(|| "archive".to_string()); 260 for comp in self.compressors.iter().rev() { 261 let ext = format!(".{}", comp.extension()); 262 if let Some(stripped) = name.strip_suffix(&ext) { 263 name = stripped.to_string(); 264 } 265 } 266 name 267 } 268 269 fn is_archive(&self, in_path: &Path) -> bool { 270 let file_name = match in_path.file_name().and_then(|f| f.to_str()) { 271 Some(f) => f, 272 None => return false, 273 }; 274 file_name.ends_with(&format!(".{}", self.format_chain())) 275 } 276 277 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 278 debug_assert!(!self.compressors.is_empty(), "pipeline is never empty"); 279 if self.compressors.len() == 1 { 280 return self.compressors[0].compress(input, output); 281 } 282 // Innermost → outermost: the outermost compressor runs on the main 283 // thread and writes to the user-supplied output. 284 let stages = self.compressors.iter().map(|c| c.clone_boxed()).collect(); 285 Self::run_threaded(stages, input, StageAction::Compress, |last, input| { 286 last.compress(input, output) 287 }) 288 } 289 290 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 291 debug_assert!(!self.compressors.is_empty(), "pipeline is never empty"); 292 if self.compressors.len() == 1 { 293 return self.compressors[0].extract(input, output); 294 } 295 // Outermost → innermost: the innermost extractor (typically the 296 // container format like tar/zip) runs on the main thread so it can 297 // unpack into the user-supplied output. 298 let stages = self 299 .compressors 300 .iter() 301 .rev() 302 .map(|c| c.clone_boxed()) 303 .collect(); 304 Self::run_threaded(stages, input, StageAction::Extract, |last, input| { 305 let final_output = match output { 306 CmprssOutput::Path(ref p) => { 307 // If the innermost extractor wants a directory and the 308 // user's output path doesn't exist yet, create it so 309 // e.g. tar::unpack has somewhere to write. 310 if last.default_extracted_target() == ExtractedTarget::Directory && !p.exists() 311 { 312 std::fs::create_dir_all(p)?; 313 } 314 CmprssOutput::Path(p.clone()) 315 } 316 CmprssOutput::Pipe(_) | CmprssOutput::Writer(_) => output, 317 }; 318 last.extract(input, final_output) 319 }) 320 } 321 322 fn append(&self, input: CmprssInput, output: CmprssOutput) -> Result { 323 debug_assert!(!self.compressors.is_empty(), "pipeline is never empty"); 324 if self.compressors.len() == 1 { 325 // Single-stage pipelines are just a wrapper; delegate so tar/zip 326 // reached via positional-path inference still support --append. 327 return self.compressors[0].append(input, output); 328 } 329 bail!( 330 "cannot --append to a compound archive ({}); it would require decompressing and recompressing the whole archive", 331 self.format_chain() 332 ) 333 } 334 335 fn list(&self, input: CmprssInput) -> Result { 336 debug_assert!(!self.compressors.is_empty(), "pipeline is never empty"); 337 if self.compressors.len() == 1 { 338 return self.compressors[0].list(input); 339 } 340 // Same plumbing as `extract`, except the innermost compressor lists 341 // its entries to stdout instead of unpacking. Outer layers still 342 // decompress into the in-memory pipe so the innermost container sees 343 // plain archive bytes. 344 let stages = self 345 .compressors 346 .iter() 347 .rev() 348 .map(|c| c.clone_boxed()) 349 .collect(); 350 Self::run_threaded(stages, input, StageAction::Extract, |innermost, input| { 351 innermost.list(input) 352 }) 353 } 354} 355 356#[cfg(test)] 357mod tests { 358 use super::*; 359 use std::fs; 360 use tempfile::tempdir; 361 362 #[test] 363 fn test_pipeline_compression() -> Result { 364 let temp_dir = tempdir()?; 365 366 let test_content = "This is a test file for pipeline compression"; 367 let test_file_path = temp_dir.path().join("test.txt"); 368 fs::write(&test_file_path, test_content)?; 369 370 let pipeline = Pipeline::new(vec![ 371 Box::new(crate::backends::Tar::default()), 372 Box::new(crate::backends::Gzip::default()), 373 ]); 374 375 let archive_path = temp_dir.path().join("test.tar.gz"); 376 pipeline.compress( 377 CmprssInput::Path(vec![test_file_path.clone()]), 378 CmprssOutput::Path(archive_path.clone()), 379 )?; 380 381 assert!(archive_path.exists()); 382 383 let output_dir = temp_dir.path().join("extracted"); 384 fs::create_dir(&output_dir)?; 385 pipeline.extract( 386 CmprssInput::Path(vec![archive_path.clone()]), 387 CmprssOutput::Path(output_dir.clone()), 388 )?; 389 390 let extracted_file = output_dir.join("test.txt"); 391 assert!(extracted_file.exists()); 392 393 let extracted_content = fs::read_to_string(extracted_file)?; 394 assert_eq!(extracted_content, test_content); 395 396 Ok(()) 397 } 398 399 /// Regression test: per-stage configuration (e.g. `--level 1` vs 400 /// `--level 9` on the outer gzip of a `.tar.gz`) must survive the 401 /// thread-dispatch in `Pipeline::compress`. Previously the pipeline 402 /// reconstructed each stage from its *name* alone, producing a default 403 /// Gzip regardless of the level the user requested. 404 #[test] 405 fn test_pipeline_preserves_stage_config() -> Result { 406 use crate::progress::ProgressArgs; 407 408 let temp_dir = tempdir()?; 409 let input = temp_dir.path().join("input.txt"); 410 // Repetitive content amplifies the level difference in output size. 411 fs::write(&input, "0123456789abcdef".repeat(1024))?; 412 413 let run = |level: i32, suffix: &str| -> Result<u64> { 414 let fast = Pipeline::new(vec![ 415 Box::new(crate::backends::Tar::default()), 416 Box::new(crate::backends::Gzip { 417 compression_level: level, 418 progress_args: ProgressArgs::default(), 419 }), 420 ]); 421 let out = temp_dir.path().join(format!("out.{suffix}.tar.gz")); 422 fast.compress( 423 CmprssInput::Path(vec![input.clone()]), 424 CmprssOutput::Path(out.clone()), 425 )?; 426 Ok(fs::metadata(&out)?.len()) 427 }; 428 429 let fast_size = run(1, "fast")?; 430 let best_size = run(9, "best")?; 431 assert!( 432 best_size < fast_size, 433 "expected best (level 9) to be smaller than fast (level 1), got {best_size} >= {fast_size}", 434 ); 435 436 Ok(()) 437 } 438}