aboutsummaryrefslogtreecommitdiffstats
path: root/core/test/network.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/test/network.go')
-rw-r--r--core/test/network.go67
1 files changed, 48 insertions, 19 deletions
diff --git a/core/test/network.go b/core/test/network.go
index 00c60d9..0d92af0 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -19,9 +19,11 @@ package test
import (
"context"
+ "errors"
"fmt"
"net"
"strconv"
+ "sync"
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core/crypto"
@@ -48,16 +50,20 @@ type NetworkConfig struct {
// Network implements core.Network interface based on TransportClient.
type Network struct {
- config NetworkConfig
- ctx context.Context
- ctxCancel context.CancelFunc
- trans TransportClient
- fromTransport <-chan *TransportEnvelope
- toConsensus chan interface{}
- toNode chan interface{}
- sentRandomness map[common.Hash]struct{}
- sentAgreement map[common.Hash]struct{}
- blockCache map[common.Hash]*types.Block
+ config NetworkConfig
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ trans TransportClient
+ fromTransport <-chan *TransportEnvelope
+ toConsensus chan interface{}
+ toNode chan interface{}
+ sentRandomnessLock sync.Mutex
+ sentRandomness map[common.Hash]struct{}
+ sentAgreementLock sync.Mutex
+ sentAgreement map[common.Hash]struct{}
+ blockCacheLock sync.RWMutex
+ blockCache map[common.Hash]*types.Block
+ stateModule *State
}
// NewNetwork setup network stuffs for nodes, which provides an
@@ -91,6 +97,8 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
// PullBlocks implements core.Network interface.
func (n *Network) PullBlocks(hashes common.Hashes) {
go func() {
+ n.blockCacheLock.RLock()
+ defer n.blockCacheLock.RUnlock()
for _, hash := range hashes {
// TODO(jimmy-dexon): request block from network instead of cache.
if block, exist := n.blockCache[hash]; exist {
@@ -124,6 +132,8 @@ func (n *Network) BroadcastBlock(block *types.Block) {
// BroadcastAgreementResult implements core.Network interface.
func (n *Network) BroadcastAgreementResult(
randRequest *types.AgreementResult) {
+ n.sentAgreementLock.Lock()
+ defer n.sentAgreementLock.Unlock()
if _, exist := n.sentAgreement[randRequest.BlockHash]; exist {
return
}
@@ -143,6 +153,8 @@ func (n *Network) BroadcastAgreementResult(
// BroadcastRandomnessResult implements core.Network interface.
func (n *Network) BroadcastRandomnessResult(
randResult *types.BlockRandomnessResult) {
+ n.sentRandomnessLock.Lock()
+ defer n.sentRandomnessLock.Unlock()
if _, exist := n.sentRandomness[randResult.BlockHash]; exist {
return
}
@@ -214,21 +226,33 @@ func (n *Network) Setup(serverEndpoint interface{}) (err error) {
return
}
-func (n *Network) msgHandler(e *TransportEnvelope) {
+func (n *Network) dispatchMsg(e *TransportEnvelope) {
switch v := e.Msg.(type) {
case *types.Block:
- if len(n.blockCache) > 500 {
- // Randomly purge one block from cache.
- for k := range n.blockCache {
- delete(n.blockCache, k)
- break
+ func() {
+ n.blockCacheLock.Lock()
+ defer n.blockCacheLock.Unlock()
+ if len(n.blockCache) > 500 {
+ // Randomly purge one block from cache.
+ for k := range n.blockCache {
+ delete(n.blockCache, k)
+ break
+ }
}
- }
- n.blockCache[v.Hash] = v
+ n.blockCache[v.Hash] = v
+ }()
n.toConsensus <- e.Msg
case *types.Vote, *types.AgreementResult, *types.BlockRandomnessResult,
*typesDKG.PrivateShare, *typesDKG.PartialSignature:
n.toConsensus <- e.Msg
+ case packedStateChanges:
+ if n.stateModule == nil {
+ panic(errors.New(
+ "receive packed state change request without state attached"))
+ }
+ if err := n.stateModule.AddRequestsFromOthers([]byte(v)); err != nil {
+ panic(err)
+ }
default:
n.toNode <- e.Msg
}
@@ -250,7 +274,7 @@ Loop:
if !ok {
break Loop
}
- n.msgHandler(e)
+ n.dispatchMsg(e)
}
}
}
@@ -291,3 +315,8 @@ func (n *Network) Broadcast(msg interface{}) {
func (n *Network) ReceiveChanForNode() <-chan interface{} {
return n.toNode
}
+
+// addStateModule attaches a State instance to this network.
+func (n *Network) addStateModule(s *State) {
+ n.stateModule = s
+}