diff options
Diffstat (limited to 'integration_test/node.go')
-rw-r--r-- | integration_test/node.go | 259 |
1 files changed, 189 insertions, 70 deletions
diff --git a/integration_test/node.go b/integration_test/node.go index bcb44bb..facd153 100644 --- a/integration_test/node.go +++ b/integration_test/node.go @@ -19,8 +19,6 @@ package integration import ( "fmt" - "math" - "sort" "time" "github.com/dexon-foundation/dexon-consensus-core/common" @@ -43,17 +41,22 @@ type consensusEventPayload struct { PiggyBack interface{} } -// NewProposeBlockEvent constructs an test.Event that would trigger +// newProposeBlockEvent constructs an test.Event that would trigger // block proposing. -func NewProposeBlockEvent(nID types.NodeID, when time.Time) *test.Event { +func newProposeBlockEvent(nID types.NodeID, + roundID uint64, chainID uint32, when time.Time) *test.Event { return test.NewEvent(nID, when, &consensusEventPayload{ Type: evtProposeBlock, + PiggyBack: &struct { + round uint64 + chain uint32 + }{roundID, chainID}, }) } -// NewReceiveBlockEvent constructs an test.Event that would trigger +// newReceiveBlockEvent constructs an test.Event that would trigger // block received. -func NewReceiveBlockEvent( +func newReceiveBlockEvent( nID types.NodeID, when time.Time, block *types.Block) *test.Event { return test.NewEvent(nID, when, &consensusEventPayload{ @@ -65,67 +68,75 @@ func NewReceiveBlockEvent( // Node is designed to work with test.Scheduler. type Node struct { ID types.NodeID - chainNum uint32 - chainID uint32 + ownChains []uint32 + roundEndTimes []time.Time + roundToNotify uint64 lattice *core.Lattice - app *test.App - db blockdb.BlockDatabase + appModule *test.App + stateModule *test.State + govModule *test.Governance + dbModule blockdb.BlockDatabase broadcastTargets map[types.NodeID]struct{} networkLatency test.LatencyModel proposingLatency test.LatencyModel prevFinalHeight uint64 pendings []*types.Block + prevHash common.Hash + // This variable caches the maximum NumChains seen by this node when + // it's notified for round switching. + latticeMaxNumChains uint32 } -// NewNode constructs an instance of Node. -func NewNode( - app *test.App, - gov core.Governance, - db blockdb.BlockDatabase, +// newNode constructs an instance of Node. +func newNode( + gov *test.Governance, privateKey crypto.PrivateKey, dMoment time.Time, + ownChains []uint32, networkLatency test.LatencyModel, - proposingLatency test.LatencyModel) *Node { - - var ( - chainID = uint32(math.MaxUint32) - governanceConfig = gov.Configuration(0) - nodeSetKeys = gov.NodeSet(0) - nodeID = types.NewNodeID(privateKey.PublicKey()) - ) - broadcastTargets := make(map[types.NodeID]struct{}) - for _, k := range nodeSetKeys { - broadcastTargets[types.NewNodeID(k)] = struct{}{} - } - hashes := common.Hashes{} - for nID := range broadcastTargets { - hashes = append(hashes, nID.Hash) - } - sort.Sort(hashes) - for i, h := range hashes { - if h == nodeID.Hash { - chainID = uint32(i) - } + proposingLatency test.LatencyModel) (*Node, error) { + // Load all configs prepared in core.Governance into core.Lattice. + copiedGov := gov.Clone() + configs := loadAllConfigs(copiedGov) + // Setup blockdb. + db, err := blockdb.NewMemBackedBlockDB() + if err != nil { + return nil, err + } + // Setup test.App + app := test.NewApp(copiedGov.State()) + // Setup lattice instance. + lattice := core.NewLattice( + dMoment, + configs[0], + core.NewAuthenticator(privateKey), + app, + app, + db, + &common.NullLogger{}) + n := &Node{ + ID: types.NewNodeID(privateKey.PublicKey()), + ownChains: ownChains, + roundEndTimes: genRoundEndTimes(configs, dMoment), + roundToNotify: 2, + networkLatency: networkLatency, + proposingLatency: proposingLatency, + appModule: app, + stateModule: copiedGov.State(), + dbModule: db, + govModule: copiedGov, + lattice: lattice, + latticeMaxNumChains: configs[0].NumChains, } - delete(broadcastTargets, nodeID) - return &Node{ - ID: nodeID, - chainID: chainID, - chainNum: governanceConfig.NumChains, - broadcastTargets: broadcastTargets, - networkLatency: networkLatency, - proposingLatency: proposingLatency, - app: app, - db: db, - lattice: core.NewLattice( - dMoment, - governanceConfig, - core.NewAuthenticator(privateKey), - app, - app, - db, - &common.NullLogger{}), + for idx, config := range configs[1:] { + if err := lattice.AppendConfig(uint64(idx+1), config); err != nil { + return nil, err + } + if config.NumChains > n.latticeMaxNumChains { + n.latticeMaxNumChains = config.NumChains + } } + return n, nil } // Handle implements test.EventHandler interface. @@ -142,47 +153,65 @@ func (n *Node) Handle(e *test.Event) (events []*test.Event) { return } -func (n *Node) handleProposeBlock(when time.Time, _ interface{}) ( +func (n *Node) handleProposeBlock(when time.Time, payload interface{}) ( events []*test.Event, err error) { - - b, err := n.prepareBlock(when) + pos := payload.(*struct { + round uint64 + chain uint32 + }) + b, err := n.prepareBlock(pos.round, pos.chain, when) if err != nil { + if err == core.ErrInvalidChainID { + // This chain is not included in this round, retry in next round. + events = append(events, newProposeBlockEvent( + n.ID, b.Position.Round+1, b.Position.ChainID, + n.roundEndTimes[b.Position.Round])) + } return } - if err = n.processBlock(b); err != nil { - return + if events, err = n.processBlock(b); err != nil { + // It's shouldn't be error when prepared. + panic(err) } // Create 'block received' event for each other nodes. for nID := range n.broadcastTargets { - events = append(events, NewReceiveBlockEvent( + events = append(events, newReceiveBlockEvent( nID, when.Add(n.networkLatency.Delay()), b.Clone())) } // Create next 'block proposing' event for this nodes. - events = append(events, NewProposeBlockEvent( - n.ID, when.Add(n.proposingLatency.Delay()))) + events = append(events, newProposeBlockEvent(n.ID, + b.Position.Round, + b.Position.ChainID, + when.Add(n.proposingLatency.Delay()))) return } func (n *Node) handleReceiveBlock(piggyback interface{}) ( events []*test.Event, err error) { - - err = n.processBlock(piggyback.(*types.Block)) + events, err = n.processBlock(piggyback.(*types.Block)) if err != nil { panic(err) } return } -func (n *Node) prepareBlock(when time.Time) (b *types.Block, err error) { +func (n *Node) prepareBlock( + round uint64, chainID uint32, when time.Time) (b *types.Block, err error) { b = &types.Block{ Position: types.Position{ - ChainID: n.chainID, + Round: round, + ChainID: chainID, }} - err = n.lattice.PrepareBlock(b, when) + if err = n.lattice.PrepareBlock(b, when); err != nil { + if err == core.ErrRoundNotSwitch { + b.Position.Round++ + err = n.lattice.PrepareBlock(b, when) + } + } return } -func (n *Node) processBlock(b *types.Block) (err error) { +func (n *Node) processBlock(b *types.Block) (events []*test.Event, err error) { // TODO(mission): this segment of code is identical to testLatticeMgr in // core/lattice_test.go, except the compaction-chain part. var ( @@ -223,15 +252,105 @@ func (n *Node) processBlock(b *types.Block) (err error) { } // Deliver blocks. for _, b = range delivered { - if err = n.db.Put(*b); err != nil { + if err = n.dbModule.Put(*b); err != nil { panic(err) } b.Finalization.Height = n.prevFinalHeight + 1 - n.app.BlockDelivered(b.Hash, b.Finalization) + b.Finalization.ParentHash = n.prevHash + n.appModule.BlockDelivered(b.Hash, b.Finalization) n.prevFinalHeight++ + n.prevHash = b.Hash + events = append(events, n.checkRoundSwitch(b)...) } if err = n.lattice.PurgeBlocks(delivered); err != nil { panic(err) } return } + +func (n *Node) checkRoundSwitch(b *types.Block) (evts []*test.Event) { + if !b.Timestamp.After(n.roundEndTimes[b.Position.Round]) { + return + } + if b.Position.Round+2 != n.roundToNotify { + return + } + // Handle round switching logic. + n.govModule.NotifyRoundHeight(n.roundToNotify, b.Finalization.Height) + if n.roundToNotify == uint64(len(n.roundEndTimes)) { + config := n.govModule.Configuration(n.roundToNotify) + if config == nil { + panic(fmt.Errorf( + "config is not ready for round: %v", n.roundToNotify-1)) + } + // Cache round ended time for each round. + n.roundEndTimes = append(n.roundEndTimes, + n.roundEndTimes[len(n.roundEndTimes)-1].Add( + config.RoundInterval)) + // Add new config to lattice module. + if err := n.lattice.AppendConfig(n.roundToNotify, config); err != nil { + panic(err) + } + if config.NumChains > n.latticeMaxNumChains { + // We can be sure that lattice module can support this number of + // chains. + for _, chainID := range n.ownChains { + if chainID < n.latticeMaxNumChains { + continue + } + if chainID >= config.NumChains { + continue + } + // For newly added chains, add block proposing seed event. + evts = append(evts, newProposeBlockEvent(n.ID, n.roundToNotify, + chainID, n.roundEndTimes[n.roundToNotify-1])) + } + n.latticeMaxNumChains = config.NumChains + } + } else if n.roundToNotify > uint64(len(n.roundEndTimes)) { + panic(fmt.Errorf( + "config notification not incremental: %v, cached configs: %v", + n.roundToNotify, len(n.roundEndTimes))) + } + n.roundToNotify++ + return +} + +// Bootstrap this node with block proposing event. +func (n *Node) Bootstrap(sch *test.Scheduler, now time.Time) (err error) { + sch.RegisterEventHandler(n.ID, n) + for _, chainID := range n.ownChains { + if chainID >= n.latticeMaxNumChains { + continue + } + err = sch.Seed(newProposeBlockEvent(n.ID, 0, chainID, now)) + if err != nil { + return + } + } + return +} + +func (n *Node) setBroadcastTargets(targets map[types.NodeID]struct{}) { + // Clone targets, except self. + targetsCopy := make(map[types.NodeID]struct{}) + for nID := range targets { + if nID == n.ID { + continue + } + targetsCopy[nID] = struct{}{} + } + n.broadcastTargets = targetsCopy +} + +func (n *Node) app() *test.App { + return n.appModule +} + +func (n *Node) db() blockdb.BlockDatabase { + return n.dbModule +} + +func (n *Node) gov() *test.Governance { + return n.govModule +} |