this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Support magic headers

+42 -13
+42 -13
cmd/netsync/main.go
··· 14 14 "time" 15 15 16 16 logging "github.com/ipfs/go-log" 17 + _ "github.com/joho/godotenv/autoload" 17 18 "github.com/prometheus/client_golang/prometheus" 18 19 "github.com/prometheus/client_golang/prometheus/promauto" 19 20 "github.com/prometheus/client_golang/prometheus/promhttp" 21 + 20 22 "golang.org/x/time/rate" 21 23 22 24 "github.com/bluesky-social/indigo/util/version" ··· 68 70 Usage: "path to checkout endpoint", 69 71 Value: "https://bsky.social/xrpc/com.atproto.sync.getCheckout", 70 72 }, 73 + &cli.StringFlag{ 74 + Name: "magic-header-key", 75 + Usage: "header key to send with checkout request", 76 + Value: "", 77 + EnvVars: []string{"MAGIC_HEADER_KEY"}, 78 + }, 79 + &cli.StringFlag{ 80 + Name: "magic-header-val", 81 + Usage: "header value to send with checkout request", 82 + Value: "", 83 + EnvVars: []string{"MAGIC_HEADER_VAL"}, 84 + }, 71 85 } 72 86 73 87 app.Action = Netsync ··· 88 102 EnqueuedRepos map[string]*RepoState 89 103 FinishedRepos map[string]*RepoState 90 104 StatePath string 91 - OutDir string 92 105 CheckoutPath string 106 + 107 + outDir string 108 + magicHeaderKey string 109 + magicHeaderVal string 93 110 94 111 lk sync.RWMutex 95 112 wg sync.WaitGroup ··· 209 226 ctx, cancel := context.WithCancel(ctx) 210 227 defer cancel() 211 228 212 - // Try to resume from state file 213 229 state := &NetsyncState{ 214 230 StatePath: cctx.String("state-file"), 215 231 CheckoutPath: cctx.String("checkout-path"), 216 - OutDir: cctx.String("out-dir"), 217 - workerCount: cctx.Int("worker-count"), 218 - limiter: rate.NewLimiter(rate.Limit(cctx.Float64("checkout-limit")), 1), 219 - exit: make(chan struct{}), 220 - wg: sync.WaitGroup{}, 232 + 233 + outDir: cctx.String("out-dir"), 234 + workerCount: cctx.Int("worker-count"), 235 + limiter: rate.NewLimiter(rate.Limit(cctx.Float64("checkout-limit")), 1), 236 + magicHeaderKey: cctx.String("magic-header-key"), 237 + magicHeaderVal: cctx.String("magic-header-val"), 238 + 239 + exit: make(chan struct{}), 240 + wg: sync.WaitGroup{}, 221 241 client: &http.Client{ 222 242 Timeout: 180 * time.Second, 223 243 }, 224 244 } 225 245 246 + if state.magicHeaderKey != "" && state.magicHeaderVal != "" { 247 + log.Info("using magic header") 248 + } 249 + 226 250 // Create out dir 227 - err := os.MkdirAll(state.OutDir, 0755) 251 + err := os.MkdirAll(state.outDir, 0755) 228 252 if err != nil { 229 253 return err 230 254 } 231 255 256 + // Try to resume from state file 232 257 err = state.Resume() 233 258 if state.EnqueuedRepos == nil { 234 259 state.EnqueuedRepos = make(map[string]*RepoState) ··· 238 263 state.FinishedRepos = make(map[string]*RepoState) 239 264 } 240 265 241 - state.OutDir = cctx.String("out-dir") 242 - 243 266 if err != nil { 244 267 // Read repo list 245 268 repoListFile, err := os.Open(cctx.String("repo-list")) ··· 326 349 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 327 350 328 351 select { 329 - case <-signals: 352 + case sig := <-signals: 330 353 cancel() 331 354 close(state.exit) 332 - log.Info("shutting down on signal") 355 + log.Infof("shutting down on signal: %+v", sig) 333 356 case <-ctx.Done(): 334 357 cancel() 335 358 close(state.exit) ··· 416 439 return cloneState, fmt.Errorf("failed to create request: %w", err) 417 440 } 418 441 442 + req.Header.Set("Accept", "application/vnd.ipld.car") 443 + req.Header.Set("User-Agent", "jaz-atproto-netsync/0.0.1") 444 + if s.magicHeaderKey != "" && s.magicHeaderVal != "" { 445 + req.Header.Set(s.magicHeaderKey, s.magicHeaderVal) 446 + } 447 + 419 448 resp, err := s.client.Do(req) 420 449 if err != nil { 421 450 cloneState = "failed (client.do)" ··· 434 463 defer instrumentedReader.Close() 435 464 436 465 // Write to file 437 - outPath := fmt.Sprintf("%s/%s", s.OutDir, repo) 466 + outPath := fmt.Sprintf("%s/%s", s.outDir, repo) 438 467 outFile, err := os.OpenFile(outPath, os.O_CREATE|os.O_WRONLY, 0644) 439 468 if err != nil { 440 469 cloneState = "failed (file.open)"