diff options
Diffstat (limited to 'core/consensus.go')
-rw-r--r-- | core/consensus.go | 221 |
1 files changed, 50 insertions, 171 deletions
diff --git a/core/consensus.go b/core/consensus.go index a1642df..e7b5ec7 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -30,16 +30,6 @@ import ( "github.com/dexon-foundation/dexon-consensus-core/core/types" ) -// ErrMissingBlockInfo would be reported if some information is missing when -// calling PrepareBlock. It implements error interface. -type ErrMissingBlockInfo struct { - MissingField string -} - -func (e *ErrMissingBlockInfo) Error() string { - return "missing " + e.MissingField + " in block" -} - // Errors for consensus core. var ( ErrProposerNotInNodeSet = fmt.Errorf( @@ -54,10 +44,6 @@ var ( "unknown block is proposed") ErrUnknownBlockConfirmed = fmt.Errorf( "unknown block is confirmed") - ErrIncorrectBlockPosition = fmt.Errorf( - "position of block is incorrect") - ErrIncorrectBlockTime = fmt.Errorf( - "block timestampe is incorrect") ) // consensusBAReceiver implements agreementReceiver. @@ -182,11 +168,9 @@ type Consensus struct { dkgReady *sync.Cond cfgModule *configurationChain - // Dexon consensus modules. - rbModule *reliableBroadcast - toModule *totalOrdering - ctModule *consensusTimestamp - ccModule *compactionChain + // Dexon consensus v1's modules. + shardModule *Shard + ccModule *compactionChain // Interfaces. db blockdb.BlockDatabase @@ -212,33 +196,29 @@ func NewConsensus( // TODO(w): load latest blockHeight from DB, and use config at that height. var round uint64 - config := gov.GetConfiguration(round) + config := gov.Configuration(round) // TODO(w): notarySet is different for each chain, need to write a // GetNotarySetForChain(nodeSet, shardID, chainID, crs) function to get the // correct notary set for a given chain. nodeSetCache := NewNodeSetCache(gov) - crs := gov.GetCRS(round) + crs := gov.CRS(round) // Setup acking by information returned from Governace. nodes, err := nodeSetCache.GetNodeSet(0) if err != nil { panic(err) } - rb := newReliableBroadcast() - rb.setChainNum(config.NumChains) - for nID := range nodes.IDs { - rb.addNode(nID) - } // Setup context. ctx, ctxCancel := context.WithCancel(context.Background()) - - // Setup sequencer by information returned from Governace. - to := newTotalOrdering( - uint64(config.K), - uint64(float32(len(nodes.IDs)-1)*config.PhiRatio+1), - config.NumChains) - - ID := types.NewNodeID(prv.PublicKey()) + // Setup auth module. authModule := NewAuthenticator(prv) + // Check if the application implement Debug interface. + debugApp, _ := app.(Debug) + // Setup nonblocking module. + nbModule := newNonBlocking(app, debugApp) + // Init shard. + shardModule := NewShard(config, authModule, nbModule, nbModule, db) + // Init configuration chain. + ID := types.NewNodeID(prv.PublicKey()) cfgModule := newConfigurationChain( ID, &consensusDKGReceiver{ @@ -252,17 +232,13 @@ func NewConsensus( // Register DKG for the initial round. This is a temporary function call for // simulation. cfgModule.registerDKG(0, config.NumDKGSet/3) - - // Check if the application implement Debug interface. - debug, _ := app.(Debug) + // Construct Consensus instance. con := &Consensus{ ID: ID, currentConfig: config, - rbModule: rb, - toModule: to, - ctModule: newConsensusTimestamp(), ccModule: newCompactionChain(db), - nbModule: newNonBlocking(app, debug), + shardModule: shardModule, + nbModule: nbModule, gov: gov, db: db, network: network, @@ -356,14 +332,10 @@ BALoop: if err != nil { panic(err) } - nIDs = nodes.GetSubSet(con.gov.GetConfiguration(con.round).NumNotarySet, - types.NewNotarySetTarget(con.gov.GetCRS(con.round), chainID)) - } - aID := types.Position{ - ChainID: chainID, - Height: con.rbModule.nextHeight(chainID), + nIDs = nodes.GetSubSet(con.gov.Configuration(con.round).NumNotarySet, + types.NewNotarySetTarget(con.gov.CRS(con.round), chainID)) } - agreement.restart(nIDs, aID) + agreement.restart(nIDs, con.shardModule.NextPosition(chainID)) default: } err := agreement.nextState() @@ -406,7 +378,7 @@ func (con *Consensus) runDKGTSIG() { } hash := HashConfigurationBlock( nodes.IDs, - con.gov.GetConfiguration(round), + con.gov.Configuration(round), common.Hash{}, con.cfgModule.prevHash) psig, err := con.cfgModule.preparePartialSignature( @@ -438,7 +410,7 @@ func (con *Consensus) runCRS() { <-ticker.Tick() // Start running next round CRS. psig, err := con.cfgModule.preparePartialSignature( - con.round, con.gov.GetCRS(con.round), types.TSigCRS) + con.round, con.gov.CRS(con.round), types.TSigCRS) if err != nil { log.Println(err) } else if err = con.authModule.SignDKGPartialSignature(psig); err != nil { @@ -447,7 +419,7 @@ func (con *Consensus) runCRS() { log.Println(err) } else { con.network.BroadcastDKGPartialSignature(psig) - crs, err := con.cfgModule.runCRSTSig(con.round, con.gov.GetCRS(con.round)) + crs, err := con.cfgModule.runCRSTSig(con.round, con.gov.CRS(con.round)) if err != nil { log.Println(err) } else { @@ -458,7 +430,7 @@ func (con *Consensus) runCRS() { <-ticker.Tick() // Change round. con.round++ - con.currentConfig = con.gov.GetConfiguration(con.round) + con.currentConfig = con.gov.Configuration(con.round) func() { con.dkgReady.L.Lock() defer con.dkgReady.L.Unlock() @@ -506,23 +478,14 @@ func (con *Consensus) processMsg(msgChan <-chan interface{}) { func (con *Consensus) proposeBlock(chainID uint32) *types.Block { block := &types.Block{ - ProposerID: con.ID, Position: types.Position{ ChainID: chainID, - Height: con.rbModule.nextHeight(chainID), }, } if err := con.prepareBlock(block, time.Now().UTC()); err != nil { log.Println(err) return nil } - // TODO(mission): decide CRS by block's round, which could be determined by - // block's info (ex. position, timestamp). - if err := con.authModule.SignCRS( - block, con.gov.GetCRS(0)); err != nil { - log.Println(err) - return nil - } return block } @@ -533,43 +496,12 @@ func (con *Consensus) ProcessVote(vote *types.Vote) (err error) { return err } -// sanityCheck checks if the block is a valid block -func (con *Consensus) sanityCheck(b *types.Block) (err error) { - // Check block.Position. - if b.Position.ChainID >= con.rbModule.chainNum() { - return ErrIncorrectBlockPosition - } - // Check the timestamp of block. - if !b.IsGenesis() { - chainTime := con.rbModule.chainTime(b.Position.ChainID) - if b.Timestamp.Before(chainTime.Add(con.currentConfig.MinBlockInterval)) || - b.Timestamp.After(chainTime.Add(con.currentConfig.MaxBlockInterval)) { - return ErrIncorrectBlockTime - } - } - // Check the hash of block. - hash, err := hashBlock(b) - if err != nil || hash != b.Hash { - return ErrIncorrectHash - } - - // Check the signer. - pubKey, err := crypto.SigToPub(b.Hash, b.Signature) - if err != nil { - return err - } - if !b.ProposerID.Equal(crypto.Keccak256Hash(pubKey.Bytes())) { - return ErrIncorrectSignature - } - return nil -} - // preProcessBlock performs Byzantine Agreement on the block. func (con *Consensus) preProcessBlock(b *types.Block) (err error) { - if err := con.sanityCheck(b); err != nil { - return err + if err = con.shardModule.SanityCheck(b); err != nil { + return } - if err := con.baModules[b.Position.ChainID].processBlock(b); err != nil { + if err = con.baModules[b.Position.ChainID].processBlock(b); err != nil { return err } return @@ -577,73 +509,30 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) { // processBlock is the entry point to submit one block to a Consensus instance. func (con *Consensus) processBlock(block *types.Block) (err error) { - if err := con.sanityCheck(block); err != nil { - return err + verifiedBlocks, deliveredBlocks, err := con.shardModule.ProcessBlock(block) + if err != nil { + return } - var ( - deliveredBlocks []*types.Block - earlyDelivered bool - ) - // To avoid application layer modify the content of block during - // processing, we should always operate based on the cloned one. - b := block.Clone() - - con.lock.Lock() - defer con.lock.Unlock() - // Perform reliable broadcast checking. - if err = con.rbModule.processBlock(b); err != nil { - return err + // Pass verified blocks (pass sanity check) back to BA module. + for _, b := range verifiedBlocks { + if err := + con.baModules[b.Position.ChainID].processBlock(b); err != nil { + return err + } } - con.nbModule.BlockConfirmed(block.Hash) - for _, b := range con.rbModule.extractBlocks() { - // Notify application layer that some block is strongly acked. - con.nbModule.StronglyAcked(b.Hash) - // Perform total ordering. - deliveredBlocks, earlyDelivered, err = con.toModule.processBlock(b) - if err != nil { + // Pass delivered blocks to compaction chain. + for _, b := range deliveredBlocks { + if err = con.ccModule.processBlock(b); err != nil { return } - if len(deliveredBlocks) == 0 { - continue - } - for _, b := range deliveredBlocks { - if err = con.db.Put(*b); err != nil { - return - } - } - // TODO(mission): handle membership events here. - hashes := make(common.Hashes, len(deliveredBlocks)) - for idx := range deliveredBlocks { - hashes[idx] = deliveredBlocks[idx].Hash - } - con.nbModule.TotalOrderingDelivered(hashes, earlyDelivered) - // Perform timestamp generation. - err = con.ctModule.processBlocks(deliveredBlocks) - if err != nil { + if err = con.db.Update(*b); err != nil { return } - for _, b := range deliveredBlocks { - if err = con.ccModule.processBlock(b); err != nil { - return - } - if err = con.db.Update(*b); err != nil { - return - } - con.nbModule.BlockDelivered(*b) - // TODO(mission): Find a way to safely recycle the block. - // We should deliver block directly to - // nonBlocking and let them recycle the - // block. - } - } - return -} - -func (con *Consensus) checkPrepareBlock( - b *types.Block, proposeTime time.Time) (err error) { - if (b.ProposerID == types.NodeID{}) { - err = &ErrMissingBlockInfo{MissingField: "ProposerID"} - return + con.nbModule.BlockDelivered(*b) + // TODO(mission): Find a way to safely recycle the block. + // We should deliver block directly to + // nonBlocking and let them recycle the + // block. } return } @@ -651,16 +540,12 @@ func (con *Consensus) checkPrepareBlock( // PrepareBlock would setup header fields of block based on its ProposerID. func (con *Consensus) prepareBlock(b *types.Block, proposeTime time.Time) (err error) { - if err = con.checkPrepareBlock(b, proposeTime); err != nil { + if err = con.shardModule.PrepareBlock(b, proposeTime); err != nil { return } - con.lock.RLock() - defer con.lock.RUnlock() - - con.rbModule.prepareBlock(b) - b.Timestamp = proposeTime - b.Payload, b.Witness.Data = con.nbModule.PrepareBlock(b.Position) - if err = con.authModule.SignBlock(b); err != nil { + // TODO(mission): decide CRS by block's round, which could be determined by + // block's info (ex. position, timestamp). + if err = con.authModule.SignCRS(b, con.gov.CRS(0)); err != nil { return } return @@ -669,18 +554,12 @@ func (con *Consensus) prepareBlock(b *types.Block, // PrepareGenesisBlock would setup header fields for genesis block. func (con *Consensus) PrepareGenesisBlock(b *types.Block, proposeTime time.Time) (err error) { - if err = con.checkPrepareBlock(b, proposeTime); err != nil { + if err = con.prepareBlock(b, proposeTime); err != nil { return } if len(b.Payload) != 0 { err = ErrGenesisBlockNotEmpty return } - b.Position.Height = 0 - b.ParentHash = common.Hash{} - b.Timestamp = proposeTime - if err = con.authModule.SignBlock(b); err != nil { - return - } return } |