A very experimental PLC implementation which uses BFT consensus for decentralization
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 220 lines 6.4 kB view raw
1package transaction 2 3import ( 4 "errors" 5 "io" 6 "math" 7 "sync" 8 "time" 9 10 "github.com/bits-and-blooms/bloom/v3" 11 dbm "github.com/cometbft/cometbft-db" 12 "github.com/cosmos/iavl" 13 "github.com/cosmos/iavl/db" 14 "github.com/dgraph-io/badger/v4" 15 "github.com/gbl08ma/stacktrace" 16) 17 18type ExtendedDB interface { 19 dbm.DB 20 IteratorWithOptions(start, end []byte, opts badger.IteratorOptions) (dbm.Iterator, error) 21} 22 23type BloomFilterStorage interface { 24 BuildDIDBloomFilter(tx Read, forceRecreate bool) (*bloom.BloomFilter, error) 25 StoreDIDBloomFilter(writeBloomTo func(w io.Writer) (uint64, uint64, uint64, error)) error 26} 27 28type Factory struct { 29 bloomFilterStorage BloomFilterStorage 30 31 db ExtendedDB 32 mutableTreeMu sync.Mutex 33 mutableTree *iavl.MutableTree 34 operationCounter func(tx Read) (uint64, error) 35 36 bloomMu sync.RWMutex 37 bloomSeq uint64 38 bloomFilter *bloom.BloomFilter 39 40 bloomSaverMu sync.Mutex 41 bloomLastSaveSeq uint64 42} 43 44func NewFactory(tree *iavl.MutableTree, indexDB ExtendedDB, operationCounter func(tx Read) (uint64, error), bloomFilterStorage BloomFilterStorage) (*Factory, error) { 45 f := &Factory{ 46 bloomFilterStorage: bloomFilterStorage, 47 db: indexDB, 48 mutableTree: tree, 49 operationCounter: operationCounter, 50 } 51 52 var err error 53 // if we use ReadCommitted, it's going to fail when the tree doesn't have versions yet 54 // in practice we just want to blindly list all DIDs "past present and future", so it doesn't matter what the version is 55 // (when doing historical queries, it's fine if the bloom filter claims that a DID already exists when it might not exist yet) 56 tx := f.ReadWorking(time.Now()) 57 f.bloomFilter, err = bloomFilterStorage.BuildDIDBloomFilter(tx, false) 58 if err != nil { 59 return nil, stacktrace.Propagate(err) 60 } 61 f.bloomSeq, err = f.operationCounter(tx) 62 if err != nil { 63 return nil, stacktrace.Propagate(err) 64 } 65 // since the sequenceGetter always returns the next sequence 66 if f.bloomSeq > 0 { 67 f.bloomSeq-- 68 } 69 70 err = f.SaveDIDBloomFilter() 71 return f, stacktrace.Propagate(err) 72} 73 74func (f *Factory) ReadWorking(ts time.Time) Read { 75 f.mutableTreeMu.Lock() 76 defer f.mutableTreeMu.Unlock() 77 78 return &readTx{ 79 ts: ts, 80 height: f.mutableTree.WorkingVersion(), 81 mutableTree: f.mutableTree, 82 db: f.db, 83 operationCounter: f.operationCounter, 84 bloomMu: &f.bloomMu, 85 bloomFilter: f.bloomFilter, 86 bloomSeq: &f.bloomSeq, 87 } 88} 89 90func (f *Factory) ReadCommitted() Read { 91 f.mutableTreeMu.Lock() 92 defer f.mutableTreeMu.Unlock() 93 94 tx, err := f.readHeightWithinMu(time.Now(), f.mutableTree.Version()) 95 if err != nil { 96 // this should never happen, it's not worth making the signature of this function more 97 // complex for an error we'll never return unless the ABCI application is yet to be initialized 98 panic(stacktrace.Propagate(err)) 99 } 100 return tx 101} 102 103func (f *Factory) readHeightWithinMu(ts time.Time, height int64) (Read, error) { 104 immutable, err := f.mutableTree.GetImmutable(height) 105 if err != nil { 106 if !errors.Is(err, iavl.ErrVersionDoesNotExist) || height != 0 { 107 return nil, stacktrace.Propagate(err) 108 } 109 // give the reader an empty tree just to satisfy expectations 110 tmpTree := iavl.NewMutableTree(db.NewMemDB(), 1, true, iavl.NewNopLogger()) 111 _, v, err := tmpTree.SaveVersion() 112 if err != nil { 113 return nil, stacktrace.Propagate(err) 114 } 115 immutable, err = tmpTree.GetImmutable(v) 116 if err != nil { 117 return nil, stacktrace.Propagate(err) 118 } 119 } 120 return &readTx{ 121 ts: ts, 122 height: height, 123 tree: AdaptImmutableTree(immutable), 124 db: f.db, 125 operationCounter: f.operationCounter, 126 bloomMu: &f.bloomMu, 127 bloomFilter: f.bloomFilter, 128 }, nil 129} 130 131func (f *Factory) ReadHeight(ts time.Time, height int64) (Read, error) { 132 f.mutableTreeMu.Lock() 133 defer f.mutableTreeMu.Unlock() 134 135 tx, err := f.readHeightWithinMu(ts, height) 136 return tx, stacktrace.Propagate(err) 137} 138 139func (f *Factory) SaveDIDBloomFilter() error { 140 f.bloomSaverMu.Lock() 141 defer f.bloomSaverMu.Unlock() 142 return stacktrace.Propagate(f.saveDIDBloomFilterWithinMutex()) 143} 144 145func (f *Factory) saveDIDBloomFilterWithinMutex() error { 146 f.bloomMu.RLock() 147 shouldSave := f.bloomSeq > f.bloomLastSaveSeq 148 f.bloomMu.RUnlock() 149 if !shouldSave { 150 return nil 151 } 152 153 var savedSeq uint64 154 err := f.bloomFilterStorage.StoreDIDBloomFilter(func(w io.Writer) (uint64, uint64, uint64, error) { 155 f.bloomMu.RLock() 156 defer f.bloomMu.RUnlock() 157 158 _, err := f.bloomFilter.WriteTo(w) 159 savedSeq = f.bloomSeq 160 161 // we reimplement bloomFilter.ApproximatedSize() here, because their version deals poorly with the case where the filter is completely full 162 // (it will return 0) 163 // and it also returns a uint32, which theoretically means we can't have more DIDs than humans on the planet... 164 165 x := float64(f.bloomFilter.BitSet().Count()) 166 m := float64(f.bloomFilter.Cap()) 167 k := float64(f.bloomFilter.K()) 168 size := -1 * m / k * math.Log(1-x/m) / math.Log(math.E) 169 curItemCap := uint64(m * math.Log(2) / k) 170 sizeInt := curItemCap 171 if !math.IsInf(size, 0) { 172 sizeInt = uint64(math.Floor(size + 0.5)) 173 } 174 return f.bloomSeq, sizeInt, curItemCap, stacktrace.Propagate(err) 175 }) 176 177 if err != nil { 178 return stacktrace.Propagate(err) 179 } 180 181 f.bloomLastSaveSeq = savedSeq 182 183 return nil 184} 185 186func (f *Factory) RecreateDIDBloomFilter() error { 187 f.bloomSaverMu.Lock() 188 defer f.bloomSaverMu.Unlock() 189 190 var err error 191 err = func() error { 192 f.bloomMu.Lock() 193 defer f.bloomMu.Unlock() 194 // if we use ReadCommitted, it's going to fail when the tree doesn't have versions yet 195 // in practice we just want to blindly list all DIDs "past present and future", so it doesn't matter what the version is 196 // (when doing historical queries, it's fine if the bloom filter claims that a DID already exists when it might not exist yet) 197 tx := f.ReadWorking(time.Now()) 198 f.bloomFilter, err = f.bloomFilterStorage.BuildDIDBloomFilter(tx, true) 199 if err != nil { 200 return stacktrace.Propagate(err) 201 } 202 f.bloomSeq, err = f.operationCounter(tx) 203 if err != nil { 204 return stacktrace.Propagate(err) 205 } 206 // since the sequenceGetter always returns the next sequence 207 if f.bloomSeq > 0 { 208 f.bloomSeq-- 209 } 210 211 f.bloomLastSaveSeq = 0 212 return nil 213 }() 214 if err != nil { 215 return stacktrace.Propagate(err) 216 } 217 218 err = f.saveDIDBloomFilterWithinMutex() 219 return stacktrace.Propagate(err) 220}