A very experimental PLC implementation which uses BFT consensus for decentralization
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

Bug fixes and improvements

- Fix major data corruption by removing Next skip from iterator adapter (a hack that was added for leveldb)
- Serialize and restore DID bloom filter
- Move stuff around, clean up test helpers

gbl08ma e414a398 e6c75929

+559 -285
+29 -13
abciapp/app.go
··· 4 4 "context" 5 5 "fmt" 6 6 "os" 7 - "path/filepath" 8 7 "sync" 9 8 "time" 10 9 ··· 14 13 "github.com/klauspost/compress/zstd" 15 14 "github.com/palantir/stacktrace" 16 15 "github.com/samber/lo" 17 - "tangled.org/gbl08ma.com/didplcbft/dbadapter" 18 - "tangled.org/gbl08ma.com/didplcbft/dbadapter/zstddict" 16 + "tangled.org/gbl08ma.com/didplcbft/dbmtoiavldb" 17 + "tangled.org/gbl08ma.com/didplcbft/dbmtoiavldb/zstddict" 19 18 "tangled.org/gbl08ma.com/didplcbft/plc" 20 19 "tangled.org/gbl08ma.com/didplcbft/store" 21 20 "tangled.org/gbl08ma.com/didplcbft/transaction" ··· 42 41 } 43 42 44 43 // store and plc must be able to share transaction objects 45 - func NewDIDPLCApplication(treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory string) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 44 + func NewDIDPLCApplication(treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory, didBloomFilterPath string) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 46 45 mkTree := func() *iavl.MutableTree { 47 46 // Using SpeedDefault appears to cause the processing time for ExecuteOperation to double on average 48 47 // Using SpeedBetterCompression appears to cause the processing time to double again 49 48 // By using SpeedFastest we seem to give up on like 5% size reduction, it's not worth using the slower speeds 50 - return iavl.NewMutableTree(dbadapter.AdaptWithCompression(treeDB, zstd.SpeedFastest, zstddict.PLCZstdDict), 500000, false, iavl.NewNopLogger(), iavl.AsyncPruningOption(false)) 49 + return iavl.NewMutableTree(dbmtoiavldb.AdaptWithCompression(treeDB, zstd.SpeedFastest, zstddict.PLCZstdDict), 500000, false, iavl.NewNopLogger(), iavl.AsyncPruningOption(false)) 51 50 } 52 51 53 52 tree := mkTree() ··· 72 71 aocsByPLC: make(map[string]*authoritativeOperationsCache), 73 72 } 74 73 75 - d.txFactory, err = transaction.NewFactory(tree, indexDB, store.Tree.NextOperationSequence, store.Tree.BuildDIDBloomFilter) 74 + d.txFactory, err = transaction.NewFactory(tree, indexDB, store.Tree.NextOperationSequence, store.NewDIDBloomFilterStore(didBloomFilterPath)) 76 75 if err != nil { 77 76 return nil, nil, nil, func() {}, stacktrace.Propagate(err, "") 78 77 } 79 78 80 - 81 79 d.fullyClearApplicationData = func() error { 82 80 // we assume this is called in a single-threaded context, which should be a safe assumption since we'll only call this during snapshot import 83 81 // and CometBFT only calls one ABCI method at a time ··· 90 88 91 89 *d.tree = *mkTree() 92 90 93 - d.txFactory, err = transaction.NewFactory(tree, indexDB, store.Tree.NextOperationSequence, store.Tree.BuildDIDBloomFilter) 91 + d.txFactory, err = transaction.NewFactory(tree, indexDB, store.Tree.NextOperationSequence, store.NewDIDBloomFilterStore(didBloomFilterPath)) 94 92 if err != nil { 95 93 return stacktrace.Propagate(err, "") 96 94 } ··· 98 96 } 99 97 100 98 d.plc = plc.NewPLC() 101 - 102 - lastSnapshotVersion := tree.Version() 103 99 104 100 var wg sync.WaitGroup 105 101 closeCh := make(chan struct{}) 106 102 wg.Go(func() { 103 + // periodically store bloom filter so we don't have to wait so long on the next startup 107 104 for { 108 105 select { 109 106 case <-closeCh: 110 107 return 111 108 case <-time.After(5 * time.Minute): 112 109 } 110 + 111 + st := time.Now() 112 + err := d.txFactory.SaveDIDBloomFilter() 113 + if err != nil { 114 + fmt.Println("FAILED TO SAVE BLOOM FILTER:", stacktrace.Propagate(err, "")) 115 + } 116 + fmt.Println("SAVED BLOOM FILTER IN", time.Since(st)) 117 + } 118 + }) 119 + 120 + /*lastSnapshotVersion := tree.Version() 121 + wg.Go(func() { 122 + for { 123 + select { 124 + case <-closeCh: 125 + return 126 + case <-time.After(5 * time.Minute): 127 + } 128 + 113 129 treeVersion := tree.Version() 114 130 if treeVersion > int64(lastSnapshotVersion+10000) { 115 131 err = d.createSnapshot(treeVersion, filepath.Join(snapshotDirectory, "snapshot.tmp")) ··· 121 137 } 122 138 } 123 139 124 - }) 140 + })*/ 125 141 126 142 /*err = d.createSnapshot(tree.Version(), filepath.Join(snapshotDirectory, "snapshot.tmp")) 127 143 if err != nil { ··· 153 169 */ 154 170 155 171 return d, d.txFactory, d.plc, func() { 156 - closeCh <- struct{}{} 172 + close(closeCh) 157 173 wg.Wait() 158 - lo.Must0(tree.Close()) 174 + lo.Must0(d.tree.Close()) 159 175 }, nil 160 176 } 161 177
+1 -1
abciapp/app_test.go
··· 22 22 } 23 23 24 24 func TestCheckTx(t *testing.T) { 25 - app, _, _, cleanup, err := abciapp.NewDIDPLCApplication(dbm.NewMemDB(), memDBWrapper{dbm.NewMemDB()}, nil, "") 25 + app, _, _, cleanup, err := abciapp.NewDIDPLCApplication(dbm.NewMemDB(), memDBWrapper{dbm.NewMemDB()}, nil, "", "") 26 26 require.NoError(t, err) 27 27 t.Cleanup(cleanup) 28 28
+32 -19
abciapp/snapshots.go
··· 403 403 header := make([]byte, 4+4) 404 404 binary.BigEndian.PutUint32(header, uint32(len(key))) 405 405 binary.BigEndian.PutUint32(header[4:], uint32(len(value))) 406 - w.Write(header) 407 - w.Write(key) 408 - w.Write(value) 406 + 407 + _, err = w.Write(header) 408 + if err != nil { 409 + return 0, stacktrace.Propagate(err, "") 410 + } 411 + 412 + _, err = w.Write(key) 413 + if err != nil { 414 + return 0, stacktrace.Propagate(err, "") 415 + } 416 + 417 + _, err = w.Write(value) 418 + if err != nil { 419 + return 0, stacktrace.Propagate(err, "") 420 + } 409 421 410 422 numEntries++ 411 423 } ··· 423 435 defer exporter.Close() 424 436 cexporter := iavl.NewCompressExporter(exporter) 425 437 438 + // 1 byte for node height 439 + // 8 bytes for node version 440 + // 4 bytes for node key length (0xffffffff if node key is nil) 441 + // 4 bytes for node value length (0xffffffff if node value is nil) 442 + // this buffer is completely rewritten on every iteration 443 + nodeHeaderBuffer := make([]byte, 1+8+4+4) 444 + 426 445 numNodes := int64(0) 427 446 for { 428 447 node, err := cexporter.Next() ··· 433 452 return 0, stacktrace.Propagate(err, "") 434 453 } 435 454 436 - b := make([]byte, 9) 437 - b[0] = byte(node.Height) 455 + nodeHeaderBuffer[0] = byte(node.Height) 438 456 439 - binary.BigEndian.PutUint64(b[1:], uint64(node.Version)) 440 - _, err = w.Write(b) 441 - if err != nil { 442 - return 0, stacktrace.Propagate(err, "") 443 - } 457 + binary.BigEndian.PutUint64(nodeHeaderBuffer[1:], uint64(node.Version)) 444 458 445 459 // nil node values are different from 0-byte values 446 - b = []byte{0xff, 0xff, 0xff, 0xff} 447 460 if node.Key != nil { 448 - binary.BigEndian.PutUint32(b, uint32(len(node.Key))) 449 - } 450 - _, err = w.Write(b) 451 - if err != nil { 452 - return 0, stacktrace.Propagate(err, "") 461 + binary.BigEndian.PutUint32(nodeHeaderBuffer[9:13], uint32(len(node.Key))) 462 + } else { 463 + copy(nodeHeaderBuffer[9:13], []byte{0xff, 0xff, 0xff, 0xff}) 453 464 } 454 465 455 - b = []byte{0xff, 0xff, 0xff, 0xff} 456 466 if node.Value != nil { 457 - binary.BigEndian.PutUint32(b, uint32(len(node.Value))) 467 + binary.BigEndian.PutUint32(nodeHeaderBuffer[13:17], uint32(len(node.Value))) 468 + } else { 469 + copy(nodeHeaderBuffer[13:17], []byte{0xff, 0xff, 0xff, 0xff}) 458 470 } 459 - _, err = w.Write(b) 471 + 472 + _, err = w.Write(nodeHeaderBuffer) 460 473 if err != nil { 461 474 return 0, stacktrace.Propagate(err, "") 462 475 }
+7 -4
badger.go badgertodbm/badger.go
··· 1 - package main 1 + package badgertodbm 2 2 3 3 import ( 4 4 "bytes" ··· 43 43 // because these large values don't seem to change (unlike the non-leaf nodes within the iavl tree, which are changing all the time), 44 44 // this makes compaction much faster and decreases SSD thrashing 45 45 opts.ValueThreshold = 192 46 + return NewBadgerDBWithOptions(opts) 47 + } 48 + 49 + func NewBadgerInMemoryDB() (*badger.DB, *BadgerDB, error) { 50 + opts := badger.DefaultOptions("") 51 + opts.InMemory = true 46 52 return NewBadgerDBWithOptions(opts) 47 53 } 48 54 ··· 297 303 } 298 304 299 305 // Key implements Iterator. 300 - // The caller should not modify the contents of the returned slice. 301 - // Instead, the caller should make a copy and work on the copy. 302 306 func (i *badgerDBIterator) Key() []byte { 303 307 if !i.Valid() { 304 308 panic("iterator is invalid") ··· 307 311 } 308 312 309 313 // Value implements Iterator. 310 - // The returned slice is a copy of the original data, therefore it is safe to modify. 311 314 func (i *badgerDBIterator) Value() []byte { 312 315 if !i.Valid() { 313 316 panic("iterator is invalid")
+13 -6
dbadapter/adapter.go dbmtoiavldb/adapter.go
··· 1 - package dbadapter 1 + package dbmtoiavldb 2 2 3 3 import ( 4 + "slices" 5 + 4 6 "cosmossdk.io/core/store" 5 7 dbm "github.com/cometbft/cometbft-db" 6 8 iavldbm "github.com/cosmos/iavl/db" ··· 71 73 } 72 74 73 75 func (i *AdaptedIterator) Next() { 74 - if !i.calledNextOnce { 76 + /*if !i.calledNextOnce { 75 77 i.calledNextOnce = true 76 78 return 77 - } 79 + }*/ 78 80 i.underlying.Next() 79 81 } 80 82 81 83 func (i *AdaptedIterator) Key() []byte { 82 - return i.underlying.Key() 84 + // dbm.Iterator says the result of underlying.Key() is not safe for modification, but 85 + // corestore.Iterator (used by iavldbm) says "the key returned should be a copy and thus safe for modification" 86 + return slices.Clone(i.underlying.Key()) 83 87 } 84 88 85 89 func (i *AdaptedIterator) Value() []byte { 90 + // dbm.Iterator says the result of underlying.Value() is not safe for modification, but 91 + // corestore.Iterator (used by iavldbm) says "the value returned should be a copy and thus safe for modification" 86 92 v, _ := decompressValue(i.zstdDecoder, i.underlying.Value()) 87 93 return v 88 94 } ··· 175 181 } 176 182 177 183 func decompressValue(decoder *zstd.Decoder, value []byte) ([]byte, error) { 184 + // always create a copy of the value, see comment on Value() about iavldb expectations 178 185 if decoder == nil || len(value) == 0 { 179 - return value, nil 186 + return slices.Clone(value), nil 180 187 } else if value[0] == 0x00 { 181 - return value[1:], nil 188 + return slices.Clone(value[1:]), nil 182 189 } 183 190 // passing a nil output buffer to DecodeAll means it'll optimistically start by allocating len(value)*2 184 191 // but we observe compression ratios better than 50% frequently, so we allocate a slice ourselves with cap len(value)*3
dbadapter/zstddict/plcvalues dbmtoiavldb/zstddict/plcvalues
dbadapter/zstddict/plcvalues.go dbmtoiavldb/zstddict/plcvalues.go
+1
httpapi/server.go
··· 87 87 s.router.HandleFunc("GET /{did}/data", s.makeDIDHandler(s.handleGetPLCData)) 88 88 s.router.HandleFunc("GET /export", s.handleExport) 89 89 90 + // TODO expose pprof only if enabled in [plc] settings 90 91 s.router.HandleFunc("/debug/pprof/", pprof.Index) 91 92 s.router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) 92 93 s.router.HandleFunc("/debug/pprof/profile", pprof.Profile)
+2 -8
httpapi/server_test.go
··· 10 10 "testing" 11 11 "time" 12 12 13 - "github.com/cosmos/iavl" 14 - dbm "github.com/cosmos/iavl/db" 15 13 "github.com/did-method-plc/go-didplc" 16 14 "github.com/stretchr/testify/require" 17 15 "tangled.org/gbl08ma.com/didplcbft/plc" 18 - "tangled.org/gbl08ma.com/didplcbft/store" 16 + "tangled.org/gbl08ma.com/didplcbft/testutil" 19 17 "tangled.org/gbl08ma.com/didplcbft/transaction" 20 18 "tangled.org/gbl08ma.com/didplcbft/types" 21 19 ) ··· 107 105 func TestServer(t *testing.T) { 108 106 mockPLC := &MockReadPLC{} 109 107 110 - // this tree is just to avoid a nil pointer when creating a transaction with the factory 111 - // the transactions don't actually get used 112 - tree := iavl.NewMutableTree(dbm.NewMemDB(), 128, false, iavl.NewNopLogger()) 113 - txFactory, err := transaction.NewFactory(tree, nil, store.Tree.NextOperationSequence, store.Tree.BuildDIDBloomFilter) 114 - require.NoError(t, err) 108 + txFactory, _, _ := testutil.NewTestTxFactory(t) 115 109 116 110 t.Run("Test Resolve DID", func(t *testing.T) { 117 111 server, err := NewServer(txFactory, mockPLC, nil, "tcp://127.0.0.1:8080", 15*time.Second)
+8 -3
main.go
··· 16 16 "github.com/cometbft/cometbft/proxy" 17 17 "github.com/samber/lo" 18 18 "tangled.org/gbl08ma.com/didplcbft/abciapp" 19 + "tangled.org/gbl08ma.com/didplcbft/badgertodbm" 19 20 "tangled.org/gbl08ma.com/didplcbft/httpapi" 20 21 21 22 bftconfig "github.com/cometbft/cometbft/config" ··· 54 55 var wg sync.WaitGroup 55 56 closeGoroutinesCh := make(chan struct{}) 56 57 57 - underlyingTreeDB, treeDB, err := NewBadgerDB("apptree", config.Config.DBDir()) 58 + underlyingTreeDB, treeDB, err := badgertodbm.NewBadgerDB("apptree", config.Config.DBDir()) 58 59 if err != nil { 59 60 log.Fatalf("failed to create application tree database: %v", err) 60 61 } 61 62 62 - underlyingIndexDB, indexDB, err := NewBadgerDB("appindex", config.Config.DBDir()) 63 + underlyingIndexDB, indexDB, err := badgertodbm.NewBadgerDB("appindex", config.Config.DBDir()) 63 64 if err != nil { 64 65 log.Fatalf("failed to create application index database: %v", err) 65 66 } ··· 73 74 } 74 75 }() 75 76 77 + didBloomFilterPath := filepath.Join(homeDir, "data", "did.bloom") 78 + 76 79 recreateDatabases := func() { 77 80 err := underlyingTreeDB.DropAll() 78 81 if err != nil { ··· 83 86 if err != nil { 84 87 log.Fatalf("failed to drop application index database: %v", err) 85 88 } 89 + 90 + os.Remove(didBloomFilterPath) 86 91 } 87 92 88 - app, txFactory, plc, cleanup, err := abciapp.NewDIDPLCApplication(treeDB, indexDB, recreateDatabases, filepath.Join(homeDir, "snapshots")) 93 + app, txFactory, plc, cleanup, err := abciapp.NewDIDPLCApplication(treeDB, indexDB, recreateDatabases, filepath.Join(homeDir, "snapshots"), didBloomFilterPath) 89 94 if err != nil { 90 95 log.Fatalf("failed to create DIDPLC application: %v", err) 91 96 }
+29 -32
plc/impl.go
··· 128 128 } 129 129 130 130 func (plc *plcImpl) Resolve(ctx context.Context, tx transaction.Read, did string) (didplc.Doc, error) { 131 - l, _, err := store.Tree.AuditLog(ctx, tx, did, false) 132 - if err != nil { 133 - return didplc.Doc{}, stacktrace.Propagate(err, "") 131 + var iteratorErr error 132 + for entry := range store.Tree.AuditLogReverseIterator(ctx, tx, did, &iteratorErr) { 133 + if entry.Operation.Tombstone != nil { 134 + return didplc.Doc{}, stacktrace.Propagate(ErrDIDGone, "") 135 + } 136 + return entry.Operation.AsOperation().Doc(did) 134 137 } 135 - 136 - if len(l) == 0 { 137 - return didplc.Doc{}, stacktrace.Propagate(ErrDIDNotFound, "") 138 + if iteratorErr != nil { 139 + return didplc.Doc{}, stacktrace.Propagate(iteratorErr, "") 138 140 } 139 141 140 - opEnum := l[len(l)-1].Operation 141 - if opEnum.Tombstone != nil { 142 - return didplc.Doc{}, stacktrace.Propagate(ErrDIDGone, "") 143 - } 144 - return opEnum.AsOperation().Doc(did) 142 + return didplc.Doc{}, stacktrace.Propagate(ErrDIDNotFound, "") 145 143 } 146 144 147 145 func (plc *plcImpl) OperationLog(ctx context.Context, tx transaction.Read, did string) ([]didplc.OpEnum, error) { ··· 189 187 // GetLastOp - /:did/log/last - latest op from audit log which isn't nullified (the latest op is guaranteed not to be nullified) 190 188 // if missing -> returns ErrDIDNotFound 191 189 // if tombstone -> returns tombstone op 192 - l, _, err := store.Tree.AuditLog(ctx, tx, did, false) 193 - if err != nil { 194 - return didplc.OpEnum{}, stacktrace.Propagate(err, "") 190 + 191 + var iteratorErr error 192 + for entry := range store.Tree.AuditLogReverseIterator(ctx, tx, did, &iteratorErr) { 193 + return entry.Operation, nil 195 194 } 196 - 197 - if len(l) == 0 { 198 - return didplc.OpEnum{}, stacktrace.Propagate(ErrDIDNotFound, "") 195 + if iteratorErr != nil { 196 + return didplc.OpEnum{}, stacktrace.Propagate(iteratorErr, "") 199 197 } 200 198 201 - return l[len(l)-1].Operation, nil 199 + return didplc.OpEnum{}, stacktrace.Propagate(ErrDIDNotFound, "") 202 200 } 203 201 204 202 func (plc *plcImpl) Data(ctx context.Context, tx transaction.Read, did string) (didplc.RegularOp, error) { 205 203 // GetPlcData - /:did/data - similar to GetLastOp but applies a transformation on the op which normalizes it into a modern op 206 204 // if missing -> returns ErrDIDNotFound 207 205 // if tombstone -> returns ErrDIDGone 208 - l, _, err := store.Tree.AuditLog(ctx, tx, did, false) 209 - if err != nil { 210 - return didplc.RegularOp{}, stacktrace.Propagate(err, "") 211 - } 212 206 213 - if len(l) == 0 { 214 - return didplc.RegularOp{}, stacktrace.Propagate(ErrDIDNotFound, "") 215 - } 216 - 217 - opEnum := l[len(l)-1].Operation 218 - if opEnum.Tombstone != nil { 219 - return didplc.RegularOp{}, stacktrace.Propagate(ErrDIDGone, "") 207 + var iteratorErr error 208 + for entry := range store.Tree.AuditLogReverseIterator(ctx, tx, did, &iteratorErr) { 209 + opEnum := entry.Operation 210 + if opEnum.Tombstone != nil { 211 + return didplc.RegularOp{}, stacktrace.Propagate(ErrDIDGone, "") 212 + } 213 + if opEnum.Regular != nil { 214 + return *opEnum.Regular, nil 215 + } 216 + return *modernizeOp(opEnum.Legacy), nil 220 217 } 221 - if opEnum.Regular != nil { 222 - return *opEnum.Regular, nil 218 + if iteratorErr != nil { 219 + return didplc.RegularOp{}, stacktrace.Propagate(iteratorErr, "") 223 220 } 224 - return *modernizeOp(opEnum.Legacy), nil 225 221 222 + return didplc.RegularOp{}, stacktrace.Propagate(ErrDIDNotFound, "") 226 223 } 227 224 228 225 func (plc *plcImpl) Export(ctx context.Context, tx transaction.Read, after uint64, count int) ([]types.SequencedLogEntry, error) {
+7 -8
plc/plc_test.go
··· 14 14 "github.com/did-method-plc/go-didplc" 15 15 "github.com/stretchr/testify/require" 16 16 "tangled.org/gbl08ma.com/didplcbft/plc" 17 + "tangled.org/gbl08ma.com/didplcbft/testutil" 17 18 "tangled.org/gbl08ma.com/didplcbft/types" 18 19 ) 19 20 ··· 140 141 141 142 ctx := t.Context() 142 143 143 - txFactory, tree, _ := NewTestTxFactory() 144 + txFactory, tree, _ := testutil.NewTestTxFactory(t) 144 145 testPLC := plc.NewPLC() 145 146 146 147 origVersion := tree.Version() ··· 241 242 242 243 ctx := t.Context() 243 244 244 - txFactory, _, _ := NewTestTxFactory() 245 + txFactory, _, _ := testutil.NewTestTxFactory(t) 245 246 testPLC := plc.NewPLC() 246 247 247 248 for _, auditLog := range remoteLogs { ··· 311 312 require.Len(t, export, 100) 312 313 313 314 // ensure entries are sorted correctly 314 - last := uint64(0) 315 - for _, entry := range export { 316 - require.True(t, entry.Seq > last) 317 - last = entry.Seq 315 + for i, entry := range export { 316 + require.Equal(t, uint64(i+1), entry.Seq) 318 317 } 319 318 } 320 319 321 320 func TestImportOperationFromAuthoritativeSource(t *testing.T) { 322 321 ctx := t.Context() 323 322 324 - txFactory, _, _ := NewTestTxFactory() 323 + txFactory, _, _ := testutil.NewTestTxFactory(t) 325 324 testPLC := plc.NewPLC() 326 325 327 326 readTx := txFactory.ReadWorking(time.Now()) ··· 376 375 ctx := t.Context() 377 376 378 377 testFn := func(toImport []didplc.LogEntry, mutate func(didplc.LogEntry) didplc.LogEntry) ([]types.SequencedLogEntry, []didplc.LogEntry) { 379 - txFactory, _, _ := NewTestTxFactory() 378 + txFactory, _, _ := testutil.NewTestTxFactory(t) 380 379 testPLC := plc.NewPLC() 381 380 382 381 readTx := txFactory.ReadWorking(time.Now())
-38
plc/testutil_test.go
··· 1 - package plc_test 2 - 3 - import ( 4 - dbm "github.com/cometbft/cometbft-db" 5 - "github.com/cosmos/iavl" 6 - iavldb "github.com/cosmos/iavl/db" 7 - "github.com/dgraph-io/badger/v4" 8 - "github.com/palantir/stacktrace" 9 - "tangled.org/gbl08ma.com/didplcbft/store" 10 - "tangled.org/gbl08ma.com/didplcbft/transaction" 11 - ) 12 - 13 - func NewTestTxFactory() (*transaction.Factory, *iavl.MutableTree, transaction.ExtendedDB) { 14 - tree := iavl.NewMutableTree(iavldb.NewMemDB(), 128, false, iavl.NewNopLogger()) 15 - _, _, err := tree.SaveVersion() 16 - if err != nil { 17 - panic(stacktrace.Propagate(err, "")) 18 - } 19 - 20 - indexDB := memDBWrapper{dbm.NewMemDB()} 21 - factory, err := transaction.NewFactory(tree, indexDB, store.Tree.NextOperationSequence, store.Tree.BuildDIDBloomFilter) 22 - if err != nil { 23 - panic(stacktrace.Propagate(err, "")) 24 - } 25 - 26 - return factory, tree, indexDB 27 - } 28 - 29 - type memDBWrapper struct { 30 - dbm.DB 31 - } 32 - 33 - func (w memDBWrapper) IteratorWithOptions(start, end []byte, opts badger.IteratorOptions) (dbm.Iterator, error) { 34 - if opts.Reverse { 35 - return w.ReverseIterator(start, end) 36 - } 37 - return w.Iterator(start, end) 38 - }
+167
store/did_bloom.go
··· 1 + package store 2 + 3 + import ( 4 + "encoding/binary" 5 + "errors" 6 + "fmt" 7 + "io" 8 + "math" 9 + "os" 10 + "slices" 11 + 12 + "github.com/bits-and-blooms/bloom/v3" 13 + "github.com/palantir/stacktrace" 14 + "tangled.org/gbl08ma.com/didplcbft/transaction" 15 + ) 16 + 17 + type DIDBloomFilterStore struct { 18 + filePath string 19 + } 20 + 21 + func NewInMemoryDIDBloomFilterStore() *DIDBloomFilterStore { 22 + return &DIDBloomFilterStore{} 23 + } 24 + 25 + func NewDIDBloomFilterStore(filePath string) *DIDBloomFilterStore { 26 + return &DIDBloomFilterStore{ 27 + filePath: filePath, 28 + } 29 + } 30 + 31 + var _ transaction.BloomFilterStorage = (*DIDBloomFilterStore)(nil) 32 + 33 + func (s *DIDBloomFilterStore) BuildDIDBloomFilter(tx transaction.Read) (*bloom.BloomFilter, error) { 34 + filter, estimatedDIDCount, err := func() (*bloom.BloomFilter, uint64, error) { 35 + if s.filePath == "" { 36 + return nil, 0, nil 37 + } 38 + var f bloom.BloomFilter 39 + 40 + file, err := os.OpenFile(s.filePath, os.O_RDONLY, 0) 41 + if err != nil { 42 + if errors.Is(err, os.ErrNotExist) { 43 + return nil, 0, nil 44 + } 45 + return nil, 0, stacktrace.Propagate(err, "") 46 + } 47 + 48 + headerBytes := make([]byte, 24) 49 + n, err := file.Read(headerBytes) 50 + if err != nil { 51 + return nil, 0, stacktrace.Propagate(err, "") 52 + } 53 + if n != 24 { 54 + return nil, 0, stacktrace.NewError("malformed bloom filter file") 55 + } 56 + seq := binary.BigEndian.Uint64(headerBytes) 57 + size := binary.BigEndian.Uint64(headerBytes[8:]) 58 + cap := binary.BigEndian.Uint64(headerBytes[16:]) 59 + 60 + if size > (cap/10)*6 { 61 + // size exceeds 60% of capacity, the filter's effectiveness will start to degrade 62 + // it's better if we regenerate the filter with an appropriate size 63 + return nil, size, nil 64 + } 65 + 66 + _, err = f.ReadFrom(file) 67 + if err != nil { 68 + return nil, 0, stacktrace.Propagate(err, "") 69 + } 70 + 71 + filter := &f 72 + 73 + // update bloom filter with DIDs it may have missed since the time it was serialized 74 + var iterErr error 75 + for entry := range Tree.OperationsIterator(tx, max(1, seq)-1, &iterErr) { 76 + didBytes, err := DIDToBytes(entry.DID) 77 + if err != nil { 78 + return nil, 0, stacktrace.Propagate(err, "") 79 + } 80 + filter.TestOrAdd(didBytes) 81 + } 82 + if iterErr != nil { 83 + return nil, 0, stacktrace.Propagate(err, "") 84 + } 85 + 86 + return filter, uint64(filter.ApproximatedSize()), nil 87 + }() 88 + if err != nil { 89 + return nil, stacktrace.Propagate(err, "") 90 + } 91 + 92 + if filter != nil { 93 + return filter, nil 94 + } 95 + 96 + fmt.Println("(RE)BUILDING DID BLOOM FILTER") 97 + 98 + filterEstimatedItems := uint(100000000) // we know there are like 80M DIDs at the time of writing 99 + if estimatedDIDCount != 0 { 100 + filterEstimatedItems = max(filterEstimatedItems, uint(estimatedDIDCount*3)) 101 + } 102 + 103 + filter = bloom.NewWithEstimates(filterEstimatedItems, 0.01) 104 + 105 + didRangeStart := marshalDIDLogKey(make([]byte, 15), 0) 106 + didRangeEnd := marshalDIDLogKey(slices.Repeat([]byte{0xff}, 15), math.MaxUint64) 107 + 108 + iterator, err := tx.IndexDB().Iterator(didRangeStart, didRangeEnd) 109 + if err != nil { 110 + return nil, stacktrace.Propagate(err, "") 111 + } 112 + 113 + defer iterator.Close() 114 + 115 + for iterator.Valid() { 116 + filter.Add(iterator.Key()[1:16]) 117 + 118 + iterator.Next() 119 + } 120 + err = iterator.Error() 121 + if err != nil { 122 + return nil, stacktrace.Propagate(err, "") 123 + } 124 + 125 + return filter, nil 126 + } 127 + 128 + func (s *DIDBloomFilterStore) StoreDIDBloomFilter(writeBloomTo func(w io.Writer) (uint64, uint64, uint64, error)) error { 129 + if s.filePath == "" { 130 + return nil 131 + } 132 + 133 + file, err := os.OpenFile(s.filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) 134 + if err != nil { 135 + return stacktrace.Propagate(err, "") 136 + } 137 + defer file.Close() 138 + 139 + // write reserved space for filter version 140 + _, err = file.Write(make([]byte, 24)) 141 + if err != nil { 142 + return stacktrace.Propagate(err, "") 143 + } 144 + 145 + version, size, cap, err := writeBloomTo(file) 146 + if err != nil { 147 + return stacktrace.Propagate(err, "") 148 + } 149 + 150 + headerBytes := make([]byte, 24) 151 + binary.BigEndian.PutUint64(headerBytes, version) 152 + binary.BigEndian.PutUint64(headerBytes[8:], size) 153 + binary.BigEndian.PutUint64(headerBytes[16:], cap) 154 + 155 + // go back and write filter version 156 + _, err = file.Seek(0, 0) 157 + if err != nil { 158 + return stacktrace.Propagate(err, "") 159 + } 160 + 161 + _, err = file.Write(headerBytes) 162 + if err != nil { 163 + return stacktrace.Propagate(err, "") 164 + } 165 + 166 + return stacktrace.Propagate(err, "") 167 + }
+46 -71
store/tree.go
··· 11 11 "strings" 12 12 "time" 13 13 14 - "github.com/bits-and-blooms/bloom/v3" 15 14 "github.com/bluesky-social/indigo/atproto/syntax" 16 15 ics23 "github.com/cosmos/ics23/go" 17 16 "github.com/dgraph-io/badger/v4" ··· 26 25 ) 27 26 28 27 // TODO rename to something more appropriate, now that this touches both the tree and the index 29 - var Tree PLCTreeStore = &TreeStore{} 28 + var Tree *TreeStore = &TreeStore{} 30 29 31 30 type PLCTreeStore interface { 32 - BuildDIDBloomFilter(tx transaction.Read) (*bloom.BloomFilter, error) 33 31 AuditLog(ctx context.Context, tx transaction.Read, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) 34 32 AuditLogReverseIterator(ctx context.Context, tx transaction.Read, did string, err *error) iter.Seq[types.SequencedLogEntry] 33 + 35 34 ExportOperations(ctx context.Context, tx transaction.Read, after uint64, count int) ([]types.SequencedLogEntry, error) // passing a count of zero means unlimited 35 + OperationsIterator(tx transaction.Read, after uint64, retErr *error) iter.Seq[types.SequencedLogEntry] 36 + 36 37 StoreOperation(ctx context.Context, tx transaction.Write, entry didplc.LogEntry, nullifyWithSequenceEqualOrGreaterThan mo.Option[uint64]) error 37 38 SetOperationCreatedAt(tx transaction.Write, seqID uint64, createdAt time.Time) error 38 39 ··· 43 44 44 45 AuthoritativeImportProgress(tx transaction.Read) (uint64, error) 45 46 SetAuthoritativeImportProgress(tx transaction.Write, nextCursor uint64) error 46 - 47 - ProduceOperationExamples(tx transaction.Read, interval, count int) iter.Seq[[]byte] 48 47 } 49 48 50 49 var _ PLCTreeStore = (*TreeStore)(nil) 51 50 52 51 // TreeStore exists just to groups methods nicely 53 52 type TreeStore struct{} 54 - 55 - func (t *TreeStore) BuildDIDBloomFilter(tx transaction.Read) (*bloom.BloomFilter, error) { 56 - // TODO find an elegant way to dynamically size the bloom filter adequately as the number of DIDs grows 57 - filter := bloom.NewWithEstimates(100000000, 0.01) 58 - 59 - didRangeStart := marshalDIDLogKey(make([]byte, 15), 0) 60 - didRangeEnd := marshalDIDLogKey(slices.Repeat([]byte{0xff}, 15), math.MaxUint64) 61 - 62 - iterator, err := tx.IndexDB().Iterator(didRangeStart, didRangeEnd) 63 - if err != nil { 64 - return nil, stacktrace.Propagate(err, "") 65 - } 66 - 67 - defer iterator.Close() 68 - 69 - for iterator.Valid() { 70 - filter.Add(iterator.Key()[1:16]) 71 - 72 - iterator.Next() 73 - } 74 - err = iterator.Error() 75 - if err != nil { 76 - return nil, stacktrace.Propagate(err, "") 77 - } 78 - 79 - return filter, nil 80 - } 81 - 82 - func (t *TreeStore) ProduceOperationExamples(tx transaction.Read, interval, count int) iter.Seq[[]byte] { 83 - return func(yield func([]byte) bool) { 84 - for i := 0; i < count*interval; i += interval { 85 - key := marshalOperationKey(uint64(i)) 86 - 87 - value, err := tx.Tree().Get(key) 88 - if err != nil { 89 - continue 90 - } 91 - 92 - if !yield(slices.Clone(value)) { 93 - return 94 - } 95 - } 96 - } 97 - 98 - } 99 53 100 54 func (t *TreeStore) AuditLog(ctx context.Context, tx transaction.Read, did string, withProof bool) ([]types.SequencedLogEntry, *ics23.CommitmentProof, error) { 101 55 didBytes, err := DIDToBytes(did) ··· 239 193 } 240 194 241 195 func (t *TreeStore) ExportOperations(ctx context.Context, tx transaction.Read, after uint64, count int) ([]types.SequencedLogEntry, error) { 242 - // as the name suggests, after is an exclusive lower bound, but our iterators use inclusive lower bounds 243 - start := after + 1 244 - startKey := marshalOperationKey(start) 245 - endKey := maxOperationKey 246 - 247 196 entries := make([]types.SequencedLogEntry, 0, count) 248 197 var iterErr error 249 - tx.Tree().IterateRange(startKey, endKey, true, func(operationKey, operationValue []byte) bool { 250 - select { 251 - case <-ctx.Done(): 252 - iterErr = stacktrace.Propagate(ctx.Err(), "") 253 - return true 254 - default: 255 - } 198 + for logEntry := range t.OperationsIterator(tx, after, &iterErr) { 199 + entries = append(entries, logEntry) 256 200 257 - logEntry, err := unmarshalLogEntry(operationKey, operationValue) 258 - if err != nil { 259 - iterErr = stacktrace.Propagate(err, "") 260 - return true 201 + // this condition being checked here also makes it so that a count of zero means unlimited 202 + if len(entries) == count { 203 + break 261 204 } 262 - 263 - entries = append(entries, logEntry) 264 - return len(entries) == count // this condition being checked here also makes it so that a count of zero means unlimited 265 - }) 205 + } 266 206 if iterErr != nil { 267 207 return nil, stacktrace.Propagate(iterErr, "ran into an error while iterating") 268 208 } 269 209 return entries, nil 210 + } 211 + 212 + func (t *TreeStore) OperationsIterator(tx transaction.Read, after uint64, retErr *error) iter.Seq[types.SequencedLogEntry] { 213 + return func(yield func(types.SequencedLogEntry) bool) { 214 + // as the name suggests, after is an exclusive lower bound, but our iterators use inclusive lower bounds 215 + start := after + 1 216 + startKey := marshalOperationKey(start) 217 + endKey := maxOperationKey 218 + 219 + opIterator, err := tx.Tree().Iterator(startKey, endKey, true) 220 + if err != nil { 221 + *retErr = stacktrace.Propagate(err, "") 222 + return 223 + } 224 + 225 + defer opIterator.Close() 226 + 227 + for opIterator.Valid() { 228 + logEntry, err := unmarshalLogEntry(opIterator.Key(), opIterator.Value()) 229 + if err != nil { 230 + *retErr = stacktrace.Propagate(err, "") 231 + return 232 + } 233 + 234 + if !yield(logEntry) { 235 + return 236 + } 237 + 238 + opIterator.Next() 239 + } 240 + err = opIterator.Error() 241 + if err != nil { 242 + *retErr = stacktrace.Propagate(err, "") 243 + } 244 + } 270 245 } 271 246 272 247 // StoreOperation stores an operation in the tree, nullifying existing operations whose index within the DID's history ··· 374 349 return stacktrace.Propagate(err, "") 375 350 } 376 351 377 - tx.AddToDIDBloomFilter(didBytes) 352 + tx.AddToDIDBloomFilter(didBytes, sequence) 378 353 379 354 return nil 380 355 }
+30
testutil/testutil.go
··· 1 + package testutil 2 + 3 + import ( 4 + "testing" 5 + 6 + "github.com/cosmos/iavl" 7 + "github.com/klauspost/compress/zstd" 8 + "github.com/stretchr/testify/require" 9 + 10 + "tangled.org/gbl08ma.com/didplcbft/badgertodbm" 11 + "tangled.org/gbl08ma.com/didplcbft/dbmtoiavldb" 12 + "tangled.org/gbl08ma.com/didplcbft/dbmtoiavldb/zstddict" 13 + "tangled.org/gbl08ma.com/didplcbft/store" 14 + "tangled.org/gbl08ma.com/didplcbft/transaction" 15 + ) 16 + 17 + func NewTestTxFactory(t *testing.T) (*transaction.Factory, *iavl.MutableTree, transaction.ExtendedDB) { 18 + _, treeDB, err := badgertodbm.NewBadgerInMemoryDB() 19 + require.NoError(t, err) 20 + 21 + tree := iavl.NewMutableTree(dbmtoiavldb.AdaptWithCompression(treeDB, zstd.SpeedFastest, zstddict.PLCZstdDict), 500000, false, iavl.NewNopLogger(), iavl.AsyncPruningOption(false)) 22 + 23 + _, indexDB, err := badgertodbm.NewBadgerInMemoryDB() 24 + require.NoError(t, err) 25 + 26 + factory, err := transaction.NewFactory(tree, indexDB, store.Tree.NextOperationSequence, store.NewInMemoryDIDBloomFilterStore()) 27 + require.NoError(t, err) 28 + 29 + return factory, tree, indexDB 30 + }
+159
transaction/factory.go
··· 1 + package transaction 2 + 3 + import ( 4 + "errors" 5 + "io" 6 + "math" 7 + "sync" 8 + "time" 9 + 10 + "github.com/bits-and-blooms/bloom/v3" 11 + dbm "github.com/cometbft/cometbft-db" 12 + "github.com/cosmos/iavl" 13 + "github.com/cosmos/iavl/db" 14 + "github.com/dgraph-io/badger/v4" 15 + "github.com/palantir/stacktrace" 16 + ) 17 + 18 + type ExtendedDB interface { 19 + dbm.DB 20 + IteratorWithOptions(start, end []byte, opts badger.IteratorOptions) (dbm.Iterator, error) 21 + } 22 + 23 + type BloomFilterStorage interface { 24 + BuildDIDBloomFilter(tx Read) (*bloom.BloomFilter, error) 25 + StoreDIDBloomFilter(writeBloomTo func(w io.Writer) (uint64, uint64, uint64, error)) error 26 + } 27 + 28 + type Factory struct { 29 + bloomFilterStorage BloomFilterStorage 30 + 31 + db ExtendedDB 32 + mutableTree *iavl.MutableTree 33 + sequenceGetter func(tx Read) (uint64, error) 34 + 35 + bloomMu sync.RWMutex 36 + bloomSeq uint64 37 + bloomFilter *bloom.BloomFilter 38 + 39 + bloomSaverMu sync.Mutex 40 + bloomLastSaveSeq uint64 41 + } 42 + 43 + func NewFactory(tree *iavl.MutableTree, indexDB ExtendedDB, sequenceGetter func(tx Read) (uint64, error), bloomFilterStorage BloomFilterStorage) (*Factory, error) { 44 + f := &Factory{ 45 + bloomFilterStorage: bloomFilterStorage, 46 + db: indexDB, 47 + mutableTree: tree, 48 + sequenceGetter: sequenceGetter, 49 + } 50 + 51 + var err error 52 + // if we use ReadCommitted, it's going to fail when the tree doesn't have versions yet 53 + // in practice we just want to blindly list all DIDs "past present and future", so it doesn't matter what the version is 54 + // (when doing historical queries, it's fine if the bloom filter claims that a DID already exists when it might not exist yet) 55 + tx := f.ReadWorking(time.Now()) 56 + f.bloomFilter, err = bloomFilterStorage.BuildDIDBloomFilter(tx) 57 + if err != nil { 58 + return nil, stacktrace.Propagate(err, "") 59 + } 60 + f.bloomSeq, err = f.sequenceGetter(tx) 61 + // since the sequenceGetter always returns the next sequence 62 + f.bloomSeq-- 63 + return f, stacktrace.Propagate(err, "") 64 + } 65 + 66 + func (f *Factory) ReadWorking(ts time.Time) Read { 67 + return &readTx{ 68 + ts: ts, 69 + height: f.mutableTree.WorkingVersion(), 70 + mutableTree: f.mutableTree, 71 + db: f.db, 72 + sequenceGetter: f.sequenceGetter, 73 + bloomMu: &f.bloomMu, 74 + bloomFilter: f.bloomFilter, 75 + bloomSeq: &f.bloomSeq, 76 + } 77 + } 78 + 79 + func (f *Factory) ReadCommitted() Read { 80 + tx, err := f.ReadHeight(time.Now(), f.mutableTree.Version()) 81 + if err != nil { 82 + // this should never happen, it's not worth making the signature of this function more 83 + // complex for an error we'll never return unless the ABCI application is yet to be initialized 84 + panic(stacktrace.Propagate(err, "")) 85 + } 86 + return tx 87 + } 88 + 89 + func (f *Factory) ReadHeight(ts time.Time, height int64) (Read, error) { 90 + immutable, err := f.mutableTree.GetImmutable(height) 91 + if err != nil { 92 + if !errors.Is(err, iavl.ErrVersionDoesNotExist) || height != 0 { 93 + return nil, stacktrace.Propagate(err, "") 94 + } 95 + // give the reader an empty tree just to satisfy expectations 96 + tmpTree := iavl.NewMutableTree(db.NewMemDB(), 1, false, iavl.NewNopLogger()) 97 + _, v, err := tmpTree.SaveVersion() 98 + if err != nil { 99 + return nil, stacktrace.Propagate(err, "") 100 + } 101 + immutable, err = tmpTree.GetImmutable(v) 102 + if err != nil { 103 + return nil, stacktrace.Propagate(err, "") 104 + } 105 + } 106 + return &readTx{ 107 + ts: ts, 108 + height: height, 109 + tree: AdaptImmutableTree(immutable), 110 + db: f.db, 111 + sequenceGetter: f.sequenceGetter, 112 + bloomMu: &f.bloomMu, 113 + bloomFilter: f.bloomFilter, 114 + }, nil 115 + } 116 + 117 + func (f *Factory) SaveDIDBloomFilter() error { 118 + f.bloomSaverMu.Lock() 119 + defer f.bloomSaverMu.Unlock() 120 + 121 + f.bloomMu.RLock() 122 + shouldSave := f.bloomSeq > f.bloomLastSaveSeq 123 + f.bloomMu.RUnlock() 124 + if !shouldSave { 125 + return nil 126 + } 127 + 128 + var savedSeq uint64 129 + err := f.bloomFilterStorage.StoreDIDBloomFilter(func(w io.Writer) (uint64, uint64, uint64, error) { 130 + f.bloomMu.RLock() 131 + defer f.bloomMu.RUnlock() 132 + 133 + _, err := f.bloomFilter.WriteTo(w) 134 + savedSeq = f.bloomSeq 135 + 136 + // we reimplement bloomFilter.ApproximatedSize() here, because their version deals poorly with the case where the filter is completely full 137 + // (it will return 0) 138 + // and it also returns a uint32, which theoretically means we can't have more DIDs than humans on the planet... 139 + 140 + x := float64(f.bloomFilter.BitSet().Count()) 141 + m := float64(f.bloomFilter.Cap()) 142 + k := float64(f.bloomFilter.K()) 143 + size := -1 * m / k * math.Log(1-x/m) / math.Log(math.E) 144 + curItemCap := uint64(m * math.Log(2) / k) 145 + sizeInt := curItemCap 146 + if !math.IsInf(size, 0) { 147 + sizeInt = uint64(math.Floor(size + 0.5)) 148 + } 149 + return f.bloomSeq, sizeInt, curItemCap, stacktrace.Propagate(err, "") 150 + }) 151 + 152 + if err != nil { 153 + return stacktrace.Propagate(err, "") 154 + } 155 + 156 + f.bloomLastSaveSeq = savedSeq 157 + 158 + return nil 159 + }
+2 -1
transaction/interface.go
··· 26 26 Tree() UnifiedTree 27 27 IndexDB() WriteIndex 28 28 TestDIDBloomFilter(did []byte) bool 29 - AddToDIDBloomFilter(did []byte) 29 + // sequence is only used to track how up-to-date the bloom filter is. it is not added to the bloom filter 30 + AddToDIDBloomFilter(did []byte, sequence uint64) 30 31 31 32 Commit() error 32 33 Rollback() error
+1 -3
transaction/read_on_write_tx.go
··· 35 35 36 36 // TestDIDBloomFilter implements [Read]. 37 37 func (d *readOnWriteTx) TestDIDBloomFilter(did []byte) bool { 38 - d.w.readTx.bloomMu.RLock() 39 - defer d.w.readTx.bloomMu.RUnlock() 40 - return d.w.readTx.bloomFilter.Test(did) 38 + return d.w.TestDIDBloomFilter(did) 41 39 } 42 40 43 41 // Upgrade implements [Read].
+3 -70
transaction/read_tx.go
··· 5 5 "time" 6 6 7 7 "github.com/bits-and-blooms/bloom/v3" 8 - dbm "github.com/cometbft/cometbft-db" 9 8 "github.com/cosmos/iavl" 10 - "github.com/dgraph-io/badger/v4" 11 9 "github.com/palantir/stacktrace" 12 10 ) 13 11 14 - type ExtendedDB interface { 15 - dbm.DB 16 - IteratorWithOptions(start, end []byte, opts badger.IteratorOptions) (dbm.Iterator, error) 17 - } 18 - 19 - type Factory struct { 20 - db ExtendedDB 21 - mutableTree *iavl.MutableTree 22 - sequenceGetter func(tx Read) (uint64, error) 23 - 24 - bloomMu sync.RWMutex 25 - bloomFilter *bloom.BloomFilter 26 - } 27 - 28 - func NewFactory(tree *iavl.MutableTree, indexDB ExtendedDB, sequenceGetter func(tx Read) (uint64, error), bloomFilterBuilder func(tx Read) (*bloom.BloomFilter, error)) (*Factory, error) { 29 - f := &Factory{ 30 - db: indexDB, 31 - mutableTree: tree, 32 - sequenceGetter: sequenceGetter, 33 - } 34 - 35 - var err error 36 - // if we use ReadCommitted, it's going to fail when the tree doesn't have versions yet 37 - // in practice we just want to blindly list all DIDs "past present and future", so it doesn't matter what the version is 38 - // (when doing historical queries, it's fine if the bloom filter claims that a DID already exists when it might not exist yet) 39 - f.bloomFilter, err = bloomFilterBuilder(f.ReadWorking(time.Now())) 40 - return f, stacktrace.Propagate(err, "") 41 - } 42 - 43 12 type readTx struct { 44 13 ts time.Time 45 14 height int64 ··· 50 19 51 20 bloomMu *sync.RWMutex 52 21 bloomFilter *bloom.BloomFilter 22 + bloomSeq *uint64 53 23 54 24 sequenceGetter func(tx Read) (uint64, error) 55 25 } 56 26 57 - func (f *Factory) ReadWorking(ts time.Time) Read { 58 - return &readTx{ 59 - ts: ts, 60 - height: f.mutableTree.WorkingVersion(), 61 - mutableTree: f.mutableTree, 62 - db: f.db, 63 - sequenceGetter: f.sequenceGetter, 64 - bloomMu: &f.bloomMu, 65 - bloomFilter: f.bloomFilter, 66 - } 67 - } 68 - 69 - func (f *Factory) ReadCommitted() Read { 70 - tx, err := f.ReadHeight(time.Now(), f.mutableTree.Version()) 71 - if err != nil { 72 - // this should never happen, it's not worth making the signature of this function more 73 - // complex for an error we'll never return unless the ABCI application is yet to be initialized 74 - panic(stacktrace.Propagate(err, "")) 75 - } 76 - return tx 77 - } 78 - 79 - func (f *Factory) ReadHeight(ts time.Time, height int64) (Read, error) { 80 - immutable, err := f.mutableTree.GetImmutable(height) 81 - if err != nil { 82 - return nil, stacktrace.Propagate(err, "") 83 - } 84 - return &readTx{ 85 - ts: ts, 86 - height: height, 87 - tree: AdaptImmutableTree(immutable), 88 - db: f.db, 89 - sequenceGetter: f.sequenceGetter, 90 - bloomMu: &f.bloomMu, 91 - bloomFilter: f.bloomFilter, 92 - }, nil 93 - } 94 - 95 27 // Height implements [Read]. 96 28 func (t *readTx) Height() int64 { 97 29 return t.height ··· 128 60 return nil, stacktrace.NewError("historical transaction is not upgradable to a write transaction") 129 61 } 130 62 return &writeTx{ 131 - readTx: t, 63 + readTx: t, 64 + unsavedBloomAdditions: map[string]struct{}{}, 132 65 }, nil 133 66 }
+22 -8
transaction/write_tx.go
··· 11 11 12 12 writeIndex *writeIndex 13 13 14 + unsavedBloomAdditions map[string]struct{} 15 + unsavedBloomHighestSeq uint64 16 + 14 17 hasSeq bool 15 18 seq uint64 16 19 } ··· 32 35 if err != nil { 33 36 return stacktrace.Propagate(err, "") 34 37 } 38 + } 39 + 40 + if len(w.unsavedBloomAdditions) > 0 { 41 + w.readTx.bloomMu.Lock() 42 + defer w.readTx.bloomMu.Unlock() 43 + for k := range w.unsavedBloomAdditions { 44 + w.readTx.bloomFilter.Add(unsafeStrToBytes(k)) 45 + } 46 + *w.readTx.bloomSeq = max(*w.readTx.bloomSeq, w.unsavedBloomHighestSeq) 35 47 } 36 48 return nil 37 49 } ··· 88 100 } 89 101 90 102 // TestDIDBloomFilter implements [Write]. 91 - func (t *writeTx) TestDIDBloomFilter(did []byte) bool { 92 - t.readTx.bloomMu.RLock() 93 - defer t.readTx.bloomMu.RUnlock() 94 - return t.readTx.bloomFilter.Test(did) 103 + func (w *writeTx) TestDIDBloomFilter(did []byte) bool { 104 + if _, ok := w.unsavedBloomAdditions[unsafeBytesToStr(did)]; ok { 105 + return true 106 + } 107 + w.readTx.bloomMu.RLock() 108 + defer w.readTx.bloomMu.RUnlock() 109 + return w.readTx.bloomFilter.Test(did) 95 110 } 96 111 97 112 // AddToDIDBloomFilter implements [Write]. 98 - func (t *writeTx) AddToDIDBloomFilter(did []byte) { 99 - t.readTx.bloomMu.Lock() 100 - defer t.readTx.bloomMu.Unlock() 101 - t.readTx.bloomFilter.Add(did) 113 + func (w *writeTx) AddToDIDBloomFilter(did []byte, sequence uint64) { 114 + w.unsavedBloomAdditions[unsafeBytesToStr(did)] = struct{}{} 115 + w.unsavedBloomHighestSeq = max(w.unsavedBloomHighestSeq, sequence) 102 116 } 103 117 104 118 func (w *writeTx) createWriteIndexIfNeeded() {