this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(pipeline): share threaded stage driver across compress/extract/list

+86 -130
+86 -130
src/backends/pipeline.rs
··· 13 13 compressors: Vec<Box<dyn Compressor>>, 14 14 } 15 15 16 + /// Which method intermediate (threaded) stages should invoke. The final stage 17 + /// always runs on the calling thread and is handled by a caller-supplied 18 + /// closure — only the intermediate layers need this dispatch. 19 + #[derive(Clone, Copy)] 20 + enum StageAction { 21 + Compress, 22 + Extract, 23 + } 24 + 16 25 impl Pipeline { 17 26 /// Create a new Pipeline with the given compressors 18 27 pub fn new(compressors: Vec<Box<dyn Compressor>>) -> Self { ··· 26 35 .map(|c| c.extension()) 27 36 .collect::<Vec<&str>>() 28 37 .join(".") 38 + } 39 + 40 + /// Run an ordered chain of compressor stages, with each non-final stage 41 + /// in its own thread linked by an in-memory pipe. The final (last) stage 42 + /// runs on the calling thread via `finalize`. Intermediate stages all 43 + /// invoke the same method — `compress` going outward through a 44 + /// compression pipeline, `extract` unwrapping layers on the way in. 45 + fn run_threaded<F>( 46 + stages: Vec<Box<dyn Compressor>>, 47 + initial_input: CmprssInput, 48 + intermediate: StageAction, 49 + finalize: F, 50 + ) -> Result 51 + where 52 + F: FnOnce(Box<dyn Compressor>, CmprssInput) -> Result, 53 + { 54 + debug_assert!(!stages.is_empty(), "pipeline is never empty"); 55 + let mut stages = stages; 56 + let last = stages.pop().expect("pipeline is never empty"); 57 + let buffer_size = 64 * 1024; 58 + let mut current_input = initial_input; 59 + let mut handles = Vec::new(); 60 + 61 + for stage in stages { 62 + let (sender, receiver) = channel::<Vec<u8>>(); 63 + let stage_output = 64 + CmprssOutput::Writer(WriteWrapper(Box::new(PipeWriter::new(sender, buffer_size)))); 65 + let next_input = CmprssInput::Reader(ReadWrapper(Box::new(PipeReader::new(receiver)))); 66 + let stage_input = std::mem::replace(&mut current_input, next_input); 67 + 68 + let handle = thread::spawn(move || match intermediate { 69 + StageAction::Compress => stage.compress(stage_input, stage_output), 70 + StageAction::Extract => stage.extract(stage_input, stage_output), 71 + }); 72 + handles.push(handle); 73 + } 74 + 75 + finalize(last, current_input)?; 76 + 77 + for handle in handles { 78 + handle 79 + .join() 80 + .map_err(|_| anyhow!("Pipeline stage thread panicked"))??; 81 + } 82 + Ok(()) 29 83 } 30 84 } 31 85 ··· 201 255 202 256 fn compress(&self, input: CmprssInput, output: CmprssOutput) -> Result { 203 257 debug_assert!(!self.compressors.is_empty(), "pipeline is never empty"); 204 - 205 258 if self.compressors.len() == 1 { 206 259 return self.compressors[0].compress(input, output); 207 260 } 208 - 209 - let mut op_compressors: Vec<Box<dyn Compressor>> = 210 - self.compressors.iter().map(|c| c.clone_boxed()).collect(); 211 - 212 - let mut handles = Vec::new(); 213 - let mut current_thread_input = input; // Consumed by the first (innermost) compressor 214 - let buffer_size = 64 * 1024; 215 - 216 - // Process all but the last (outermost) compressor in separate threads 217 - for _ in 0..op_compressors.len() - 1 { 218 - let compressor_for_this_stage = op_compressors.remove(0); 219 - let (sender, receiver) = channel::<Vec<u8>>(); 220 - let pipe_writer = PipeWriter::new(sender, buffer_size); 221 - let input_for_next_stage = 222 - CmprssInput::Reader(ReadWrapper(Box::new(PipeReader::new(receiver)))); 223 - 224 - let actual_input_for_thread = current_thread_input; // Move current input to thread 225 - current_thread_input = input_for_next_stage; // Set up input for the *next* stage or final compressor 226 - 227 - let handle = thread::spawn(move || { 228 - compressor_for_this_stage.compress( 229 - actual_input_for_thread, 230 - CmprssOutput::Writer(WriteWrapper(Box::new(pipe_writer))), 231 - ) 232 - }); 233 - handles.push(handle); 234 - } 235 - 236 - // The last (outermost) compressor runs in the current thread and writes to the final output 237 - let last_compressor = op_compressors.remove(0); 238 - // current_thread_input here is the Reader from the penultimate stage 239 - last_compressor.compress(current_thread_input, output)?; 240 - 241 - for handle in handles { 242 - handle 243 - .join() 244 - .map_err(|_| anyhow!("Compression thread panicked"))??; 245 - } 246 - Ok(()) 261 + // Innermost → outermost: the outermost compressor runs on the main 262 + // thread and writes to the user-supplied output. 263 + let stages = self.compressors.iter().map(|c| c.clone_boxed()).collect(); 264 + Self::run_threaded(stages, input, StageAction::Compress, |last, input| { 265 + last.compress(input, output) 266 + }) 247 267 } 248 268 249 269 fn extract(&self, input: CmprssInput, output: CmprssOutput) -> Result { 250 270 debug_assert!(!self.compressors.is_empty(), "pipeline is never empty"); 251 - 252 271 if self.compressors.len() == 1 { 253 272 return self.compressors[0].extract(input, output); 254 273 } 255 - 256 - let mut op_extractors: Vec<Box<dyn Compressor>> = self 274 + // Outermost → innermost: the innermost extractor (typically the 275 + // container format like tar/zip) runs on the main thread so it can 276 + // unpack into the user-supplied output. 277 + let stages = self 257 278 .compressors 258 279 .iter() 259 280 .rev() 260 281 .map(|c| c.clone_boxed()) 261 282 .collect(); 262 - 263 - let mut handles = Vec::new(); 264 - let mut current_thread_input = input; // Consumed by the first (outermost) extractor 265 - let buffer_size = 64 * 1024; 266 - 267 - // Process all but the last (innermost) extractor in separate threads. 268 - for _ in 0..op_extractors.len() - 1 { 269 - let extractor_for_this_stage = op_extractors.remove(0); 270 - let (sender, receiver) = channel::<Vec<u8>>(); 271 - let pipe_writer = PipeWriter::new(sender, buffer_size); 272 - let intermediate_output_for_thread = 273 - CmprssOutput::Writer(WriteWrapper(Box::new(pipe_writer))); 274 - let input_for_next_stage = 275 - CmprssInput::Reader(ReadWrapper(Box::new(PipeReader::new(receiver)))); 276 - 277 - let actual_input_for_thread = current_thread_input; // Move current input to thread 278 - current_thread_input = input_for_next_stage; // Set up input for the *next* stage or final extractor 279 - 280 - let handle = thread::spawn(move || { 281 - extractor_for_this_stage 282 - .extract(actual_input_for_thread, intermediate_output_for_thread) 283 - }); 284 - handles.push(handle); 285 - } 286 - 287 - // The last (innermost) extractor runs in the current thread and writes to the final output 288 - let last_extractor = op_extractors.remove(0); 289 - // current_thread_input here is the Reader from the penultimate stage 290 - 291 - let final_output = match output { 292 - CmprssOutput::Path(ref p) => { 293 - if last_extractor.default_extracted_target() == ExtractedTarget::Directory 294 - && !p.exists() 295 - { 296 - std::fs::create_dir_all(p)?; 283 + Self::run_threaded(stages, input, StageAction::Extract, |last, input| { 284 + let final_output = match output { 285 + CmprssOutput::Path(ref p) => { 286 + // If the innermost extractor wants a directory and the 287 + // user's output path doesn't exist yet, create it so 288 + // e.g. tar::unpack has somewhere to write. 289 + if last.default_extracted_target() == ExtractedTarget::Directory && !p.exists() 290 + { 291 + std::fs::create_dir_all(p)?; 292 + } 293 + CmprssOutput::Path(p.clone()) 297 294 } 298 - // If it's a directory, the tar extractor (usually innermost) will handle it. 299 - // The path provided should be the target directory. 300 - // Always pass the path; the backend decides how to use it. 301 - CmprssOutput::Path(p.clone()) 302 - } 303 - CmprssOutput::Pipe(_) => output, 304 - CmprssOutput::Writer(_) => output, 305 - }; 306 - 307 - last_extractor.extract(current_thread_input, final_output)?; 308 - 309 - for handle in handles { 310 - handle 311 - .join() 312 - .map_err(|_| anyhow!("Extraction thread panicked"))??; 313 - } 314 - Ok(()) 295 + CmprssOutput::Pipe(_) | CmprssOutput::Writer(_) => output, 296 + }; 297 + last.extract(input, final_output) 298 + }) 315 299 } 316 300 317 301 fn list(&self, input: CmprssInput) -> Result { 318 302 debug_assert!(!self.compressors.is_empty(), "pipeline is never empty"); 319 - 320 303 if self.compressors.len() == 1 { 321 304 return self.compressors[0].list(input); 322 305 } 323 - 324 306 // Same plumbing as `extract`, except the innermost compressor lists 325 - // its entries to stdout instead of unpacking to an output path. Outer 326 - // layers still need to decompress into an in-memory pipe so that the 327 - // innermost container format sees plain archive bytes. 328 - let mut op_extractors: Vec<Box<dyn Compressor>> = self 307 + // its entries to stdout instead of unpacking. Outer layers still 308 + // decompress into the in-memory pipe so the innermost container sees 309 + // plain archive bytes. 310 + let stages = self 329 311 .compressors 330 312 .iter() 331 313 .rev() 332 314 .map(|c| c.clone_boxed()) 333 315 .collect(); 334 - 335 - let mut handles = Vec::new(); 336 - let mut current_thread_input = input; 337 - let buffer_size = 64 * 1024; 338 - 339 - for _ in 0..op_extractors.len() - 1 { 340 - let extractor = op_extractors.remove(0); 341 - let (sender, receiver) = channel::<Vec<u8>>(); 342 - let pipe_writer = PipeWriter::new(sender, buffer_size); 343 - let stage_output = CmprssOutput::Writer(WriteWrapper(Box::new(pipe_writer))); 344 - let next_stage_input = 345 - CmprssInput::Reader(ReadWrapper(Box::new(PipeReader::new(receiver)))); 346 - 347 - let stage_input = current_thread_input; 348 - current_thread_input = next_stage_input; 349 - 350 - let handle = thread::spawn(move || extractor.extract(stage_input, stage_output)); 351 - handles.push(handle); 352 - } 353 - 354 - let innermost = op_extractors.remove(0); 355 - innermost.list(current_thread_input)?; 356 - 357 - for handle in handles { 358 - handle 359 - .join() 360 - .map_err(|_| anyhow!("Extraction thread panicked"))??; 361 - } 362 - Ok(()) 316 + Self::run_threaded(stages, input, StageAction::Extract, |innermost, input| { 317 + innermost.list(input) 318 + }) 363 319 } 364 320 } 365 321