this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

cleanups, renames

+44 -396
+2 -24
archiver/archiver.go
··· 43 43 // pieces that abstract the need for explicit ssl checks 44 44 ssl bool 45 45 46 - crawlOnly bool 47 - 48 46 // TODO: at some point we will want to lock specific DIDs, this lock as is 49 47 // is overly broad, but i dont expect it to be a bottleneck for now 50 48 extUserLk sync.Mutex ··· 75 73 ConcurrencyPerPDS int64 76 74 MaxQueuePerPDS int64 77 75 NumCompactionWorkers int 78 - 79 - // NextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl 80 - NextCrawlers []*url.URL 81 76 } 82 77 83 78 func DefaultArchiverConfig() *ArchiverConfig { ··· 153 148 154 149 type User struct { 155 150 gorm.Model 156 - ID models.Uid `gorm:"primarykey;index:idx_user_id_active,where:taken_down = false AND tombstoned = false"` 151 + ID models.Uid `gorm:"primarykey"` 157 152 Did string `gorm:"uniqueindex"` 158 153 PDS uint 159 154 ··· 201 196 durl.Scheme = "http" 202 197 } 203 198 204 - // TODO: the PDS's DID should also be in the service, we could use that to look up? 205 199 var peering models.PDS 206 200 if err := s.db.Find(&peering, "host = ?", durl.Host).Error; err != nil { 207 201 s.log.Error("failed to find pds", "host", durl.Host) ··· 268 262 s.extUserLk.Lock() 269 263 defer s.extUserLk.Unlock() 270 264 271 - exu, err := s.LookupUserByDid(ctx, did) 265 + exu, err := s.lookupUserByDid(ctx, did) 272 266 if err == nil { 273 267 s.log.Debug("lost the race to create a new user", "did", did) 274 268 if exu.PDS != peering.ID { ··· 561 555 } 562 556 563 557 s.userCache.Add(did, &u) 564 - 565 - return &u, nil 566 - } 567 - 568 - func (s *Archiver) lookupUserByUID(ctx context.Context, uid models.Uid) (*User, error) { 569 - ctx, span := tracer.Start(ctx, "lookupUserByUID") 570 - defer span.End() 571 - 572 - var u User 573 - if err := s.db.Find(&u, "id = ?", uid).Error; err != nil { 574 - return nil, err 575 - } 576 - 577 - if u.ID == 0 { 578 - return nil, gorm.ErrRecordNotFound 579 - } 580 558 581 559 return &u, nil 582 560 }
+1 -14
archiver/loader.go
··· 15 15 ctx, span := otel.Tracer("indexer").Start(ctx, "getUserOrMissing") 16 16 defer span.End() 17 17 18 - ai, err := s.LookupUserByDid(ctx, did) 18 + ai, err := s.lookupUserByDid(ctx, did) 19 19 if err == nil { 20 20 return ai, nil 21 21 } ··· 68 68 var ai User 69 69 if err := s.db.First(&ai, "id = ?", id).Error; err != nil { 70 70 return nil, err 71 - } 72 - 73 - return &ai, nil 74 - } 75 - 76 - func (s *Archiver) LookupUserByDid(ctx context.Context, did string) (*User, error) { 77 - var ai User 78 - if err := s.db.Find(&ai, "did = ?", did).Error; err != nil { 79 - return nil, err 80 - } 81 - 82 - if ai.ID == 0 { 83 - return nil, gorm.ErrRecordNotFound 84 71 } 85 72 86 73 return &ai, nil
+1 -15
cmd/archit/main.go
··· 5 5 "fmt" 6 6 "log/slog" 7 7 _ "net/http/pprof" 8 - "net/url" 9 8 "os" 10 9 "os/signal" 11 10 "path/filepath" ··· 288 287 } 289 288 } 290 289 291 - cstore, err := repostore.NewCarStore(db, csdirs) 290 + cstore, err := repostore.NewRepoStore(db, csdirs) 292 291 if err != nil { 293 292 return err 294 293 } ··· 353 352 archiverConfig.MaxQueuePerPDS = cctx.Int64("max-queue-per-pds") 354 353 archiverConfig.DefaultRepoLimit = cctx.Int64("default-repo-limit") 355 354 archiverConfig.NumCompactionWorkers = cctx.Int("num-compaction-workers") 356 - nextCrawlers := cctx.StringSlice("next-crawler") 357 - if len(nextCrawlers) != 0 { 358 - nextCrawlerUrls := make([]*url.URL, len(nextCrawlers)) 359 - for i, tu := range nextCrawlers { 360 - var err error 361 - nextCrawlerUrls[i], err = url.Parse(tu) 362 - if err != nil { 363 - return fmt.Errorf("failed to parse next-crawler url: %w", err) 364 - } 365 - slog.Info("configuring relay for requestCrawl", "host", nextCrawlerUrls[i]) 366 - } 367 - archiverConfig.NextCrawlers = nextCrawlerUrls 368 - } 369 355 370 356 arc, err := archiver.NewArchiver(db, repoman, cachedidr, rf, archiverConfig) 371 357 if err != nil {
+35 -63
repostore/bs.go
··· 13 13 "time" 14 14 15 15 carstore "github.com/bluesky-social/indigo/carstore" 16 - carstore1 "github.com/bluesky-social/indigo/carstore" 17 16 "github.com/bluesky-social/indigo/models" 18 - "github.com/prometheus/client_golang/prometheus" 19 - "github.com/prometheus/client_golang/prometheus/promauto" 20 17 21 18 blockformat "github.com/ipfs/go-block-format" 22 19 "github.com/ipfs/go-cid" ··· 32 29 "gorm.io/gorm" 33 30 ) 34 31 35 - var blockGetTotalCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 36 - Name: "carstore2_block_get_total", 37 - Help: "carstore get queries", 38 - }, []string{"usrskip", "cache"}) 39 - 40 - const MaxSliceLength = 2 << 20 41 - 42 - const BigShardThreshold = 2 << 20 43 - 44 - type CarStore interface { 45 - // TODO: not really part of general interface 46 - CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*carstore1.CompactionStats, error) 47 - // TODO: not really part of general interface 48 - GetCompactionTargets(ctx context.Context, shardCount int) ([]carstore1.CompactionTarget, error) 49 - 50 - GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) 51 - GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) 52 - ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, carstore1.BlockStorage, error) 53 - NewDeltaSession(ctx context.Context, user models.Uid, since *string) (carstore1.BlockStorage, error) 54 - ReadOnlySession(user models.Uid) (carstore1.BlockStorage, error) 55 - ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error 56 - Stat(ctx context.Context, usr models.Uid) ([]carstore1.UserStat, error) 57 - WipeUserData(ctx context.Context, user models.Uid) error 58 - } 59 - 60 - type FileCarStore struct { 32 + type FileRepoStore struct { 61 33 meta *CarStoreGormMeta 62 34 rootDirs []string 63 35 ··· 66 38 log *slog.Logger 67 39 } 68 40 69 - func NewCarStore(meta *gorm.DB, roots []string) (CarStore, error) { 41 + func NewRepoStore(meta *gorm.DB, roots []string) (carstore.CarStore, error) { 70 42 for _, root := range roots { 71 43 if _, err := os.Stat(root); err != nil { 72 44 if !os.IsNotExist(err) { ··· 86 58 } 87 59 88 60 gormMeta := &CarStoreGormMeta{meta: meta} 89 - out := &FileCarStore{ 61 + out := &FileRepoStore{ 90 62 meta: gormMeta, 91 63 rootDirs: roots, 92 64 lastShardCache: lastShardCache{ ··· 288 260 lastRev string 289 261 } 290 262 291 - func (cs *FileCarStore) checkLastShardCache(user models.Uid) *CarShard { 263 + func (cs *FileRepoStore) checkLastShardCache(user models.Uid) *CarShard { 292 264 return cs.lastShardCache.check(user) 293 265 } 294 266 295 - func (cs *FileCarStore) removeLastShardCache(user models.Uid) { 267 + func (cs *FileRepoStore) removeLastShardCache(user models.Uid) { 296 268 cs.lastShardCache.remove(user) 297 269 } 298 270 299 - func (cs *FileCarStore) putLastShardCache(ls *CarShard) { 271 + func (cs *FileRepoStore) putLastShardCache(ls *CarShard) { 300 272 cs.lastShardCache.put(ls) 301 273 } 302 274 303 - func (cs *FileCarStore) getLastShard(ctx context.Context, user models.Uid) (*CarShard, error) { 275 + func (cs *FileRepoStore) getLastShard(ctx context.Context, user models.Uid) (*CarShard, error) { 304 276 return cs.lastShardCache.get(ctx, user) 305 277 } 306 278 307 - func (cs *FileCarStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (carstore1.BlockStorage, error) { 279 + func (cs *FileRepoStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (carstore.BlockStorage, error) { 308 280 ctx, span := otel.Tracer("carstore2").Start(ctx, "NewSession") 309 281 defer span.End() 310 282 ··· 316 288 } 317 289 318 290 if since != nil && *since != lastShard.Rev { 319 - return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, carstore1.ErrRepoBaseMismatch) 291 + return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, carstore.ErrRepoBaseMismatch) 320 292 } 321 293 uv := &userView{ 322 294 user: user, ··· 341 313 }, nil 342 314 } 343 315 344 - func (cs *FileCarStore) ReadOnlySession(user models.Uid) (carstore1.BlockStorage, error) { 316 + func (cs *FileRepoStore) ReadOnlySession(user models.Uid) (carstore.BlockStorage, error) { 345 317 return &DeltaSession{ 346 318 base: &userView{ 347 319 user: user, ··· 355 327 } 356 328 357 329 // TODO: incremental is only ever called true, remove the param 358 - func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error { 330 + func (cs *FileRepoStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error { 359 331 ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar") 360 332 defer span.End() 361 333 ··· 402 374 403 375 // inner loop part of ReadUserCar 404 376 // copy shard blocks from disk to Writer 405 - func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, shardOut io.Writer) error { 377 + func (cs *FileRepoStore) writeShardBlocks(ctx context.Context, sh *CarShard, shardOut io.Writer) error { 406 378 ctx, span := otel.Tracer("carstore").Start(ctx, "writeShardBlocks") 407 379 defer span.End() 408 380 ··· 426 398 } 427 399 428 400 // inner loop part of compactBucket 429 - func (cs *FileCarStore) iterateShardBlocks(ctx context.Context, sh *CarShard, cb func(blk blockformat.Block) error) error { 401 + func (cs *FileRepoStore) iterateShardBlocks(ctx context.Context, sh *CarShard, cb func(blk blockformat.Block) error) error { 430 402 fi, err := os.Open(sh.Path) 431 403 if err != nil { 432 404 return err ··· 530 502 return fmt.Sprintf("sh-%d-%d", user, seq) 531 503 } 532 504 533 - func (cs *FileCarStore) dirForUser(user models.Uid) string { 505 + func (cs *FileRepoStore) dirForUser(user models.Uid) string { 534 506 return cs.rootDirs[int(user)%len(cs.rootDirs)] 535 507 } 536 508 537 - func (cs *FileCarStore) openNewShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) { 509 + func (cs *FileRepoStore) openNewShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) { 538 510 // TODO: some overwrite protections 539 511 fname := filepath.Join(cs.dirForUser(user), fnameForShard(user, seq)) 540 512 fi, err := os.Create(fname) ··· 545 517 return fi, fname, nil 546 518 } 547 519 548 - func (cs *FileCarStore) writeNewShardFile(ctx context.Context, user models.Uid, seq int, data []byte) (string, error) { 520 + func (cs *FileRepoStore) writeNewShardFile(ctx context.Context, user models.Uid, seq int, data []byte) (string, error) { 549 521 _, span := otel.Tracer("carstore").Start(ctx, "writeNewShardFile") 550 522 defer span.End() 551 523 ··· 558 530 return fname, nil 559 531 } 560 532 561 - func (cs *FileCarStore) deleteShardFile(ctx context.Context, sh *CarShard) error { 533 + func (cs *FileRepoStore) deleteShardFile(ctx context.Context, sh *CarShard) error { 562 534 return os.Remove(sh.Path) 563 535 } 564 536 ··· 616 588 return buf.Bytes(), nil 617 589 } 618 590 619 - func (cs *FileCarStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { 591 + func (cs *FileRepoStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { 620 592 621 593 buf := new(bytes.Buffer) 622 594 hnw, err := WriteCarHeader(buf, root) ··· 679 651 return buf.Bytes(), nil 680 652 } 681 653 682 - func (cs *FileCarStore) putShard(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool, nocache bool) error { 654 + func (cs *FileRepoStore) putShard(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool, nocache bool) error { 683 655 ctx, span := otel.Tracer("carstore").Start(ctx, "putShard") 684 656 defer span.End() 685 657 ··· 759 731 return dropset, nil 760 732 } 761 733 762 - func (cs *FileCarStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, carstore1.BlockStorage, error) { 734 + func (cs *FileRepoStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, carstore.BlockStorage, error) { 763 735 ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice") 764 736 defer span.End() 765 737 ··· 807 779 return nil 808 780 } 809 781 810 - func (cs *FileCarStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { 782 + func (cs *FileRepoStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { 811 783 lastShard, err := cs.getLastShard(ctx, user) 812 784 if err != nil { 813 785 return cid.Undef, err ··· 819 791 return lastShard.Root.CID, nil 820 792 } 821 793 822 - func (cs *FileCarStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) { 794 + func (cs *FileRepoStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) { 823 795 lastShard, err := cs.getLastShard(ctx, user) 824 796 if err != nil { 825 797 return "", err ··· 831 803 return lastShard.Rev, nil 832 804 } 833 805 834 - func (cs *FileCarStore) Stat(ctx context.Context, usr models.Uid) ([]carstore1.UserStat, error) { 806 + func (cs *FileRepoStore) Stat(ctx context.Context, usr models.Uid) ([]carstore.UserStat, error) { 835 807 shards, err := cs.meta.GetUserShards(ctx, usr) 836 808 if err != nil { 837 809 return nil, err 838 810 } 839 811 840 - var out []carstore1.UserStat 812 + var out []carstore.UserStat 841 813 for _, s := range shards { 842 - out = append(out, carstore1.UserStat{ 814 + out = append(out, carstore.UserStat{ 843 815 Seq: s.Seq, 844 816 Root: s.Root.CID.String(), 845 817 Created: s.CreatedAt, ··· 849 821 return out, nil 850 822 } 851 823 852 - func (cs *FileCarStore) WipeUserData(ctx context.Context, user models.Uid) error { 824 + func (cs *FileRepoStore) WipeUserData(ctx context.Context, user models.Uid) error { 853 825 shards, err := cs.meta.GetUserShards(ctx, user) 854 826 if err != nil { 855 827 return err ··· 866 838 return nil 867 839 } 868 840 869 - func (cs *FileCarStore) deleteShards(ctx context.Context, shs []CarShard) error { 841 + func (cs *FileRepoStore) deleteShards(ctx context.Context, shs []CarShard) error { 870 842 ctx, span := otel.Tracer("carstore").Start(ctx, "deleteShards") 871 843 defer span.End() 872 844 ··· 992 964 return len(cb.shards) == 0 993 965 } 994 966 995 - func (cs *FileCarStore) openNewCompactedShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) { 967 + func (cs *FileRepoStore) openNewCompactedShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) { 996 968 // TODO: some overwrite protections 997 969 // NOTE CreateTemp is used for creating a non-colliding file, but we keep it and don't delete it so don't think of it as "temporary". 998 970 // This creates "sh-%d-%d%s" with some random stuff in the last position ··· 1009 981 NumShards int 1010 982 } 1011 983 1012 - func (cs *FileCarStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]carstore1.CompactionTarget, error) { 984 + func (cs *FileRepoStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]carstore.CompactionTarget, error) { 1013 985 ctx, span := otel.Tracer("carstore").Start(ctx, "GetCompactionTargets") 1014 986 defer span.End() 1015 987 ··· 1029 1001 return st.Size(), nil 1030 1002 } 1031 1003 1032 - func (cs *FileCarStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*carstore1.CompactionStats, error) { 1033 - ctx, span := otel.Tracer("carstore").Start(ctx, "CompactUserShards") 1004 + func (cs *FileRepoStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*carstore.CompactionStats, error) { 1005 + ctx, span := otel.Tracer("repostore").Start(ctx, "CompactUserShards") 1034 1006 defer span.End() 1035 1007 1036 1008 span.SetAttributes(attribute.Int64("user", int64(user))) ··· 1044 1016 return nil, fmt.Errorf("TODO: have to redo all of compaction") 1045 1017 } 1046 1018 1047 - func (cs *FileCarStore) deleteStaleRefs(ctx context.Context, uid models.Uid, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error { 1048 - ctx, span := otel.Tracer("carstore").Start(ctx, "deleteStaleRefs") 1019 + func (cs *FileRepoStore) deleteStaleRefs(ctx context.Context, uid models.Uid, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error { 1020 + ctx, span := otel.Tracer("repostore").Start(ctx, "deleteStaleRefs") 1049 1021 defer span.End() 1050 1022 1051 1023 brByCid := make(map[cid.Cid][]blockRef) ··· 1079 1051 return cs.meta.SetStaleRef(ctx, uid, staleToKeep) 1080 1052 } 1081 1053 1082 - func (cs *FileCarStore) compactBucket(ctx context.Context, user models.Uid, b *compBucket, shardsById map[uint]CarShard, keep map[cid.Cid]bool) error { 1083 - ctx, span := otel.Tracer("carstore").Start(ctx, "compactBucket") 1054 + func (cs *FileRepoStore) compactBucket(ctx context.Context, user models.Uid, b *compBucket, shardsById map[uint]CarShard, keep map[cid.Cid]bool) error { 1055 + ctx, span := otel.Tracer("repostore").Start(ctx, "compactBucket") 1084 1056 defer span.End() 1085 1057 1086 1058 span.SetAttributes(attribute.Int("shards", len(b.shards)))
-43
repostore/meta_gorm.go
··· 12 12 carstore "github.com/bluesky-social/indigo/carstore" 13 13 "github.com/bluesky-social/indigo/models" 14 14 "github.com/ipfs/go-cid" 15 - "go.opentelemetry.io/otel" 16 15 "gorm.io/gorm" 17 16 ) 18 17 ··· 218 217 219 218 return buf.Bytes() 220 219 } 221 - 222 - func createBlockRefs(ctx context.Context, tx *gorm.DB, brefs []map[string]any) error { 223 - ctx, span := otel.Tracer("carstore").Start(ctx, "createBlockRefs") 224 - defer span.End() 225 - 226 - if err := createInBatches(ctx, tx, brefs, 2000); err != nil { 227 - return err 228 - } 229 - 230 - return nil 231 - } 232 - 233 - // Function to create in batches 234 - func createInBatches(ctx context.Context, tx *gorm.DB, brefs []map[string]any, batchSize int) error { 235 - for i := 0; i < len(brefs); i += batchSize { 236 - batch := brefs[i:] 237 - if len(batch) > batchSize { 238 - batch = batch[:batchSize] 239 - } 240 - 241 - query, values := generateInsertQuery(batch) 242 - 243 - if err := tx.WithContext(ctx).Exec(query, values...).Error; err != nil { 244 - return err 245 - } 246 - } 247 - return nil 248 - } 249 - 250 - func generateInsertQuery(brefs []map[string]any) (string, []any) { 251 - placeholders := strings.Repeat("(?, ?, ?),", len(brefs)) 252 - placeholders = placeholders[:len(placeholders)-1] // trim trailing comma 253 - 254 - query := "INSERT INTO block_refs (\"cid\", \"offset\", \"shard\") VALUES " + placeholders 255 - 256 - values := make([]any, 0, 3*len(brefs)) 257 - for _, entry := range brefs { 258 - values = append(values, entry["cid"], entry["offset"], entry["shard"]) 259 - } 260 - 261 - return query, values 262 - }
+5 -237
repostore/repo_test.go
··· 27 27 "gorm.io/gorm" 28 28 ) 29 29 30 - func testCarStore(t testing.TB) (CarStore, func(), error) { 30 + func testCarStore(t testing.TB) (carstore.CarStore, func(), error) { 31 31 tempdir, err := os.MkdirTemp("", "msttest-") 32 32 if err != nil { 33 33 return nil, nil, err ··· 53 53 return nil, nil, err 54 54 } 55 55 56 - cs, err := NewCarStore(db, []string{sharddir1, sharddir2}) 56 + cs, err := NewRepoStore(db, []string{sharddir1, sharddir2}) 57 57 if err != nil { 58 58 return nil, nil, err 59 59 } ··· 63 63 }, nil 64 64 } 65 65 66 - type testFactory func(t testing.TB) (CarStore, func(), error) 66 + type testFactory func(t testing.TB) (carstore.CarStore, func(), error) 67 67 68 68 var backends = map[string]testFactory{ 69 69 "cartore": testCarStore, ··· 175 175 } 176 176 } 177 177 178 - func TestRepeatedCompactions(t *testing.T) { 179 - ctx := context.TODO() 180 - 181 - cs, cleanup, err := testCarStore(t) 182 - if err != nil { 183 - t.Fatal(err) 184 - } 185 - defer cleanup() 186 - 187 - ds, err := cs.NewDeltaSession(ctx, 1, nil) 188 - if err != nil { 189 - t.Fatal(err) 190 - } 191 - 192 - ncid, rev, err := setupRepo(ctx, ds, false) 193 - if err != nil { 194 - t.Fatal(err) 195 - } 196 - 197 - if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil { 198 - t.Fatal(err) 199 - } 200 - 201 - var recs []cid.Cid 202 - head := ncid 203 - 204 - var lastRec string 205 - 206 - for loop := 0; loop < 50; loop++ { 207 - for i := 0; i < 20; i++ { 208 - ds, err := cs.NewDeltaSession(ctx, 1, &rev) 209 - if err != nil { 210 - t.Fatal(err) 211 - } 212 - 213 - rr, err := repo.OpenRepo(ctx, ds, head) 214 - if err != nil { 215 - t.Fatal(err) 216 - } 217 - if i%4 == 3 { 218 - if err := rr.DeleteRecord(ctx, lastRec); err != nil { 219 - t.Fatal(err) 220 - } 221 - recs = recs[:len(recs)-1] 222 - } else { 223 - rc, tid, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ 224 - Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), 225 - }) 226 - if err != nil { 227 - t.Fatal(err) 228 - } 229 - 230 - recs = append(recs, rc) 231 - lastRec = "app.bsky.feed.post/" + tid 232 - } 233 - 234 - kmgr := &util.FakeKeyManager{} 235 - nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) 236 - if err != nil { 237 - t.Fatal(err) 238 - } 239 - 240 - rev = nrev 241 - 242 - if err := ds.CalcDiff(ctx, nil); err != nil { 243 - t.Fatal(err) 244 - } 245 - 246 - if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { 247 - t.Fatal(err) 248 - } 249 - 250 - head = nroot 251 - } 252 - fmt.Println("Run compaction", loop) 253 - st, err := cs.CompactUserShards(ctx, 1, false) 254 - if err != nil { 255 - t.Log(err) 256 - // TODO: 257 - //t.Fatal(err) 258 - } 259 - 260 - fmt.Printf("%#v\n", st) 261 - 262 - buf := new(bytes.Buffer) 263 - if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { 264 - t.Fatal(err) 265 - } 266 - checkRepo(t, cs, buf, recs) 267 - } 268 - 269 - buf := new(bytes.Buffer) 270 - if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { 271 - t.Fatal(err) 272 - } 273 - checkRepo(t, cs, buf, recs) 274 - } 275 - 276 - func checkRepo(t *testing.T, cs CarStore, r io.Reader, expRecs []cid.Cid) { 178 + func checkRepo(t *testing.T, cs carstore.CarStore, r io.Reader, expRecs []cid.Cid) { 277 179 t.Helper() 278 180 rep, err := repo.ReadRepoFromCar(context.TODO(), r) 279 181 if err != nil { ··· 345 247 innerBenchmarkRepoWritesCarstore(b, ctx, cs, cleanup, err) 346 248 } 347 249 348 - func innerBenchmarkRepoWritesCarstore(b *testing.B, ctx context.Context, cs CarStore, cleanup func(), err error) { 250 + func innerBenchmarkRepoWritesCarstore(b *testing.B, ctx context.Context, cs carstore.CarStore, cleanup func(), err error) { 349 251 if err != nil { 350 252 b.Fatal(err) 351 253 } ··· 481 383 } 482 384 } 483 385 */ 484 - 485 - func TestDuplicateBlockAcrossShards(ot *testing.T) { 486 - ctx := context.TODO() 487 - 488 - for fname, tf := range backends { 489 - ot.Run(fname, func(t *testing.T) { 490 - 491 - cs, cleanup, err := tf(t) 492 - if err != nil { 493 - t.Fatal(err) 494 - } 495 - defer cleanup() 496 - 497 - ds1, err := cs.NewDeltaSession(ctx, 1, nil) 498 - if err != nil { 499 - t.Fatal(err) 500 - } 501 - 502 - ds2, err := cs.NewDeltaSession(ctx, 2, nil) 503 - if err != nil { 504 - t.Fatal(err) 505 - } 506 - 507 - ds3, err := cs.NewDeltaSession(ctx, 3, nil) 508 - if err != nil { 509 - t.Fatal(err) 510 - } 511 - 512 - var cids []cid.Cid 513 - var revs []string 514 - for _, ds := range []carstore.BlockStorage{ds1, ds2, ds3} { 515 - ncid, rev, err := setupRepo(ctx, ds, true) 516 - if err != nil { 517 - t.Fatal(err) 518 - } 519 - 520 - if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil { 521 - t.Fatal(err) 522 - } 523 - cids = append(cids, ncid) 524 - revs = append(revs, rev) 525 - } 526 - 527 - var recs []cid.Cid 528 - head := cids[1] 529 - rev := revs[1] 530 - for i := 0; i < 10; i++ { 531 - ds, err := cs.NewDeltaSession(ctx, 2, &rev) 532 - if err != nil { 533 - t.Fatal(err) 534 - } 535 - 536 - rr, err := repo.OpenRepo(ctx, ds, head) 537 - if err != nil { 538 - t.Fatal(err) 539 - } 540 - 541 - rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ 542 - Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), 543 - }) 544 - if err != nil { 545 - t.Fatal(err) 546 - } 547 - 548 - recs = append(recs, rc) 549 - 550 - kmgr := &util.FakeKeyManager{} 551 - nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) 552 - if err != nil { 553 - t.Fatal(err) 554 - } 555 - 556 - rev = nrev 557 - 558 - if err := ds.CalcDiff(ctx, nil); err != nil { 559 - t.Fatal(err) 560 - } 561 - 562 - if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { 563 - t.Fatal(err) 564 - } 565 - 566 - head = nroot 567 - } 568 - 569 - // explicitly update the profile object 570 - { 571 - ds, err := cs.NewDeltaSession(ctx, 2, &rev) 572 - if err != nil { 573 - t.Fatal(err) 574 - } 575 - 576 - rr, err := repo.OpenRepo(ctx, ds, head) 577 - if err != nil { 578 - t.Fatal(err) 579 - } 580 - 581 - desc := "this is so unique" 582 - rc, err := rr.UpdateRecord(ctx, "app.bsky.actor.profile/self", &appbsky.ActorProfile{ 583 - Description: &desc, 584 - }) 585 - if err != nil { 586 - t.Fatal(err) 587 - } 588 - 589 - recs = append(recs, rc) 590 - 591 - kmgr := &util.FakeKeyManager{} 592 - nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) 593 - if err != nil { 594 - t.Fatal(err) 595 - } 596 - 597 - rev = nrev 598 - 599 - if err := ds.CalcDiff(ctx, nil); err != nil { 600 - t.Fatal(err) 601 - } 602 - 603 - if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { 604 - t.Fatal(err) 605 - } 606 - 607 - head = nroot 608 - } 609 - 610 - buf := new(bytes.Buffer) 611 - if err := cs.ReadUserCar(ctx, 2, "", true, buf); err != nil { 612 - t.Fatal(err) 613 - } 614 - checkRepo(t, cs, buf, recs) 615 - }) 616 - } 617 - } 618 386 619 387 type testWriter struct { 620 388 t testing.TB