A very experimental PLC implementation which uses BFT consensus for decentralization
1package transaction
2
3import (
4 "errors"
5 "io"
6 "math"
7 "sync"
8 "time"
9
10 "github.com/bits-and-blooms/bloom/v3"
11 dbm "github.com/cometbft/cometbft-db"
12 "github.com/cosmos/iavl"
13 "github.com/cosmos/iavl/db"
14 "github.com/dgraph-io/badger/v4"
15 "github.com/gbl08ma/stacktrace"
16)
17
18type ExtendedDB interface {
19 dbm.DB
20 IteratorWithOptions(start, end []byte, opts badger.IteratorOptions) (dbm.Iterator, error)
21}
22
23type BloomFilterStorage interface {
24 BuildDIDBloomFilter(tx Read, forceRecreate bool) (*bloom.BloomFilter, error)
25 StoreDIDBloomFilter(writeBloomTo func(w io.Writer) (uint64, uint64, uint64, error)) error
26}
27
28type Factory struct {
29 bloomFilterStorage BloomFilterStorage
30
31 db ExtendedDB
32 mutableTreeMu sync.Mutex
33 mutableTree *iavl.MutableTree
34 operationCounter func(tx Read) (uint64, error)
35
36 bloomMu sync.RWMutex
37 bloomSeq uint64
38 bloomFilter *bloom.BloomFilter
39
40 bloomSaverMu sync.Mutex
41 bloomLastSaveSeq uint64
42}
43
44func NewFactory(tree *iavl.MutableTree, indexDB ExtendedDB, operationCounter func(tx Read) (uint64, error), bloomFilterStorage BloomFilterStorage) (*Factory, error) {
45 f := &Factory{
46 bloomFilterStorage: bloomFilterStorage,
47 db: indexDB,
48 mutableTree: tree,
49 operationCounter: operationCounter,
50 }
51
52 var err error
53 // if we use ReadCommitted, it's going to fail when the tree doesn't have versions yet
54 // in practice we just want to blindly list all DIDs "past present and future", so it doesn't matter what the version is
55 // (when doing historical queries, it's fine if the bloom filter claims that a DID already exists when it might not exist yet)
56 tx := f.ReadWorking(time.Now())
57 f.bloomFilter, err = bloomFilterStorage.BuildDIDBloomFilter(tx, false)
58 if err != nil {
59 return nil, stacktrace.Propagate(err)
60 }
61 f.bloomSeq, err = f.operationCounter(tx)
62 if err != nil {
63 return nil, stacktrace.Propagate(err)
64 }
65 // since the sequenceGetter always returns the next sequence
66 if f.bloomSeq > 0 {
67 f.bloomSeq--
68 }
69
70 err = f.SaveDIDBloomFilter()
71 return f, stacktrace.Propagate(err)
72}
73
74func (f *Factory) ReadWorking(ts time.Time) Read {
75 f.mutableTreeMu.Lock()
76 defer f.mutableTreeMu.Unlock()
77
78 return &readTx{
79 ts: ts,
80 height: f.mutableTree.WorkingVersion(),
81 mutableTree: f.mutableTree,
82 db: f.db,
83 operationCounter: f.operationCounter,
84 bloomMu: &f.bloomMu,
85 bloomFilter: f.bloomFilter,
86 bloomSeq: &f.bloomSeq,
87 }
88}
89
90func (f *Factory) ReadCommitted() Read {
91 f.mutableTreeMu.Lock()
92 defer f.mutableTreeMu.Unlock()
93
94 tx, err := f.readHeightWithinMu(time.Now(), f.mutableTree.Version())
95 if err != nil {
96 // this should never happen, it's not worth making the signature of this function more
97 // complex for an error we'll never return unless the ABCI application is yet to be initialized
98 panic(stacktrace.Propagate(err))
99 }
100 return tx
101}
102
103func (f *Factory) readHeightWithinMu(ts time.Time, height int64) (Read, error) {
104 immutable, err := f.mutableTree.GetImmutable(height)
105 if err != nil {
106 if !errors.Is(err, iavl.ErrVersionDoesNotExist) || height != 0 {
107 return nil, stacktrace.Propagate(err)
108 }
109 // give the reader an empty tree just to satisfy expectations
110 tmpTree := iavl.NewMutableTree(db.NewMemDB(), 1, true, iavl.NewNopLogger())
111 _, v, err := tmpTree.SaveVersion()
112 if err != nil {
113 return nil, stacktrace.Propagate(err)
114 }
115 immutable, err = tmpTree.GetImmutable(v)
116 if err != nil {
117 return nil, stacktrace.Propagate(err)
118 }
119 }
120 return &readTx{
121 ts: ts,
122 height: height,
123 tree: AdaptImmutableTree(immutable),
124 db: f.db,
125 operationCounter: f.operationCounter,
126 bloomMu: &f.bloomMu,
127 bloomFilter: f.bloomFilter,
128 }, nil
129}
130
131func (f *Factory) ReadHeight(ts time.Time, height int64) (Read, error) {
132 f.mutableTreeMu.Lock()
133 defer f.mutableTreeMu.Unlock()
134
135 tx, err := f.readHeightWithinMu(ts, height)
136 return tx, stacktrace.Propagate(err)
137}
138
139func (f *Factory) SaveDIDBloomFilter() error {
140 f.bloomSaverMu.Lock()
141 defer f.bloomSaverMu.Unlock()
142 return stacktrace.Propagate(f.saveDIDBloomFilterWithinMutex())
143}
144
145func (f *Factory) saveDIDBloomFilterWithinMutex() error {
146 f.bloomMu.RLock()
147 shouldSave := f.bloomSeq > f.bloomLastSaveSeq
148 f.bloomMu.RUnlock()
149 if !shouldSave {
150 return nil
151 }
152
153 var savedSeq uint64
154 err := f.bloomFilterStorage.StoreDIDBloomFilter(func(w io.Writer) (uint64, uint64, uint64, error) {
155 f.bloomMu.RLock()
156 defer f.bloomMu.RUnlock()
157
158 _, err := f.bloomFilter.WriteTo(w)
159 savedSeq = f.bloomSeq
160
161 // we reimplement bloomFilter.ApproximatedSize() here, because their version deals poorly with the case where the filter is completely full
162 // (it will return 0)
163 // and it also returns a uint32, which theoretically means we can't have more DIDs than humans on the planet...
164
165 x := float64(f.bloomFilter.BitSet().Count())
166 m := float64(f.bloomFilter.Cap())
167 k := float64(f.bloomFilter.K())
168 size := -1 * m / k * math.Log(1-x/m) / math.Log(math.E)
169 curItemCap := uint64(m * math.Log(2) / k)
170 sizeInt := curItemCap
171 if !math.IsInf(size, 0) {
172 sizeInt = uint64(math.Floor(size + 0.5))
173 }
174 return f.bloomSeq, sizeInt, curItemCap, stacktrace.Propagate(err)
175 })
176
177 if err != nil {
178 return stacktrace.Propagate(err)
179 }
180
181 f.bloomLastSaveSeq = savedSeq
182
183 return nil
184}
185
186func (f *Factory) RecreateDIDBloomFilter() error {
187 f.bloomSaverMu.Lock()
188 defer f.bloomSaverMu.Unlock()
189
190 var err error
191 err = func() error {
192 f.bloomMu.Lock()
193 defer f.bloomMu.Unlock()
194 // if we use ReadCommitted, it's going to fail when the tree doesn't have versions yet
195 // in practice we just want to blindly list all DIDs "past present and future", so it doesn't matter what the version is
196 // (when doing historical queries, it's fine if the bloom filter claims that a DID already exists when it might not exist yet)
197 tx := f.ReadWorking(time.Now())
198 f.bloomFilter, err = f.bloomFilterStorage.BuildDIDBloomFilter(tx, true)
199 if err != nil {
200 return stacktrace.Propagate(err)
201 }
202 f.bloomSeq, err = f.operationCounter(tx)
203 if err != nil {
204 return stacktrace.Propagate(err)
205 }
206 // since the sequenceGetter always returns the next sequence
207 if f.bloomSeq > 0 {
208 f.bloomSeq--
209 }
210
211 f.bloomLastSaveSeq = 0
212 return nil
213 }()
214 if err != nil {
215 return stacktrace.Propagate(err)
216 }
217
218 err = f.saveDIDBloomFilterWithinMutex()
219 return stacktrace.Propagate(err)
220}