A very experimental PLC implementation which uses BFT consensus for decentralization
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

Implement automatic snapshot creation

gbl08ma 70da204d 3d5476e8

+171 -59
+26 -57
abciapp/app.go
··· 17 17 "github.com/gbl08ma/stacktrace" 18 18 "github.com/klauspost/compress/zstd" 19 19 "github.com/samber/lo" 20 + "tangled.org/gbl08ma.com/didplcbft/config" 20 21 "tangled.org/gbl08ma.com/didplcbft/dbmtoiavldb" 21 22 "tangled.org/gbl08ma.com/didplcbft/dbmtoiavldb/zstddict" 22 23 "tangled.org/gbl08ma.com/didplcbft/plc" ··· 56 57 57 58 blockChallengeCoordinator *blockChallengeCoordinator 58 59 rangeChallengeCoordinator *rangeChallengeCoordinator 60 + 61 + // for snapshot creation: 62 + plcConfig *config.PLCConfig 63 + snapshotManagerLatestHeightChan chan int64 64 + lastSnapshotHeight int64 59 65 } 60 66 61 67 // store and plc must be able to share transaction objects 62 - func NewDIDPLCApplication(appContext context.Context, logger cmtlog.Logger, pv *privval.FilePV, treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory, stateSyncTempDir, didBloomFilterPath string, mempoolSubmitter types.MempoolSubmitter, blockHeaderGetter store.BlockHeaderGetter) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 68 + func NewDIDPLCApplication(appContext context.Context, logger cmtlog.Logger, pv *privval.FilePV, treeDB dbm.DB, indexDB transaction.ExtendedDB, clearData func(), snapshotDirectory, stateSyncTempDir, didBloomFilterPath string, mempoolSubmitter types.MempoolSubmitter, blockHeaderGetter store.BlockHeaderGetter, plcConfig *config.PLCConfig) (*DIDPLCApplication, *transaction.Factory, plc.PLC, func(), error) { 63 69 mkTree := func() *iavl.MutableTree { 64 70 // Using SpeedDefault appears to cause the processing time for ExecuteOperation to double on average 65 71 // Using SpeedBetterCompression appears to cause the processing time to double again ··· 91 97 runnerContext, cancelRunnerContext := context.WithCancel(appContext) 92 98 93 99 d := &DIDPLCApplication{ 94 - runnerContext: runnerContext, 95 - logger: logger.With("module", "plcapp"), 96 - tree: tree, 97 - indexDB: indexDB, 98 - mempoolSubmitter: mempoolSubmitter, 99 - snapshotDirectory: snapshotDirectory, 100 - stateSyncTempDir: stateSyncTempDir, 101 - blockHeaderGetter: blockHeaderGetter, 102 - triggerBlockCreation: func() {}, 100 + runnerContext: runnerContext, 101 + logger: logger.With("module", "plcapp"), 102 + tree: tree, 103 + indexDB: indexDB, 104 + mempoolSubmitter: mempoolSubmitter, 105 + snapshotDirectory: snapshotDirectory, 106 + stateSyncTempDir: stateSyncTempDir, 107 + blockHeaderGetter: blockHeaderGetter, 108 + triggerBlockCreation: func() {}, 109 + plcConfig: plcConfig, 110 + snapshotManagerLatestHeightChan: make(chan int64, 1), 103 111 } 104 112 105 113 if pv != nil { ··· 159 167 } 160 168 }) 161 169 162 - /*lastSnapshotVersion := tree.Version() 163 - wg.Go(func() { 164 - for { 165 - select { 166 - case <-closeCh: 167 - return 168 - case <-time.After(5 * time.Minute): 169 - } 170 - 171 - treeVersion := tree.Version() 172 - if treeVersion > int64(lastSnapshotVersion+10000) { 173 - err = d.createSnapshot(treeVersion, filepath.Join(snapshotDirectory, "snapshot.tmp")) 174 - if err != nil { 175 - fmt.Println("FAILED TO TAKE SNAPSHOT", stacktrace.Propagate(err)) 176 - } 177 - fmt.Println("TOOK SNAPSHOT OF VERSION", treeVersion) 178 - lastSnapshotVersion = treeVersion 179 - } 180 - } 181 - 182 - })*/ 183 - 184 - /*err = d.createSnapshot(tree.Version(), filepath.Join(snapshotDirectory, "snapshot.tmp")) 185 - if err != nil { 186 - return nil, nil, func() {}, stacktrace.Propagate(err) 187 - }*/ 188 - 189 - /* 190 - tree2 := iavl.NewMutableTree(dbm.NewMemDB(), 2048, false, iavl.NewNopLogger()) 191 - importer, err := tree2.Import(tree.Version()) 170 + if plcConfig != nil && plcConfig.SnapshotInterval > 0 && snapshotDirectory != "" { 171 + h, err := store.Snapshot.MostRecentSnapshotHeight(snapshotDirectory) 192 172 if err != nil { 193 - return nil, nil, func() {}, stacktrace.Propagate(err) 173 + return nil, nil, nil, cancelRunnerContext, stacktrace.Propagate(err) 194 174 } 195 - cimporter := iavl.NewCompressImporter(importer) 175 + d.lastSnapshotHeight = int64(h) 196 176 197 - st = time.Now() 198 - for _, node := range nodes { 199 - err := cimporter.Add(&node) 200 - if err != nil { 201 - return nil, nil, func() {}, stacktrace.Propagate(err) 202 - } 203 - } 204 - err = importer.Commit() 205 - if err != nil { 206 - return nil, nil, func() {}, stacktrace.Propagate(err) 207 - } 208 - 209 - fmt.Println("Took", time.Since(st), "to import", len(nodes), "nodes") 210 - fmt.Println("Imported tree hash", hex.EncodeToString(tree2.Hash()), "and version", tree2.Version()) 211 - */ 177 + wg.Go(func() { 178 + d.runSnapshotManager(runnerContext, snapshotDirectory) 179 + }) 180 + } 212 181 213 182 return d, d.txFactory, d.plc, func() { 214 183 cancelRunnerContext()
+2 -1
abciapp/app_test.go
··· 11 11 cbornode "github.com/ipfs/go-ipld-cbor" 12 12 "github.com/stretchr/testify/require" 13 13 "tangled.org/gbl08ma.com/didplcbft/abciapp" 14 + "tangled.org/gbl08ma.com/didplcbft/config" 14 15 ) 15 16 16 17 func txJSONToCBOR(t *testing.T, jsonBytes []byte) []byte { ··· 24 25 25 26 func TestCheckTx(t *testing.T) { 26 27 logger := cmtlog.NewNopLogger() 27 - app, _, _, cleanup, err := abciapp.NewDIDPLCApplication(t.Context(), logger, nil, dbm.NewMemDB(), memDBWrapper{dbm.NewMemDB()}, nil, "", "", "", nil, nil) 28 + app, _, _, cleanup, err := abciapp.NewDIDPLCApplication(t.Context(), logger, nil, dbm.NewMemDB(), memDBWrapper{dbm.NewMemDB()}, nil, "", "", "", nil, nil, config.DefaultPLCConfig()) 28 29 require.NoError(t, err) 29 30 t.Cleanup(cleanup) 30 31
+18
abciapp/execution.go
··· 340 340 return nil, stacktrace.Propagate(err) 341 341 } 342 342 343 + committedHeight := d.ongoingRead.Height() 344 + 343 345 for _, r := range d.lastProcessedProposalExecTxResults { 344 346 for _, cb := range r.commitSideEffects { 345 347 cb() ··· 348 350 349 351 d.ongoingWrite = nil 350 352 d.ongoingRead = nil 353 + 354 + // Notify snapshot manager of new committed height (non-blocking) 355 + select { 356 + case d.snapshotManagerLatestHeightChan <- committedHeight: 357 + default: 358 + // try updating the height buffered in the chan 359 + 360 + // tentative read because we could race at reading with the snapshot manager goroutine 361 + select { 362 + case <-d.snapshotManagerLatestHeightChan: 363 + default: 364 + } 365 + 366 + // this must succeed because no other goroutine writes to the chan, and we just read: 367 + d.snapshotManagerLatestHeightChan <- committedHeight 368 + } 351 369 352 370 return &abcitypes.ResponseCommit{ 353 371 // TODO only discard actual blockchain history based on settings
+45
abciapp/snapshots.go
··· 15 15 "tangled.org/gbl08ma.com/didplcbft/store" 16 16 ) 17 17 18 + // runSnapshotManager monitors block heights and creates/deletes snapshots as needed 19 + func (d *DIDPLCApplication) runSnapshotManager(ctx context.Context, snapshotDirectory string) { 20 + var lastHeight int64 21 + 22 + for { 23 + select { 24 + case <-ctx.Done(): 25 + return 26 + case height := <-d.snapshotManagerLatestHeightChan: 27 + lastHeight = height 28 + } 29 + 30 + currentHeight := lastHeight 31 + snapshotInterval := int64(d.plcConfig.SnapshotInterval) 32 + 33 + // Skip if we haven't reached the interval yet 34 + if currentHeight-d.lastSnapshotHeight < snapshotInterval { 35 + continue 36 + } 37 + 38 + tempFilename := filepath.Join(snapshotDirectory, fmt.Sprintf("%020d.snapshot.tmp", currentHeight)) 39 + 40 + d.logger.Info("Creating snapshot", "height", currentHeight) 41 + 42 + err := d.createSnapshot(currentHeight, tempFilename) 43 + if err != nil { 44 + d.logger.Error("failed to create snapshot", "height", currentHeight, "error", stacktrace.Propagate(err)) 45 + continue 46 + } 47 + 48 + d.logger.Info("Created snapshot", "height", currentHeight) 49 + d.lastSnapshotHeight = currentHeight 50 + 51 + // Clean up old snapshots if retention is set 52 + if d.plcConfig.SnapshotRetentionCount > 0 { 53 + numDeleted, err := store.Snapshot.PruneOldSnapshots(snapshotDirectory, int(d.plcConfig.SnapshotRetentionCount)) 54 + if err != nil { 55 + d.logger.Error("failed to prune old snapshots", "error", stacktrace.Propagate(err)) 56 + continue 57 + } 58 + d.logger.Info("Pruned old snapshots", "numDeleted", numDeleted) 59 + } 60 + } 61 + } 62 + 18 63 // snapshotNumRecentBlockHeaders is the number of recent block headers to include in snapshots 19 64 // It should be enough to handle challenges that depend on recent block headers 20 65 const snapshotNumRecentBlockHeaders = max(CommitToChallengeMaxAgeInBlocks, CompleteChallengeMaxAgeInBlocks) + 5
+11
config/config.go
··· 32 32 33 33 // Server response timeout for API endpoints 34 34 ResponseTimeout time.Duration `mapstructure:"response_timeout"` 35 + 36 + // SnapshotInterval defines the number of blocks between automatic snapshots. 37 + // If set to 0, automatic snapshot creation is disabled. 38 + SnapshotInterval uint64 `mapstructure:"snapshot_interval"` 39 + 40 + // SnapshotRetentionCount defines the maximum number of snapshots to retain. 41 + // When creating a new snapshot, older snapshots are automatically deleted. 42 + // If set to 0, all snapshots are retained. 43 + SnapshotRetentionCount uint64 `mapstructure:"snapshot_retention_count"` 35 44 } 36 45 37 46 func DefaultPLCConfig() *PLCConfig { ··· 40 49 Pprof: true, // TODO set to false once we move past alpha phase 41 50 MaxStreamingExportCursorAge: 7 * 24 * time.Hour, 42 51 ResponseTimeout: 10 * time.Second, 52 + SnapshotInterval: 0, // Disable snapshots by default 53 + SnapshotRetentionCount: 1, 43 54 } 44 55 }
+2 -1
main.go
··· 155 155 cfg.StateSync.TempDir, 156 156 didBloomFilterPath, 157 157 mempoolSubmitter, 158 - blockHeaderGetter) 158 + blockHeaderGetter, 159 + cfg.PLC) 159 160 if err != nil { 160 161 log.Fatalf("failed to create DIDPLC application: %v", err) 161 162 }
+67
store/snapshot.go
··· 828 828 829 829 return chunkData, nil 830 830 } 831 + 832 + func (s *SnapshotStore) PruneOldSnapshots(snapshotDirectory string, retentionCount int) (int, error) { 833 + // Get list of all snapshots sorted by height (newest first) 834 + files, err := filepath.Glob(filepath.Join(snapshotDirectory, "*.snapshot")) 835 + if err != nil { 836 + return 0, stacktrace.Propagate(err, "failed to list snapshots") 837 + } 838 + 839 + if len(files) <= retentionCount { 840 + return 0, nil 841 + } 842 + 843 + // Extract heights from filenames and sort 844 + heights := make([]uint64, 0, len(files)) 845 + for _, f := range files { 846 + base := filepath.Base(f) 847 + heightStr := strings.TrimSuffix(base, ".snapshot") 848 + h, err := strconv.ParseUint(heightStr, 10, 64) 849 + if err != nil { 850 + continue 851 + } 852 + heights = append(heights, h) 853 + } 854 + 855 + slices.SortFunc(heights, func(a, b uint64) int { 856 + return int(int64(b) - int64(a)) // Sort descending (newest first) 857 + }) 858 + 859 + // Delete snapshots beyond retention count 860 + toDelete := heights[retentionCount:] 861 + for _, h := range toDelete { 862 + snapshotFile := filepath.Join(snapshotDirectory, fmt.Sprintf("%020d.snapshot", h)) 863 + chunksumsFile := filepath.Join(snapshotDirectory, fmt.Sprintf("%020d.chunksums", h)) 864 + 865 + if err := os.Remove(snapshotFile); err != nil && !errors.Is(err, os.ErrNotExist) { 866 + return 0, stacktrace.Propagate(err, "failed to delete old snapshot file: %s", snapshotFile) 867 + } 868 + 869 + if err := os.Remove(chunksumsFile); err != nil && !errors.Is(err, os.ErrNotExist) { 870 + return 0, stacktrace.Propagate(err, "failed to delete old chunksums file: %s", chunksumsFile) 871 + } 872 + } 873 + 874 + return len(toDelete), nil 875 + } 876 + 877 + func (s *SnapshotStore) MostRecentSnapshotHeight(snapshotDirectory string) (uint64, error) { 878 + files, err := filepath.Glob(filepath.Join(snapshotDirectory, "*.snapshot")) 879 + if err != nil { 880 + return 0, stacktrace.Propagate(err, "failed to list snapshots") 881 + } 882 + 883 + var maxHeight uint64 884 + for _, f := range files { 885 + base := filepath.Base(f) 886 + heightStr := strings.TrimSuffix(base, ".snapshot") 887 + h, err := strconv.ParseUint(heightStr, 10, 64) 888 + if err != nil { 889 + continue 890 + } 891 + if h > maxHeight { 892 + maxHeight = h 893 + } 894 + } 895 + 896 + return maxHeight, nil 897 + }