🧱 Chunk is a download manager for slow and unstable servers
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #27 from cuducos/parallel_downloads

Parallel downloads

authored by

Daniel Fireman and committed by
GitHub
9dd74ab5 6a7bf292

+50 -34
+50 -31
downloader.go
··· 69 69 // this is the total of parallel requests. If the user is downloading files 70 70 // from different servers (including different subdomains), this limit is 71 71 // applied to each server idependently. 72 - MaxParallelDownloadsPerServer uint 72 + MaxParallelDownloadsPerServer int 73 73 74 74 // MaxRetriesPerChunk is the maximum amount of retries for each HTTP request 75 75 // using the content range header that fails. ··· 143 143 } 144 144 145 145 func (d *Downloader) getDownloadSize(ctx context.Context, u string) (int64, error) { 146 - req, err := http.NewRequestWithContext(ctx, http.MethodHead, u, nil) 147 - if err != nil { 148 - return 0, fmt.Errorf("creating the request for %s: %w", u, err) 149 - } 150 146 ch := make(chan *http.Response, 1) 151 147 defer close(ch) 152 - err = retry.Do( 148 + err := retry.Do( 153 149 func() error { 150 + req, err := http.NewRequestWithContext(ctx, http.MethodHead, u, nil) 151 + if err != nil { 152 + return fmt.Errorf("creating the request for %s: %w", u, err) 153 + } 154 154 resp, err := d.client.Do(req) 155 155 if err != nil { 156 - return err 156 + return fmt.Errorf("dispatching the request for %s: %w", u, err) 157 157 } 158 158 if resp.StatusCode != 200 { 159 159 return fmt.Errorf("got unexpected http response status for %s: %s", u, resp.Status) ··· 161 161 ch <- resp 162 162 return nil 163 163 }, 164 - retry.Attempts(d.MaxParallelDownloadsPerServer), 164 + retry.Attempts(d.MaxRetriesPerChunk), 165 165 retry.MaxDelay(d.WaitBetweenRetries), 166 166 ) 167 167 if err != nil { ··· 222 222 return c 223 223 } 224 224 225 - func (d *Downloader) prepareAndStartDownload(ctx context.Context, url string, wg *sync.WaitGroup, ch chan<- DownloadStatus) { 226 - defer wg.Done() 225 + func (d *Downloader) prepareAndStartDownload(ctx context.Context, url string, ch chan<- DownloadStatus) { 227 226 path := filepath.Join(os.TempDir(), filepath.Base(url)) 228 227 s := DownloadStatus{URL: url, DownloadedFilePath: path} 229 228 t, err := d.getDownloadSize(ctx, url) ··· 240 239 ch <- s 241 240 return 242 241 } 243 - defer f.Close() 242 + var urlDownload sync.WaitGroup 243 + defer func() { 244 + urlDownload.Wait() 245 + f.Close() 246 + }() 244 247 if err := f.Truncate(int64(t)); err != nil { 245 248 s.Error = fmt.Errorf("error truncating %s to %d: %w", path, t, err) 246 249 ch <- s 247 250 return 248 251 } 249 252 for _, c := range d.chunks(t) { 250 - b, err := d.downloadChunk(ctx, url, c) 251 - if err != nil { 252 - s.Error = err 253 + urlDownload.Add(1) 254 + go func(c chunk) { 255 + defer urlDownload.Done() 256 + b, err := d.downloadChunk(ctx, url, c) 257 + if err != nil { 258 + s.Error = err 259 + ch <- s 260 + return 261 + } 262 + n, err := f.WriteAt(b, c.start) 263 + if err != nil { 264 + s.Error = fmt.Errorf("error writing to %s: %w", path, err) 265 + ch <- s 266 + return 267 + } 268 + s.DownloadedFileBytes += int64(n) 253 269 ch <- s 254 - return 255 - } 256 - _, err = f.WriteAt(b, int64(c.start)) 257 - if err != nil { 258 - s.Error = fmt.Errorf("error writing to %s: %w", path, err) 259 - ch <- s 260 - return 261 - } 262 - s.DownloadedFileBytes += int64(len(b)) 263 - ch <- s 270 + }(c) 264 271 } 265 272 } 266 273 ··· 268 275 // context can be used to stop all downloads in progress. 269 276 func (d *Downloader) DownloadWithContext(ctx context.Context, urls ...string) <-chan DownloadStatus { 270 277 if d.client == nil { 271 - d.client = &http.Client{Timeout: d.TimeoutPerChunk} 278 + d.client = newClient(d.MaxParallelDownloadsPerServer, d.TimeoutPerChunk) 272 279 } 273 - ch := make(chan DownloadStatus) 274 - var wg sync.WaitGroup 280 + ch := make(chan DownloadStatus, 2*len(urls)) // the first status will be the total file size (and or an error creating/trucating the file). 281 + var wg sync.WaitGroup // this wait group is used to wait for all chunks (from all downloads) to finish. 275 282 for _, u := range urls { 276 283 wg.Add(1) 277 - go d.prepareAndStartDownload(ctx, u, &wg, ch) 284 + go func(u string) { 285 + d.prepareAndStartDownload(ctx, u, ch) 286 + wg.Done() 287 + }(u) 288 + 278 289 } 279 290 go func() { 280 291 wg.Wait() ··· 292 303 // NewDownloader creates a downloader with the defalt configuration. Check 293 304 // the constants in this package for their values. 294 305 func DefaultDownloader() *Downloader { 295 - d := Downloader{ 306 + return &Downloader{ 296 307 TimeoutPerChunk: DefaultTimeoutPerChunk, 297 308 MaxParallelDownloadsPerServer: DefaultMaxParallelDownloadsPerServer, 298 309 MaxRetriesPerChunk: DefaultMaxRetriesPerChunk, 299 310 ChunkSize: DefaultChunkSize, 300 311 WaitBetweenRetries: DefaultWaitBetweenRetries, 312 + client: newClient(DefaultMaxRetriesPerChunk, DefaultTimeoutPerChunk), 301 313 } 302 - d.client = &http.Client{Timeout: d.TimeoutPerChunk} 314 + } 303 315 304 - return &d 316 + func newClient(maxParallelDownloadsPerServer int, timeoutPerChunk time.Duration) *http.Client { 317 + t := http.DefaultTransport.(*http.Transport).Clone() 318 + t.MaxConnsPerHost = maxParallelDownloadsPerServer 319 + t.MaxIdleConnsPerHost = maxParallelDownloadsPerServer 320 + return &http.Client{ 321 + Timeout: timeoutPerChunk, 322 + Transport: t, 323 + } 305 324 }
-3
downloader_test.go
··· 316 316 attempts := int32(0) 317 317 s := httptest.NewServer(http.HandlerFunc( 318 318 func(w http.ResponseWriter, r *http.Request) { 319 - fmt.Printf("attempts = %d\n", atomic.LoadInt32(&attempts)) // TODO: remove 320 319 if atomic.CompareAndSwapInt32(&attempts, 0, 1) { 321 320 w.WriteHeader(http.StatusTooManyRequests) 322 321 return ··· 327 326 defer s.Close() 328 327 329 328 d := DefaultDownloader() 330 - fmt.Printf("d.MaxRetriesPerChunk = %d\n", d.MaxRetriesPerChunk) // TODO: remove 331 - fmt.Printf("d.WaitBetweenRetries = %v\n", d.WaitBetweenRetries) // TODO: remove 332 329 got, err := d.getDownloadSize(context.Background(), s.URL) 333 330 334 331 if err != nil {