aboutsummaryrefslogtreecommitdiffstats
path: root/core/test/network.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/test/network.go')
-rw-r--r--core/test/network.go201
1 files changed, 110 insertions, 91 deletions
diff --git a/core/test/network.go b/core/test/network.go
index a79898e..066d36c 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -22,7 +22,6 @@ import (
"encoding/json"
"errors"
"fmt"
- "log"
"net"
"strconv"
"sync"
@@ -52,9 +51,12 @@ const (
// NetworkConfig is the configuration for Network module.
type NetworkConfig struct {
- Type NetworkType
- PeerServer string
- PeerPort int
+ Type NetworkType
+ PeerServer string
+ PeerPort int
+ DirectLatency LatencyModel
+ GossipLatency LatencyModel
+ Marshaller Marshaller
}
// PullRequest is a generic request to pull everything (ex. vote, block...).
@@ -151,7 +153,6 @@ type Network struct {
unreceivedBlocks map[common.Hash]chan<- common.Hash
unreceivedRandomnessLock sync.RWMutex
unreceivedRandomness map[common.Hash]chan<- common.Hash
- latencyModel LatencyModel
cache *utils.NodeSetCache
notarySetCachesLock sync.Mutex
notarySetCaches map[uint64]map[uint32]map[types.NodeID]struct{}
@@ -161,8 +162,8 @@ type Network struct {
// NewNetwork setup network stuffs for nodes, which provides an
// implementation of core.Network based on TransportClient.
-func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
- marshaller Marshaller, config NetworkConfig) (n *Network) {
+func NewNetwork(pubKey crypto.PublicKey, config NetworkConfig) (
+ n *Network) {
// Construct basic network instance.
n = &Network{
ID: types.NewNodeID(pubKey),
@@ -175,9 +176,10 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
randomnessCache: make(map[common.Hash]*types.BlockRandomnessResult),
unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
unreceivedRandomness: make(map[common.Hash]chan<- common.Hash),
- latencyModel: latency,
- notarySetCaches: make(map[uint64]map[uint32]map[types.NodeID]struct{}),
- dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}),
+ peers: make(map[types.NodeID]struct{}),
+ notarySetCaches: make(
+ map[uint64]map[uint32]map[types.NodeID]struct{}),
+ dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}),
voteCache: make(
map[types.Position]map[types.VoteHeader]*types.Vote),
}
@@ -185,11 +187,11 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
// Construct transport layer.
switch config.Type {
case NetworkTypeTCPLocal:
- n.trans = NewTCPTransportClient(pubKey, latency, marshaller, true)
+ n.trans = NewTCPTransportClient(pubKey, config.Marshaller, true)
case NetworkTypeTCP:
- n.trans = NewTCPTransportClient(pubKey, latency, marshaller, false)
+ n.trans = NewTCPTransportClient(pubKey, config.Marshaller, false)
case NetworkTypeFake:
- n.trans = NewFakeTransportClient(pubKey, latency)
+ n.trans = NewFakeTransportClient(pubKey)
default:
panic(fmt.Errorf("unknown network type: %v", config.Type))
}
@@ -213,8 +215,11 @@ func (n *Network) PullRandomness(hashes common.Hashes) {
// BroadcastVote implements core.Network interface.
func (n *Network) BroadcastVote(vote *types.Vote) {
- n.broadcastToSet(
- n.getNotarySet(vote.Position.Round, vote.Position.ChainID), vote)
+ if err := n.trans.Broadcast(
+ n.getNotarySet(vote.Position.Round, vote.Position.ChainID),
+ n.config.DirectLatency, vote); err != nil {
+ panic(err)
+ }
n.addVoteToCache(vote)
}
@@ -222,28 +227,33 @@ func (n *Network) BroadcastVote(vote *types.Vote) {
func (n *Network) BroadcastBlock(block *types.Block) {
// Avoid data race in fake transport.
block = n.cloneForFake(block).(*types.Block)
- n.broadcastToSet(
- n.getNotarySet(block.Position.Round, block.Position.ChainID), block)
+ notarySet := n.getNotarySet(block.Position.Round, block.Position.ChainID)
+ if err := n.trans.Broadcast(
+ notarySet, n.config.DirectLatency, block); err != nil {
+ panic(err)
+ }
+ if err := n.trans.Broadcast(getComplementSet(n.peers, notarySet),
+ n.config.GossipLatency, block); err != nil {
+ panic(err)
+ }
n.addBlockToCache(block)
}
// BroadcastAgreementResult implements core.Network interface.
func (n *Network) BroadcastAgreementResult(
- randRequest *types.AgreementResult) {
- n.sentAgreementLock.Lock()
- defer n.sentAgreementLock.Unlock()
- if _, exist := n.sentAgreement[randRequest.BlockHash]; exist {
+ result *types.AgreementResult) {
+ if !n.markAgreementResultAsSent(result.BlockHash) {
return
}
- if len(n.sentAgreement) > 1000 {
- // Randomly drop one entry.
- for k := range n.sentAgreement {
- delete(n.sentAgreement, k)
- break
- }
+ // Send to DKG set first.
+ dkgSet := n.getDKGSet(result.Position.Round)
+ if err := n.trans.Broadcast(
+ dkgSet, n.config.DirectLatency, result); err != nil {
+ panic(err)
}
- n.sentAgreement[randRequest.BlockHash] = struct{}{}
- if err := n.trans.Broadcast(randRequest); err != nil {
+ // Gossip to other nodes.
+ if err := n.trans.Broadcast(getComplementSet(n.peers, dkgSet),
+ n.config.GossipLatency, result); err != nil {
panic(err)
}
}
@@ -251,20 +261,19 @@ func (n *Network) BroadcastAgreementResult(
// BroadcastRandomnessResult implements core.Network interface.
func (n *Network) BroadcastRandomnessResult(
randResult *types.BlockRandomnessResult) {
- n.sentRandomnessLock.Lock()
- defer n.sentRandomnessLock.Unlock()
- if _, exist := n.sentRandomness[randResult.BlockHash]; exist {
+ if !n.markRandomnessResultAsSent(randResult.BlockHash) {
return
}
- if len(n.sentRandomness) > 1000 {
- // Randomly drop one entry.
- for k := range n.sentRandomness {
- delete(n.sentRandomness, k)
- break
- }
+ // Send to notary set first.
+ notarySet := n.getNotarySet(
+ randResult.Position.Round, randResult.Position.ChainID)
+ if err := n.trans.Broadcast(
+ notarySet, n.config.DirectLatency, randResult); err != nil {
+ panic(err)
}
- n.sentRandomness[randResult.BlockHash] = struct{}{}
- if err := n.trans.Broadcast(randResult); err != nil {
+ // Gossip to other nodes.
+ if err := n.trans.Broadcast(getComplementSet(n.peers, notarySet),
+ n.config.GossipLatency, randResult); err != nil {
panic(err)
}
n.addRandomnessToCache(randResult)
@@ -273,21 +282,25 @@ func (n *Network) BroadcastRandomnessResult(
// SendDKGPrivateShare implements core.Network interface.
func (n *Network) SendDKGPrivateShare(
recv crypto.PublicKey, prvShare *typesDKG.PrivateShare) {
- if err := n.trans.Send(types.NewNodeID(recv), prvShare); err != nil {
- panic(err)
- }
+ n.send(types.NewNodeID(recv), prvShare)
}
// BroadcastDKGPrivateShare implements core.Network interface.
func (n *Network) BroadcastDKGPrivateShare(
prvShare *typesDKG.PrivateShare) {
- n.broadcastToSet(n.getDKGSet(prvShare.Round), prvShare)
+ if err := n.trans.Broadcast(n.getDKGSet(prvShare.Round),
+ n.config.DirectLatency, prvShare); err != nil {
+ panic(err)
+ }
}
// BroadcastDKGPartialSignature implements core.Network interface.
func (n *Network) BroadcastDKGPartialSignature(
psig *typesDKG.PartialSignature) {
- n.broadcastToSet(n.getDKGSet(psig.Round), psig)
+ if err := n.trans.Broadcast(
+ n.getDKGSet(psig.Round), n.config.DirectLatency, psig); err != nil {
+ panic(err)
+ }
}
// ReceiveChan implements core.Network interface.
@@ -312,7 +325,6 @@ func (n *Network) Setup(serverEndpoint interface{}) (err error) {
return
}
peerKeys := n.trans.Peers()
- n.peers = make(map[types.NodeID]struct{})
for _, k := range peerKeys {
n.peers[types.NewNodeID(k)] = struct{}{}
}
@@ -374,9 +386,7 @@ func (n *Network) handlePullRequest(req *PullRequest) {
break All
default:
}
- if err := n.trans.Send(req.Requester, b); err != nil {
- log.Println("unable to send block", req.Requester, err)
- }
+ n.send(req.Requester, b)
}
}()
case "vote":
@@ -386,9 +396,7 @@ func (n *Network) handlePullRequest(req *PullRequest) {
defer n.voteCacheLock.Unlock()
if votes, exists := n.voteCache[pos]; exists {
for _, v := range votes {
- if err := n.trans.Send(req.Requester, v); err != nil {
- log.Println("unable to send vote", req.Requester, err)
- }
+ n.send(req.Requester, v)
}
}
}()
@@ -408,9 +416,7 @@ func (n *Network) handlePullRequest(req *PullRequest) {
break All
default:
}
- if err := n.trans.Send(req.Requester, r); err != nil {
- log.Println("unable to send randomness", req.Requester, err)
- }
+ n.send(req.Requester, r)
}
}()
default:
@@ -457,24 +463,16 @@ func (n *Network) Report(msg interface{}) error {
return n.trans.Report(msg)
}
+// Broadcast a message to all peers.
+func (n *Network) Broadcast(msg interface{}) error {
+ return n.trans.Broadcast(n.peers, &FixedLatencyModel{}, msg)
+}
+
// Peers exports 'Peers' method of Transport.
func (n *Network) Peers() []crypto.PublicKey {
return n.trans.Peers()
}
-// Broadcast exports 'Broadcast' method of Transport, and would panic when
-// error.
-func (n *Network) Broadcast(msg interface{}) {
- if err := n.trans.Broadcast(msg); err != nil {
- panic(err)
- }
-}
-
-// Send exports 'Send' method of Transport.
-func (n *Network) Send(nodeID types.NodeID, msg interface{}) error {
- return n.trans.Send(nodeID, msg)
-}
-
// ReceiveChanForNode returns a channel for messages not handled by
// core.Consensus.
func (n *Network) ReceiveChanForNode() <-chan interface{} {
@@ -521,14 +519,11 @@ Loop:
if nID == n.ID {
continue
}
- if err := n.trans.Send(nID, req); err != nil {
- // Try next peer.
- continue
- }
+ n.send(nID, req)
select {
case <-n.ctx.Done():
break Loop
- case <-time.After(2 * n.latencyModel.Delay()):
+ case <-time.After(2 * n.config.DirectLatency.Delay()):
// Consume everything in the notification channel.
for {
select {
@@ -561,10 +556,7 @@ func (n *Network) pullVotesAsync(pos types.Position) {
// Randomly select one peer from notary set and send a pull request.
sentCount := 0
for nID := range notarySet {
- if err := n.trans.Send(nID, req); err != nil {
- // Try next peer.
- continue
- }
+ n.send(nID, req)
sentCount++
if sentCount >= maxPullingPeerCount {
break
@@ -598,14 +590,11 @@ Loop:
if nID == n.ID {
continue
}
- if err := n.trans.Send(nID, req); err != nil {
- // Try next peer.
- continue
- }
+ n.send(nID, req)
select {
case <-n.ctx.Done():
break Loop
- case <-time.After(2 * n.latencyModel.Delay()):
+ case <-time.After(2 * n.config.DirectLatency.Delay()):
// Consume everything in the notification channel.
for {
select {
@@ -673,6 +662,40 @@ func (n *Network) addRandomnessToCache(rand *types.BlockRandomnessResult) {
n.randomnessCache[rand.BlockHash] = rand
}
+func (n *Network) markAgreementResultAsSent(blockHash common.Hash) bool {
+ n.sentAgreementLock.Lock()
+ defer n.sentAgreementLock.Unlock()
+ if _, exist := n.sentAgreement[blockHash]; exist {
+ return false
+ }
+ if len(n.sentAgreement) > 1000 {
+ // Randomly drop one entry.
+ for k := range n.sentAgreement {
+ delete(n.sentAgreement, k)
+ break
+ }
+ }
+ n.sentAgreement[blockHash] = struct{}{}
+ return true
+}
+
+func (n *Network) markRandomnessResultAsSent(blockHash common.Hash) bool {
+ n.sentRandomnessLock.Lock()
+ defer n.sentRandomnessLock.Unlock()
+ if _, exist := n.sentRandomness[blockHash]; exist {
+ return false
+ }
+ if len(n.sentRandomness) > 1000 {
+ // Randomly drop one entry.
+ for k := range n.sentRandomness {
+ delete(n.sentRandomness, k)
+ break
+ }
+ }
+ n.sentRandomness[blockHash] = struct{}{}
+ return true
+}
+
func (n *Network) cloneForFake(v interface{}) interface{} {
if n.config.Type != NetworkTypeFake {
return v
@@ -735,15 +758,11 @@ func (n *Network) getDKGSet(round uint64) map[types.NodeID]struct{} {
return set
}
-// broadcastToSet broadcast a message to a set of nodes.
-func (n *Network) broadcastToSet(
- set map[types.NodeID]struct{}, msg interface{}) {
- for nID := range set {
- if nID == n.ID {
- continue
- }
- if err := n.trans.Send(nID, msg); err != nil {
+func (n *Network) send(endpoint types.NodeID, msg interface{}) {
+ go func() {
+ time.Sleep(n.config.DirectLatency.Delay())
+ if err := n.trans.Send(endpoint, msg); err != nil {
panic(err)
}
- }
+ }()
}