diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2019-03-20 14:57:12 +0800 |
---|---|---|
committer | Jimmy Hu <jimmy.hu@dexon.org> | 2019-03-27 15:25:10 +0800 |
commit | 6efe199cb38eb4cb9a9a64d98ff5f8c4fb997da7 (patch) | |
tree | 2c18fe616f84df7274f19f88cf325fe558869918 /core/consensus.go | |
parent | fa3b5a29499739e90b3cf17f9a0cf60a72a64fc0 (diff) | |
download | dexon-consensus-6efe199cb38eb4cb9a9a64d98ff5f8c4fb997da7.tar.gz dexon-consensus-6efe199cb38eb4cb9a9a64d98ff5f8c4fb997da7.tar.zst dexon-consensus-6efe199cb38eb4cb9a9a64d98ff5f8c4fb997da7.zip |
core: merge notarySet and DKGSet (#488)
* core: ăăăȘă DKGSet
* test logger
* temporary fix before finalized
* core: Sign psig on commit vote
* Add syncer log
* fixup
Diffstat (limited to 'core/consensus.go')
-rw-r--r-- | core/consensus.go | 383 |
1 files changed, 181 insertions, 202 deletions
diff --git a/core/consensus.go b/core/consensus.go index d74a4a2..2b0d5a4 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -27,6 +27,7 @@ import ( "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/crypto" + cryptoDKG "github.com/dexon-foundation/dexon-consensus/core/crypto/dkg" "github.com/dexon-foundation/dexon-consensus/core/db" "github.com/dexon-foundation/dexon-consensus/core/types" typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" @@ -51,6 +52,8 @@ var ( "CRS not ready") ErrConfigurationNotReady = fmt.Errorf( "Configuration not ready") + ErrIncorrectBlockRandomness = fmt.Errorf( + "randomness of block is incorrect") ) // consensusBAReceiver implements agreementReceiver. @@ -62,6 +65,8 @@ type consensusBAReceiver struct { roundValue *atomic.Value isNotary bool restartNotary chan types.Position + npks *typesDKG.NodePublicKeys + psigSigner *dkgShareSecret } func (recv *consensusBAReceiver) round() uint64 { @@ -72,10 +77,51 @@ func (recv *consensusBAReceiver) changeNotaryHeight() uint64 { return recv.changeNotaryHeightValue.Load().(uint64) } +func (recv *consensusBAReceiver) VerifyPartialSignature(vote *types.Vote) bool { + if recv.round() >= DKGDelayRound && vote.BlockHash != types.SkipBlockHash { + if vote.Type == types.VoteCom || vote.Type == types.VoteFastCom { + if recv.npks == nil || recv.npks.Round != vote.Position.Round { + var err error + recv.npks, _, err = + recv.consensus.cfgModule.getDKGInfo(vote.Position.Round, true) + if err != nil || recv.npks == nil { + recv.consensus.logger.Warn("cannot get npks", + "round", vote.Position.Round, "error", err) + return false + } + } + pubKey, exist := recv.npks.PublicKeys[vote.ProposerID] + if !exist { + return false + } + blockHash := vote.BlockHash + if blockHash == types.NullBlockHash { + blockHash = utils.HashPosition(vote.Position) + } + return pubKey.VerifySignature( + vote.BlockHash, crypto.Signature(vote.PartialSignature)) + } + } + return len(vote.PartialSignature.Signature) == 0 +} + func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { if !recv.isNotary { return } + if recv.round() >= DKGDelayRound && vote.BlockHash != types.SkipBlockHash { + if vote.Type == types.VoteCom || vote.Type == types.VoteFastCom { + if recv.psigSigner == nil { + return + } + if vote.BlockHash == types.NullBlockHash { + vote.PartialSignature = recv.psigSigner.sign( + utils.HashPosition(vote.Position)) + } else { + vote.PartialSignature = recv.psigSigner.sign(vote.BlockHash) + } + } + } if err := recv.agreementModule.prepareVote(vote); err != nil { recv.consensus.logger.Error("Failed to prepare vote", "error", err) return @@ -120,6 +166,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( block *types.Block aID = recv.agreementModule.agreementID() ) + isEmptyBlockConfirmed := hash == common.Hash{} if isEmptyBlockConfirmed { recv.consensus.logger.Info("Empty block is confirmed", "position", aID) @@ -177,6 +224,47 @@ func (recv *consensusBAReceiver) ConfirmBlock( return } } + + voteList := make([]types.Vote, 0, len(votes)) + IDs := make(cryptoDKG.IDs, 0, len(votes)) + psigs := make([]cryptoDKG.PartialSignature, 0, len(votes)) + for _, vote := range votes { + if vote.BlockHash != hash { + continue + } + if recv.round() >= DKGDelayRound { + ID, exist := recv.npks.IDMap[vote.ProposerID] + if !exist { + continue + } + IDs = append(IDs, ID) + psigs = append(psigs, vote.PartialSignature) + } + voteList = append(voteList, *vote) + } + if recv.round() >= DKGDelayRound { + rand, err := cryptoDKG.RecoverSignature(psigs, IDs) + if err != nil { + recv.consensus.logger.Warn("Unable to recover randomness", + "block", block, + "error", err) + } else { + block.Finalization.Randomness = rand.Signature[:] + } + } + if recv.isNotary { + result := &types.AgreementResult{ + BlockHash: block.Hash, + Position: block.Position, + Votes: voteList, + Randomness: block.Finalization.Randomness, + IsEmptyBlock: isEmptyBlockConfirmed, + } + recv.consensus.logger.Debug("Propose AgreementResult", + "result", result) + recv.consensus.msgChan <- result + } + if block.Position.Height != 0 && !recv.consensus.bcModule.confirmed(block.Position.Height-1) { go func(hash common.Hash) { @@ -222,25 +310,9 @@ func (recv *consensusBAReceiver) ConfirmBlock( } }(block.ParentHash) } - if recv.isNotary { - voteList := make([]types.Vote, 0, len(votes)) - for _, vote := range votes { - if vote.BlockHash != hash { - continue - } - voteList = append(voteList, *vote) - } - result := &types.AgreementResult{ - BlockHash: block.Hash, - Position: block.Position, - Votes: voteList, - IsEmptyBlock: isEmptyBlockConfirmed, - } - recv.consensus.logger.Debug("Propose AgreementResult", - "result", result) - recv.consensus.network.BroadcastAgreementResult(result) + if !block.IsEmpty() { + recv.consensus.processBlockChan <- block } - recv.consensus.processBlockChan <- block // Clean the restartNotary channel so BA will not stuck by deadlock. CleanChannelLoop: for { @@ -253,7 +325,7 @@ CleanChannelLoop: newPos := block.Position if block.Position.Height+1 == recv.changeNotaryHeight() { newPos.Round++ - recv.roundValue.Store(newPos.Round) + recv.updateRound(newPos.Round) } currentRound := recv.round() changeNotaryHeight := recv.changeNotaryHeight() @@ -282,6 +354,18 @@ func (recv *consensusBAReceiver) ReportForkBlock(b1, b2 *types.Block) { recv.consensus.gov.ReportForkBlock(b1, b2) } +func (recv *consensusBAReceiver) updateRound(round uint64) { + recv.roundValue.Store(round) + var err error + _, recv.psigSigner, err = + recv.consensus.cfgModule.getDKGInfo(round, false) + if err != nil { + recv.consensus.logger.Warn("cannot get dkg info", + "round", round, "error", err) + recv.psigSigner = nil + } +} + // consensusDKGReceiver implements dkgReceiver. type consensusDKGReceiver struct { ID types.NodeID @@ -401,13 +485,13 @@ type Consensus struct { bcModule *blockChain dMoment time.Time nodeSetCache *utils.NodeSetCache + tsigVerifierCache *TSigVerifierCache lock sync.RWMutex ctx context.Context ctxCancel context.CancelFunc event *common.Event roundEvent *utils.RoundEvent logger common.Logger - resetRandomnessTicker chan struct{} resetDeliveryGuardTicker chan struct{} msgChan chan interface{} waitGroup sync.WaitGroup @@ -465,7 +549,6 @@ func NewConsensusFromSyncer( networkModule Network, prv crypto.PrivateKey, confirmedBlocks []*types.Block, - randomnessResults []*types.BlockRandomnessResult, cachedMessages []interface{}, logger common.Logger) (*Consensus, error) { // Setup Consensus instance. @@ -492,30 +575,13 @@ func NewConsensusFromSyncer( } refBlock = b } - // Dump all randomness result to the consensus instance. - for _, r := range randomnessResults { - if err := con.ProcessBlockRandomnessResult(r, false); err != nil { - con.logger.Error("failed to process randomness result when syncing", - "result", r) - continue - } - } if startWithEmpty { pos := initBlock.Position pos.Height++ - block, err := con.bcModule.addEmptyBlock(pos) + _, err := con.bcModule.addEmptyBlock(pos) if err != nil { panic(err) } - con.processBlockChan <- block - if pos.Round >= DKGDelayRound { - rand := &types.AgreementResult{ - BlockHash: block.Hash, - Position: block.Position, - IsEmptyBlock: true, - } - go con.prepareRandomnessResult(rand) - } } return con, nil } @@ -566,8 +632,9 @@ func newConsensusForRound( if usingNonBlocking { appModule = newNonBlocking(app, debugApp) } + tsigVerifierCache := NewTSigVerifierCache(gov, 7) bcModule := newBlockChain(ID, dMoment, initBlock, appModule, - NewTSigVerifierCache(gov, 7), signer, logger) + tsigVerifierCache, signer, logger) // Construct Consensus instance. con := &Consensus{ ID: ID, @@ -582,10 +649,10 @@ func newConsensusForRound( bcModule: bcModule, dMoment: dMoment, nodeSetCache: nodeSetCache, + tsigVerifierCache: tsigVerifierCache, signer: signer, event: common.NewEvent(), logger: logger, - resetRandomnessTicker: make(chan struct{}), resetDeliveryGuardTicker: make(chan struct{}), msgChan: make(chan interface{}, 1024), processBlockChan: make(chan *types.Block, 1024), @@ -690,14 +757,14 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) { if nextRound < DKGDelayRound { return } - curDKGSet, err := con.nodeSetCache.GetDKGSet(e.Round) + curNotarySet, err := con.nodeSetCache.GetNotarySet(e.Round) if err != nil { - con.logger.Error("Error getting DKG set when proposing CRS", + con.logger.Error("Error getting notary set when proposing CRS", "round", e.Round, "error", err) return } - if _, exist := curDKGSet[con.ID]; !exist { + if _, exist := curNotarySet[con.ID]; !exist { return } isDKGValid := func() bool { @@ -733,18 +800,18 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) { // Register round event handler to propose new CRS. con.roundEvent.Register(func(evts []utils.RoundEventParam) { // We don't have to propose new CRS during DKG reset, the reset of DKG - // would be done by the DKG set in previous round. + // would be done by the notary set in previous round. e := evts[len(evts)-1] defer elapse("propose-CRS", e)() if e.Reset != 0 || e.Round < DKGDelayRound { return } - if curDkgSet, err := con.nodeSetCache.GetDKGSet(e.Round); err != nil { - con.logger.Error("Error getting DKG set when proposing CRS", + if curNotarySet, err := con.nodeSetCache.GetNotarySet(e.Round); err != nil { + con.logger.Error("Error getting notary set when proposing CRS", "round", e.Round, "error", err) } else { - if _, exist := curDkgSet[con.ID]; !exist { + if _, exist := curNotarySet[con.ID]; !exist { return } con.event.RegisterHeight(e.NextCRSProposingHeight(), func(uint64) { @@ -809,26 +876,26 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) { // of unexpected network fluctuation and ensure the robustness. if !checkWithCancel( con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { - con.logger.Debug("unable to prepare CRS for DKG set", + con.logger.Debug("unable to prepare CRS for notary set", "round", nextRound, "reset", e.Reset) return } - nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound) + nextNotarySet, err := con.nodeSetCache.GetNotarySet(nextRound) if err != nil { - con.logger.Error("Error getting DKG set for next round", + con.logger.Error("Error getting notary set for next round", "round", nextRound, "reset", e.Reset, "error", err) return } - if _, exist := nextDkgSet[con.ID]; !exist { - con.logger.Info("Not selected as DKG set", + if _, exist := nextNotarySet[con.ID]; !exist { + con.logger.Info("Not selected as notary set", "round", nextRound, "reset", e.Reset) return } - con.logger.Info("Selected as DKG set", + con.logger.Info("Selected as notary set", "round", nextRound, "reset", e.Reset) nextConfig := utils.GetConfigWithPanic(con.gov, nextRound, @@ -865,12 +932,6 @@ func (con *Consensus) Run() { con.waitGroup.Add(1) go con.processMsg() go con.processBlockLoop() - // Sleep until dMoment come. - time.Sleep(con.dMoment.Sub(time.Now().UTC())) - // Take some time to bootstrap. - time.Sleep(3 * time.Second) - con.waitGroup.Add(1) - go con.pullRandomness() // Stop dummy receiver if launched. if con.dummyCancel != nil { con.logger.Trace("Stop dummy receiver") @@ -893,6 +954,10 @@ func (con *Consensus) Run() { } con.logger.Trace("Finish dumping cached messages") } + // Sleep until dMoment come. + time.Sleep(con.dMoment.Sub(time.Now().UTC())) + // Take some time to bootstrap. + time.Sleep(3 * time.Second) con.waitGroup.Add(1) go con.deliveryGuard() // Block until done. @@ -964,7 +1029,6 @@ func (con *Consensus) Stop() { con.event.Reset() con.waitGroup.Wait() if nbApp, ok := con.app.(*nonBlocking); ok { - fmt.Println("Stopping nonBlocking App") nbApp.wait() } } @@ -1019,11 +1083,47 @@ MessageLoop: ch, e := con.baConfirmedBlock[val.Hash] return ch, e }(); exist { - if err := utils.VerifyBlockSignature(val); err != nil { - con.logger.Error("VerifyBlockSignature failed", - "block", val, - "error", err) - continue MessageLoop + if val.IsEmpty() { + hash, err := utils.HashBlock(val) + if err != nil { + con.logger.Error("error verifying empty block hash", + "block", val, + "error, err") + continue MessageLoop + } + if hash != val.Hash { + con.logger.Error("incorrect confirmed empty block hash", + "block", val, + "hash", hash) + continue MessageLoop + } + if _, err := con.bcModule.proposeBlock( + val.Position, time.Time{}, true); err != nil { + con.logger.Error("error adding empty block", + "block", val, + "error", err) + continue MessageLoop + } + } else { + ok, err := con.bcModule.verifyRandomness( + val.Hash, val.Position.Round, val.Finalization.Randomness) + if err != nil { + con.logger.Error("error verifying confirmed block randomness", + "block", val, + "error", err) + continue MessageLoop + } + if !ok { + con.logger.Error("incorrect confirmed block randomness", + "block", val) + continue MessageLoop + } + if err := utils.VerifyBlockSignature(val); err != nil { + con.logger.Error("VerifyBlockSignature failed", + "block", val, + "error", err) + continue MessageLoop + } } func() { con.lock.Lock() @@ -1035,13 +1135,6 @@ MessageLoop: delete(con.baConfirmedBlock, val.Hash) ch <- val }() - } else if val.IsFinalized() { - // For sync mode. - if err := con.processFinalizedBlock(val); err != nil { - con.logger.Error("Failed to process finalized block", - "block", val, - "error", err) - } } else { if err := con.preProcessBlock(val); err != nil { con.logger.Error("Failed to pre process block", @@ -1061,13 +1154,6 @@ MessageLoop: "result", val, "error", err) } - case *types.BlockRandomnessResult: - if err := con.ProcessBlockRandomnessResult(val, true); err != nil { - con.logger.Error("Failed to process block randomness result", - "hash", val.BlockHash.String()[:6], - "position", val.Position, - "error", err) - } case *typesDKG.PrivateShare: if err := con.cfgModule.processPrivateShare(val); err != nil { con.logger.Error("Failed to process private share", @@ -1096,6 +1182,15 @@ func (con *Consensus) ProcessAgreementResult( if !con.baMgr.touchAgreementResult(rand) { return nil } + // TODO(jimmy): merge tsig check to VerifyAgreementResult + ok, err := con.bcModule.verifyRandomness( + rand.BlockHash, rand.Position.Round, rand.Randomness) + if err != nil { + return err + } + if !ok { + return ErrIncorrectBlockRandomness + } // Sanity Check. if err := VerifyAgreementResult(rand, con.nodeSetCache); err != nil { con.baMgr.untouchAgreementResult(rand) @@ -1105,96 +1200,11 @@ func (con *Consensus) ProcessAgreementResult( if err := con.baMgr.processAgreementResult(rand); err != nil { return err } - // Calculating randomness. - if rand.Position.Round == 0 { - return nil - } - // TODO(mission): find a way to avoid spamming by older agreement results. - // Sanity check done. - if !con.cfgModule.touchTSigHash(rand.BlockHash) { - return nil - } - con.logger.Debug("Rebroadcast AgreementResult", + con.logger.Debug("Broadcast AgreementResult", "result", rand) con.network.BroadcastAgreementResult(rand) - go con.prepareRandomnessResult(rand) - return nil -} -func (con *Consensus) prepareRandomnessResult(rand *types.AgreementResult) { - dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round) - if err != nil { - con.logger.Error("Failed to get dkg set", - "round", rand.Position.Round, "error", err) - return - } - if _, exist := dkgSet[con.ID]; !exist { - return - } - con.logger.Debug("PrepareRandomness", "round", rand.Position.Round, "hash", rand.BlockHash) - psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash) - if err != nil { - con.logger.Error("Failed to prepare psig", - "round", rand.Position.Round, - "hash", rand.BlockHash.String()[:6], - "error", err) - return - } - if err = con.signer.SignDKGPartialSignature(psig); err != nil { - con.logger.Error("Failed to sign psig", - "hash", rand.BlockHash.String()[:6], - "error", err) - return - } - if err = con.cfgModule.processPartialSignature(psig); err != nil { - con.logger.Error("Failed process psig", - "hash", rand.BlockHash.String()[:6], - "error", err) - return - } - con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", - "proposer", psig.ProposerID, - "round", psig.Round, - "hash", psig.Hash.String()[:6]) - con.network.BroadcastDKGPartialSignature(psig) - tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash) - if err != nil { - if err != ErrTSigAlreadyRunning { - con.logger.Error("Failed to run TSIG", - "position", rand.Position, - "hash", rand.BlockHash.String()[:6], - "error", err) - } - return - } - result := &types.BlockRandomnessResult{ - BlockHash: rand.BlockHash, - Position: rand.Position, - Randomness: tsig.Signature, - } - // ProcessBlockRandomnessResult is not thread-safe so we put the result in - // the message channnel to be processed in the main thread. - con.msgChan <- result -} - -// ProcessBlockRandomnessResult processes the randomness result. -func (con *Consensus) ProcessBlockRandomnessResult( - rand *types.BlockRandomnessResult, needBroadcast bool) error { - if rand.Position.Round == 0 { - return nil - } - if !con.bcModule.shouldAddRandomness(rand) { - return nil - } - if err := con.bcModule.addRandomness(rand); err != nil { - return err - } - if needBroadcast { - con.logger.Debug("Calling Network.BroadcastRandomnessResult", - "randomness", rand) - con.network.BroadcastRandomnessResult(rand) - } return con.deliverFinalizedBlocks() } @@ -1207,33 +1217,12 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) { return } -func (con *Consensus) pullRandomness() { - defer con.waitGroup.Done() - for { - select { - case <-con.ctx.Done(): - return - default: - } - select { - case <-con.ctx.Done(): - return - case <-con.resetRandomnessTicker: - case <-time.After(1500 * time.Millisecond): - // TODO(jimmy): pulling period should be related to lambdaBA. - hashes := con.bcModule.pendingBlocksWithoutRandomness() - if len(hashes) > 0 { - con.logger.Debug( - "Calling Network.PullRandomness", "blocks", hashes) - con.network.PullRandomness(hashes) - } - } - } -} - func (con *Consensus) deliveryGuard() { defer con.waitGroup.Done() - time.Sleep(con.dMoment.Sub(time.Now())) + select { + case <-con.ctx.Done(): + case <-time.After(con.dMoment.Sub(time.Now())): + } // Node takes time to start. select { case <-con.ctx.Done(): @@ -1259,10 +1248,6 @@ func (con *Consensus) deliveryGuard() { // deliverBlock deliver a block to application layer. func (con *Consensus) deliverBlock(b *types.Block) { select { - case con.resetRandomnessTicker <- struct{}{}: - default: - } - select { case con.resetDeliveryGuardTicker <- struct{}{}: default: } @@ -1274,7 +1259,6 @@ func (con *Consensus) deliverBlock(b *types.Block) { b.Hash, b.Finalization.Height); err != nil { panic(err) } - con.cfgModule.untouchTSigHash(b.Hash) con.logger.Debug("Calling Application.BlockDelivered", "block", b) con.app.BlockDelivered(b.Hash, b.Position, b.Finalization.Clone()) if con.debugApp != nil { @@ -1338,15 +1322,10 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { return } -// processFinalizedBlock is the entry point for handling finalized blocks. -func (con *Consensus) processFinalizedBlock(block *types.Block) error { - return con.bcModule.processFinalizedBlock(block) -} - // PrepareBlock would setup header fields of block based on its ProposerID. func (con *Consensus) proposeBlock(position types.Position) ( *types.Block, error) { - b, err := con.bcModule.proposeBlock(position, time.Now().UTC()) + b, err := con.bcModule.proposeBlock(position, time.Now().UTC(), false) if err != nil { return nil, err } |