this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

PR feedback

authored by

Brian Olson and committed by
Brian Olson
6a3ec135 2a3ff8e1

+36 -25
+3 -5
cmd/collectiondir/crawl.go
··· 4 4 "context" 5 5 "encoding/csv" 6 6 "fmt" 7 + "github.com/bluesky-social/indigo/util" 7 8 "golang.org/x/time/rate" 8 9 "io" 9 10 "log/slog" 10 - "net/http" 11 11 "net/url" 12 12 "os" 13 13 "strings" ··· 69 69 hosturl.Scheme = "https" 70 70 hosturl.Host = hostname 71 71 } 72 - httpClient := http.Client{} 73 72 rpcClient := xrpc.Client{ 74 73 Host: hosturl.String(), 75 - Client: &httpClient, 74 + Client: util.RobustHTTPClient(), 76 75 } 77 76 if cctx.IsSet("ratelimit-header") { 78 77 rpcClient.Headers = map[string]string{ ··· 134 133 limiter.Wait(cr.Ctx) 135 134 repos, err := atproto.SyncListRepos(cr.Ctx, cr.RpcClient, cursor, 1000) 136 135 if err != nil { 137 - // TODO: wait N seconds, retry M times 138 136 return fmt.Errorf("%s: sync repos: %w", cr.RpcClient.Host, err) 139 137 } 140 - slog.Info("got repo list", "count", len(repos.Repos)) 138 + slog.Debug("got repo list", "count", len(repos.Repos)) 141 139 for _, xr := range repos.Repos { 142 140 limiter.Wait(cr.Ctx) 143 141 desc, err := atproto.RepoDescribeRepo(cr.Ctx, cr.RpcClient, xr.Did)
+4 -2
cmd/collectiondir/firehose.go
··· 56 56 } 57 57 58 58 header := http.Header{ 59 - "User-Agent": []string{"bgs-rainbow-v0"}, 59 + "User-Agent": []string{"collectiondir"}, 60 60 } 61 61 62 62 if fh.Seq >= 0 { ··· 66 66 con, res, err := d.DialContext(ctx, url, header) 67 67 if err != nil { 68 68 fh.Log.Warn("dialing failed", "url", url, "err", err, "backoff", backoff) 69 - time.Sleep(5 * time.Second) 69 + time.Sleep(time.Duration(5+backoff) * time.Second) 70 70 backoff++ 71 71 72 72 continue 73 + } else { 74 + backoff = 0 73 75 } 74 76 75 77 fh.Log.Info("event subscription response", "code", res.StatusCode)
-5
cmd/collectiondir/metrics.go
··· 60 60 } 61 61 62 62 start := time.Now() 63 - //requestSize := computeApproximateRequestSize(c.Request()) 64 63 65 64 err := next(c) 66 65 ··· 80 79 statusStr := strconv.Itoa(status) 81 80 method := c.Request().Method 82 81 83 - //responseSize := float64(c.Response().Size) 84 - 85 82 reqDur.WithLabelValues(statusStr, method, path).Observe(elapsed) 86 83 reqCnt.WithLabelValues(statusStr, method, path).Inc() 87 - //reqSz.WithLabelValues(statusStr, method, path).Observe(float64(requestSize)) 88 - //resSz.WithLabelValues(statusStr, method, path).Observe(responseSize) 89 84 90 85 return err 91 86 }
+18 -5
cmd/collectiondir/pebble.go
··· 8 8 "fmt" 9 9 "github.com/cockroachdb/pebble" 10 10 "log/slog" 11 + "sync" 11 12 "time" 12 13 ) 13 14 ··· 78 79 collections map[string]uint32 79 80 collectionNames map[uint32]string // TODO: B-tree would be nice 80 81 maxCollectionId uint32 82 + collectionsLock sync.Mutex 81 83 82 84 log *slog.Logger 83 85 } ··· 93 95 if pcd.log == nil { 94 96 pcd.log = slog.Default() 95 97 } 96 - return pcd.ReadAllCollectionInterns(context.Background()) 98 + return pcd.readAllCollectionInterns(context.Background()) 97 99 } 98 100 99 101 func (pcd *PebbleCollectionDirectory) Close() error { ··· 108 110 return err 109 111 } 110 112 111 - func (pcd *PebbleCollectionDirectory) ReadAllCollectionInterns(ctx context.Context) error { 113 + // readAllCollectionInterns should only be run at setup time inside Open() when locking against threads is not needed 114 + func (pcd *PebbleCollectionDirectory) readAllCollectionInterns(ctx context.Context) error { 112 115 lower := []byte{'C'} 113 116 upper := []byte{'D'} 114 117 iter, err := pcd.db.NewIterWithContext(ctx, &pebble.IterOptions{ ··· 182 185 183 186 func (pcd *PebbleCollectionDirectory) ReadCollection(ctx context.Context, collection, cursor string, limit int) (result []CollectionDidTime, nextCursor string, err error) { 184 187 var lower []byte 185 - collectionId, err := pcd.CollectionToId(collection) 188 + collectionId, err := pcd.CollectionToId(collection, false) 186 189 if err != nil { 190 + if err == ErrNotFound { 191 + return nil, "", nil 192 + } 187 193 return nil, "", fmt.Errorf("collection id err, %w", err) 188 194 } 189 195 if cursor != "" { ··· 243 249 return result, nextCursor, nil 244 250 } 245 251 246 - func (pcd *PebbleCollectionDirectory) CollectionToId(collection string) (uint32, error) { 252 + var ErrNotFound = errors.New("not found") 253 + 254 + func (pcd *PebbleCollectionDirectory) CollectionToId(collection string, create bool) (uint32, error) { 255 + pcd.collectionsLock.Lock() 256 + defer pcd.collectionsLock.Unlock() 247 257 // easy mode: in cache 248 258 collectionId, ok := pcd.collections[collection] 249 259 if ok { ··· 261 271 return collectionId, nil 262 272 } 263 273 274 + if !create { 275 + return 0, ErrNotFound 276 + } 264 277 // make new id, write to db 265 278 if errors.Is(err, pebble.ErrNotFound) { 266 279 // ok, fall through ··· 308 321 } 309 322 310 323 func (pcd *PebbleCollectionDirectory) MaybeSetCollection(did, collection string) error { 311 - collectionId, err := pcd.CollectionToId(collection) 324 + collectionId, err := pcd.CollectionToId(collection, true) 312 325 if err != nil { 313 326 return err 314 327 }
+11 -8
cmd/collectiondir/serve.go
··· 6 6 "encoding/csv" 7 7 "encoding/json" 8 8 "fmt" 9 + "github.com/bluesky-social/indigo/atproto/syntax" 9 10 lru "github.com/hashicorp/golang-lru/v2" 10 11 "log/slog" 11 12 "net" ··· 145 146 146 147 ratelimitHeader string 147 148 148 - apiServer *http.Server 149 - //esrv *echo.Echo 149 + apiServer *http.Server 150 150 metricsServer *http.Server 151 151 152 152 MinDidsForCollectionList uint64 ··· 368 368 func (cs *collectionServer) handleCommit(commit *comatproto.SyncSubscribeRepos_Commit) { 369 369 for _, op := range commit.Ops { 370 370 // op.Path is collection/rkey 371 - slash := strings.IndexRune(op.Path, '/') 372 - if slash == -1 { 373 - cs.log.Warn("bad op path", "repo", commit.Repo) 371 + nsid, _, err := syntax.ParseRepoPath(op.Path) 372 + if err != nil { 373 + cs.log.Warn("bad op path", "repo", commit.Repo, "err", err) 374 374 return 375 375 } 376 - collection := op.Path[:slash] 377 376 firehoseCommitOps.WithLabelValues(op.Action).Inc() 378 377 if op.Action == "create" || op.Action == "update" { 379 378 firehoseDidcSet.Inc() 380 379 cs.ingestFirehose <- DidCollection{ 381 380 Did: commit.Repo, 382 - Collection: collection, 381 + Collection: nsid.String(), 383 382 } 384 383 } 385 384 } ··· 423 422 424 423 // admin auth heador required 425 424 e.POST("/admin/pds/requestCrawl", cs.crawlPds) // same as relay 426 - e.GET("/v1/crawlStatus", cs.crawlStatus) 425 + e.GET("/admin/crawlStatus", cs.crawlStatus) 427 426 428 427 e.Listener = li 429 428 srv := &http.Server{ ··· 466 465 func (cs *collectionServer) getDidsForCollection(c echo.Context) error { 467 466 ctx := c.Request().Context() 468 467 collection := c.QueryParam("collection") 468 + _, err := syntax.ParseNSID(collection) 469 + if err != nil { 470 + return c.String(http.StatusBadRequest, fmt.Sprintf("bad collection nsid, %s", err.Error())) 471 + } 469 472 cursor := c.QueryParam("cursor") 470 473 limit := getLimit(c, 50, 500, 1000) 471 474 they, nextCursor, err := cs.pcd.ReadCollection(ctx, collection, cursor, limit)