package transaction import ( "errors" "io" "math" "sync" "time" "github.com/bits-and-blooms/bloom/v3" dbm "github.com/cometbft/cometbft-db" "github.com/cosmos/iavl" "github.com/cosmos/iavl/db" "github.com/dgraph-io/badger/v4" "github.com/gbl08ma/stacktrace" ) type ExtendedDB interface { dbm.DB IteratorWithOptions(start, end []byte, opts badger.IteratorOptions) (dbm.Iterator, error) } type BloomFilterStorage interface { BuildDIDBloomFilter(tx Read, forceRecreate bool) (*bloom.BloomFilter, error) StoreDIDBloomFilter(writeBloomTo func(w io.Writer) (uint64, uint64, uint64, error)) error } type Factory struct { bloomFilterStorage BloomFilterStorage db ExtendedDB mutableTreeMu sync.Mutex mutableTree *iavl.MutableTree operationCounter func(tx Read) (uint64, error) bloomMu sync.RWMutex bloomSeq uint64 bloomFilter *bloom.BloomFilter bloomSaverMu sync.Mutex bloomLastSaveSeq uint64 } func NewFactory(tree *iavl.MutableTree, indexDB ExtendedDB, operationCounter func(tx Read) (uint64, error), bloomFilterStorage BloomFilterStorage) (*Factory, error) { f := &Factory{ bloomFilterStorage: bloomFilterStorage, db: indexDB, mutableTree: tree, operationCounter: operationCounter, } var err error // if we use ReadCommitted, it's going to fail when the tree doesn't have versions yet // in practice we just want to blindly list all DIDs "past present and future", so it doesn't matter what the version is // (when doing historical queries, it's fine if the bloom filter claims that a DID already exists when it might not exist yet) tx := f.ReadWorking(time.Now()) f.bloomFilter, err = bloomFilterStorage.BuildDIDBloomFilter(tx, false) if err != nil { return nil, stacktrace.Propagate(err) } f.bloomSeq, err = f.operationCounter(tx) if err != nil { return nil, stacktrace.Propagate(err) } // since the sequenceGetter always returns the next sequence if f.bloomSeq > 0 { f.bloomSeq-- } err = f.SaveDIDBloomFilter() return f, stacktrace.Propagate(err) } func (f *Factory) ReadWorking(ts time.Time) Read { f.mutableTreeMu.Lock() defer f.mutableTreeMu.Unlock() return &readTx{ ts: ts, height: f.mutableTree.WorkingVersion(), mutableTree: f.mutableTree, db: f.db, operationCounter: f.operationCounter, bloomMu: &f.bloomMu, bloomFilter: f.bloomFilter, bloomSeq: &f.bloomSeq, } } func (f *Factory) ReadCommitted() Read { f.mutableTreeMu.Lock() defer f.mutableTreeMu.Unlock() tx, err := f.readHeightWithinMu(time.Now(), f.mutableTree.Version()) if err != nil { // this should never happen, it's not worth making the signature of this function more // complex for an error we'll never return unless the ABCI application is yet to be initialized panic(stacktrace.Propagate(err)) } return tx } func (f *Factory) readHeightWithinMu(ts time.Time, height int64) (Read, error) { immutable, err := f.mutableTree.GetImmutable(height) if err != nil { if !errors.Is(err, iavl.ErrVersionDoesNotExist) || height != 0 { return nil, stacktrace.Propagate(err) } // give the reader an empty tree just to satisfy expectations tmpTree := iavl.NewMutableTree(db.NewMemDB(), 1, true, iavl.NewNopLogger()) _, v, err := tmpTree.SaveVersion() if err != nil { return nil, stacktrace.Propagate(err) } immutable, err = tmpTree.GetImmutable(v) if err != nil { return nil, stacktrace.Propagate(err) } } return &readTx{ ts: ts, height: height, tree: AdaptImmutableTree(immutable), db: f.db, operationCounter: f.operationCounter, bloomMu: &f.bloomMu, bloomFilter: f.bloomFilter, }, nil } func (f *Factory) ReadHeight(ts time.Time, height int64) (Read, error) { f.mutableTreeMu.Lock() defer f.mutableTreeMu.Unlock() tx, err := f.readHeightWithinMu(ts, height) return tx, stacktrace.Propagate(err) } func (f *Factory) SaveDIDBloomFilter() error { f.bloomSaverMu.Lock() defer f.bloomSaverMu.Unlock() return stacktrace.Propagate(f.saveDIDBloomFilterWithinMutex()) } func (f *Factory) saveDIDBloomFilterWithinMutex() error { f.bloomMu.RLock() shouldSave := f.bloomSeq > f.bloomLastSaveSeq f.bloomMu.RUnlock() if !shouldSave { return nil } var savedSeq uint64 err := f.bloomFilterStorage.StoreDIDBloomFilter(func(w io.Writer) (uint64, uint64, uint64, error) { f.bloomMu.RLock() defer f.bloomMu.RUnlock() _, err := f.bloomFilter.WriteTo(w) savedSeq = f.bloomSeq // we reimplement bloomFilter.ApproximatedSize() here, because their version deals poorly with the case where the filter is completely full // (it will return 0) // and it also returns a uint32, which theoretically means we can't have more DIDs than humans on the planet... x := float64(f.bloomFilter.BitSet().Count()) m := float64(f.bloomFilter.Cap()) k := float64(f.bloomFilter.K()) size := -1 * m / k * math.Log(1-x/m) / math.Log(math.E) curItemCap := uint64(m * math.Log(2) / k) sizeInt := curItemCap if !math.IsInf(size, 0) { sizeInt = uint64(math.Floor(size + 0.5)) } return f.bloomSeq, sizeInt, curItemCap, stacktrace.Propagate(err) }) if err != nil { return stacktrace.Propagate(err) } f.bloomLastSaveSeq = savedSeq return nil } func (f *Factory) RecreateDIDBloomFilter() error { f.bloomSaverMu.Lock() defer f.bloomSaverMu.Unlock() var err error err = func() error { f.bloomMu.Lock() defer f.bloomMu.Unlock() // if we use ReadCommitted, it's going to fail when the tree doesn't have versions yet // in practice we just want to blindly list all DIDs "past present and future", so it doesn't matter what the version is // (when doing historical queries, it's fine if the bloom filter claims that a DID already exists when it might not exist yet) tx := f.ReadWorking(time.Now()) f.bloomFilter, err = f.bloomFilterStorage.BuildDIDBloomFilter(tx, true) if err != nil { return stacktrace.Propagate(err) } f.bloomSeq, err = f.operationCounter(tx) if err != nil { return stacktrace.Propagate(err) } // since the sequenceGetter always returns the next sequence if f.bloomSeq > 0 { f.bloomSeq-- } f.bloomLastSaveSeq = 0 return nil }() if err != nil { return stacktrace.Propagate(err) } err = f.saveDIDBloomFilterWithinMutex() return stacktrace.Propagate(err) }