diff options
Diffstat (limited to 'core/test/network.go')
-rw-r--r-- | core/test/network.go | 67 |
1 files changed, 48 insertions, 19 deletions
diff --git a/core/test/network.go b/core/test/network.go index 00c60d9..0d92af0 100644 --- a/core/test/network.go +++ b/core/test/network.go @@ -19,9 +19,11 @@ package test import ( "context" + "errors" "fmt" "net" "strconv" + "sync" "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/crypto" @@ -48,16 +50,20 @@ type NetworkConfig struct { // Network implements core.Network interface based on TransportClient. type Network struct { - config NetworkConfig - ctx context.Context - ctxCancel context.CancelFunc - trans TransportClient - fromTransport <-chan *TransportEnvelope - toConsensus chan interface{} - toNode chan interface{} - sentRandomness map[common.Hash]struct{} - sentAgreement map[common.Hash]struct{} - blockCache map[common.Hash]*types.Block + config NetworkConfig + ctx context.Context + ctxCancel context.CancelFunc + trans TransportClient + fromTransport <-chan *TransportEnvelope + toConsensus chan interface{} + toNode chan interface{} + sentRandomnessLock sync.Mutex + sentRandomness map[common.Hash]struct{} + sentAgreementLock sync.Mutex + sentAgreement map[common.Hash]struct{} + blockCacheLock sync.RWMutex + blockCache map[common.Hash]*types.Block + stateModule *State } // NewNetwork setup network stuffs for nodes, which provides an @@ -91,6 +97,8 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel, // PullBlocks implements core.Network interface. func (n *Network) PullBlocks(hashes common.Hashes) { go func() { + n.blockCacheLock.RLock() + defer n.blockCacheLock.RUnlock() for _, hash := range hashes { // TODO(jimmy-dexon): request block from network instead of cache. if block, exist := n.blockCache[hash]; exist { @@ -124,6 +132,8 @@ func (n *Network) BroadcastBlock(block *types.Block) { // BroadcastAgreementResult implements core.Network interface. func (n *Network) BroadcastAgreementResult( randRequest *types.AgreementResult) { + n.sentAgreementLock.Lock() + defer n.sentAgreementLock.Unlock() if _, exist := n.sentAgreement[randRequest.BlockHash]; exist { return } @@ -143,6 +153,8 @@ func (n *Network) BroadcastAgreementResult( // BroadcastRandomnessResult implements core.Network interface. func (n *Network) BroadcastRandomnessResult( randResult *types.BlockRandomnessResult) { + n.sentRandomnessLock.Lock() + defer n.sentRandomnessLock.Unlock() if _, exist := n.sentRandomness[randResult.BlockHash]; exist { return } @@ -214,21 +226,33 @@ func (n *Network) Setup(serverEndpoint interface{}) (err error) { return } -func (n *Network) msgHandler(e *TransportEnvelope) { +func (n *Network) dispatchMsg(e *TransportEnvelope) { switch v := e.Msg.(type) { case *types.Block: - if len(n.blockCache) > 500 { - // Randomly purge one block from cache. - for k := range n.blockCache { - delete(n.blockCache, k) - break + func() { + n.blockCacheLock.Lock() + defer n.blockCacheLock.Unlock() + if len(n.blockCache) > 500 { + // Randomly purge one block from cache. + for k := range n.blockCache { + delete(n.blockCache, k) + break + } } - } - n.blockCache[v.Hash] = v + n.blockCache[v.Hash] = v + }() n.toConsensus <- e.Msg case *types.Vote, *types.AgreementResult, *types.BlockRandomnessResult, *typesDKG.PrivateShare, *typesDKG.PartialSignature: n.toConsensus <- e.Msg + case packedStateChanges: + if n.stateModule == nil { + panic(errors.New( + "receive packed state change request without state attached")) + } + if err := n.stateModule.AddRequestsFromOthers([]byte(v)); err != nil { + panic(err) + } default: n.toNode <- e.Msg } @@ -250,7 +274,7 @@ Loop: if !ok { break Loop } - n.msgHandler(e) + n.dispatchMsg(e) } } } @@ -291,3 +315,8 @@ func (n *Network) Broadcast(msg interface{}) { func (n *Network) ReceiveChanForNode() <-chan interface{} { return n.toNode } + +// addStateModule attaches a State instance to this network. +func (n *Network) addStateModule(s *State) { + n.stateModule = s +} |