this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

improved compaction, minimize shard count while minimizing IO (#328)

The main idea here is to have each shard contain twice the number of
blocks as the next shard, under the logic that older shards will need to
be rewritten less and also accessed less in general.

still to discuss, should there be a max size on shards?

authored by

Whyrusleeping and committed by
GitHub
6442e521 b6a6ed55

+112 -55
+3 -1
bgs/admin.go
··· 414 414 return fmt.Errorf("no such user: %w", err) 415 415 } 416 416 417 - if err := bgs.repoman.CarStore().CompactUserShards(ctx, u.ID); err != nil { 417 + stats, err := bgs.repoman.CarStore().CompactUserShards(ctx, u.ID) 418 + if err != nil { 418 419 return fmt.Errorf("compaction failed: %w", err) 419 420 } 420 421 421 422 return e.JSON(200, map[string]any{ 422 423 "success": "true", 424 + "stats": stats, 423 425 }) 424 426 } 425 427
+1 -1
bgs/bgs.go
··· 1090 1090 } 1091 1091 1092 1092 for _, r := range repos { 1093 - if err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr); err != nil { 1093 + if _, err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr); err != nil { 1094 1094 log.Errorf("failed to compact shards for user %d: %s", r.Usr, err) 1095 1095 continue 1096 1096 }
+102 -51
carstore/bs.go
··· 1015 1015 1016 1016 type shardStat struct { 1017 1017 ID uint 1018 - Seq int 1019 1018 Dirty int 1019 + Seq int 1020 1020 Total int 1021 1021 1022 1022 refs []blockRef ··· 1026 1026 return float64(s.Dirty) / float64(s.Total) 1027 1027 } 1028 1028 1029 - func shouldCompact(s shardStat) bool { 1030 - // if shard is mostly removed blocks 1031 - if s.dirtyFrac() > 0.5 { 1032 - return true 1033 - } 1034 - 1035 - // if its a big shard with a sufficient number of removed blocks 1036 - if s.Dirty > 1000 { 1037 - return true 1038 - } 1039 - 1040 - // if its just rather small and we want to compact it up with other shards 1041 - if s.Total < 20 { 1042 - return true 1043 - } 1044 - 1045 - return false 1046 - } 1047 - 1048 - func aggrRefs(brefs []blockRef, staleCids map[cid.Cid]bool) []shardStat { 1029 + func aggrRefs(brefs []blockRef, shards map[uint]CarShard, staleCids map[cid.Cid]bool) []shardStat { 1049 1030 byId := make(map[uint]*shardStat) 1050 1031 1051 1032 for _, br := range brefs { 1052 1033 s, ok := byId[br.Shard] 1053 1034 if !ok { 1054 1035 s = &shardStat{ 1055 - ID: br.Shard, 1036 + ID: br.Shard, 1037 + Seq: shards[br.Shard].Seq, 1056 1038 } 1057 1039 byId[br.Shard] = s 1058 1040 } ··· 1071 1053 } 1072 1054 1073 1055 sort.Slice(out, func(i, j int) bool { 1074 - return out[i].ID < out[j].ID 1056 + return out[i].Seq < out[j].Seq 1075 1057 }) 1076 1058 1077 1059 return out ··· 1081 1063 shards []shardStat 1082 1064 1083 1065 cleanBlocks int 1066 + expSize int 1067 + } 1068 + 1069 + func (cb *compBucket) shouldCompact() bool { 1070 + if len(cb.shards) == 0 { 1071 + return false 1072 + } 1073 + 1074 + if len(cb.shards) > 5 { 1075 + return true 1076 + } 1077 + 1078 + var frac float64 1079 + for _, s := range cb.shards { 1080 + frac += s.dirtyFrac() 1081 + } 1082 + frac /= float64(len(cb.shards)) 1083 + 1084 + if len(cb.shards) > 3 && frac > 0.2 { 1085 + return true 1086 + } 1087 + 1088 + return frac > 0.4 1084 1089 } 1085 1090 1086 1091 func (cb *compBucket) addShardStat(ss shardStat) { ··· 1144 1149 return targets, nil 1145 1150 } 1146 1151 1147 - func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) error { 1152 + type CompactionStats struct { 1153 + StartShards int `json:"startShards"` 1154 + NewShards int `json:"newShards"` 1155 + SkippedShards int `json:"skippedShards"` 1156 + ShardsDeleted int `json:"shardsDeleted"` 1157 + RefsDeleted int `json:"refsDeleted"` 1158 + } 1159 + 1160 + func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) (*CompactionStats, error) { 1148 1161 ctx, span := otel.Tracer("carstore").Start(ctx, "CompactUserShards") 1149 1162 defer span.End() 1150 1163 ··· 1152 1165 1153 1166 var shards []CarShard 1154 1167 if err := cs.meta.WithContext(ctx).Find(&shards, "usr = ?", user).Error; err != nil { 1155 - return err 1168 + return nil, err 1156 1169 } 1157 1170 1158 1171 var shardIds []uint ··· 1166 1179 } 1167 1180 1168 1181 var brefs []blockRef 1169 - if err := cs.meta.WithContext(ctx).Raw(`select * from block_refs where shard in (?)`, shardIds).Scan(&brefs).Error; err != nil { 1170 - return err 1182 + if err := cs.meta.WithContext(ctx).Raw(`select shard, cid from block_refs where shard in (?)`, shardIds).Scan(&brefs).Error; err != nil { 1183 + return nil, err 1171 1184 } 1172 1185 1173 1186 var staleRefs []staleRef 1174 1187 if err := cs.meta.WithContext(ctx).Find(&staleRefs, "usr = ?", user).Error; err != nil { 1175 - return err 1188 + return nil, err 1176 1189 } 1177 1190 1178 1191 stale := make(map[cid.Cid]bool) ··· 1195 1208 // focus on compacting everything else. it leaves *some* dirty blocks 1196 1209 // still around but we're doing that anyways since compaction isnt a 1197 1210 // perfect process 1198 - return fmt.Errorf("WIP: not currently handling this case") 1211 + return nil, fmt.Errorf("WIP: not currently handling this case") 1199 1212 } 1200 1213 1201 1214 keep := make(map[cid.Cid]bool) ··· 1205 1218 } 1206 1219 } 1207 1220 1208 - results := aggrRefs(brefs, stale) 1221 + results := aggrRefs(brefs, shardsById, stale) 1222 + var sum int 1223 + for _, r := range results { 1224 + sum += r.Total 1225 + } 1226 + 1227 + lowBound := 20 1228 + N := 10 1229 + // we want to *aim* for N shards per user 1230 + // the last several should be left small to allow easy loading from disk 1231 + // for updates (since recent blocks are most likely needed for edits) 1232 + // the beginning of the list should be some sort of exponential fall-off 1233 + // with the area under the curve targeted by the total number of blocks we 1234 + // have 1235 + var threshs []int 1236 + tot := len(brefs) 1237 + for i := 0; i < N; i++ { 1238 + v := tot / 2 1239 + if v < lowBound { 1240 + v = lowBound 1241 + } 1242 + tot = tot / 2 1243 + threshs = append(threshs, v) 1244 + } 1209 1245 1210 1246 thresholdForPosition := func(i int) int { 1211 - // TODO: calculate some curve here so earlier shards end up with more 1212 - // blocks and recent shards end up with less 1213 - return 50 1247 + if i > len(threshs) { 1248 + return lowBound 1249 + } 1250 + return threshs[i] 1214 1251 } 1215 1252 1216 1253 cur := new(compBucket) 1254 + cur.expSize = thresholdForPosition(0) 1217 1255 var compactionQueue []*compBucket 1218 1256 for i, r := range results { 1219 - if shouldCompact(r) { 1220 - if cur.cleanBlocks > thresholdForPosition(i) { 1221 - compactionQueue = append(compactionQueue, cur) 1222 - cur = new(compBucket) 1223 - } 1257 + cur.addShardStat(r) 1224 1258 1225 - cur.addShardStat(r) 1226 - } else { 1227 - if !cur.isEmpty() { 1228 - compactionQueue = append(compactionQueue, cur) 1229 - cur = new(compBucket) 1259 + if cur.cleanBlocks > cur.expSize || i > len(results)-3 { 1260 + compactionQueue = append(compactionQueue, cur) 1261 + cur = &compBucket{ 1262 + expSize: thresholdForPosition(len(compactionQueue)), 1230 1263 } 1231 1264 } 1232 1265 } 1233 - 1234 1266 if !cur.isEmpty() { 1235 1267 compactionQueue = append(compactionQueue, cur) 1236 1268 } 1237 1269 1270 + stats := &CompactionStats{ 1271 + StartShards: len(shards), 1272 + } 1273 + 1238 1274 removedShards := make(map[uint]bool) 1239 1275 for _, b := range compactionQueue { 1276 + if !b.shouldCompact() { 1277 + stats.SkippedShards += len(b.shards) 1278 + continue 1279 + } 1280 + 1240 1281 if err := cs.compactBucket(ctx, user, b, shardsById, keep); err != nil { 1241 - return err 1282 + return nil, err 1242 1283 } 1284 + 1285 + stats.NewShards++ 1243 1286 1244 1287 var todelete []*CarShard 1245 1288 for _, s := range b.shards { 1246 1289 removedShards[s.ID] = true 1247 1290 sh, ok := shardsById[s.ID] 1248 1291 if !ok { 1249 - return fmt.Errorf("missing shard to delete") 1292 + return nil, fmt.Errorf("missing shard to delete") 1250 1293 } 1251 1294 1252 1295 todelete = append(todelete, &sh) 1253 1296 } 1254 1297 1298 + stats.ShardsDeleted += len(todelete) 1255 1299 if err := cs.deleteShards(ctx, todelete); err != nil { 1256 - return fmt.Errorf("deleting shards: %w", err) 1300 + return nil, fmt.Errorf("deleting shards: %w", err) 1257 1301 } 1258 1302 } 1259 1303 1260 1304 // now we need to delete the staleRefs we successfully cleaned up 1261 1305 // we can delete a staleRef if all the shards that have blockRefs with matching stale refs were processed 1262 1306 1263 - return cs.deleteStaleRefs(ctx, brefs, staleRefs, removedShards) 1307 + num, err := cs.deleteStaleRefs(ctx, brefs, staleRefs, removedShards) 1308 + if err != nil { 1309 + return nil, err 1310 + } 1311 + 1312 + stats.RefsDeleted = num 1313 + 1314 + return stats, nil 1264 1315 } 1265 1316 1266 - func (cs *CarStore) deleteStaleRefs(ctx context.Context, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error { 1317 + func (cs *CarStore) deleteStaleRefs(ctx context.Context, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) (int, error) { 1267 1318 ctx, span := otel.Tracer("carstore").Start(ctx, "deleteStaleRefs") 1268 1319 defer span.End() 1269 1320 ··· 1296 1347 } 1297 1348 1298 1349 if err := cs.meta.Delete(&staleRef{}, "id in (?)", sl).Error; err != nil { 1299 - return err 1350 + return 0, err 1300 1351 } 1301 1352 } 1302 1353 1303 - return nil 1354 + return len(staleToDelete), nil 1304 1355 } 1305 1356 1306 1357 func (cs *CarStore) compactBucket(ctx context.Context, user models.Uid, b *compBucket, shardsById map[uint]CarShard, keep map[cid.Cid]bool) error {
+6 -2
carstore/repo_test.go
··· 140 140 } 141 141 checkRepo(t, buf, recs) 142 142 143 - if err := cs.CompactUserShards(ctx, 1); err != nil { 143 + if _, err := cs.CompactUserShards(ctx, 1); err != nil { 144 144 t.Fatal(err) 145 145 } 146 146 ··· 217 217 head = nroot 218 218 } 219 219 fmt.Println("Run compaction", loop) 220 - if err := cs.CompactUserShards(ctx, 1); err != nil { 220 + st, err := cs.CompactUserShards(ctx, 1) 221 + if err != nil { 221 222 t.Fatal(err) 222 223 } 224 + 225 + fmt.Printf("%#v\n", st) 223 226 224 227 buf := new(bytes.Buffer) 225 228 if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { ··· 236 239 } 237 240 238 241 func checkRepo(t *testing.T, r io.Reader, expRecs []cid.Cid) { 242 + t.Helper() 239 243 rep, err := repo.ReadRepoFromCar(context.TODO(), r) 240 244 if err != nil { 241 245 t.Fatal(err)