A very experimental PLC implementation which uses BFT consensus for decentralization
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix multiple DID bloom filter bugs

- Force recreate filter after snapshot application, to avoid inconsistent state as cometbft transitions to block sync / consensus
- Don't save 0xffffffffffffffff bloom filter seq when there are no operations

gbl08ma 905fad65 93104729

+60 -10
+3 -1
abciapp/app.go
··· 124 124 if err != nil { 125 125 return stacktrace.Propagate(err) 126 126 } 127 - return nil 127 + d.logger.Info("Recreating DID bloom filter") 128 + err = d.txFactory.RecreateDIDBloomFilter() 129 + return stacktrace.Propagate(err) 128 130 } 129 131 130 132 d.plc = plc.NewPLC()
+4
abciapp/snapshots.go
··· 189 189 190 190 if d.snapshotApplier.Done() { 191 191 d.snapshotApplier = nil 192 + err = d.txFactory.RecreateDIDBloomFilter() 193 + if err != nil { 194 + return nil, stacktrace.Propagate(err) 195 + } 192 196 } 193 197 194 198 // this is a giant hack but I found no other way to not make the machine run out of space as it applies a snapshot
-2
main.go
··· 93 93 if err != nil { 94 94 log.Fatalf("failed to drop application index database: %v", err) 95 95 } 96 - 97 - os.Remove(didBloomFilterPath) 98 96 } 99 97 100 98 mempoolSubmitter := &txSubmitter{}
+3 -3
store/did_bloom.go
··· 34 34 35 35 var _ transaction.BloomFilterStorage = (*DIDBloomFilterStore)(nil) 36 36 37 - func (s *DIDBloomFilterStore) BuildDIDBloomFilter(tx transaction.Read) (*bloom.BloomFilter, error) { 37 + func (s *DIDBloomFilterStore) BuildDIDBloomFilter(tx transaction.Read, forceRecreate bool) (*bloom.BloomFilter, error) { 38 38 filter, estimatedDIDCount, err := func() (*bloom.BloomFilter, uint64, error) { 39 - if s.filePath == "" { 39 + if s.filePath == "" || forceRecreate { 40 40 return nil, 0, nil 41 41 } 42 42 var f bloom.BloomFilter ··· 97 97 return filter, nil 98 98 } 99 99 100 - filterEstimatedItems := uint(100000000) // we know there are like 80M DIDs at the time of writing 100 + filterEstimatedItems := uint(150000000) // we know there are like 80M DIDs at the time of writing 101 101 if estimatedDIDCount != 0 { 102 102 filterEstimatedItems = max(filterEstimatedItems, uint(estimatedDIDCount*3)) 103 103 }
+50 -4
transaction/factory.go
··· 21 21 } 22 22 23 23 type BloomFilterStorage interface { 24 - BuildDIDBloomFilter(tx Read) (*bloom.BloomFilter, error) 24 + BuildDIDBloomFilter(tx Read, forceRecreate bool) (*bloom.BloomFilter, error) 25 25 StoreDIDBloomFilter(writeBloomTo func(w io.Writer) (uint64, uint64, uint64, error)) error 26 26 } 27 27 ··· 54 54 // in practice we just want to blindly list all DIDs "past present and future", so it doesn't matter what the version is 55 55 // (when doing historical queries, it's fine if the bloom filter claims that a DID already exists when it might not exist yet) 56 56 tx := f.ReadWorking(time.Now()) 57 - f.bloomFilter, err = bloomFilterStorage.BuildDIDBloomFilter(tx) 57 + f.bloomFilter, err = bloomFilterStorage.BuildDIDBloomFilter(tx, false) 58 58 if err != nil { 59 59 return nil, stacktrace.Propagate(err) 60 60 } 61 61 f.bloomSeq, err = f.operationCounter(tx) 62 + if err != nil { 63 + return nil, stacktrace.Propagate(err) 64 + } 62 65 // since the sequenceGetter always returns the next sequence 63 - f.bloomSeq-- 66 + if f.bloomSeq > 0 { 67 + f.bloomSeq-- 68 + } 69 + 70 + err = f.SaveDIDBloomFilter() 64 71 return f, stacktrace.Propagate(err) 65 72 } 66 73 ··· 100 107 return nil, stacktrace.Propagate(err) 101 108 } 102 109 // give the reader an empty tree just to satisfy expectations 103 - tmpTree := iavl.NewMutableTree(db.NewMemDB(), 1, false, iavl.NewNopLogger()) 110 + tmpTree := iavl.NewMutableTree(db.NewMemDB(), 1, true, iavl.NewNopLogger()) 104 111 _, v, err := tmpTree.SaveVersion() 105 112 if err != nil { 106 113 return nil, stacktrace.Propagate(err) ··· 132 139 func (f *Factory) SaveDIDBloomFilter() error { 133 140 f.bloomSaverMu.Lock() 134 141 defer f.bloomSaverMu.Unlock() 142 + return stacktrace.Propagate(f.saveDIDBloomFilterWithinMutex()) 143 + } 135 144 145 + func (f *Factory) saveDIDBloomFilterWithinMutex() error { 136 146 f.bloomMu.RLock() 137 147 shouldSave := f.bloomSeq > f.bloomLastSaveSeq 138 148 f.bloomMu.RUnlock() ··· 172 182 173 183 return nil 174 184 } 185 + 186 + func (f *Factory) RecreateDIDBloomFilter() error { 187 + f.bloomSaverMu.Lock() 188 + defer f.bloomSaverMu.Unlock() 189 + 190 + var err error 191 + err = func() error { 192 + f.bloomMu.Lock() 193 + defer f.bloomMu.Unlock() 194 + // if we use ReadCommitted, it's going to fail when the tree doesn't have versions yet 195 + // in practice we just want to blindly list all DIDs "past present and future", so it doesn't matter what the version is 196 + // (when doing historical queries, it's fine if the bloom filter claims that a DID already exists when it might not exist yet) 197 + tx := f.ReadWorking(time.Now()) 198 + f.bloomFilter, err = f.bloomFilterStorage.BuildDIDBloomFilter(tx, true) 199 + if err != nil { 200 + return stacktrace.Propagate(err) 201 + } 202 + f.bloomSeq, err = f.operationCounter(tx) 203 + if err != nil { 204 + return stacktrace.Propagate(err) 205 + } 206 + // since the sequenceGetter always returns the next sequence 207 + if f.bloomSeq > 0 { 208 + f.bloomSeq-- 209 + } 210 + 211 + f.bloomLastSaveSeq = 0 212 + return nil 213 + }() 214 + if err != nil { 215 + return stacktrace.Propagate(err) 216 + } 217 + 218 + err = f.saveDIDBloomFilterWithinMutex() 219 + return stacktrace.Propagate(err) 220 + }