this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add Netsync, a tool to download all repos in the network for replay

+441
+441
cmd/netsync/main.go
··· 1 + package main 2 + 3 + import ( 4 + "bufio" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "io" 9 + "net/http" 10 + "os" 11 + "os/signal" 12 + "sync" 13 + "syscall" 14 + "time" 15 + 16 + logging "github.com/ipfs/go-log" 17 + "github.com/prometheus/client_golang/prometheus" 18 + "github.com/prometheus/client_golang/prometheus/promauto" 19 + "github.com/prometheus/client_golang/prometheus/promhttp" 20 + "golang.org/x/time/rate" 21 + 22 + "github.com/bluesky-social/indigo/util/version" 23 + "github.com/urfave/cli/v2" 24 + ) 25 + 26 + var log = logging.Logger("netsync") 27 + 28 + func main() { 29 + app := cli.App{ 30 + Name: "netsync", 31 + Usage: "atproto network cloning tool", 32 + Version: version.Version, 33 + } 34 + 35 + app.Flags = []cli.Flag{ 36 + &cli.IntFlag{ 37 + Name: "port", 38 + Usage: "listen port for metrics server", 39 + Value: 8753, 40 + }, 41 + &cli.IntFlag{ 42 + Name: "worker-count", 43 + Usage: "number of workers to run concurrently", 44 + Value: 10, 45 + }, 46 + &cli.Float64Flag{ 47 + Name: "checkout-limit", 48 + Usage: "maximum number of repos per second to checkout", 49 + Value: 4, 50 + }, 51 + &cli.StringFlag{ 52 + Name: "out-dir", 53 + Usage: "directory to write cloned repos to", 54 + Value: "netsync-out", 55 + }, 56 + &cli.StringFlag{ 57 + Name: "repo-list", 58 + Usage: "path to file containing list of repos to clone", 59 + Value: "repos.txt", 60 + }, 61 + &cli.StringFlag{ 62 + Name: "state-file", 63 + Usage: "path to file to write state to", 64 + Value: "state.json", 65 + }, 66 + &cli.StringFlag{ 67 + Name: "checkout-path", 68 + Usage: "path to checkout endpoint", 69 + Value: "https://bsky.social/xrpc/com.atproto.sync.getCheckout", 70 + }, 71 + } 72 + 73 + app.Action = Netsync 74 + 75 + err := app.Run(os.Args) 76 + if err != nil { 77 + log.Fatal(err) 78 + } 79 + } 80 + 81 + type RepoState struct { 82 + Repo string 83 + State string 84 + FinishedAt time.Time 85 + } 86 + 87 + type NetsyncState struct { 88 + EnqueuedRepos map[string]*RepoState 89 + FinishedRepos map[string]*RepoState 90 + StatePath string 91 + OutDir string 92 + CheckoutPath string 93 + 94 + lk sync.RWMutex 95 + wg sync.WaitGroup 96 + exit chan struct{} 97 + limiter *rate.Limiter 98 + workerCount int 99 + client *http.Client 100 + } 101 + 102 + type instrumentedReader struct { 103 + source io.ReadCloser 104 + counter prometheus.Counter 105 + } 106 + 107 + func (r instrumentedReader) Read(b []byte) (int, error) { 108 + n, err := r.source.Read(b) 109 + r.counter.Add(float64(n)) 110 + return n, err 111 + } 112 + 113 + func (r instrumentedReader) Close() error { 114 + var buf [32]byte 115 + var n int 116 + var err error 117 + for err == nil { 118 + n, err = r.source.Read(buf[:]) 119 + r.counter.Add(float64(n)) 120 + } 121 + closeerr := r.source.Close() 122 + if err != nil && err != io.EOF { 123 + return err 124 + } 125 + return closeerr 126 + } 127 + 128 + func (s *NetsyncState) Save() error { 129 + s.lk.RLock() 130 + defer s.lk.RUnlock() 131 + 132 + stateFile, err := os.OpenFile(s.StatePath, os.O_CREATE|os.O_WRONLY, 0644) 133 + if err != nil { 134 + return err 135 + } 136 + defer stateFile.Close() 137 + 138 + stateBytes, err := json.Marshal(s) 139 + if err != nil { 140 + return err 141 + } 142 + 143 + _, err = stateFile.Write(stateBytes) 144 + return err 145 + } 146 + 147 + func (s *NetsyncState) Resume() error { 148 + stateFile, err := os.Open(s.StatePath) 149 + if err != nil { 150 + return err 151 + } 152 + 153 + stateBytes, err := io.ReadAll(stateFile) 154 + if err != nil { 155 + return err 156 + } 157 + 158 + err = json.Unmarshal(stateBytes, s) 159 + if err != nil { 160 + return err 161 + } 162 + 163 + return nil 164 + } 165 + 166 + func (s *NetsyncState) Dequeue() string { 167 + s.lk.Lock() 168 + defer s.lk.Unlock() 169 + 170 + for repo, state := range s.EnqueuedRepos { 171 + if state.State == "enqueued" { 172 + state.State = "dequeued" 173 + return repo 174 + } 175 + } 176 + 177 + return "" 178 + } 179 + 180 + func (s *NetsyncState) Finish(repo string, state string) { 181 + s.lk.Lock() 182 + defer s.lk.Unlock() 183 + 184 + s.FinishedRepos[repo] = &RepoState{ 185 + Repo: repo, 186 + State: state, 187 + FinishedAt: time.Now(), 188 + } 189 + 190 + delete(s.EnqueuedRepos, repo) 191 + } 192 + 193 + func Netsync(cctx *cli.Context) error { 194 + ctx := cctx.Context 195 + ctx, cancel := context.WithCancel(ctx) 196 + defer cancel() 197 + 198 + // Try to resume from state file 199 + state := &NetsyncState{ 200 + StatePath: cctx.String("state-file"), 201 + CheckoutPath: cctx.String("checkout-path"), 202 + OutDir: cctx.String("out-dir"), 203 + workerCount: cctx.Int("worker-count"), 204 + limiter: rate.NewLimiter(rate.Limit(cctx.Float64("checkout-limit")), 1), 205 + exit: make(chan struct{}), 206 + wg: sync.WaitGroup{}, 207 + client: &http.Client{ 208 + Timeout: 180 * time.Second, 209 + }, 210 + } 211 + 212 + // Create out dir 213 + err := os.MkdirAll(state.OutDir, 0755) 214 + if err != nil { 215 + return err 216 + } 217 + 218 + err = state.Resume() 219 + if state.EnqueuedRepos == nil { 220 + state.EnqueuedRepos = make(map[string]*RepoState) 221 + } 222 + 223 + if state.FinishedRepos == nil { 224 + state.FinishedRepos = make(map[string]*RepoState) 225 + } 226 + 227 + if err != nil { 228 + // Read repo list 229 + repoListFile, err := os.Open(cctx.String("repo-list")) 230 + if err != nil { 231 + return err 232 + } 233 + 234 + fileScanner := bufio.NewScanner(repoListFile) 235 + fileScanner.Split(bufio.ScanLines) 236 + 237 + for fileScanner.Scan() { 238 + repo := fileScanner.Text() 239 + state.EnqueuedRepos[repo] = &RepoState{ 240 + Repo: repo, 241 + State: "enqueued", 242 + } 243 + } 244 + } else { 245 + log.Info("Resuming from state file") 246 + } 247 + 248 + // Start metrics server 249 + mux := http.NewServeMux() 250 + mux.Handle("/metrics", promhttp.Handler()) 251 + 252 + metricsServer := &http.Server{ 253 + Addr: fmt.Sprintf(":%d", cctx.Int("port")), 254 + Handler: mux, 255 + } 256 + 257 + go func() { 258 + state.wg.Add(1) 259 + defer state.wg.Done() 260 + if err := metricsServer.ListenAndServe(); err != http.ErrServerClosed { 261 + log.Fatalf("failed to start metrics server: %+v", err) 262 + } 263 + log.Info("metrics server shut down successfully") 264 + }() 265 + 266 + // Start workers 267 + for i := 0; i < state.workerCount; i++ { 268 + state.wg.Add(1) 269 + go func(id int) { 270 + defer state.wg.Done() 271 + err := state.worker(id) 272 + if err != nil { 273 + log.Errorw("worker failed", "err", err) 274 + } 275 + }(i) 276 + } 277 + 278 + // Check for empty queue 279 + go func() { 280 + state.wg.Add(1) 281 + defer state.wg.Done() 282 + t := time.NewTicker(30 * time.Second) 283 + for { 284 + select { 285 + case <-ctx.Done(): 286 + err := state.Save() 287 + if err != nil { 288 + log.Errorw("failed to save state", "err", err) 289 + } 290 + return 291 + case <-t.C: 292 + err := state.Save() 293 + if err != nil { 294 + log.Errorw("failed to save state", "err", err) 295 + } 296 + state.lk.RLock() 297 + if len(state.EnqueuedRepos) == 0 { 298 + log.Info("no more repos to clone, shutting down") 299 + close(state.exit) 300 + return 301 + } 302 + state.lk.RUnlock() 303 + } 304 + } 305 + }() 306 + 307 + // Trap SIGINT to trigger a shutdown. 308 + log.Info("listening for signals") 309 + signals := make(chan os.Signal, 1) 310 + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 311 + 312 + select { 313 + case <-signals: 314 + cancel() 315 + close(state.exit) 316 + log.Info("shutting down on signal") 317 + case <-ctx.Done(): 318 + cancel() 319 + close(state.exit) 320 + log.Info("shutting down on context done") 321 + case <-state.exit: 322 + cancel() 323 + log.Info("shutting down on exit signal") 324 + } 325 + 326 + log.Info("shutting down, waiting for workers to clean up...") 327 + 328 + if err := metricsServer.Shutdown(ctx); err != nil { 329 + log.Errorf("failed to shut down metrics server: %+v", err) 330 + } 331 + 332 + state.wg.Wait() 333 + 334 + log.Info("shut down successfully") 335 + 336 + return nil 337 + 338 + } 339 + 340 + func (s *NetsyncState) worker(id int) error { 341 + log := log.With("worker", id) 342 + log.Infow("starting worker") 343 + defer log.Infow("worker stopped") 344 + for { 345 + select { 346 + case <-s.exit: 347 + log.Info("worker exiting due to exit signal") 348 + return nil 349 + default: 350 + ctx := context.Background() 351 + // Dequeue repo 352 + repo := s.Dequeue() 353 + if repo == "" { 354 + // No more repos to clone 355 + return nil 356 + } 357 + 358 + // Wait for rate limiter 359 + s.limiter.Wait(ctx) 360 + 361 + // Clone repo 362 + cloneState, err := s.cloneRepo(ctx, repo) 363 + if err != nil { 364 + log.Errorw("failed to clone repo", "repo", repo, "err", err) 365 + } 366 + 367 + // Update state 368 + s.Finish(repo, cloneState) 369 + log.Infow("worker finished", "repo", repo, "status", cloneState) 370 + } 371 + } 372 + } 373 + 374 + var repoCloneDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 375 + Name: "netsync_repo_clone_duration_seconds", 376 + Help: "Duration of repo clone operations", 377 + }, []string{"status"}) 378 + 379 + var bytesProcessed = promauto.NewCounter(prometheus.CounterOpts{ 380 + Name: "netsync_bytes_processed", 381 + Help: "Number of bytes processed", 382 + }) 383 + 384 + func (s *NetsyncState) cloneRepo(ctx context.Context, repo string) (cloneState string, err error) { 385 + log := log.With("repo", repo, "source", "cloneRepo") 386 + log.Infow("cloning repo") 387 + 388 + start := time.Now() 389 + defer func() { 390 + duration := time.Since(start) 391 + repoCloneDuration.WithLabelValues(cloneState).Observe(duration.Seconds()) 392 + }() 393 + 394 + var url = fmt.Sprintf("%s?did=%s", s.CheckoutPath, repo) 395 + 396 + // Clone repo 397 + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 398 + if err != nil { 399 + cloneState = "failed (request-creation)" 400 + return cloneState, fmt.Errorf("failed to create request: %w", err) 401 + } 402 + 403 + resp, err := s.client.Do(req) 404 + if err != nil { 405 + cloneState = "failed (client.do)" 406 + return cloneState, fmt.Errorf("failed to get repo: %w", err) 407 + } 408 + 409 + if resp.StatusCode != http.StatusOK { 410 + cloneState = fmt.Sprintf("failed (status: %d)", resp.StatusCode) 411 + return cloneState, fmt.Errorf("failed to get repo: %s", resp.Status) 412 + } 413 + 414 + instrumentedReader := instrumentedReader{ 415 + source: resp.Body, 416 + counter: bytesProcessed, 417 + } 418 + defer instrumentedReader.Close() 419 + 420 + // Write to file 421 + outPath := fmt.Sprintf("%s/%s", s.OutDir, repo) 422 + outFile, err := os.OpenFile(outPath, os.O_CREATE|os.O_WRONLY, 0644) 423 + if err != nil { 424 + cloneState = "failed (file.open)" 425 + return cloneState, fmt.Errorf("failed to open file: %w", err) 426 + } 427 + 428 + _, err = io.Copy(outFile, instrumentedReader) 429 + if err != nil { 430 + cloneState = "failed (file.copy)" 431 + return cloneState, fmt.Errorf("failed to copy file: %w", err) 432 + } 433 + 434 + err = outFile.Close() 435 + if err != nil { 436 + cloneState = "failed (file.close)" 437 + return cloneState, fmt.Errorf("failed to close file: %w", err) 438 + } 439 + cloneState = "success" 440 + return cloneState, nil 441 + }