this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Resync with a PDS by comparing revs insted of commit CIDs (#373)

When resyncing with a PDS, if users are active then the CID we get from
`ListRepos` can become out of date by the time we start comparing the
CIDs against our local repo heads (i.e. new events come in for some
users, we update the repo head in our local Carstore, and our CID is
fresher than the one in-memory from `ListRepos`).

Since the Repo Rev is monotonically increasing, we can use the Rev we
get back from `ListRepos` as a "low-water-mark" such that, if our local
Rev for that repo is > the one we get from the PDS, we know we're
current. This should reduce the number of unnecessary repos being
recrawled during a resync.

authored by

Jaz and committed by
GitHub
c2be5868 8a63a7a3

+21 -26
+1
api/atproto/synclistRepos.go
··· 20 20 type SyncListRepos_Repo struct { 21 21 Did string `json:"did" cborgen:"did"` 22 22 Head string `json:"head" cborgen:"head"` 23 + Rev string `json:"rev" cborgen:"rev"` 23 24 } 24 25 25 26 // SyncListRepos calls the XRPC method "com.atproto.sync.listRepos".
+20 -26
bgs/bgs.go
··· 1250 1250 }, nil 1251 1251 } 1252 1252 1253 - type repoHead struct { 1254 - Did string 1255 - Head string 1256 - } 1257 - 1258 - type headCheckResult struct { 1253 + type revCheckResult struct { 1259 1254 ai *models.ActorInfo 1260 1255 err error 1261 1256 } ··· 1343 1338 cursor := "" 1344 1339 limit := int64(500) 1345 1340 1346 - repos := []repoHead{} 1341 + repos := []comatproto.SyncListRepos_Repo{} 1347 1342 1348 1343 pages := 0 1349 1344 ··· 1367 1362 } 1368 1363 1369 1364 for _, r := range repoList.Repos { 1370 - repos = append(repos, repoHead{ 1371 - Did: r.Did, 1372 - Head: r.Head, 1373 - }) 1365 + if r != nil { 1366 + repos = append(repos, *r) 1367 + } 1374 1368 } 1375 1369 1376 1370 if repoList.Cursor == nil || *repoList.Cursor == "" { ··· 1386 1380 repolistDone := time.Now() 1387 1381 1388 1382 log.Warnw("listed all repos, checking roots", "num_repos", len(repos), "took", repolistDone.Sub(start)) 1389 - resync = bgs.SetResyncStatus(pds.ID, "checking heads") 1383 + resync = bgs.SetResyncStatus(pds.ID, "checking revs") 1390 1384 1391 1385 // Create a buffered channel for collecting results 1392 - results := make(chan headCheckResult, len(repos)) 1386 + results := make(chan revCheckResult, len(repos)) 1393 1387 sem := semaphore.NewWeighted(40) 1394 1388 1395 - // Check repo heads against our local copy and enqueue crawls for any that are out of date 1389 + // Check repo revs against our local copy and enqueue crawls for any that are out of date 1396 1390 for _, r := range repos { 1397 - go func(r repoHead) { 1391 + go func(r comatproto.SyncListRepos_Repo) { 1398 1392 if err := sem.Acquire(ctx, 1); err != nil { 1399 1393 log.Errorw("failed to acquire semaphore", "error", err) 1400 - results <- headCheckResult{err: err} 1394 + results <- revCheckResult{err: err} 1401 1395 return 1402 1396 } 1403 1397 defer sem.Release(1) 1404 1398 1405 - log := log.With("did", r.Did, "head", r.Head) 1399 + log := log.With("did", r.Did, "remote_rev", r.Rev) 1406 1400 // Fetches the user if we have it, otherwise automatically enqueues it for crawling 1407 1401 ai, err := bgs.Index.GetUserOrMissing(ctx, r.Did) 1408 1402 if err != nil { 1409 1403 log.Errorw("failed to get user while resyncing PDS, we can't recrawl it", "error", err) 1410 - results <- headCheckResult{err: err} 1404 + results <- revCheckResult{err: err} 1411 1405 return 1412 1406 } 1413 1407 1414 - head, err := bgs.repoman.GetRepoRoot(ctx, ai.Uid) 1408 + rev, err := bgs.repoman.GetRepoRev(ctx, ai.Uid) 1415 1409 if err != nil { 1416 1410 log.Warnw("recrawling because we failed to get the local repo root", "err", err, "uid", ai.Uid) 1417 - results <- headCheckResult{ai: ai} 1411 + results <- revCheckResult{ai: ai} 1418 1412 return 1419 1413 } 1420 1414 1421 - if head.String() != r.Head { 1422 - log.Warnw("recrawling because the repo head from the PDS is different from our local repo root", "local_head", head.String()) 1423 - results <- headCheckResult{ai: ai} 1415 + if rev == "" || rev < r.Rev { 1416 + log.Warnw("recrawling because the repo rev from the PDS is newer than our local repo rev", "local_rev", rev) 1417 + results <- revCheckResult{ai: ai} 1424 1418 return 1425 1419 } 1426 1420 1427 - results <- headCheckResult{} 1421 + results <- revCheckResult{} 1428 1422 }(r) 1429 1423 } 1430 1424 ··· 1442 1436 log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", res.ai.Uid, "did", res.ai.Did) 1443 1437 } 1444 1438 } 1445 - if i%10_000 == 0 { 1446 - log.Warnw("checked heads during resync", "num_repos_checked", i, "num_repos_to_crawl", numReposToResync, "took", time.Now().Sub(resync.StatusChangedAt)) 1439 + if i%100_000 == 0 { 1440 + log.Warnw("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", numReposToResync, "took", time.Now().Sub(resync.StatusChangedAt)) 1447 1441 resync.NumReposChecked = i 1448 1442 resync.NumReposToResync = numReposToResync 1449 1443 bgs.UpdateResync(resync)