this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Starting to build out compactor daemon (#390)

Daemonizing the Compactor

Design goals:
- Always be running compactions
- Enqueue repos eagerly based on activity, more active repos should be
compacted more frequently
- Easily re-compact all repos
- Get state of currently running compaction
- Verbose output from compactions with stats and timing etc.

authored by

Jaz and committed by
GitHub
5a338fa2 3f1248f9

+405 -75
+5 -8
bgs/admin.go
··· 431 431 ctx, span := otel.Tracer("bgs").Start(context.Background(), "adminCompactAllRepos") 432 432 defer span.End() 433 433 434 - var dry bool 435 - if strings.ToLower(e.QueryParam("dry")) == "true" { 436 - dry = true 437 - } 438 - 439 434 var fast bool 440 435 if strings.ToLower(e.QueryParam("fast")) == "true" { 441 436 fast = true ··· 451 446 lim = v 452 447 } 453 448 454 - stats, err := bgs.runRepoCompaction(ctx, lim, dry, fast) 449 + err := bgs.compactor.EnqueueAllRepos(ctx, bgs, lim, 0, fast) 455 450 if err != nil { 456 - return fmt.Errorf("compaction run failed: %w", err) 451 + return echo.NewHTTPError(http.StatusInternalServerError, fmt.Errorf("failed to enqueue all repos: %w", err)) 457 452 } 458 453 459 - return e.JSON(200, stats) 454 + return e.JSON(200, map[string]any{ 455 + "success": "true", 456 + }) 460 457 } 461 458 462 459 func (bgs *BGS) handleAdminPostResyncPDS(e echo.Context) error {
+26 -62
bgs/bgs.go
··· 84 84 // Management of Resyncs 85 85 pdsResyncsLk sync.RWMutex 86 86 pdsResyncs map[uint]*PDSResync 87 + 88 + // Management of Compaction 89 + compactor *Compactor 87 90 } 88 91 89 92 type PDSResync struct { ··· 139 142 if err := bgs.slurper.RestartAll(); err != nil { 140 143 return nil, err 141 144 } 145 + 146 + compactor := NewCompactor(nil) 147 + compactor.Start(bgs) 148 + bgs.compactor = compactor 142 149 143 150 return bgs, nil 144 151 } ··· 362 369 if err := bgs.events.Shutdown(context.TODO()); err != nil { 363 370 errs = append(errs, err) 364 371 } 372 + 373 + bgs.compactor.Shutdown() 365 374 366 375 return errs 367 376 } ··· 733 742 return &u, nil 734 743 } 735 744 745 + func (bgs *BGS) lookupUserByUID(ctx context.Context, uid models.Uid) (*User, error) { 746 + ctx, span := otel.Tracer("bgs").Start(ctx, "lookupUserByUID") 747 + defer span.End() 748 + 749 + var u User 750 + if err := bgs.db.Find(&u, "id = ?", uid).Error; err != nil { 751 + return nil, err 752 + } 753 + 754 + if u.ID == 0 { 755 + return nil, gorm.ErrRecordNotFound 756 + } 757 + 758 + return &u, nil 759 + } 760 + 736 761 func stringLink(lnk *lexutil.LexLink) string { 737 762 if lnk == nil { 738 763 return "<nil>" ··· 783 808 return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, host.Host) 784 809 } 785 810 786 - if host.ID != u.PDS { 811 + if host.ID != u.PDS && u.PDS != 0 { 787 812 log.Warnw("received event for repo from different pds than expected", "repo", evt.Repo, "expPds", u.PDS, "gotPds", host.Host) 788 813 subj, err := bgs.createExternalUser(ctx, evt.Repo) 789 814 if err != nil { ··· 1200 1225 } 1201 1226 1202 1227 return nil 1203 - } 1204 - 1205 - type compactionStats struct { 1206 - Completed map[models.Uid]*carstore.CompactionStats 1207 - Targets []carstore.CompactionTarget 1208 - } 1209 - 1210 - func (bgs *BGS) runRepoCompaction(ctx context.Context, lim int, dry bool, fast bool) (*compactionStats, error) { 1211 - ctx, span := otel.Tracer("bgs").Start(ctx, "runRepoCompaction") 1212 - defer span.End() 1213 - 1214 - log.Warn("starting repo compaction") 1215 - 1216 - runStart := time.Now() 1217 - 1218 - repos, err := bgs.repoman.CarStore().GetCompactionTargets(ctx, 50) 1219 - if err != nil { 1220 - return nil, fmt.Errorf("failed to get repos to compact: %w", err) 1221 - } 1222 - 1223 - if lim > 0 && len(repos) > lim { 1224 - repos = repos[:lim] 1225 - } 1226 - 1227 - if dry { 1228 - return &compactionStats{ 1229 - Targets: repos, 1230 - }, nil 1231 - } 1232 - 1233 - results := make(map[models.Uid]*carstore.CompactionStats) 1234 - for i, r := range repos { 1235 - select { 1236 - case <-ctx.Done(): 1237 - return &compactionStats{ 1238 - Targets: repos, 1239 - Completed: results, 1240 - }, nil 1241 - default: 1242 - } 1243 - 1244 - repostart := time.Now() 1245 - st, err := bgs.repoman.CarStore().CompactUserShards(context.Background(), r.Usr, fast) 1246 - if err != nil { 1247 - log.Errorf("failed to compact shards for user %d: %s", r.Usr, err) 1248 - continue 1249 - } 1250 - compactionDuration.Observe(time.Since(repostart).Seconds()) 1251 - results[r.Usr] = st 1252 - 1253 - if i%100 == 0 { 1254 - log.Warnf("compacted %d repos in %s", i+1, time.Since(runStart)) 1255 - } 1256 - } 1257 - 1258 - log.Warnf("compacted %d repos in %s", len(repos), time.Since(runStart)) 1259 - 1260 - return &compactionStats{ 1261 - Targets: repos, 1262 - Completed: results, 1263 - }, nil 1264 1228 } 1265 1229 1266 1230 type revCheckResult struct {
+365
bgs/compactor.go
··· 1 + package bgs 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "sync" 7 + "time" 8 + 9 + "github.com/bluesky-social/indigo/carstore" 10 + "github.com/bluesky-social/indigo/models" 11 + "go.opentelemetry.io/otel" 12 + "go.opentelemetry.io/otel/attribute" 13 + ) 14 + 15 + type queueItem struct { 16 + uid models.Uid 17 + fast bool 18 + } 19 + 20 + // uniQueue is a queue that only allows one instance of a given uid 21 + type uniQueue struct { 22 + q []queueItem 23 + members map[models.Uid]struct{} 24 + lk sync.Mutex 25 + } 26 + 27 + // Append appends a uid to the end of the queue if it doesn't already exist 28 + func (q *uniQueue) Append(uid models.Uid, fast bool) { 29 + q.lk.Lock() 30 + defer q.lk.Unlock() 31 + 32 + if _, ok := q.members[uid]; ok { 33 + return 34 + } 35 + 36 + q.q = append(q.q, queueItem{uid: uid, fast: fast}) 37 + q.members[uid] = struct{}{} 38 + compactionQueueDepth.Inc() 39 + } 40 + 41 + // Prepend prepends a uid to the beginning of the queue if it doesn't already exist 42 + func (q *uniQueue) Prepend(uid models.Uid, fast bool) { 43 + q.lk.Lock() 44 + defer q.lk.Unlock() 45 + 46 + if _, ok := q.members[uid]; ok { 47 + return 48 + } 49 + 50 + q.q = append([]queueItem{{uid: uid, fast: fast}}, q.q...) 51 + q.members[uid] = struct{}{} 52 + compactionQueueDepth.Inc() 53 + } 54 + 55 + // Has returns true if the queue contains the given uid 56 + func (q *uniQueue) Has(uid models.Uid) bool { 57 + q.lk.Lock() 58 + defer q.lk.Unlock() 59 + 60 + _, ok := q.members[uid] 61 + return ok 62 + } 63 + 64 + // Remove removes the given uid from the queue 65 + func (q *uniQueue) Remove(uid models.Uid) { 66 + q.lk.Lock() 67 + defer q.lk.Unlock() 68 + 69 + if _, ok := q.members[uid]; !ok { 70 + return 71 + } 72 + 73 + for i, item := range q.q { 74 + if item.uid == uid { 75 + q.q = append(q.q[:i], q.q[i+1:]...) 76 + break 77 + } 78 + } 79 + 80 + delete(q.members, uid) 81 + compactionQueueDepth.Dec() 82 + } 83 + 84 + // Pop pops the first item off the front of the queue 85 + func (q *uniQueue) Pop() (*queueItem, bool) { 86 + q.lk.Lock() 87 + defer q.lk.Unlock() 88 + 89 + if len(q.q) == 0 { 90 + return nil, false 91 + } 92 + 93 + item := q.q[0] 94 + q.q = q.q[1:] 95 + delete(q.members, item.uid) 96 + 97 + compactionQueueDepth.Dec() 98 + return &item, true 99 + } 100 + 101 + type CompactorState struct { 102 + latestUID models.Uid 103 + latestDID string 104 + status string 105 + stats *carstore.CompactionStats 106 + } 107 + 108 + // Compactor is a compactor daemon that compacts repos in the background 109 + type Compactor struct { 110 + q *uniQueue 111 + state *CompactorState 112 + stateLk sync.RWMutex 113 + exit chan struct{} 114 + exited chan struct{} 115 + requeueInterval time.Duration 116 + requeueLimit int 117 + requeueShardCount int 118 + requeueFast bool 119 + } 120 + 121 + type CompactorOptions struct { 122 + RequeueInterval time.Duration 123 + RequeueLimit int 124 + RequeueShardCount int 125 + RequeueFast bool 126 + } 127 + 128 + func DefaultCompactorOptions() *CompactorOptions { 129 + return &CompactorOptions{ 130 + RequeueInterval: time.Hour * 4, 131 + RequeueLimit: 0, 132 + RequeueShardCount: 50, 133 + RequeueFast: true, 134 + } 135 + } 136 + 137 + func NewCompactor(opts *CompactorOptions) *Compactor { 138 + if opts == nil { 139 + opts = DefaultCompactorOptions() 140 + } 141 + 142 + return &Compactor{ 143 + q: &uniQueue{ 144 + members: make(map[models.Uid]struct{}), 145 + }, 146 + state: &CompactorState{}, 147 + exit: make(chan struct{}), 148 + exited: make(chan struct{}), 149 + requeueInterval: opts.RequeueInterval, 150 + requeueLimit: opts.RequeueLimit, 151 + requeueFast: opts.RequeueFast, 152 + requeueShardCount: opts.RequeueShardCount, 153 + } 154 + } 155 + 156 + type compactionStats struct { 157 + Completed map[models.Uid]*carstore.CompactionStats 158 + Targets []carstore.CompactionTarget 159 + } 160 + 161 + func (c *Compactor) SetState(uid models.Uid, did, status string, stats *carstore.CompactionStats) { 162 + c.stateLk.Lock() 163 + defer c.stateLk.Unlock() 164 + 165 + c.state.latestUID = uid 166 + c.state.latestDID = did 167 + c.state.status = status 168 + c.state.stats = stats 169 + } 170 + 171 + func (c *Compactor) GetState() *CompactorState { 172 + c.stateLk.RLock() 173 + defer c.stateLk.RUnlock() 174 + 175 + return &CompactorState{ 176 + latestUID: c.state.latestUID, 177 + latestDID: c.state.latestDID, 178 + status: c.state.status, 179 + stats: c.state.stats, 180 + } 181 + } 182 + 183 + var errNoReposToCompact = fmt.Errorf("no repos to compact") 184 + 185 + // Start starts the compactor 186 + func (c *Compactor) Start(bgs *BGS) { 187 + log.Info("starting compactor") 188 + go c.doWork(bgs) 189 + go func() { 190 + log.Infow("starting compactor requeue routine", 191 + "interval", c.requeueInterval, 192 + "limit", c.requeueLimit, 193 + "shardCount", c.requeueShardCount, 194 + "fast", c.requeueFast, 195 + ) 196 + 197 + // Enqueue all repos on startup 198 + ctx := context.Background() 199 + ctx, span := otel.Tracer("compactor").Start(ctx, "RequeueRoutine") 200 + if err := c.EnqueueAllRepos(ctx, bgs, c.requeueLimit, c.requeueShardCount, c.requeueFast); err != nil { 201 + log.Errorw("failed to enqueue all repos", "err", err) 202 + } 203 + span.End() 204 + 205 + t := time.NewTicker(c.requeueInterval) 206 + for { 207 + select { 208 + case <-c.exit: 209 + return 210 + case <-t.C: 211 + ctx := context.Background() 212 + ctx, span := otel.Tracer("compactor").Start(ctx, "RequeueRoutine") 213 + if err := c.EnqueueAllRepos(ctx, bgs, c.requeueLimit, c.requeueShardCount, c.requeueFast); err != nil { 214 + log.Errorw("failed to enqueue all repos", "err", err) 215 + } 216 + span.End() 217 + } 218 + } 219 + }() 220 + } 221 + 222 + // Shutdown shuts down the compactor 223 + func (c *Compactor) Shutdown() { 224 + log.Info("stopping compactor") 225 + close(c.exit) 226 + <-c.exited 227 + log.Info("compactor stopped") 228 + } 229 + 230 + func (c *Compactor) doWork(bgs *BGS) { 231 + for { 232 + select { 233 + case <-c.exit: 234 + log.Info("compactor worker exiting, no more active compactions running") 235 + close(c.exited) 236 + return 237 + default: 238 + } 239 + 240 + ctx := context.Background() 241 + start := time.Now() 242 + state, err := c.compactNext(ctx, bgs) 243 + if err != nil { 244 + if err == errNoReposToCompact { 245 + log.Debug("no repos to compact, waiting and retrying") 246 + time.Sleep(time.Second * 5) 247 + continue 248 + } 249 + log.Errorw("failed to compact repo", 250 + "err", err, 251 + "uid", state.latestUID, 252 + "repo", state.latestDID, 253 + "status", state.status, 254 + "stats", state.stats, 255 + "duration", time.Since(start), 256 + ) 257 + // Pause for a bit to avoid spamming failed compactions 258 + time.Sleep(time.Millisecond * 100) 259 + } else { 260 + log.Infow("compacted repo", 261 + "uid", state.latestUID, 262 + "repo", state.latestDID, 263 + "status", state.status, 264 + "stats", state.stats, 265 + "duration", time.Since(start), 266 + ) 267 + } 268 + } 269 + } 270 + 271 + func (c *Compactor) compactNext(ctx context.Context, bgs *BGS) (*CompactorState, error) { 272 + ctx, span := otel.Tracer("compactor").Start(ctx, "CompactNext") 273 + defer span.End() 274 + 275 + item, ok := c.q.Pop() 276 + if !ok || item == nil { 277 + return nil, errNoReposToCompact 278 + } 279 + 280 + c.SetState(item.uid, "unknown", "getting_user", nil) 281 + 282 + user, err := bgs.lookupUserByUID(ctx, item.uid) 283 + if err != nil { 284 + span.RecordError(err) 285 + c.SetState(item.uid, "unknown", "failed_getting_user", nil) 286 + return nil, fmt.Errorf("failed to get user %d: %w", item.uid, err) 287 + } 288 + 289 + span.SetAttributes(attribute.String("repo", user.Did), attribute.Int("uid", int(item.uid))) 290 + 291 + c.SetState(item.uid, user.Did, "compacting", nil) 292 + 293 + start := time.Now() 294 + st, err := bgs.repoman.CarStore().CompactUserShards(ctx, item.uid, item.fast) 295 + if err != nil { 296 + span.RecordError(err) 297 + c.SetState(item.uid, user.Did, "failed_compacting", nil) 298 + return nil, fmt.Errorf("failed to compact shards for user %d: %w", item.uid, err) 299 + } 300 + compactionDuration.Observe(time.Since(start).Seconds()) 301 + 302 + span.SetAttributes( 303 + attribute.Int("shards.deleted", st.ShardsDeleted), 304 + attribute.Int("shards.new", st.NewShards), 305 + attribute.Int("dupes", st.DupeCount), 306 + attribute.Int("shards.skipped", st.SkippedShards), 307 + attribute.Int("refs", st.TotalRefs), 308 + ) 309 + 310 + c.SetState(item.uid, user.Did, "done", st) 311 + 312 + return c.GetState(), nil 313 + } 314 + 315 + func (c *Compactor) EnqueueRepo(ctx context.Context, user User, fast bool) { 316 + ctx, span := otel.Tracer("compactor").Start(ctx, "EnqueueRepo") 317 + defer span.End() 318 + log.Infow("enqueueing compaction for repo", "repo", user.Did, "uid", user.ID, "fast", fast) 319 + c.q.Append(user.ID, fast) 320 + } 321 + 322 + // EnqueueAllRepos enqueues all repos for compaction 323 + // lim is the maximum number of repos to enqueue 324 + // shardCount is the number of shards to compact per user (0 = default of 50) 325 + // fast is whether to use the fast compaction method (skip large shards) 326 + func (c *Compactor) EnqueueAllRepos(ctx context.Context, bgs *BGS, lim int, shardCount int, fast bool) error { 327 + ctx, span := otel.Tracer("compactor").Start(ctx, "EnqueueAllRepos") 328 + defer span.End() 329 + 330 + span.SetAttributes( 331 + attribute.Int("lim", lim), 332 + attribute.Int("shardCount", shardCount), 333 + attribute.Bool("fast", fast), 334 + ) 335 + 336 + if shardCount == 0 { 337 + shardCount = 50 338 + } 339 + 340 + span.SetAttributes(attribute.Int("clampedShardCount", shardCount)) 341 + 342 + log := log.With("source", "compactor_enqueue_all_repos", "lim", lim, "shardCount", shardCount, "fast", fast) 343 + log.Info("enqueueing all repos") 344 + 345 + repos, err := bgs.repoman.CarStore().GetCompactionTargets(ctx, shardCount) 346 + if err != nil { 347 + return fmt.Errorf("failed to get repos to compact: %w", err) 348 + } 349 + 350 + span.SetAttributes(attribute.Int("repos", len(repos))) 351 + 352 + if lim > 0 && len(repos) > lim { 353 + repos = repos[:lim] 354 + } 355 + 356 + span.SetAttributes(attribute.Int("clampedRepos", len(repos))) 357 + 358 + for _, r := range repos { 359 + c.q.Append(r.Usr, fast) 360 + } 361 + 362 + log.Infow("done enqueueing all repos", "repos_enqueued", len(repos)) 363 + 364 + return nil 365 + }
+5
bgs/metrics.go
··· 48 48 Buckets: prometheus.ExponentialBuckets(0.001, 3, 14), 49 49 }) 50 50 51 + var compactionQueueDepth = promauto.NewGauge(prometheus.GaugeOpts{ 52 + Name: "compaction_queue_depth", 53 + Help: "The current depth of the compaction queue", 54 + }) 55 + 51 56 var newUsersDiscovered = promauto.NewCounter(prometheus.CounterOpts{ 52 57 Name: "bgs_new_users_discovered", 53 58 Help: "The total number of new users discovered directly from the firehose (not from refs)",
+1 -1
carstore/bs.go
··· 1370 1370 // still around but we're doing that anyways since compaction isnt a 1371 1371 // perfect process 1372 1372 1373 - log.Warnw("repo has dirty dupes", "count", len(dupes), "uid", user, "staleRefs", len(staleRefs), "blockRefs", len(brefs)) 1373 + log.Debugw("repo has dirty dupes", "count", len(dupes), "uid", user, "staleRefs", len(staleRefs), "blockRefs", len(brefs)) 1374 1374 1375 1375 //return nil, fmt.Errorf("WIP: not currently handling this case") 1376 1376 }
+2 -2
cmd/bigsky/main.go
··· 11 11 "time" 12 12 13 13 "github.com/bluesky-social/indigo/api" 14 - "github.com/bluesky-social/indigo/bgs" 14 + libbgs "github.com/bluesky-social/indigo/bgs" 15 15 "github.com/bluesky-social/indigo/blobs" 16 16 "github.com/bluesky-social/indigo/carstore" 17 17 "github.com/bluesky-social/indigo/did" ··· 335 335 } 336 336 337 337 log.Infow("constructing bgs") 338 - bgs, err := bgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, hr, !cctx.Bool("crawl-insecure-ws")) 338 + bgs, err := libbgs.NewBGS(db, ix, repoman, evtman, cachedidr, blobstore, hr, !cctx.Bool("crawl-insecure-ws")) 339 339 if err != nil { 340 340 return err 341 341 }
+1 -2
util/http.go
··· 27 27 l.inner.Infow(msg, keysAndValues...) 28 28 } 29 29 30 - // re-writes HTTP client DEBUG to INFO level (this is where retry is logged) 31 30 func (l LeveledZap) Debug(msg string, keysAndValues ...interface{}) { 32 - l.inner.Infow(msg, keysAndValues...) 31 + l.inner.Debugw(msg, keysAndValues...) 33 32 } 34 33 35 34 // Generates an HTTP client with decent general-purpose defaults around