diff options
Diffstat (limited to 'integration_test/consensus_test.go')
-rw-r--r-- | integration_test/consensus_test.go | 213 |
1 files changed, 205 insertions, 8 deletions
diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go index 8fd3fa4..6432e9f 100644 --- a/integration_test/consensus_test.go +++ b/integration_test/consensus_test.go @@ -18,6 +18,7 @@ package integration import ( + "context" "sync" "testing" "time" @@ -26,6 +27,7 @@ import ( "github.com/dexon-foundation/dexon-consensus/core" "github.com/dexon-foundation/dexon-consensus/core/blockdb" "github.com/dexon-foundation/dexon-consensus/core/crypto" + "github.com/dexon-foundation/dexon-consensus/core/syncer" "github.com/dexon-foundation/dexon-consensus/core/test" "github.com/dexon-foundation/dexon-consensus/core/types" "github.com/dexon-foundation/dexon-consensus/core/utils" @@ -39,10 +41,12 @@ type ConsensusTestSuite struct { } type node struct { - con *core.Consensus - app *test.App - gov *test.Governance - db blockdb.BlockDatabase + ID types.NodeID + con *core.Consensus + app *test.App + gov *test.Governance + db blockdb.BlockDatabase + network *test.Network } func (s *ConsensusTestSuite) setupNodes( @@ -81,8 +85,9 @@ func (s *ConsensusTestSuite) setupNodes( db, networkModule, k, - &common.NullLogger{}) - nodes[con.ID] = &node{con, app, gov, db} + &common.NullLogger{}, + ) + nodes[con.ID] = &node{con.ID, con, app, gov, db, networkModule} go func() { defer wg.Done() s.Require().NoError(networkModule.Setup(serverChannel)) @@ -107,6 +112,53 @@ func (s *ConsensusTestSuite) verifyNodes(nodes map[types.NodeID]*node) { } } +func (s *ConsensusTestSuite) syncBlocksWithSomeNode( + sourceNode *node, syncerObj *syncer.Consensus, nextSyncHeight uint64) ( + syncedCon *core.Consensus, syncerHeight uint64, err error) { + + syncerHeight = nextSyncHeight + // Setup revealer. + DBAll, err := sourceNode.db.GetAll() + if err != nil { + return + } + r, err := test.NewCompactionChainRevealer(DBAll, nextSyncHeight) + if err != nil { + return + } + // Load all blocks from revealer and dump them into syncer. + var compactionChainBlocks []*types.Block + syncBlocks := func() (done bool) { + syncedCon, err = syncerObj.SyncBlocks(compactionChainBlocks, true) + if syncedCon != nil || err != nil { + done = true + } + compactionChainBlocks = nil + return + } + for { + var b types.Block + b, err = r.Next() + if err != nil { + if err == blockdb.ErrIterationFinished { + err = nil + if syncBlocks() { + break + } + } + break + } + syncerHeight = b.Finalization.Height + 1 + compactionChainBlocks = append(compactionChainBlocks, &b) + if len(compactionChainBlocks) >= 100 { + if syncBlocks() { + break + } + } + } + return +} + func (s *ConsensusTestSuite) TestSimple() { // The simplest test case: // - Node set is equals to DKG set and notary set for each chain in each @@ -143,7 +195,7 @@ Loop: s.T().Log("check latest position delivered by each node") for _, n := range nodes { latestPos := n.app.GetLatestDeliveredPosition() - s.T().Log("latestPos", n.con.ID, &latestPos) + s.T().Log("latestPos", n.ID, &latestPos) if latestPos.Round < untilRound { continue Loop } @@ -152,6 +204,7 @@ Loop: break } s.verifyNodes(nodes) + // TODO(haoping) stop consensus. } func (s *ConsensusTestSuite) TestNumChainsChange() { @@ -215,7 +268,7 @@ Loop: s.T().Log("check latest position delivered by each node") for _, n := range nodes { latestPos := n.app.GetLatestDeliveredPosition() - s.T().Log("latestPos", n.con.ID, &latestPos) + s.T().Log("latestPos", n.ID, &latestPos) if latestPos.Round < untilRound { continue Loop } @@ -226,6 +279,150 @@ Loop: s.verifyNodes(nodes) } +func (s *ConsensusTestSuite) TestSync() { + // The sync test case: + // - No configuration change. + // - One node does not run when all others starts until aliveRound exceeded. + var ( + req = s.Require() + peerCount = 4 + dMoment = time.Now().UTC() + untilRound = uint64(5) + aliveRound = uint64(1) + errChan = make(chan error, 100) + ) + prvKeys, pubKeys, err := test.NewKeys(peerCount) + req.NoError(err) + // Setup seed governance instance. Give a short latency to make this test + // run faster. + seedGov, err := test.NewGovernance( + pubKeys, 100*time.Millisecond, core.ConfigRoundShift) + req.NoError(err) + req.NoError(seedGov.State().RequestChange( + test.StateChangeRoundInterval, 30*time.Second)) + // A short round interval. + nodes := s.setupNodes(dMoment, prvKeys, seedGov) + // Choose the first node as "syncNode" that its consensus' Run() is called + // later. + syncNode := nodes[types.NewNodeID(pubKeys[0])] + syncNode.con = nil + // Use other node's governance instance. Normally, fullnode would make + // governance when syncing. In our test, it's the simplest way to achieve + // that. + syncNode.gov = nodes[types.NewNodeID(pubKeys[1])].gov + for _, n := range nodes { + if n.ID != syncNode.ID { + go n.con.Run(&types.Block{}) + } + } + // Clean syncNode's network receive channel, or it might exceed the limit + // and block other go routines. + dummyReceiverCtx, dummyReceiverCtxCancel := context.WithCancel( + context.Background()) + go func() { + Loop: + for { + select { + case <-syncNode.network.ReceiveChan(): + case <-dummyReceiverCtx.Done(): + break Loop + } + } + }() +ReachAlive: + for { + // If all nodes excepts syncNode have reached aliveRound, call syncNode's + // Run() and send it all blocks in one of normal node's compaction chain. + for id, n := range nodes { + if id == syncNode.ID { + continue + } + if n.app.GetLatestDeliveredPosition().Round < aliveRound { + continue ReachAlive + } + // Check if any error happened or sleep for a period of time. + select { + case err := <-errChan: + req.NoError(err) + case <-time.After(5 * time.Second): + } + } + dummyReceiverCtxCancel() + break + } + // Initiate Syncer. + runnerCtx, runnerCtxCancel := context.WithCancel(context.Background()) + defer runnerCtxCancel() + syncerObj := syncer.NewConsensus( + dMoment, + syncNode.app, + syncNode.gov, + syncNode.db, + syncNode.network, + prvKeys[0], + &common.NullLogger{}, + ) + // Initialize communication channel, it's not recommended to assertion in + // another go routine. + go func() { + var ( + node *node + syncedHeight uint64 + err error + syncedCon *core.Consensus + ) + SyncLoop: + for { + for _, n := range nodes { + if n.ID == syncNode.ID { + continue + } + node = n + break + } + syncedCon, syncedHeight, err = s.syncBlocksWithSomeNode( + node, syncerObj, syncedHeight) + if syncedCon != nil { + // TODO(mission): run it and make sure it can follow up with + // other nodes. + runnerCtxCancel() + break SyncLoop + } + if err != nil { + errChan <- err + break SyncLoop + } + select { + case <-runnerCtx.Done(): + break SyncLoop + case <-time.After(2 * time.Second): + } + } + }() + // Wait until all nodes reach 'untilRound'. + go func() { + ReachFinished: + for { + time.Sleep(5 * time.Second) + for _, n := range nodes { + if n.app.GetLatestDeliveredPosition().Round < untilRound { + continue ReachFinished + } + } + break + } + runnerCtxCancel() + }() + // Block until any reasonable testing milestone reached. + select { + case err := <-errChan: + req.NoError(err) + case <-runnerCtx.Done(): + // This test passed. + // TODO(haoping) stop consensus. + } +} + func TestConsensus(t *testing.T) { suite.Run(t, new(ConsensusTestSuite)) } |