this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

in postgres use optimized where in query (#681)

We need to find a good way to run these tests on postgres

authored by

Jaz and committed by
GitHub
092ea1cf f07f35dd

+31 -32
+31 -32
carstore/bs.go
··· 10 10 "os" 11 11 "path/filepath" 12 12 "sort" 13 + "strconv" 13 14 "strings" 14 15 "sync" 15 16 "sync/atomic" ··· 32 33 cbg "github.com/whyrusleeping/cbor-gen" 33 34 "go.opentelemetry.io/otel" 34 35 "go.opentelemetry.io/otel/attribute" 36 + "gorm.io/driver/postgres" 35 37 "gorm.io/gorm" 36 38 ) 37 39 ··· 1097 1099 return nil 1098 1100 } 1099 1101 1100 - chunkSize := 10000 1102 + chunkSize := 2000 1101 1103 for i := 0; i < len(shs); i += chunkSize { 1102 1104 sl := shs[i:] 1103 1105 if len(sl) > chunkSize { ··· 1196 1198 return len(cb.shards) == 0 1197 1199 } 1198 1200 1199 - func (cs *CarStore) copyShardBlocksFiltered(ctx context.Context, sh *CarShard, w io.Writer, keep map[cid.Cid]bool) error { 1200 - fi, err := os.Open(sh.Path) 1201 - if err != nil { 1202 - return err 1203 - } 1204 - defer fi.Close() 1205 - 1206 - rr, err := car.NewCarReader(fi) 1207 - if err != nil { 1208 - return err 1209 - } 1210 - 1211 - for { 1212 - blk, err := rr.Next() 1213 - if err != nil { 1214 - return err 1215 - } 1216 - 1217 - if keep[blk.Cid()] { 1218 - _, err := LdWrite(w, blk.Cid().Bytes(), blk.RawData()) 1219 - return err 1220 - } 1221 - } 1222 - } 1223 - 1224 1201 func (cs *CarStore) openNewCompactedShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) { 1225 1202 // TODO: some overwrite protections 1226 1203 fi, err := os.CreateTemp(cs.rootDir, fnameForShard(user, seq)) ··· 1254 1231 1255 1232 span.SetAttributes(attribute.Int("shards", len(shardIds))) 1256 1233 1257 - chunkSize := 10000 1234 + chunkSize := 2000 1258 1235 out := make([]blockRef, 0, len(shardIds)) 1259 1236 for i := 0; i < len(shardIds); i += chunkSize { 1260 1237 sl := shardIds[i:] ··· 1262 1239 sl = sl[:chunkSize] 1263 1240 } 1264 1241 1265 - var brefs []blockRef 1266 - if err := cs.meta.Raw(`select * from block_refs where shard in (?)`, sl).Scan(&brefs).Error; err != nil { 1267 - return nil, err 1242 + if err := blockRefsForShards(ctx, cs.meta, sl, &out); err != nil { 1243 + return nil, fmt.Errorf("getting block refs: %w", err) 1268 1244 } 1269 - 1270 - out = append(out, brefs...) 1271 1245 } 1272 1246 1273 1247 span.SetAttributes(attribute.Int("refs", len(out))) 1274 1248 1275 1249 return out, nil 1250 + } 1251 + 1252 + func valuesStatementForShards(shards []uint) string { 1253 + sb := new(strings.Builder) 1254 + for i, v := range shards { 1255 + sb.WriteByte('(') 1256 + sb.WriteString(strconv.Itoa(int(v))) 1257 + sb.WriteByte(')') 1258 + if i != len(shards)-1 { 1259 + sb.WriteByte(',') 1260 + } 1261 + } 1262 + return sb.String() 1263 + } 1264 + 1265 + func blockRefsForShards(ctx context.Context, db *gorm.DB, shards []uint, obuf *[]blockRef) error { 1266 + // Check the database driver 1267 + switch db.Dialector.(type) { 1268 + case *postgres.Dialector: 1269 + sval := valuesStatementForShards(shards) 1270 + q := fmt.Sprintf(`SELECT block_refs.* FROM block_refs INNER JOIN (VALUES %s) AS vals(v) ON block_refs.shard = v`, sval) 1271 + return db.Raw(q).Scan(obuf).Error 1272 + default: 1273 + return db.Raw(`SELECT * FROM block_refs WHERE shard IN (?)`, shards).Scan(obuf).Error 1274 + } 1276 1275 } 1277 1276 1278 1277 func shardSize(sh *CarShard) (int64, error) {