🧱 Chunk is a download manager for slow and unstable servers
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #21 from cuducos/range

Uses HTTP Content-Range for downloading

authored by

Eduardo Cuducos and committed by
GitHub
2b5b262d aa0553d7

+67 -37
+63 -37
main.go
··· 87 87 WaitBetweenRetries time.Duration 88 88 } 89 89 90 - func (d *Downloader) downloadFileWithContext(ctx context.Context, u string) ([]byte, error) { 90 + type chunk struct { 91 + start int64 92 + end int64 93 + } 94 + 95 + func (c chunk) size() int64 { return (c.end + 1) - c.start } 96 + func (c chunk) rangeHeader() string { return fmt.Sprintf("bytes=%d-%d", c.start, c.end) } 97 + func (c chunk) last() bool { return c.end+1 == c.size() } 98 + 99 + func (d *Downloader) downloadChunkWithContext(ctx context.Context, u string, c chunk) ([]byte, error) { 91 100 req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) 92 101 if err != nil { 93 102 return nil, fmt.Errorf("error creating the request for %s: %w", u, err) 94 103 } 104 + req.Header.Set("Range", c.rangeHeader()) 95 105 resp, err := d.client.Do(req) 96 106 if err != nil { 97 107 return nil, fmt.Errorf("error sending a get http request to %s: %w", u, err) 98 108 } 99 109 defer resp.Body.Close() 100 - if resp.StatusCode != http.StatusOK { 110 + if resp.StatusCode != http.StatusPartialContent && resp.StatusCode != http.StatusOK { 101 111 return nil, fmt.Errorf("got http response %s from %s: %w", resp.Status, u, err) 102 112 } 103 113 var b bytes.Buffer ··· 108 118 return b.Bytes(), nil 109 119 } 110 120 111 - func (d *Downloader) downloadFileWithTimeout(userCtx context.Context, u string) ([]byte, error) { 121 + func (d *Downloader) downloadChunkWithTimeout(userCtx context.Context, u string, c chunk) ([]byte, error) { 112 122 ctx, cancel := context.WithTimeout(userCtx, d.TimeoutPerChunk) // need to propagate context, which might contain app-specific data. 113 123 defer cancel() 114 124 ch := make(chan []byte) 115 125 errs := make(chan error) 116 126 go func() { 117 - b, err := d.downloadFileWithContext(ctx, u) 127 + b, err := d.downloadChunkWithContext(ctx, u, c) 118 128 if err != nil { 119 129 errs <- err 120 130 return ··· 174 184 return resp.ContentLength, nil 175 185 } 176 186 177 - func (d *Downloader) downloadFile(ctx context.Context, u string) ([]byte, error) { 187 + func (d *Downloader) downloadChunk(ctx context.Context, u string, c chunk) ([]byte, error) { 178 188 ch := make(chan []byte, 1) 179 189 defer close(ch) 180 190 err := retry.Do( 181 191 func() error { 182 - b, err := d.downloadFileWithTimeout(ctx, u) 192 + b, err := d.downloadChunkWithTimeout(ctx, u, c) 183 193 if err != nil { 184 194 return err 185 195 } ··· 196 206 return b, nil 197 207 } 198 208 199 - type chunk struct { 200 - start int64 201 - end int64 202 - } 203 - 204 - func (c chunk) size() int64 { return (c.end + 1) - c.start } 205 - func (c chunk) rangeHeader() string { return fmt.Sprintf("bytes=%d-%d", c.start, c.end) } 206 - 207 209 func (d *Downloader) chunks(t int64) []chunk { 208 210 var start int64 209 211 last := t - 1 ··· 222 224 return c 223 225 } 224 226 227 + func (d *Downloader) prepareAndStartDownload(ctx context.Context, url string, wg *sync.WaitGroup, ch chan<- DownloadStatus) { 228 + defer wg.Done() 229 + path := filepath.Join(os.TempDir(), filepath.Base(url)) 230 + s := DownloadStatus{URL: url, DownloadedFilePath: path} 231 + t, err := d.getDownloadSize(ctx, url) 232 + if err != nil { 233 + s.Error = fmt.Errorf("error getting file size: %w", err) 234 + ch <- s 235 + return 236 + } 237 + s.FileSizeBytes = t 238 + ch <- s // send total file size to the user 239 + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0644) 240 + if err != nil { 241 + s.Error = fmt.Errorf("error creating %s: %w", path, err) 242 + ch <- s 243 + return 244 + } 245 + defer f.Close() 246 + if err := f.Truncate(int64(t)); err != nil { 247 + s.Error = fmt.Errorf("error truncating %s to %d: %w", path, t, err) 248 + ch <- s 249 + return 250 + } 251 + for _, c := range d.chunks(t) { 252 + b, err := d.downloadChunk(ctx, url, c) 253 + if err != nil { 254 + s.Error = err 255 + ch <- s 256 + return 257 + } 258 + _, err = f.WriteAt(b, int64(c.start)) 259 + if err != nil { 260 + s.Error = fmt.Errorf("error writing to %s: %w", path, err) 261 + ch <- s 262 + return 263 + } 264 + a := int64(len(b)) 265 + if !c.last() { 266 + a -= 1 // adjust for extra EOF bytes 267 + } 268 + s.DownloadedFileBytes += a 269 + ch <- s 270 + } 271 + } 272 + 225 273 // DownloadWithContext is a version of Download that takes a context. The 226 274 // context can be used to stop all downloads in progress. 227 275 func (d *Downloader) DownloadWithContext(ctx context.Context, urls ...string) <-chan DownloadStatus { ··· 232 280 var wg sync.WaitGroup 233 281 for _, u := range urls { 234 282 wg.Add(1) 235 - go func(u string) { 236 - defer wg.Done() 237 - path := filepath.Join(os.TempDir(), filepath.Base(u)) 238 - s := DownloadStatus{URL: u, DownloadedFilePath: path} 239 - defer func() { ch <- s }() 240 - t, err := d.getDownloadSize(ctx, u) 241 - if err != nil { 242 - s.Error = fmt.Errorf("error getting file size: %w", err) 243 - return 244 - } 245 - s.FileSizeBytes = t 246 - ch <- s // send total file size to the user 247 - b, err := d.downloadFile(ctx, u) 248 - if err != nil { 249 - s.Error = err 250 - return 251 - } 252 - if err := os.WriteFile(s.DownloadedFilePath, b, 0655); err != nil { 253 - s.Error = err 254 - return 255 - } 256 - s.DownloadedFileBytes = int64(len(b)) 257 - }(u) 283 + go d.prepareAndStartDownload(ctx, u, &wg, ch) 258 284 } 259 285 go func() { 260 286 wg.Wait()
+4
main_test.go
··· 58 58 func TestDownload_OkWithDefaultDownloader(t *testing.T) { 59 59 s := httptest.NewServer(http.HandlerFunc( 60 60 func(w http.ResponseWriter, r *http.Request) { 61 + if r.Method == http.MethodHead { 62 + w.Header().Add("Content-Length", "2") 63 + return 64 + } 61 65 fmt.Fprint(w, "42") 62 66 }, 63 67 ))