🧱 Chunk is a download manager for slow and unstable servers
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #40 from cuducos/use-progress-tracking

Uses progress tracker

authored by

Daniel Fireman and committed by
GitHub
da68b1ea 810efdc6

+59 -22
+4 -2
cmd/chunk/main.go
··· 46 46 chunk.MaxRetries = maxRetriesChunk 47 47 chunk.WaitRetry = waitBetweenRetries 48 48 chunk.ChunkSize = chunkSize 49 + chunk.RestartDownloads = restartDownloads 49 50 prog := newProgress() 50 - for status := range chunk.Download(os.Args[1:len(os.Args)]...) { 51 + for status := range chunk.Download(args...) { 51 52 if status.Error != nil { 52 53 log.Fatal(status.Error) 53 54 } 54 55 prog.update(status) 55 56 } 56 - fmt.Printf("\r%s\nDownloaded to: %s", prog.String(), os.TempDir()) 57 57 return nil 58 58 }, 59 59 } ··· 66 66 maxRetriesChunk uint 67 67 chunkSize int64 68 68 waitBetweenRetries time.Duration 69 + restartDownloads bool 69 70 ) 70 71 71 72 func init() { ··· 75 76 rootCmd.Flags().DurationVarP(&waitBetweenRetries, "wait-retry", "w", chunk.DefaultWaitRetry, "pause before retrying an HTTP request that has failed.") 76 77 rootCmd.Flags().Int64VarP(&chunkSize, "chunk-size", "s", chunk.DefaultChunkSize, "maximum size of each HTTP request done using the content range header.") 77 78 rootCmd.Flags().IntVarP(&concurrencyPerServer, "concurrency-per-server", "c", chunk.DefaultConcurrencyPerServer, "controls the max number of concurrent connections opened to the same server.") 79 + rootCmd.Flags().BoolVarP(&restartDownloads, "force-restart", "f", chunk.DefaultRestartDownload, "restart previous downloads, ignoring where they were stopped") 78 80 } 79 81 80 82 func main() {
+37 -11
downloader.go
··· 9 9 "path/filepath" 10 10 "strings" 11 11 "sync" 12 + "sync/atomic" 12 13 "time" 13 14 14 15 "github.com/avast/retry-go" ··· 20 21 DefaultMaxRetries = 5 21 22 DefaultChunkSize = 8192 22 23 DefaultWaitRetry = 1 * time.Second 24 + DefaultRestartDownload = false 23 25 ) 24 26 25 27 // DownloadStatus is the data propagated via the channel sent back to the user ··· 89 91 // WaitBetweenRetries is an optional pause before retrying an HTTP request 90 92 // that has failed. 91 93 WaitRetry time.Duration 94 + 95 + // RestartDownloads controls whether or not to continue the download of 96 + // previous download attempts, skipping chunks alreadt downloaded. 97 + RestartDownloads bool 92 98 } 93 99 94 - type chunk struct { 95 - start int64 96 - end int64 97 - } 100 + type chunk struct{ start, end int64 } 98 101 99 102 func (c chunk) size() int64 { return (c.end + 1) - c.start } 100 103 func (c chunk) rangeHeader() string { return fmt.Sprintf("bytes=%d-%d", c.start, c.end) } ··· 244 247 ch <- s 245 248 return 246 249 } 250 + chunks := d.chunks(t) 251 + p, err := newProgress(s.DownloadedFilePath, s.URL, d.ChunkSize, len(chunks), d.RestartDownloads) 252 + if err != nil { 253 + s.Error = fmt.Errorf("could not creat a progress file: %w", err) 254 + ch <- s 255 + return 256 + } 247 257 var urlDownload sync.WaitGroup 248 258 defer func() { 249 259 urlDownload.Wait() 260 + p.close() 250 261 f.Close() 251 262 }() 252 263 if err := f.Truncate(int64(t)); err != nil { ··· 254 265 ch <- s 255 266 return 256 267 } 257 - for _, c := range d.chunks(t) { 268 + downloadedBytes := p.downloadedBytes() 269 + for idx, c := range chunks { 270 + pending, err := p.shouldDownload(idx) 271 + if err != nil { 272 + s.Error = fmt.Errorf("could not determine whether chunk #%d is pending: %w", idx+1, err) 273 + ch <- s 274 + return 275 + } 276 + if !pending { 277 + continue 278 + } 258 279 urlDownload.Add(1) 259 - go func(c chunk, s DownloadStatus) { 280 + go func(c chunk, idx int, s DownloadStatus) { 260 281 defer urlDownload.Done() 261 282 b, err := d.downloadChunk(ctx, url, c) 262 283 if err != nil { 263 - s.Error = err 284 + s.Error = fmt.Errorf("error downloadinf chunk #%d: %w", idx+1, err) 264 285 ch <- s 265 286 return 266 287 } 267 - n, err := f.WriteAt(b, c.start) 268 - if err != nil { 288 + written := int64(len(b)) 289 + if _, err := f.WriteAt(b, c.start); err != nil { 269 290 s.Error = fmt.Errorf("error writing to %s: %w", path, err) 270 291 ch <- s 271 292 return 272 293 } 273 - s.DownloadedFileBytes += int64(n) 294 + if err := p.done(idx, written); err != nil { 295 + s.Error = fmt.Errorf("error checking chunk #%d as done: %w", idx+1, err) 296 + ch <- s 297 + return 298 + } 299 + s.DownloadedFileBytes = atomic.AddInt64(&downloadedBytes, written) 274 300 ch <- s 275 - }(c, s) 301 + }(c, idx, s) 276 302 } 277 303 } 278 304
+13 -4
progress.go
··· 36 36 URL string 37 37 Path string 38 38 ChunkSize int64 39 - Chunks []uint32 39 + Chunks []int64 40 40 } 41 41 42 42 // trues to loads a download progress from a file ··· 91 91 } 92 92 93 93 // marks the chunk number `idx` as done (ie successfully downloaded) 94 - func (p *progress) done(idx int) error { 94 + func (p *progress) done(idx int, bytes int64) error { 95 95 if !p.isValidIndex(idx) { 96 96 return fmt.Errorf("%s does not have chunk #%d", p.Path, idx+1) 97 97 } 98 98 p.lock.Lock() 99 99 defer p.lock.Unlock() 100 - atomic.StoreUint32(&p.Chunks[idx], 1) 100 + atomic.StoreInt64(&p.Chunks[idx], bytes) 101 101 f, err := os.Create(p.path) 102 102 if err != nil { 103 103 return fmt.Errorf("error opening progress file %s: %w", p.path, err) ··· 138 138 return nil // Either not empty or error, suits both cases 139 139 } 140 140 141 + // calculates the number of bytes downloaded 142 + func (p *progress) downloadedBytes() int64 { 143 + var downloaded int64 144 + for _, c := range p.Chunks { 145 + downloaded += c 146 + } 147 + return downloaded 148 + } 149 + 141 150 func newProgress(path, url string, chunkSize int64, chunks int, restart bool) (*progress, error) { 142 151 dir, err := getChunkDirectory() 143 152 if err != nil { ··· 157 166 URL: url, 158 167 Path: abs, 159 168 ChunkSize: chunkSize, 160 - Chunks: make([]uint32, chunks), 169 + Chunks: make([]int64, chunks), 161 170 } 162 171 if err := p.load(restart); err != nil { 163 172 return nil, fmt.Errorf("error loading existing progress file: %w", err)
+5 -5
progress_test.go
··· 14 14 if err != nil { 15 15 t.Errorf("expected no error creating the progress, got %s", err) 16 16 } 17 - if err := p.done(1); err != nil { 17 + if err := p.done(1, 3); err != nil { 18 18 t.Errorf("expected no error marking chunk as done, got %s", err) 19 19 } 20 20 for i := 0; i < 3; i++ { ··· 53 53 wg.Add(1) 54 54 go func(i int) { 55 55 defer wg.Done() 56 - errs <- p.done(i) 56 + errs <- p.done(i, 2048) 57 57 }(i) 58 58 } 59 59 for i := 0; i < 2048; i++ { ··· 78 78 if err != nil { 79 79 t.Errorf("expected no error creating the old progress, got %s", err) 80 80 } 81 - old.done(1) 81 + old.done(1, 3) 82 82 old.close() 83 83 84 84 p, err := newProgress(name, "https://test.etc/chunk.zip", 5, 3, false) ··· 115 115 if err != nil { 116 116 t.Errorf("expected no error creating the old progress, got %s", err) 117 117 } 118 - old.done(1) 118 + old.done(1, 3) 119 119 old.close() 120 120 121 121 if _, err := newProgress(name, "https://test.etc/chunk.zip", 10, 3, false); err == nil { ··· 130 130 if err != nil { 131 131 t.Errorf("expected no error creating the old progress, got %s", err) 132 132 } 133 - old.done(1) 133 + old.done(1, 3) 134 134 old.close() 135 135 136 136 p, err := newProgress(name, "https://test.etc/chunk.zip", 10, 3, true)