this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

avoid joining a potentially large degenerate table (#568)

authored by

Whyrusleeping and committed by
GitHub
e7a656b3 46cba8dc

+197 -13
+30 -7
carstore/bs.go
··· 16 16 "time" 17 17 18 18 "github.com/bluesky-social/indigo/models" 19 + "github.com/prometheus/client_golang/prometheus" 20 + "github.com/prometheus/client_golang/prometheus/promauto" 19 21 20 22 blockformat "github.com/ipfs/go-block-format" 21 23 "github.com/ipfs/go-cid" ··· 32 34 "go.opentelemetry.io/otel/attribute" 33 35 "gorm.io/gorm" 34 36 ) 37 + 38 + var blockGetTotalCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 39 + Name: "carstore_block_get_total", 40 + Help: "carstore get queries", 41 + }, []string{"usrskip", "cache"}) 42 + 43 + var blockGetTotalCounterUsrskip = blockGetTotalCounter.WithLabelValues("true", "miss") 44 + var blockGetTotalCounterCached = blockGetTotalCounter.WithLabelValues("false", "hit") 45 + var blockGetTotalCounterNormal = blockGetTotalCounter.WithLabelValues("false", "miss") 35 46 36 47 var log = logging.Logger("carstore") 37 48 ··· 176 187 if uv.cache != nil { 177 188 blk, ok := uv.cache[k] 178 189 if ok { 190 + blockGetTotalCounterCached.Add(1) 179 191 atomic.AddInt64(&CacheHits, 1) 180 192 181 193 return blk, nil ··· 189 201 var info struct { 190 202 Path string 191 203 Offset int64 204 + Usr models.Uid 192 205 } 193 - if err := uv.cs.meta. 194 - Model(blockRef{}). 195 - Select("path, block_refs.offset"). 196 - Joins("left join car_shards on block_refs.shard = car_shards.id"). 197 - Where("usr = ? AND cid = ?", uv.user, models.DbCID{CID: k}). 198 - Find(&info).Error; err != nil { 206 + if err := uv.cs.meta.Raw(`SELECT 207 + (select path from car_shards where id = block_refs.shard) as path, 208 + block_refs.offset, 209 + (select usr from car_shards where id = block_refs.shard) as usr 210 + FROM block_refs 211 + WHERE 212 + block_refs.cid = ? 213 + LIMIT 1;`, models.DbCID{CID: k}).Scan(&info).Error; err != nil { 199 214 return nil, err 200 215 } 201 216 if info.Path == "" { 202 217 return nil, ipld.ErrNotFound{Cid: k} 203 218 } 204 219 205 - if uv.prefetch { 220 + prefetch := uv.prefetch 221 + if info.Usr != uv.user { 222 + blockGetTotalCounterUsrskip.Add(1) 223 + prefetch = false 224 + } else { 225 + blockGetTotalCounterNormal.Add(1) 226 + } 227 + 228 + if prefetch { 206 229 return uv.prefetchRead(ctx, k, info.Path, info.Offset) 207 230 } else { 208 231 return uv.singleRead(ctx, k, info.Path, info.Offset)
+143 -6
carstore/repo_test.go
··· 11 11 "testing" 12 12 "time" 13 13 14 + "github.com/bluesky-social/indigo/api/bsky" 14 15 appbsky "github.com/bluesky-social/indigo/api/bsky" 15 16 "github.com/bluesky-social/indigo/repo" 16 17 "github.com/bluesky-social/indigo/util" ··· 86 87 t.Fatal(err) 87 88 } 88 89 89 - ncid, rev, err := setupRepo(ctx, ds) 90 + ncid, rev, err := setupRepo(ctx, ds, false) 90 91 if err != nil { 91 92 t.Fatal(err) 92 93 } ··· 167 168 t.Fatal(err) 168 169 } 169 170 170 - ncid, rev, err := setupRepo(ctx, ds) 171 + ncid, rev, err := setupRepo(ctx, ds, false) 171 172 if err != nil { 172 173 t.Fatal(err) 173 174 } ··· 295 296 296 297 } 297 298 298 - func setupRepo(ctx context.Context, bs blockstore.Blockstore) (cid.Cid, string, error) { 299 + func setupRepo(ctx context.Context, bs blockstore.Blockstore, mkprofile bool) (cid.Cid, string, error) { 299 300 nr := repo.NewRepo(ctx, "did:foo", bs) 300 301 302 + if mkprofile { 303 + _, err := nr.PutRecord(ctx, "app.bsky.actor.profile/self", &bsky.ActorProfile{}) 304 + if err != nil { 305 + return cid.Undef, "", fmt.Errorf("write record failed: %w", err) 306 + } 307 + } 308 + 301 309 kmgr := &util.FakeKeyManager{} 302 310 ncid, rev, err := nr.Commit(ctx, kmgr.SignForUser) 303 311 if err != nil { ··· 321 329 b.Fatal(err) 322 330 } 323 331 324 - ncid, rev, err := setupRepo(ctx, ds) 332 + ncid, rev, err := setupRepo(ctx, ds, false) 325 333 if err != nil { 326 334 b.Fatal(err) 327 335 } ··· 377 385 } 378 386 defer cleanup() 379 387 380 - ncid, _, err := setupRepo(ctx, bs) 388 + ncid, _, err := setupRepo(ctx, bs, false) 381 389 if err != nil { 382 390 b.Fatal(err) 383 391 } ··· 415 423 b.Fatal(err) 416 424 } 417 425 418 - ncid, _, err := setupRepo(ctx, bs) 426 + ncid, _, err := setupRepo(ctx, bs, false) 419 427 if err != nil { 420 428 b.Fatal(err) 421 429 } ··· 444 452 head = nroot 445 453 } 446 454 } 455 + 456 + func TestDuplicateBlockAcrossShards(t *testing.T) { 457 + ctx := context.TODO() 458 + 459 + cs, cleanup, err := testCarStore() 460 + if err != nil { 461 + t.Fatal(err) 462 + } 463 + defer cleanup() 464 + 465 + ds1, err := cs.NewDeltaSession(ctx, 1, nil) 466 + if err != nil { 467 + t.Fatal(err) 468 + } 469 + 470 + ds2, err := cs.NewDeltaSession(ctx, 2, nil) 471 + if err != nil { 472 + t.Fatal(err) 473 + } 474 + 475 + ds3, err := cs.NewDeltaSession(ctx, 3, nil) 476 + if err != nil { 477 + t.Fatal(err) 478 + } 479 + 480 + var cids []cid.Cid 481 + var revs []string 482 + for _, ds := range []*DeltaSession{ds1, ds2, ds3} { 483 + ncid, rev, err := setupRepo(ctx, ds, true) 484 + if err != nil { 485 + t.Fatal(err) 486 + } 487 + 488 + if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil { 489 + t.Fatal(err) 490 + } 491 + cids = append(cids, ncid) 492 + revs = append(revs, rev) 493 + } 494 + 495 + var recs []cid.Cid 496 + head := cids[1] 497 + rev := revs[1] 498 + for i := 0; i < 10; i++ { 499 + ds, err := cs.NewDeltaSession(ctx, 2, &rev) 500 + if err != nil { 501 + t.Fatal(err) 502 + } 503 + 504 + rr, err := repo.OpenRepo(ctx, ds, head) 505 + if err != nil { 506 + t.Fatal(err) 507 + } 508 + 509 + rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ 510 + Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), 511 + }) 512 + if err != nil { 513 + t.Fatal(err) 514 + } 515 + 516 + recs = append(recs, rc) 517 + 518 + kmgr := &util.FakeKeyManager{} 519 + nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) 520 + if err != nil { 521 + t.Fatal(err) 522 + } 523 + 524 + rev = nrev 525 + 526 + if err := ds.CalcDiff(ctx, nil); err != nil { 527 + t.Fatal(err) 528 + } 529 + 530 + if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { 531 + t.Fatal(err) 532 + } 533 + 534 + head = nroot 535 + } 536 + 537 + // explicitly update the profile object 538 + { 539 + ds, err := cs.NewDeltaSession(ctx, 2, &rev) 540 + if err != nil { 541 + t.Fatal(err) 542 + } 543 + 544 + rr, err := repo.OpenRepo(ctx, ds, head) 545 + if err != nil { 546 + t.Fatal(err) 547 + } 548 + 549 + desc := "this is so unique" 550 + rc, err := rr.UpdateRecord(ctx, "app.bsky.actor.profile/self", &appbsky.ActorProfile{ 551 + Description: &desc, 552 + }) 553 + if err != nil { 554 + t.Fatal(err) 555 + } 556 + 557 + recs = append(recs, rc) 558 + 559 + kmgr := &util.FakeKeyManager{} 560 + nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) 561 + if err != nil { 562 + t.Fatal(err) 563 + } 564 + 565 + rev = nrev 566 + 567 + if err := ds.CalcDiff(ctx, nil); err != nil { 568 + t.Fatal(err) 569 + } 570 + 571 + if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { 572 + t.Fatal(err) 573 + } 574 + 575 + head = nroot 576 + } 577 + 578 + buf := new(bytes.Buffer) 579 + if err := cs.ReadUserCar(ctx, 2, "", true, buf); err != nil { 580 + t.Fatal(err) 581 + } 582 + checkRepo(t, cs, buf, recs) 583 + }
+24
repo/repo.go
··· 227 227 return k, nil 228 228 } 229 229 230 + func (r *Repo) UpdateRecord(ctx context.Context, rpath string, rec CborMarshaler) (cid.Cid, error) { 231 + ctx, span := otel.Tracer("repo").Start(ctx, "UpdateRecord") 232 + defer span.End() 233 + 234 + r.dirty = true 235 + t, err := r.getMst(ctx) 236 + if err != nil { 237 + return cid.Undef, fmt.Errorf("failed to get mst: %w", err) 238 + } 239 + 240 + k, err := r.cst.Put(ctx, rec) 241 + if err != nil { 242 + return cid.Undef, err 243 + } 244 + 245 + nmst, err := t.Update(ctx, rpath, k) 246 + if err != nil { 247 + return cid.Undef, fmt.Errorf("mst.Add failed: %w", err) 248 + } 249 + 250 + r.mst = nmst 251 + return k, nil 252 + } 253 + 230 254 func (r *Repo) DeleteRecord(ctx context.Context, rpath string) error { 231 255 ctx, span := otel.Tracer("repo").Start(ctx, "DeleteRecord") 232 256 defer span.End()