A very experimental PLC implementation which uses BFT consensus for decentralization
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

Begin work on misbehavior penalties

gbl08ma c56e5735 e7cbd734

+116 -47
+81 -41
abciapp/execution.go
··· 10 10 11 11 abcitypes "github.com/cometbft/cometbft/abci/types" 12 12 "github.com/cometbft/cometbft/crypto/ed25519" 13 + protocrypto "github.com/cometbft/cometbft/proto/tendermint/crypto" 13 14 "github.com/gbl08ma/stacktrace" 14 15 cbornode "github.com/ipfs/go-ipld-cbor" 15 16 "github.com/samber/lo" ··· 269 270 func (d *DIDPLCApplication) FinalizeBlock(ctx context.Context, req *abcitypes.RequestFinalizeBlock) (*abcitypes.ResponseFinalizeBlock, error) { 270 271 defer (d.logMethod("FinalizeBlock", "height", req.Height, "hash", req.Hash))() 271 272 272 - markVotingParticipation := func() error { 273 - for _, vote := range req.DecidedLastCommit.Votes { 274 - err := store.Consensus.MarkValidatorVote(d.ongoingWrite, vote.GetValidator().Address, uint64(req.Height)) 275 - // we expect to attempt to store votes for validators that aren't active because validator_updates take a few blocks to fully take effect 276 - if err != nil && !errors.Is(err, store.ErrValidatorNotActive) { 277 - return stacktrace.Propagate(err) 278 - } 279 - } 280 - return nil 281 - } 273 + d.createOngoingTxIfNeeded(req.Time) 282 274 283 - if bytes.Equal(req.Hash, d.lastProcessedProposalHash) && d.lastProcessedProposalExecTxResults != nil { 284 - d.createOngoingTxIfNeeded(req.Time) 275 + if !bytes.Equal(req.Hash, d.lastProcessedProposalHash) || d.lastProcessedProposalExecTxResults == nil { 276 + // a block other than the one we processed in ProcessProposal was decided 277 + // discard the current modified state, and process the decided block 278 + d.DiscardChanges() 285 279 286 - err := markVotingParticipation() 287 - if err != nil { 288 - return nil, stacktrace.Propagate(err) 280 + txResults := make([]*processResult, len(req.Txs)) 281 + for i, tx := range req.Txs { 282 + var err error 283 + txResults[i], err = processTx(ctx, d.transactionProcessorDependenciesForOngoingProcessing(true, req.Time), tx) 284 + if err != nil { 285 + return nil, stacktrace.Propagate(err) 286 + } 289 287 } 290 288 289 + d.lastProcessedProposalHash = slices.Clone(req.Hash) 290 + d.lastProcessedProposalExecTxResults = txResults 291 + } else { 291 292 // the block that was decided was the one we processed in ProcessProposal, and ProcessProposal processed successfully 292 293 // reuse the uncommitted results 293 - return &abcitypes.ResponseFinalizeBlock{ 294 - TxResults: lo.Map(d.lastProcessedProposalExecTxResults, func(result *processResult, _ int) *abcitypes.ExecTxResult { 295 - return result.ToABCI() 296 - }), 297 - ValidatorUpdates: lo.FlatMap(d.lastProcessedProposalExecTxResults, func(result *processResult, _ int) []abcitypes.ValidatorUpdate { 298 - return result.validatorUpdates 299 - }), 300 - AppHash: d.tree.WorkingHash(), 301 - }, nil 302 294 } 303 - // a block other than the one we processed in ProcessProposal was decided 304 - // discard the current modified state, and process the decided block 305 - d.DiscardChanges() 306 295 307 - txResults := make([]*processResult, len(req.Txs)) 308 - validatorUpdates := []abcitypes.ValidatorUpdate{} 309 - for i, tx := range req.Txs { 310 - var err error 311 - txResults[i], err = processTx(ctx, d.transactionProcessorDependenciesForOngoingProcessing(true, req.Time), tx) 312 - if err != nil { 313 - return nil, stacktrace.Propagate(err) 296 + validatorUpdates, jailedValidatorPubkeys, err := d.processMisbehavior(req.Misbehavior) 297 + if err != nil { 298 + return nil, stacktrace.Propagate(err) 299 + } 300 + for _, result := range d.lastProcessedProposalExecTxResults { 301 + for _, update := range result.validatorUpdates { 302 + _, jailed := jailedValidatorPubkeys[[32]byte(update.PubKey.GetEd25519())] 303 + // if this is an epoch block, ensure we don't un-jail any validators that were just jailed for misbehavior 304 + if !jailed { 305 + validatorUpdates = append(validatorUpdates, update) 306 + } 314 307 } 315 - validatorUpdates = append(validatorUpdates, txResults[i].validatorUpdates...) 316 308 } 317 309 318 - d.lastProcessedProposalHash = slices.Clone(req.Hash) 319 - d.lastProcessedProposalExecTxResults = txResults 320 - 321 - err := markVotingParticipation() 322 - if err != nil { 323 - return nil, stacktrace.Propagate(err) 310 + for _, vote := range req.DecidedLastCommit.Votes { 311 + err := store.Consensus.MarkValidatorVote(d.ongoingWrite, vote.GetValidator().Address, uint64(req.Height)) 312 + // we expect to attempt to store votes for validators that aren't active because validator_updates take a few blocks to fully take effect 313 + if err != nil && !errors.Is(err, store.ErrValidatorNotActive) { 314 + return nil, stacktrace.Propagate(err) 315 + } 324 316 } 325 317 326 318 return &abcitypes.ResponseFinalizeBlock{ ··· 330 322 ValidatorUpdates: validatorUpdates, 331 323 AppHash: d.tree.WorkingHash(), 332 324 }, nil 325 + } 326 + 327 + func (d *DIDPLCApplication) processMisbehavior(misbehavior []abcitypes.Misbehavior) ([]abcitypes.ValidatorUpdate, map[[store.PublicKeyLength]byte]struct{}, error) { 328 + validatorUpdates := []abcitypes.ValidatorUpdate{} 329 + jailed := make(map[[store.PublicKeyLength]byte]struct{}) 330 + for _, m := range misbehavior { 331 + pubkey, err := store.Consensus.ActiveValidatorPubKey(d.ongoingRead, uint64(m.Height), m.Validator.Address) 332 + if errors.Is(err, store.ErrValidatorNotActive) { 333 + // hmm, this makes it trickier to find the pubkey for the validator 334 + // if the validator is no longer active maybe we can just ignore this? 335 + continue 336 + } else if err != nil { 337 + return nil, nil, stacktrace.Propagate(err) 338 + } 339 + 340 + // note: the validator may receive additional penalties on the next epoch block, 341 + // because the validator voting power is (potentially only temporarily) fully removed 342 + // but the validator is not removed from the store.Consensus.ActiveValidatorsIterator visitees. 343 + // its votes might not be included for the rest of this epoch, this will further count negatively towards its reputation 344 + 345 + err = store.Consensus.ChangeValidatorReputation(d.ongoingWrite, pubkey, func(reputation uint64) (uint64, error) { 346 + switch m.Type { 347 + case abcitypes.MisbehaviorType_DUPLICATE_VOTE: 348 + return reputation / 10, nil 349 + case abcitypes.MisbehaviorType_LIGHT_CLIENT_ATTACK: 350 + return (reputation * 7) / 10, nil 351 + default: 352 + return 0, stacktrace.NewError("unknown misbehavior type") 353 + } 354 + }) 355 + if err != nil { 356 + return nil, nil, stacktrace.Propagate(err) 357 + } 358 + 359 + // remove validator from validator set until the _next_ epoch block 360 + // if this is an epoch block, because misbehavior is processed after the update validators transaction, this may mean the validator will spend an entire epoch inactive 361 + // the validator may (or may not) regain some voting power on the next epoch block 362 + jailed[[store.PublicKeyLength]byte(pubkey)] = struct{}{} 363 + validatorUpdates = append(validatorUpdates, abcitypes.ValidatorUpdate{ 364 + PubKey: protocrypto.PublicKey{ 365 + Sum: &protocrypto.PublicKey_Ed25519{ 366 + Ed25519: pubkey[:], 367 + }, 368 + }, 369 + Power: int64(0), 370 + }) 371 + } 372 + return validatorUpdates, jailed, nil 333 373 } 334 374 335 375 // Commit implements [types.Application].
+3 -1
abciapp/tx_challenge.go
··· 328 328 numProvenBlocks := toHeight - fromHeight + 1 329 329 repGain := numProvenBlocks * ReputationGainPerProvenBlock 330 330 331 - err = store.Consensus.ChangeValidatorReputation(writeTx, pubKey.Bytes(), int64(repGain)) 331 + err = store.Consensus.ChangeValidatorReputation(writeTx, pubKey.Bytes(), func(reputation uint64) (uint64, error) { 332 + return reputation + repGain, nil 333 + }) 332 334 if err != nil { 333 335 return nil, stacktrace.Propagate(err) 334 336 }
+26 -3
store/consensus.go
··· 94 94 DeleteBlockChallengeProofsBelowHeight(ctx context.Context, tx transaction.WriteIndex, blockHeight uint64) error 95 95 96 96 ValidatorReputation(tx transaction.Read, validatorPubKey []byte) (uint64, error) 97 - ChangeValidatorReputation(tx transaction.Write, validatorPubKey []byte, change int64) error 97 + ChangeValidatorReputation(tx transaction.Write, validatorPubKey []byte, changer func(reputation uint64) (uint64, error)) error 98 98 ChangeAllNonZeroValidatorReputations(tx transaction.Write, changer func(validatorPubKey []byte, reputation uint64) (uint64, error)) error 99 99 100 + ActiveValidatorPubKey(tx transaction.Read, height uint64, validatorAddress []byte) ([]byte, error) 100 101 ActiveValidatorsIterator(tx transaction.Read, epochHeight uint64, retErr *error) iter.Seq[ActiveValidator] 101 102 InitializeValidatorVotingActivity(tx transaction.WriteIndex, validatorAddress, validatorPubKey []byte, epochHeight uint64) error 102 103 MarkValidatorVote(tx transaction.WriteIndex, validatorAddress []byte, height uint64) error ··· 919 920 } 920 921 921 922 // ChangeValidatorReputation implements [ConsensusStore]. 922 - func (t *consensusStore) ChangeValidatorReputation(tx transaction.Write, validatorPubKey []byte, change int64) error { 923 + func (t *consensusStore) ChangeValidatorReputation(tx transaction.Write, validatorPubKey []byte, changer func(reputation uint64) (uint64, error)) error { 923 924 key := marshalValidatorReputationKey(validatorPubKey) 924 925 925 926 value, err := tx.Tree().Get(key) ··· 928 929 } 929 930 930 931 reputation := new(big.Int).SetBytes(value) 931 - reputation.Add(reputation, big.NewInt(change)) 932 + newValue, err := changer(reputation.Uint64()) 933 + if err != nil { 934 + return stacktrace.Propagate(err) 935 + } 936 + reputation.SetUint64(newValue) 932 937 if reputation.Sign() <= 0 { 933 938 _, _, err := tx.Tree().Remove(key) 934 939 return stacktrace.Propagate(err) ··· 1089 1094 1090 1095 err = tx.IndexDB().Set(key, value) 1091 1096 return stacktrace.Propagate(err) 1097 + } 1098 + 1099 + // ActiveValidatorPubKey implements [ConsensusStore]. 1100 + func (t *consensusStore) ActiveValidatorPubKey(tx transaction.Read, height uint64, validatorAddress []byte) ([]byte, error) { 1101 + epochHeight := height - height%t.epochSize 1102 + 1103 + key := MarshalValidatorVotingActivityKey(epochHeight, validatorAddress) 1104 + 1105 + value, err := tx.IndexDB().Get(key) 1106 + if err != nil { 1107 + return nil, stacktrace.Propagate(err) 1108 + } 1109 + if value == nil { 1110 + return nil, stacktrace.Propagate(ErrValidatorNotActive) 1111 + } 1112 + 1113 + pubkey, _, err := unmarshalValidatorVotingActivityValue(value) 1114 + return pubkey, stacktrace.Propagate(err) 1092 1115 } 1093 1116 1094 1117 // ActiveValidatorsIterator implements [ConsensusStore].
+3 -1
store/snapshot_test.go
··· 124 124 err = store.Consensus.SetAuthoritativePLC(writeTx, authoritativePLC) 125 125 require.NoError(t, err) 126 126 127 - err = store.Consensus.ChangeValidatorReputation(writeTx, valPubKey, 100) 127 + err = store.Consensus.ChangeValidatorReputation(writeTx, valPubKey, func(reputation uint64) (uint64, error) { 128 + return reputation + 100, nil 129 + }) 128 130 require.NoError(t, err) 129 131 130 132 err = writeTx.Commit()
+3 -1
store/store_test.go
··· 30 30 31 31 for range 3 { 32 32 for i := range validators { 33 - err = store.Consensus.ChangeValidatorReputation(tx, validators[i], int64(i)*10) 33 + err = store.Consensus.ChangeValidatorReputation(tx, validators[i], func(reputation uint64) (uint64, error) { 34 + return reputation + uint64(i)*10, nil 35 + }) 34 36 require.NoError(t, err) 35 37 } 36 38 }