diff options
author | Mission Liao <mission.liao@dexon.org> | 2018-09-21 17:28:25 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-21 17:28:25 +0800 |
commit | 9d99c27b7261f8228cc0a5a496be6ac50e03abf2 (patch) | |
tree | 71e10b4f1ca6aa155c7521c7e8083ba72be4428c /integration_test/node.go | |
parent | fb4b47fa61db81f4d6b8264d7508aa43509a60a3 (diff) | |
download | dexon-consensus-9d99c27b7261f8228cc0a5a496be6ac50e03abf2.tar.gz dexon-consensus-9d99c27b7261f8228cc0a5a496be6ac50e03abf2.tar.zst dexon-consensus-9d99c27b7261f8228cc0a5a496be6ac50e03abf2.zip |
core: add shard (#127)
A shard is basically DEXON v1 components,
except the strongly acked part, including:
- maintaining lattice structure
- total ordering
- generate consensus timestamp
Diffstat (limited to 'integration_test/node.go')
-rw-r--r-- | integration_test/node.go | 111 |
1 files changed, 81 insertions, 30 deletions
diff --git a/integration_test/node.go b/integration_test/node.go index c0e226b..3193d99 100644 --- a/integration_test/node.go +++ b/integration_test/node.go @@ -19,6 +19,7 @@ package integration import ( "fmt" + "math" "sort" "time" @@ -65,44 +66,59 @@ func NewReceiveBlockEvent( // Node is designed to work with test.Scheduler. type Node struct { ID types.NodeID + chainNum uint32 chainID uint32 - cons *core.Consensus - gov core.Governance + shard *core.Shard + app *test.App + db blockdb.BlockDatabase + broadcastTargets map[types.NodeID]struct{} networkLatency test.LatencyModel proposingLatency test.LatencyModel } // NewNode constructs an instance of Node. func NewNode( - app core.Application, + app *test.App, gov core.Governance, db blockdb.BlockDatabase, privateKey crypto.PrivateKey, - nID types.NodeID, networkLatency test.LatencyModel, proposingLatency test.LatencyModel) *Node { - hashes := make(common.Hashes, 0) - for nID := range gov.GetNotarySet() { + var ( + shardID = uint32(0) + chainID = uint32(math.MaxUint32) + governanceConfig = gov.GetConfiguration(0) + broadcastTargets = gov.GetNotarySet() + nodeID = types.NewNodeID(privateKey.PublicKey()) + ) + hashes := common.Hashes{} + for nID := range broadcastTargets { hashes = append(hashes, nID.Hash) } sort.Sort(hashes) - chainID := uint32(0) - for i, hash := range hashes { - if hash == nID.Hash { + for i, h := range hashes { + if h == nodeID.Hash { chainID = uint32(i) - break } } - + delete(broadcastTargets, nodeID) return &Node{ - ID: nID, + ID: nodeID, chainID: chainID, - gov: gov, + chainNum: governanceConfig.NumChains, + broadcastTargets: broadcastTargets, networkLatency: networkLatency, proposingLatency: proposingLatency, - cons: core.NewConsensus( - app, gov, db, &Network{}, privateKey, eth.SigToPub), + app: app, + db: db, + shard: core.NewShard( + shardID, + governanceConfig, + privateKey, + eth.SigToPub, + app, + db), } } @@ -120,27 +136,18 @@ func (n *Node) Handle(e *test.Event) (events []*test.Event) { return } -func (n *Node) handleProposeBlock(when time.Time, piggyback interface{}) ( +func (n *Node) handleProposeBlock(when time.Time, _ interface{}) ( events []*test.Event, err error) { - b := &types.Block{ - ProposerID: n.ID, - Position: types.Position{ - ChainID: n.chainID, - }, - } - defer types.RecycleBlock(b) - if err = n.cons.PrepareBlock(b, when); err != nil { + b, err := n.prepareBlock(when) + if err != nil { return } - if err = n.cons.ProcessBlock(b); err != nil { + if err = n.processBlock(b); err != nil { return } // Create 'block received' event for each other nodes. - for nID := range n.gov.GetNotarySet() { - if nID == n.ID { - continue - } + for nID := range n.broadcastTargets { events = append(events, NewReceiveBlockEvent( nID, when.Add(n.networkLatency.Delay()), b.Clone())) } @@ -153,9 +160,53 @@ func (n *Node) handleProposeBlock(when time.Time, piggyback interface{}) ( func (n *Node) handleReceiveBlock(piggyback interface{}) ( events []*test.Event, err error) { - err = n.cons.ProcessBlock(piggyback.(*types.Block)) + err = n.processBlock(piggyback.(*types.Block)) if err != nil { panic(err) } return } + +func (n *Node) prepareBlock(when time.Time) (b *types.Block, err error) { + b = &types.Block{ + Position: types.Position{ + ChainID: n.chainID, + }} + err = n.shard.PrepareBlock(b, when) + return +} + +func (n *Node) processBlock(b *types.Block) (err error) { + // TODO(mission): this segment of code is identical to testShardMgr in + // core/shard_test.go, except the compaction-chain part. + var ( + delivered []*types.Block + verified []*types.Block + pendings = []*types.Block{b} + ) + if err = n.shard.SanityCheck(b); err != nil { + if err == core.ErrAckingBlockNotExists { + err = nil + } + return + } + for { + if len(pendings) == 0 { + break + } + b, pendings = pendings[0], pendings[1:] + if verified, delivered, err = n.shard.ProcessBlock(b); err != nil { + return + } + // Deliver blocks. + for _, b = range delivered { + if err = n.db.Update(*b); err != nil { + return + } + n.app.DeliverBlock(b.Hash, b.Witness.Timestamp) + } + // Update pending blocks for verified block (pass sanity check). + pendings = append(pendings, verified...) + } + return +} |