aboutsummaryrefslogtreecommitdiffstats
path: root/core/consensus.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/consensus.go')
-rw-r--r--core/consensus.go221
1 files changed, 50 insertions, 171 deletions
diff --git a/core/consensus.go b/core/consensus.go
index a1642df..e7b5ec7 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -30,16 +30,6 @@ import (
"github.com/dexon-foundation/dexon-consensus-core/core/types"
)
-// ErrMissingBlockInfo would be reported if some information is missing when
-// calling PrepareBlock. It implements error interface.
-type ErrMissingBlockInfo struct {
- MissingField string
-}
-
-func (e *ErrMissingBlockInfo) Error() string {
- return "missing " + e.MissingField + " in block"
-}
-
// Errors for consensus core.
var (
ErrProposerNotInNodeSet = fmt.Errorf(
@@ -54,10 +44,6 @@ var (
"unknown block is proposed")
ErrUnknownBlockConfirmed = fmt.Errorf(
"unknown block is confirmed")
- ErrIncorrectBlockPosition = fmt.Errorf(
- "position of block is incorrect")
- ErrIncorrectBlockTime = fmt.Errorf(
- "block timestampe is incorrect")
)
// consensusBAReceiver implements agreementReceiver.
@@ -182,11 +168,9 @@ type Consensus struct {
dkgReady *sync.Cond
cfgModule *configurationChain
- // Dexon consensus modules.
- rbModule *reliableBroadcast
- toModule *totalOrdering
- ctModule *consensusTimestamp
- ccModule *compactionChain
+ // Dexon consensus v1's modules.
+ shardModule *Shard
+ ccModule *compactionChain
// Interfaces.
db blockdb.BlockDatabase
@@ -212,33 +196,29 @@ func NewConsensus(
// TODO(w): load latest blockHeight from DB, and use config at that height.
var round uint64
- config := gov.GetConfiguration(round)
+ config := gov.Configuration(round)
// TODO(w): notarySet is different for each chain, need to write a
// GetNotarySetForChain(nodeSet, shardID, chainID, crs) function to get the
// correct notary set for a given chain.
nodeSetCache := NewNodeSetCache(gov)
- crs := gov.GetCRS(round)
+ crs := gov.CRS(round)
// Setup acking by information returned from Governace.
nodes, err := nodeSetCache.GetNodeSet(0)
if err != nil {
panic(err)
}
- rb := newReliableBroadcast()
- rb.setChainNum(config.NumChains)
- for nID := range nodes.IDs {
- rb.addNode(nID)
- }
// Setup context.
ctx, ctxCancel := context.WithCancel(context.Background())
-
- // Setup sequencer by information returned from Governace.
- to := newTotalOrdering(
- uint64(config.K),
- uint64(float32(len(nodes.IDs)-1)*config.PhiRatio+1),
- config.NumChains)
-
- ID := types.NewNodeID(prv.PublicKey())
+ // Setup auth module.
authModule := NewAuthenticator(prv)
+ // Check if the application implement Debug interface.
+ debugApp, _ := app.(Debug)
+ // Setup nonblocking module.
+ nbModule := newNonBlocking(app, debugApp)
+ // Init shard.
+ shardModule := NewShard(config, authModule, nbModule, nbModule, db)
+ // Init configuration chain.
+ ID := types.NewNodeID(prv.PublicKey())
cfgModule := newConfigurationChain(
ID,
&consensusDKGReceiver{
@@ -252,17 +232,13 @@ func NewConsensus(
// Register DKG for the initial round. This is a temporary function call for
// simulation.
cfgModule.registerDKG(0, config.NumDKGSet/3)
-
- // Check if the application implement Debug interface.
- debug, _ := app.(Debug)
+ // Construct Consensus instance.
con := &Consensus{
ID: ID,
currentConfig: config,
- rbModule: rb,
- toModule: to,
- ctModule: newConsensusTimestamp(),
ccModule: newCompactionChain(db),
- nbModule: newNonBlocking(app, debug),
+ shardModule: shardModule,
+ nbModule: nbModule,
gov: gov,
db: db,
network: network,
@@ -356,14 +332,10 @@ BALoop:
if err != nil {
panic(err)
}
- nIDs = nodes.GetSubSet(con.gov.GetConfiguration(con.round).NumNotarySet,
- types.NewNotarySetTarget(con.gov.GetCRS(con.round), chainID))
- }
- aID := types.Position{
- ChainID: chainID,
- Height: con.rbModule.nextHeight(chainID),
+ nIDs = nodes.GetSubSet(con.gov.Configuration(con.round).NumNotarySet,
+ types.NewNotarySetTarget(con.gov.CRS(con.round), chainID))
}
- agreement.restart(nIDs, aID)
+ agreement.restart(nIDs, con.shardModule.NextPosition(chainID))
default:
}
err := agreement.nextState()
@@ -406,7 +378,7 @@ func (con *Consensus) runDKGTSIG() {
}
hash := HashConfigurationBlock(
nodes.IDs,
- con.gov.GetConfiguration(round),
+ con.gov.Configuration(round),
common.Hash{},
con.cfgModule.prevHash)
psig, err := con.cfgModule.preparePartialSignature(
@@ -438,7 +410,7 @@ func (con *Consensus) runCRS() {
<-ticker.Tick()
// Start running next round CRS.
psig, err := con.cfgModule.preparePartialSignature(
- con.round, con.gov.GetCRS(con.round), types.TSigCRS)
+ con.round, con.gov.CRS(con.round), types.TSigCRS)
if err != nil {
log.Println(err)
} else if err = con.authModule.SignDKGPartialSignature(psig); err != nil {
@@ -447,7 +419,7 @@ func (con *Consensus) runCRS() {
log.Println(err)
} else {
con.network.BroadcastDKGPartialSignature(psig)
- crs, err := con.cfgModule.runCRSTSig(con.round, con.gov.GetCRS(con.round))
+ crs, err := con.cfgModule.runCRSTSig(con.round, con.gov.CRS(con.round))
if err != nil {
log.Println(err)
} else {
@@ -458,7 +430,7 @@ func (con *Consensus) runCRS() {
<-ticker.Tick()
// Change round.
con.round++
- con.currentConfig = con.gov.GetConfiguration(con.round)
+ con.currentConfig = con.gov.Configuration(con.round)
func() {
con.dkgReady.L.Lock()
defer con.dkgReady.L.Unlock()
@@ -506,23 +478,14 @@ func (con *Consensus) processMsg(msgChan <-chan interface{}) {
func (con *Consensus) proposeBlock(chainID uint32) *types.Block {
block := &types.Block{
- ProposerID: con.ID,
Position: types.Position{
ChainID: chainID,
- Height: con.rbModule.nextHeight(chainID),
},
}
if err := con.prepareBlock(block, time.Now().UTC()); err != nil {
log.Println(err)
return nil
}
- // TODO(mission): decide CRS by block's round, which could be determined by
- // block's info (ex. position, timestamp).
- if err := con.authModule.SignCRS(
- block, con.gov.GetCRS(0)); err != nil {
- log.Println(err)
- return nil
- }
return block
}
@@ -533,43 +496,12 @@ func (con *Consensus) ProcessVote(vote *types.Vote) (err error) {
return err
}
-// sanityCheck checks if the block is a valid block
-func (con *Consensus) sanityCheck(b *types.Block) (err error) {
- // Check block.Position.
- if b.Position.ChainID >= con.rbModule.chainNum() {
- return ErrIncorrectBlockPosition
- }
- // Check the timestamp of block.
- if !b.IsGenesis() {
- chainTime := con.rbModule.chainTime(b.Position.ChainID)
- if b.Timestamp.Before(chainTime.Add(con.currentConfig.MinBlockInterval)) ||
- b.Timestamp.After(chainTime.Add(con.currentConfig.MaxBlockInterval)) {
- return ErrIncorrectBlockTime
- }
- }
- // Check the hash of block.
- hash, err := hashBlock(b)
- if err != nil || hash != b.Hash {
- return ErrIncorrectHash
- }
-
- // Check the signer.
- pubKey, err := crypto.SigToPub(b.Hash, b.Signature)
- if err != nil {
- return err
- }
- if !b.ProposerID.Equal(crypto.Keccak256Hash(pubKey.Bytes())) {
- return ErrIncorrectSignature
- }
- return nil
-}
-
// preProcessBlock performs Byzantine Agreement on the block.
func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
- if err := con.sanityCheck(b); err != nil {
- return err
+ if err = con.shardModule.SanityCheck(b); err != nil {
+ return
}
- if err := con.baModules[b.Position.ChainID].processBlock(b); err != nil {
+ if err = con.baModules[b.Position.ChainID].processBlock(b); err != nil {
return err
}
return
@@ -577,73 +509,30 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
// processBlock is the entry point to submit one block to a Consensus instance.
func (con *Consensus) processBlock(block *types.Block) (err error) {
- if err := con.sanityCheck(block); err != nil {
- return err
+ verifiedBlocks, deliveredBlocks, err := con.shardModule.ProcessBlock(block)
+ if err != nil {
+ return
}
- var (
- deliveredBlocks []*types.Block
- earlyDelivered bool
- )
- // To avoid application layer modify the content of block during
- // processing, we should always operate based on the cloned one.
- b := block.Clone()
-
- con.lock.Lock()
- defer con.lock.Unlock()
- // Perform reliable broadcast checking.
- if err = con.rbModule.processBlock(b); err != nil {
- return err
+ // Pass verified blocks (pass sanity check) back to BA module.
+ for _, b := range verifiedBlocks {
+ if err :=
+ con.baModules[b.Position.ChainID].processBlock(b); err != nil {
+ return err
+ }
}
- con.nbModule.BlockConfirmed(block.Hash)
- for _, b := range con.rbModule.extractBlocks() {
- // Notify application layer that some block is strongly acked.
- con.nbModule.StronglyAcked(b.Hash)
- // Perform total ordering.
- deliveredBlocks, earlyDelivered, err = con.toModule.processBlock(b)
- if err != nil {
+ // Pass delivered blocks to compaction chain.
+ for _, b := range deliveredBlocks {
+ if err = con.ccModule.processBlock(b); err != nil {
return
}
- if len(deliveredBlocks) == 0 {
- continue
- }
- for _, b := range deliveredBlocks {
- if err = con.db.Put(*b); err != nil {
- return
- }
- }
- // TODO(mission): handle membership events here.
- hashes := make(common.Hashes, len(deliveredBlocks))
- for idx := range deliveredBlocks {
- hashes[idx] = deliveredBlocks[idx].Hash
- }
- con.nbModule.TotalOrderingDelivered(hashes, earlyDelivered)
- // Perform timestamp generation.
- err = con.ctModule.processBlocks(deliveredBlocks)
- if err != nil {
+ if err = con.db.Update(*b); err != nil {
return
}
- for _, b := range deliveredBlocks {
- if err = con.ccModule.processBlock(b); err != nil {
- return
- }
- if err = con.db.Update(*b); err != nil {
- return
- }
- con.nbModule.BlockDelivered(*b)
- // TODO(mission): Find a way to safely recycle the block.
- // We should deliver block directly to
- // nonBlocking and let them recycle the
- // block.
- }
- }
- return
-}
-
-func (con *Consensus) checkPrepareBlock(
- b *types.Block, proposeTime time.Time) (err error) {
- if (b.ProposerID == types.NodeID{}) {
- err = &ErrMissingBlockInfo{MissingField: "ProposerID"}
- return
+ con.nbModule.BlockDelivered(*b)
+ // TODO(mission): Find a way to safely recycle the block.
+ // We should deliver block directly to
+ // nonBlocking and let them recycle the
+ // block.
}
return
}
@@ -651,16 +540,12 @@ func (con *Consensus) checkPrepareBlock(
// PrepareBlock would setup header fields of block based on its ProposerID.
func (con *Consensus) prepareBlock(b *types.Block,
proposeTime time.Time) (err error) {
- if err = con.checkPrepareBlock(b, proposeTime); err != nil {
+ if err = con.shardModule.PrepareBlock(b, proposeTime); err != nil {
return
}
- con.lock.RLock()
- defer con.lock.RUnlock()
-
- con.rbModule.prepareBlock(b)
- b.Timestamp = proposeTime
- b.Payload, b.Witness.Data = con.nbModule.PrepareBlock(b.Position)
- if err = con.authModule.SignBlock(b); err != nil {
+ // TODO(mission): decide CRS by block's round, which could be determined by
+ // block's info (ex. position, timestamp).
+ if err = con.authModule.SignCRS(b, con.gov.CRS(0)); err != nil {
return
}
return
@@ -669,18 +554,12 @@ func (con *Consensus) prepareBlock(b *types.Block,
// PrepareGenesisBlock would setup header fields for genesis block.
func (con *Consensus) PrepareGenesisBlock(b *types.Block,
proposeTime time.Time) (err error) {
- if err = con.checkPrepareBlock(b, proposeTime); err != nil {
+ if err = con.prepareBlock(b, proposeTime); err != nil {
return
}
if len(b.Payload) != 0 {
err = ErrGenesisBlockNotEmpty
return
}
- b.Position.Height = 0
- b.ParentHash = common.Hash{}
- b.Timestamp = proposeTime
- if err = con.authModule.SignBlock(b); err != nil {
- return
- }
return
}