diff options
Diffstat (limited to 'core/test/network.go')
-rw-r--r-- | core/test/network.go | 201 |
1 files changed, 110 insertions, 91 deletions
diff --git a/core/test/network.go b/core/test/network.go index a79898e..066d36c 100644 --- a/core/test/network.go +++ b/core/test/network.go @@ -22,7 +22,6 @@ import ( "encoding/json" "errors" "fmt" - "log" "net" "strconv" "sync" @@ -52,9 +51,12 @@ const ( // NetworkConfig is the configuration for Network module. type NetworkConfig struct { - Type NetworkType - PeerServer string - PeerPort int + Type NetworkType + PeerServer string + PeerPort int + DirectLatency LatencyModel + GossipLatency LatencyModel + Marshaller Marshaller } // PullRequest is a generic request to pull everything (ex. vote, block...). @@ -151,7 +153,6 @@ type Network struct { unreceivedBlocks map[common.Hash]chan<- common.Hash unreceivedRandomnessLock sync.RWMutex unreceivedRandomness map[common.Hash]chan<- common.Hash - latencyModel LatencyModel cache *utils.NodeSetCache notarySetCachesLock sync.Mutex notarySetCaches map[uint64]map[uint32]map[types.NodeID]struct{} @@ -161,8 +162,8 @@ type Network struct { // NewNetwork setup network stuffs for nodes, which provides an // implementation of core.Network based on TransportClient. -func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel, - marshaller Marshaller, config NetworkConfig) (n *Network) { +func NewNetwork(pubKey crypto.PublicKey, config NetworkConfig) ( + n *Network) { // Construct basic network instance. n = &Network{ ID: types.NewNodeID(pubKey), @@ -175,9 +176,10 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel, randomnessCache: make(map[common.Hash]*types.BlockRandomnessResult), unreceivedBlocks: make(map[common.Hash]chan<- common.Hash), unreceivedRandomness: make(map[common.Hash]chan<- common.Hash), - latencyModel: latency, - notarySetCaches: make(map[uint64]map[uint32]map[types.NodeID]struct{}), - dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}), + peers: make(map[types.NodeID]struct{}), + notarySetCaches: make( + map[uint64]map[uint32]map[types.NodeID]struct{}), + dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}), voteCache: make( map[types.Position]map[types.VoteHeader]*types.Vote), } @@ -185,11 +187,11 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel, // Construct transport layer. switch config.Type { case NetworkTypeTCPLocal: - n.trans = NewTCPTransportClient(pubKey, latency, marshaller, true) + n.trans = NewTCPTransportClient(pubKey, config.Marshaller, true) case NetworkTypeTCP: - n.trans = NewTCPTransportClient(pubKey, latency, marshaller, false) + n.trans = NewTCPTransportClient(pubKey, config.Marshaller, false) case NetworkTypeFake: - n.trans = NewFakeTransportClient(pubKey, latency) + n.trans = NewFakeTransportClient(pubKey) default: panic(fmt.Errorf("unknown network type: %v", config.Type)) } @@ -213,8 +215,11 @@ func (n *Network) PullRandomness(hashes common.Hashes) { // BroadcastVote implements core.Network interface. func (n *Network) BroadcastVote(vote *types.Vote) { - n.broadcastToSet( - n.getNotarySet(vote.Position.Round, vote.Position.ChainID), vote) + if err := n.trans.Broadcast( + n.getNotarySet(vote.Position.Round, vote.Position.ChainID), + n.config.DirectLatency, vote); err != nil { + panic(err) + } n.addVoteToCache(vote) } @@ -222,28 +227,33 @@ func (n *Network) BroadcastVote(vote *types.Vote) { func (n *Network) BroadcastBlock(block *types.Block) { // Avoid data race in fake transport. block = n.cloneForFake(block).(*types.Block) - n.broadcastToSet( - n.getNotarySet(block.Position.Round, block.Position.ChainID), block) + notarySet := n.getNotarySet(block.Position.Round, block.Position.ChainID) + if err := n.trans.Broadcast( + notarySet, n.config.DirectLatency, block); err != nil { + panic(err) + } + if err := n.trans.Broadcast(getComplementSet(n.peers, notarySet), + n.config.GossipLatency, block); err != nil { + panic(err) + } n.addBlockToCache(block) } // BroadcastAgreementResult implements core.Network interface. func (n *Network) BroadcastAgreementResult( - randRequest *types.AgreementResult) { - n.sentAgreementLock.Lock() - defer n.sentAgreementLock.Unlock() - if _, exist := n.sentAgreement[randRequest.BlockHash]; exist { + result *types.AgreementResult) { + if !n.markAgreementResultAsSent(result.BlockHash) { return } - if len(n.sentAgreement) > 1000 { - // Randomly drop one entry. - for k := range n.sentAgreement { - delete(n.sentAgreement, k) - break - } + // Send to DKG set first. + dkgSet := n.getDKGSet(result.Position.Round) + if err := n.trans.Broadcast( + dkgSet, n.config.DirectLatency, result); err != nil { + panic(err) } - n.sentAgreement[randRequest.BlockHash] = struct{}{} - if err := n.trans.Broadcast(randRequest); err != nil { + // Gossip to other nodes. + if err := n.trans.Broadcast(getComplementSet(n.peers, dkgSet), + n.config.GossipLatency, result); err != nil { panic(err) } } @@ -251,20 +261,19 @@ func (n *Network) BroadcastAgreementResult( // BroadcastRandomnessResult implements core.Network interface. func (n *Network) BroadcastRandomnessResult( randResult *types.BlockRandomnessResult) { - n.sentRandomnessLock.Lock() - defer n.sentRandomnessLock.Unlock() - if _, exist := n.sentRandomness[randResult.BlockHash]; exist { + if !n.markRandomnessResultAsSent(randResult.BlockHash) { return } - if len(n.sentRandomness) > 1000 { - // Randomly drop one entry. - for k := range n.sentRandomness { - delete(n.sentRandomness, k) - break - } + // Send to notary set first. + notarySet := n.getNotarySet( + randResult.Position.Round, randResult.Position.ChainID) + if err := n.trans.Broadcast( + notarySet, n.config.DirectLatency, randResult); err != nil { + panic(err) } - n.sentRandomness[randResult.BlockHash] = struct{}{} - if err := n.trans.Broadcast(randResult); err != nil { + // Gossip to other nodes. + if err := n.trans.Broadcast(getComplementSet(n.peers, notarySet), + n.config.GossipLatency, randResult); err != nil { panic(err) } n.addRandomnessToCache(randResult) @@ -273,21 +282,25 @@ func (n *Network) BroadcastRandomnessResult( // SendDKGPrivateShare implements core.Network interface. func (n *Network) SendDKGPrivateShare( recv crypto.PublicKey, prvShare *typesDKG.PrivateShare) { - if err := n.trans.Send(types.NewNodeID(recv), prvShare); err != nil { - panic(err) - } + n.send(types.NewNodeID(recv), prvShare) } // BroadcastDKGPrivateShare implements core.Network interface. func (n *Network) BroadcastDKGPrivateShare( prvShare *typesDKG.PrivateShare) { - n.broadcastToSet(n.getDKGSet(prvShare.Round), prvShare) + if err := n.trans.Broadcast(n.getDKGSet(prvShare.Round), + n.config.DirectLatency, prvShare); err != nil { + panic(err) + } } // BroadcastDKGPartialSignature implements core.Network interface. func (n *Network) BroadcastDKGPartialSignature( psig *typesDKG.PartialSignature) { - n.broadcastToSet(n.getDKGSet(psig.Round), psig) + if err := n.trans.Broadcast( + n.getDKGSet(psig.Round), n.config.DirectLatency, psig); err != nil { + panic(err) + } } // ReceiveChan implements core.Network interface. @@ -312,7 +325,6 @@ func (n *Network) Setup(serverEndpoint interface{}) (err error) { return } peerKeys := n.trans.Peers() - n.peers = make(map[types.NodeID]struct{}) for _, k := range peerKeys { n.peers[types.NewNodeID(k)] = struct{}{} } @@ -374,9 +386,7 @@ func (n *Network) handlePullRequest(req *PullRequest) { break All default: } - if err := n.trans.Send(req.Requester, b); err != nil { - log.Println("unable to send block", req.Requester, err) - } + n.send(req.Requester, b) } }() case "vote": @@ -386,9 +396,7 @@ func (n *Network) handlePullRequest(req *PullRequest) { defer n.voteCacheLock.Unlock() if votes, exists := n.voteCache[pos]; exists { for _, v := range votes { - if err := n.trans.Send(req.Requester, v); err != nil { - log.Println("unable to send vote", req.Requester, err) - } + n.send(req.Requester, v) } } }() @@ -408,9 +416,7 @@ func (n *Network) handlePullRequest(req *PullRequest) { break All default: } - if err := n.trans.Send(req.Requester, r); err != nil { - log.Println("unable to send randomness", req.Requester, err) - } + n.send(req.Requester, r) } }() default: @@ -457,24 +463,16 @@ func (n *Network) Report(msg interface{}) error { return n.trans.Report(msg) } +// Broadcast a message to all peers. +func (n *Network) Broadcast(msg interface{}) error { + return n.trans.Broadcast(n.peers, &FixedLatencyModel{}, msg) +} + // Peers exports 'Peers' method of Transport. func (n *Network) Peers() []crypto.PublicKey { return n.trans.Peers() } -// Broadcast exports 'Broadcast' method of Transport, and would panic when -// error. -func (n *Network) Broadcast(msg interface{}) { - if err := n.trans.Broadcast(msg); err != nil { - panic(err) - } -} - -// Send exports 'Send' method of Transport. -func (n *Network) Send(nodeID types.NodeID, msg interface{}) error { - return n.trans.Send(nodeID, msg) -} - // ReceiveChanForNode returns a channel for messages not handled by // core.Consensus. func (n *Network) ReceiveChanForNode() <-chan interface{} { @@ -521,14 +519,11 @@ Loop: if nID == n.ID { continue } - if err := n.trans.Send(nID, req); err != nil { - // Try next peer. - continue - } + n.send(nID, req) select { case <-n.ctx.Done(): break Loop - case <-time.After(2 * n.latencyModel.Delay()): + case <-time.After(2 * n.config.DirectLatency.Delay()): // Consume everything in the notification channel. for { select { @@ -561,10 +556,7 @@ func (n *Network) pullVotesAsync(pos types.Position) { // Randomly select one peer from notary set and send a pull request. sentCount := 0 for nID := range notarySet { - if err := n.trans.Send(nID, req); err != nil { - // Try next peer. - continue - } + n.send(nID, req) sentCount++ if sentCount >= maxPullingPeerCount { break @@ -598,14 +590,11 @@ Loop: if nID == n.ID { continue } - if err := n.trans.Send(nID, req); err != nil { - // Try next peer. - continue - } + n.send(nID, req) select { case <-n.ctx.Done(): break Loop - case <-time.After(2 * n.latencyModel.Delay()): + case <-time.After(2 * n.config.DirectLatency.Delay()): // Consume everything in the notification channel. for { select { @@ -673,6 +662,40 @@ func (n *Network) addRandomnessToCache(rand *types.BlockRandomnessResult) { n.randomnessCache[rand.BlockHash] = rand } +func (n *Network) markAgreementResultAsSent(blockHash common.Hash) bool { + n.sentAgreementLock.Lock() + defer n.sentAgreementLock.Unlock() + if _, exist := n.sentAgreement[blockHash]; exist { + return false + } + if len(n.sentAgreement) > 1000 { + // Randomly drop one entry. + for k := range n.sentAgreement { + delete(n.sentAgreement, k) + break + } + } + n.sentAgreement[blockHash] = struct{}{} + return true +} + +func (n *Network) markRandomnessResultAsSent(blockHash common.Hash) bool { + n.sentRandomnessLock.Lock() + defer n.sentRandomnessLock.Unlock() + if _, exist := n.sentRandomness[blockHash]; exist { + return false + } + if len(n.sentRandomness) > 1000 { + // Randomly drop one entry. + for k := range n.sentRandomness { + delete(n.sentRandomness, k) + break + } + } + n.sentRandomness[blockHash] = struct{}{} + return true +} + func (n *Network) cloneForFake(v interface{}) interface{} { if n.config.Type != NetworkTypeFake { return v @@ -735,15 +758,11 @@ func (n *Network) getDKGSet(round uint64) map[types.NodeID]struct{} { return set } -// broadcastToSet broadcast a message to a set of nodes. -func (n *Network) broadcastToSet( - set map[types.NodeID]struct{}, msg interface{}) { - for nID := range set { - if nID == n.ID { - continue - } - if err := n.trans.Send(nID, msg); err != nil { +func (n *Network) send(endpoint types.NodeID, msg interface{}) { + go func() { + time.Sleep(n.config.DirectLatency.Delay()) + if err := n.trans.Send(endpoint, msg); err != nil { panic(err) } - } + }() } |