A very experimental PLC implementation which uses BFT consensus for decentralization
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

Make more aspects of the PLC http server configurable

gbl08ma f114e3de 64e43a7e

+119 -106
-31
config.go
··· 1 - package main 2 - 3 - import bftconfig "github.com/cometbft/cometbft/config" 4 - 5 - type UnifiedConfig struct { 6 - *bftconfig.Config `mapstructure:",squash"` 7 - 8 - PLC *PLCConfig `mapstructure:"plc"` 9 - } 10 - 11 - func DefaultConfig() *UnifiedConfig { 12 - return &UnifiedConfig{ 13 - Config: bftconfig.DefaultConfig(), 14 - PLC: DefaultPLCConfig(), 15 - } 16 - } 17 - 18 - type PLCConfig struct { 19 - // Address to listen for incoming connections 20 - ListenAddress string `mapstructure:"laddr"` 21 - 22 - // Whether to expose pprof endpoints for debugging 23 - Pprof bool `mapstructure:"pprof"` 24 - } 25 - 26 - func DefaultPLCConfig() *PLCConfig { 27 - return &PLCConfig{ 28 - ListenAddress: "tcp://127.0.0.1:28080", 29 - Pprof: true, // TODO set to false once we move past alpha phase 30 - } 31 - }
+44
config/config.go
··· 1 + package config 2 + 3 + import ( 4 + "time" 5 + 6 + bftconfig "github.com/cometbft/cometbft/config" 7 + ) 8 + 9 + type UnifiedConfig struct { 10 + *bftconfig.Config `mapstructure:",squash"` 11 + 12 + PLC *PLCConfig `mapstructure:"plc"` 13 + } 14 + 15 + func DefaultConfig() *UnifiedConfig { 16 + return &UnifiedConfig{ 17 + Config: bftconfig.DefaultConfig(), 18 + PLC: DefaultPLCConfig(), 19 + } 20 + } 21 + 22 + type PLCConfig struct { 23 + // Address to listen for incoming connections 24 + ListenAddress string `mapstructure:"laddr"` 25 + 26 + // Whether to expose pprof endpoints for debugging 27 + Pprof bool `mapstructure:"pprof"` 28 + 29 + // Maximum age of a cursor for the streaming export endpoint. 30 + // If set to 0, the OutdatedCursor error is disabled entirely. 31 + MaxStreamingExportCursorAge time.Duration `mapstructure:"max_streaming_export_cursor_age"` 32 + 33 + // Server response timeout for API endpoints 34 + ResponseTimeout time.Duration `mapstructure:"response_timeout"` 35 + } 36 + 37 + func DefaultPLCConfig() *PLCConfig { 38 + return &PLCConfig{ 39 + ListenAddress: "tcp://127.0.0.1:28080", 40 + Pprof: true, // TODO set to false once we move past alpha phase 41 + MaxStreamingExportCursorAge: 7 * 24 * time.Hour, 42 + ResponseTimeout: 10 * time.Second, 43 + } 44 + }
+27 -29
httpapi/server.go
··· 31 31 "github.com/cometbft/cometbft/rpc/core" 32 32 cmttypes "github.com/cometbft/cometbft/types" 33 33 "tangled.org/gbl08ma.com/didplcbft/abciapp" 34 + "tangled.org/gbl08ma.com/didplcbft/config" 34 35 "tangled.org/gbl08ma.com/didplcbft/plc" 35 36 "tangled.org/gbl08ma.com/didplcbft/transaction" 36 37 "tangled.org/gbl08ma.com/didplcbft/types" ··· 38 39 39 40 // Server represents the HTTP server for the PLC directory. 40 41 type Server struct { 41 - logger cmtlog.Logger 42 - txFactory *transaction.Factory 43 - plc plc.ReadPLC 44 - router *http.ServeMux 45 - mempoolSubmitter types.MempoolSubmitter 46 - nodeEventBus *cmttypes.EventBus 47 - srv http.Server 48 - handlerTimeout time.Duration 49 - proto string 50 - addr string 42 + logger cmtlog.Logger 43 + txFactory *transaction.Factory 44 + plc plc.ReadPLC 45 + router *http.ServeMux 46 + mempoolSubmitter types.MempoolSubmitter 47 + nodeEventBus *cmttypes.EventBus 48 + srv http.Server 49 + handlerTimeout time.Duration 50 + proto string 51 + addr string 52 + maxStreamingExportCursorAge time.Duration 51 53 52 54 started atomic.Bool 53 55 exitDone sync.WaitGroup ··· 60 62 plc plc.ReadPLC, 61 63 mempoolSubmitter types.MempoolSubmitter, 62 64 nodeEventBus *cmttypes.EventBus, 63 - listenAddr string, 64 - handlerTimeout time.Duration, 65 - pprofEnabled bool) (*Server, error) { 65 + cfg *config.PLCConfig) (*Server, error) { 66 66 s := &Server{ 67 - logger: logger, 68 - txFactory: txFactory, 69 - plc: plc, 70 - router: http.NewServeMux(), 71 - mempoolSubmitter: mempoolSubmitter, 72 - nodeEventBus: nodeEventBus, 73 - srv: http.Server{Addr: listenAddr}, 74 - handlerTimeout: handlerTimeout, 67 + logger: logger, 68 + txFactory: txFactory, 69 + plc: plc, 70 + router: http.NewServeMux(), 71 + mempoolSubmitter: mempoolSubmitter, 72 + nodeEventBus: nodeEventBus, 73 + srv: http.Server{Addr: cfg.ListenAddress}, 74 + handlerTimeout: cfg.ResponseTimeout, 75 + maxStreamingExportCursorAge: cfg.MaxStreamingExportCursorAge, 75 76 } 76 - s.setupRoutes(pprofEnabled) 77 + s.setupRoutes(cfg.Pprof) 77 78 78 79 handler := cors.Default().Handler(s.router) 79 80 80 81 s.srv.Handler = handler 81 82 82 - parts := strings.SplitN(listenAddr, "://", 2) 83 + parts := strings.SplitN(cfg.ListenAddress, "://", 2) 83 84 if len(parts) != 2 { 84 85 return nil, stacktrace.NewError( 85 86 "invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", 86 - listenAddr, 87 + cfg.ListenAddress, 87 88 ) 88 89 } 89 90 s.proto, s.addr = parts[0], parts[1] ··· 475 476 } 476 477 477 478 for _, entry := range entries { 478 - // 1 week is the same as what the official implementation appears to allow 479 - // TODO make configurable (the official implementation has it configurable) 480 - // TODO consider whether we really need to implement this limitation, as much like what happens with slow consumers, 481 - // we probably don't have any problems dealing with old entries, unlike the official implementation's "outbox"? 482 - if time.Since(entry.CreatedAt) > 7*24*time.Hour { 479 + // Check if cursor age exceeds maximum allowed 480 + if s.maxStreamingExportCursorAge > 0 && time.Since(entry.CreatedAt) > s.maxStreamingExportCursorAge { 483 481 c.Close(websocket.StatusNormalClosure, "OutdatedCursor") // as in the spec 484 482 return 485 483 }
+29 -28
httpapi/server_test.go
··· 13 13 cmtlog "github.com/cometbft/cometbft/libs/log" 14 14 "github.com/did-method-plc/go-didplc" 15 15 "github.com/stretchr/testify/require" 16 + "tangled.org/gbl08ma.com/didplcbft/config" 16 17 "tangled.org/gbl08ma.com/didplcbft/plc" 17 18 "tangled.org/gbl08ma.com/didplcbft/testutil" 18 19 "tangled.org/gbl08ma.com/didplcbft/transaction" ··· 110 111 111 112 txFactory, _, _ := testutil.NewTestTxFactory(t) 112 113 113 - t.Run("Test Resolve DID", func(t *testing.T) { 114 - server, err := NewServer(testLogger, txFactory, mockPLC, nil, nil, "tcp://127.0.0.1:8080", 15*time.Second, false) 114 + // newTestServer creates a new server instance for testing with the given MockReadPLC 115 + newTestServer := func(t *testing.T, plc *MockReadPLC) *Server { 116 + t.Helper() 117 + cfg := &config.PLCConfig{ 118 + ListenAddress: "tcp://127.0.0.1:8080", 119 + Pprof: false, 120 + MaxStreamingExportCursorAge: 7 * 24 * time.Hour, 121 + ResponseTimeout: 15 * time.Second, 122 + } 123 + server, err := NewServer(testLogger, txFactory, plc, nil, nil, cfg) 115 124 require.NoError(t, err) 125 + return server 126 + } 127 + 128 + t.Run("Test Resolve DID", func(t *testing.T) { 129 + server := newTestServer(t, mockPLC) 116 130 117 131 req, err := http.NewRequest("GET", "/did:plc:test", nil) 118 132 require.NoError(t, err) ··· 126 140 127 141 t.Run("Test Resolve DID Not Found", func(t *testing.T) { 128 142 mockPLC := &MockReadPLC{shouldReturnError: true, errorType: "notfound"} 129 - server, err := NewServer(testLogger, txFactory, mockPLC, nil, nil, "tcp://127.0.0.1:8080", 15*time.Second, false) 130 - require.NoError(t, err) 143 + server := newTestServer(t, mockPLC) 131 144 132 145 req, err := http.NewRequest("GET", "/did:plc:test", nil) 133 146 require.NoError(t, err) ··· 141 154 142 155 t.Run("Test Resolve DID Gone", func(t *testing.T) { 143 156 mockPLC := &MockReadPLC{shouldReturnError: true, errorType: "gone"} 144 - server, err := NewServer(testLogger, txFactory, mockPLC, nil, nil, "tcp://127.0.0.1:8080", 15*time.Second, false) 145 - require.NoError(t, err) 157 + server := newTestServer(t, mockPLC) 146 158 147 159 req, err := http.NewRequest("GET", "/did:plc:test", nil) 148 160 require.NoError(t, err) ··· 156 168 157 169 t.Run("Test Resolve DID Internal Error", func(t *testing.T) { 158 170 mockPLC := &MockReadPLC{shouldReturnError: true, errorType: "internal"} 159 - server, err := NewServer(testLogger, txFactory, mockPLC, nil, nil, "tcp://127.0.0.1:8080", 15*time.Second, false) 160 - require.NoError(t, err) 171 + server := newTestServer(t, mockPLC) 161 172 162 173 req, err := http.NewRequest("GET", "/did:plc:test", nil) 163 174 require.NoError(t, err) ··· 170 181 }) 171 182 172 183 t.Run("Test Create PLC Operation", func(t *testing.T) { 173 - server, err := NewServer(testLogger, txFactory, mockPLC, nil, nil, "tcp://127.0.0.1:8080", 15*time.Second, false) 174 - require.NoError(t, err) 184 + server := newTestServer(t, mockPLC) 175 185 176 186 op := map[string]interface{}{ 177 187 "type": "plc_operation", ··· 194 204 }) 195 205 196 206 t.Run("Test Get PLC Log", func(t *testing.T) { 197 - server, err := NewServer(testLogger, txFactory, mockPLC, nil, nil, "tcp://127.0.0.1:8080", 15*time.Second, false) 198 - require.NoError(t, err) 207 + server := newTestServer(t, mockPLC) 199 208 200 209 req, err := http.NewRequest("GET", "/did:plc:test/log", nil) 201 210 require.NoError(t, err) ··· 208 217 209 218 t.Run("Test Get PLC Log Not Found", func(t *testing.T) { 210 219 mockPLC := &MockReadPLC{shouldReturnError: true, errorType: "notfound"} 211 - server, err := NewServer(testLogger, txFactory, mockPLC, nil, nil, "tcp://127.0.0.1:8080", 15*time.Second, false) 212 - require.NoError(t, err) 220 + server := newTestServer(t, mockPLC) 213 221 214 222 req, err := http.NewRequest("GET", "/did:plc:test/log", nil) 215 223 require.NoError(t, err) ··· 222 230 }) 223 231 224 232 t.Run("Test Get PLC Audit Log", func(t *testing.T) { 225 - server, err := NewServer(testLogger, txFactory, mockPLC, nil, nil, "tcp://127.0.0.1:8080", 15*time.Second, false) 226 - require.NoError(t, err) 233 + server := newTestServer(t, mockPLC) 227 234 228 235 req, err := http.NewRequest("GET", "/did:plc:test/log/audit", nil) 229 236 require.NoError(t, err) ··· 235 242 }) 236 243 237 244 t.Run("Test Get Last Operation", func(t *testing.T) { 238 - server, err := NewServer(testLogger, txFactory, mockPLC, nil, nil, "tcp://127.0.0.1:8080", 15*time.Second, false) 239 - require.NoError(t, err) 245 + server := newTestServer(t, mockPLC) 240 246 241 247 req, err := http.NewRequest("GET", "/did:plc:test/log/last", nil) 242 248 require.NoError(t, err) ··· 249 255 250 256 t.Run("Test Get Last Operation Internal Error", func(t *testing.T) { 251 257 mockPLC := &MockReadPLC{shouldReturnError: true, errorType: "internal"} 252 - server, err := NewServer(testLogger, txFactory, mockPLC, nil, nil, "tcp://127.0.0.1:8080", 15*time.Second, false) 253 - require.NoError(t, err) 258 + server := newTestServer(t, mockPLC) 254 259 255 260 req, err := http.NewRequest("GET", "/did:plc:test/log/last", nil) 256 261 require.NoError(t, err) ··· 263 268 }) 264 269 265 270 t.Run("Test Get PLC Data", func(t *testing.T) { 266 - server, err := NewServer(testLogger, txFactory, mockPLC, nil, nil, "tcp://127.0.0.1:8080", 15*time.Second, false) 267 - require.NoError(t, err) 271 + server := newTestServer(t, mockPLC) 268 272 269 273 req, err := http.NewRequest("GET", "/did:plc:test/data", nil) 270 274 require.NoError(t, err) ··· 277 281 278 282 t.Run("Test Get PLC Data Not Found", func(t *testing.T) { 279 283 mockPLC := &MockReadPLC{shouldReturnError: true, errorType: "notfound"} 280 - server, err := NewServer(testLogger, txFactory, mockPLC, nil, nil, "tcp://127.0.0.1:8080", 15*time.Second, false) 281 - require.NoError(t, err) 284 + server := newTestServer(t, mockPLC) 282 285 283 286 req, err := http.NewRequest("GET", "/did:plc:test/data", nil) 284 287 require.NoError(t, err) ··· 291 294 }) 292 295 293 296 t.Run("Test Export", func(t *testing.T) { 294 - server, err := NewServer(testLogger, txFactory, mockPLC, nil, nil, "tcp://127.0.0.1:8080", 15*time.Second, false) 295 - require.NoError(t, err) 297 + server := newTestServer(t, mockPLC) 296 298 297 299 req, err := http.NewRequest("GET", "/export?count=10", nil) 298 300 require.NoError(t, err) ··· 305 307 306 308 t.Run("Test Export Internal Error", func(t *testing.T) { 307 309 mockPLC := &MockReadPLC{shouldReturnError: true, errorType: "internal"} 308 - server, err := NewServer(testLogger, txFactory, mockPLC, nil, nil, "tcp://127.0.0.1:8080", 15*time.Second, false) 309 - require.NoError(t, err) 310 + server := newTestServer(t, mockPLC) 310 311 311 312 req, err := http.NewRequest("GET", "/export?count=10", nil) 312 313 require.NoError(t, err)
+19 -18
main.go
··· 24 24 "github.com/samber/lo" 25 25 "tangled.org/gbl08ma.com/didplcbft/abciapp" 26 26 "tangled.org/gbl08ma.com/didplcbft/badgertodbm" 27 + "tangled.org/gbl08ma.com/didplcbft/config" 27 28 "tangled.org/gbl08ma.com/didplcbft/httpapi" 28 29 "tangled.org/gbl08ma.com/didplcbft/store" 29 30 "tangled.org/gbl08ma.com/didplcbft/transaction" ··· 47 48 homeDir = filepath.Join(lo.Must(os.Getwd()), "didplcbft-data") 48 49 } 49 50 50 - config := DefaultConfig() 51 - config.SetRoot(homeDir) 51 + cfg := config.DefaultConfig() 52 + cfg.SetRoot(homeDir) 52 53 viper.SetConfigFile(fmt.Sprintf("%s/%s", homeDir, "config/config.toml")) 53 54 54 55 if err := viper.ReadInConfig(); err != nil { 55 56 log.Fatalf("Reading config: %v", err) 56 57 } 57 - if err := viper.Unmarshal(config); err != nil { 58 + if err := viper.Unmarshal(cfg); err != nil { 58 59 log.Fatalf("Decoding config: %v", err) 59 60 } 60 - if err := config.ValidateBasic(); err != nil { 61 + if err := cfg.ValidateBasic(); err != nil { 61 62 log.Fatalf("Invalid configuration data: %v", err) 62 63 } 63 64 64 65 logger := cmtlog.NewTMLogger(cmtlog.NewSyncWriter(os.Stdout)) 65 - logger, err := cmtflags.ParseLogLevel(config.LogLevel, logger, cmtconfig.DefaultLogLevel) 66 + logger, err := cmtflags.ParseLogLevel(cfg.LogLevel, logger, cmtconfig.DefaultLogLevel) 66 67 if err != nil { 67 68 log.Fatalf("failed to parse log level: %v", err) 68 69 } 69 70 70 - underlyingTreeDB, treeDB, err := badgertodbm.NewBadgerDB(logger.With("module", "treebadger"), "apptree", config.Config.DBDir()) 71 + underlyingTreeDB, treeDB, err := badgertodbm.NewBadgerDB(logger.With("module", "treebadger"), "apptree", cfg.Config.DBDir()) 71 72 if err != nil { 72 73 log.Fatalf("failed to create application tree database: %v", err) 73 74 } 74 75 75 - underlyingIndexDB, indexDB, err := badgertodbm.NewBadgerDB(logger.With("module", "indexbadger"), "appindex", config.Config.DBDir()) 76 + underlyingIndexDB, indexDB, err := badgertodbm.NewBadgerDB(logger.With("module", "indexbadger"), "appindex", cfg.Config.DBDir()) 76 77 if err != nil { 77 78 log.Fatalf("failed to create application index database: %v", err) 78 79 } ··· 103 104 mempoolSubmitter := &txSubmitter{} 104 105 105 106 pv := privval.LoadFilePV( 106 - config.PrivValidatorKeyFile(), 107 - config.PrivValidatorStateFile(), 107 + cfg.PrivValidatorKeyFile(), 108 + cfg.PrivValidatorStateFile(), 108 109 ) 109 110 110 111 appContext, cancelAppContext := context.WithCancel(context.Background()) ··· 113 114 // this must be done before we call NewNode, otherwise it will get a hold of the leveldb block store 114 115 nodeDBProvider := cmtconfig.DefaultDBProvider 115 116 116 - recentBlockHeaders, err := readRecentBlockHeaders(nodeDBProvider, config.Config, max(abciapp.CommitToChallengeMaxAgeInBlocks, abciapp.CompleteChallengeMaxAgeInBlocks)+5) // 5 blocks safety margin 117 + recentBlockHeaders, err := readRecentBlockHeaders(nodeDBProvider, cfg.Config, max(abciapp.CommitToChallengeMaxAgeInBlocks, abciapp.CompleteChallengeMaxAgeInBlocks)+5) // 5 blocks safety margin 117 118 if err != nil { 118 119 log.Fatalf("failed to read recent block headers: %v", err) 119 120 } ··· 157 158 indexDB, 158 159 recreateDatabases, 159 160 filepath.Join(homeDir, "snapshots"), 160 - config.StateSync.TempDir, 161 + cfg.StateSync.TempDir, 161 162 didBloomFilterPath, 162 163 mempoolSubmitter, 163 164 blockHeaderGetter) ··· 168 169 169 170 txFactory = txf 170 171 171 - nodeKey, err := p2p.LoadNodeKey(config.NodeKeyFile()) 172 + nodeKey, err := p2p.LoadNodeKey(cfg.NodeKeyFile()) 172 173 if err != nil { 173 174 log.Fatalf("failed to load node's key: %v", err) 174 175 } 175 176 176 177 node, err := nm.NewNode( 177 - config.Config, 178 + cfg.Config, 178 179 pv, 179 180 nodeKey, 180 181 proxy.NewLocalClientCreator(app), 181 - nm.DefaultGenesisDocProviderFunc(config.Config), 182 + nm.DefaultGenesisDocProviderFunc(cfg.Config), 182 183 nodeDBProvider, 183 - nm.DefaultMetricsProvider(config.Config.Instrumentation), 184 + nm.DefaultMetricsProvider(cfg.Config.Instrumentation), 184 185 logger, 185 186 ) 186 187 ··· 191 192 blockStore = node.BlockStore() 192 193 193 194 // workaround for CometBFT bug where the temp_dir config entry is not taken into account 194 - err = fixStateSyncReactorTempDir(node, config.StateSync.TempDir) 195 + err = fixStateSyncReactorTempDir(node, cfg.StateSync.TempDir) 195 196 if err != nil { 196 197 log.Fatalf("Creating node: %v", err) 197 198 } ··· 214 215 node.Wait() 215 216 }() 216 217 217 - if config.PLC.ListenAddress != "" { 218 - plcAPIServer, err := httpapi.NewServer(logger.With("module", "plcapi"), txFactory, plc, mempoolSubmitter, node.EventBus(), config.PLC.ListenAddress, 10*time.Second, config.PLC.Pprof) 218 + if cfg.PLC.ListenAddress != "" { 219 + plcAPIServer, err := httpapi.NewServer(logger.With("module", "plcapi"), txFactory, plc, mempoolSubmitter, node.EventBus(), cfg.PLC) 219 220 if err != nil { 220 221 log.Fatalf("Creating PLC API server: %v", err) 221 222 }