this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

more trace attributes and some tweaks (#367)

a small grab-bag of changes, most things are either tweaking a batch
size or adding an attribute to a trace.
The one 'real' change here is falling back to an individual block read
instead of a prefetch if the shard in question is over a size limit.

authored by

Whyrusleeping and committed by
GitHub
b1591bca 2222be4d

+44 -5
+6 -1
bgs/bgs.go
··· 744 744 ctx, span := otel.Tracer("bgs").Start(ctx, "handleFedEvent") 745 745 defer span.End() 746 746 747 + start := time.Now() 748 + defer func() { 749 + eventsHandleDuration.WithLabelValues(host.Host).Observe(time.Since(start).Seconds()) 750 + }() 751 + 747 752 eventsReceivedCounter.WithLabelValues(host.Host).Add(1) 748 753 749 754 switch { ··· 1222 1227 1223 1228 results := make(map[models.Uid]*carstore.CompactionStats) 1224 1229 for _, r := range repos { 1225 - st, err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr) 1230 + st, err := bgs.repoman.CarStore().CompactUserShards(context.Background(), r.Usr) 1226 1231 if err != nil { 1227 1232 log.Errorf("failed to compact shards for user %d: %s", r.Usr, err) 1228 1233 continue
+6
bgs/metrics.go
··· 16 16 Help: "The total number of events received", 17 17 }, []string{"pds"}) 18 18 19 + var eventsHandleDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ 20 + Name: "events_handle_duration", 21 + Help: "A histogram of handleFedEvent latencies", 22 + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), 23 + }, []string{"pds"}) 24 + 19 25 var repoCommitsReceivedCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 20 26 Name: "repo_commits_received_counter", 21 27 Help: "The total number of events received",
+28 -3
carstore/bs.go
··· 164 164 } 165 165 } 166 166 167 + const prefetchThreshold = 512 << 10 168 + 167 169 func (uv *userView) prefetchRead(ctx context.Context, k cid.Cid, path string, offset int64) (blockformat.Block, error) { 170 + ctx, span := otel.Tracer("carstore").Start(ctx, "getLastShard") 171 + defer span.End() 172 + 168 173 fi, err := os.Open(path) 169 174 if err != nil { 170 175 return nil, err 171 176 } 172 177 defer fi.Close() 173 178 179 + st, err := fi.Stat() 180 + if err != nil { 181 + return nil, fmt.Errorf("stat file for prefetch: %w", err) 182 + } 183 + 184 + span.SetAttributes(attribute.Int64("shard_size", st.Size())) 185 + 186 + if st.Size() > prefetchThreshold { 187 + span.SetAttributes(attribute.Bool("no_prefetch", true)) 188 + return doBlockRead(fi, k, offset) 189 + } 190 + 174 191 cr, err := car.NewCarReader(fi) 175 192 if err != nil { 176 193 return nil, err ··· 203 220 } 204 221 defer fi.Close() 205 222 223 + return doBlockRead(fi, k, offset) 224 + } 225 + 226 + func doBlockRead(fi *os.File, k cid.Cid, offset int64) (blockformat.Block, error) { 206 227 seeked, err := fi.Seek(offset, io.SeekStart) 207 228 if err != nil { 208 229 return nil, err ··· 724 745 ctx, span := otel.Tracer("carstore").Start(ctx, "createBlockRefs") 725 746 defer span.End() 726 747 727 - if err := createInBatches(ctx, tx, brefs, 1000); err != nil { 748 + if err := createInBatches(ctx, tx, brefs, 2000); err != nil { 728 749 return err 729 750 } 730 751 ··· 1008 1029 return outErr 1009 1030 } 1010 1031 1011 - chunkSize := 100 1032 + chunkSize := 10000 1012 1033 for i := 0; i < len(shs); i += chunkSize { 1013 1034 sl := shs[i:] 1014 1035 if len(sl) > chunkSize { ··· 1163 1184 ctx, span := otel.Tracer("carstore").Start(ctx, "getBlockRefsForShards") 1164 1185 defer span.End() 1165 1186 1187 + span.SetAttributes(attribute.Int("shards", len(shardIds))) 1188 + 1166 1189 chunkSize := 10000 1167 1190 out := make([]blockRef, 0, len(shardIds)) 1168 1191 for i := 0; i < len(shardIds); i += chunkSize { ··· 1178 1201 1179 1202 out = append(out, brefs...) 1180 1203 } 1204 + 1205 + span.SetAttributes(attribute.Int("refs", len(out))) 1181 1206 1182 1207 return out, nil 1183 1208 } ··· 1396 1421 } 1397 1422 } 1398 1423 1399 - chunkSize := 500 1424 + chunkSize := 10000 1400 1425 for i := 0; i < len(staleToDelete); i += chunkSize { 1401 1426 sl := staleToDelete[i:] 1402 1427 if len(sl) > chunkSize {
+1 -1
cmd/gosky/main.go
··· 199 199 Repo: auth.Did, 200 200 Record: &lexutil.LexiconTypeDecoder{&appbsky.FeedPost{ 201 201 Text: text, 202 - CreatedAt: time.Now().Format(util.ISO8601), 202 + CreatedAt: time.Now().UTC().Format(util.ISO8601), 203 203 }}, 204 204 }) 205 205 if err != nil {
+3
repomgr/repomgr.go
··· 25 25 "github.com/ipld/go-car" 26 26 cbg "github.com/whyrusleeping/cbor-gen" 27 27 "go.opentelemetry.io/otel" 28 + "go.opentelemetry.io/otel/attribute" 28 29 "gorm.io/gorm" 29 30 ) 30 31 ··· 489 490 func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, uid models.Uid, did string, since *string, nrev string, carslice []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) error { 490 491 ctx, span := otel.Tracer("repoman").Start(ctx, "HandleExternalUserEvent") 491 492 defer span.End() 493 + 494 + span.SetAttributes(attribute.Int64("uid", int64(uid))) 492 495 493 496 log.Infow("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "since", since, "nrev", nrev) 494 497