diff options
Diffstat (limited to 'core/test/network.go')
-rw-r--r-- | core/test/network.go | 100 |
1 files changed, 93 insertions, 7 deletions
diff --git a/core/test/network.go b/core/test/network.go index 6034fa6..c903c57 100644 --- a/core/test/network.go +++ b/core/test/network.go @@ -130,13 +130,52 @@ func (req *PullRequest) UnmarshalJSON(data []byte) (err error) { return } +// NetworkCensor is a interface to determine if a message should be censored. +type NetworkCensor interface { + Censor(interface{}) bool +} + +type censorClient struct { + TransportClient + + censor NetworkCensor + lock sync.RWMutex +} + +func (cc *censorClient) Send(ID types.NodeID, msg interface{}) error { + if func() bool { + cc.lock.RLock() + defer cc.lock.RUnlock() + return cc.censor.Censor(msg) + }() { + return nil + } + return cc.TransportClient.Send(ID, msg) +} + +func (cc *censorClient) Broadcast( + IDs map[types.NodeID]struct{}, latency LatencyModel, msg interface{}) error { + if func() bool { + cc.lock.RLock() + defer cc.lock.RUnlock() + return cc.censor.Censor(msg) + }() { + return nil + } + return cc.TransportClient.Broadcast(IDs, latency, msg) +} + +type dummyCensor struct{} + +func (dc *dummyCensor) Censor(interface{}) bool { return false } + // Network implements core.Network interface based on TransportClient. type Network struct { ID types.NodeID config NetworkConfig ctx context.Context ctxCancel context.CancelFunc - trans TransportClient + trans *censorClient dMoment time.Time fromTransport <-chan *TransportEnvelope toConsensus chan interface{} @@ -156,6 +195,8 @@ type Network struct { cache *utils.NodeSetCache notarySetCachesLock sync.Mutex notarySetCaches map[uint64]map[types.NodeID]struct{} + censor NetworkCensor + censorLock sync.RWMutex } // NewNetwork setup network stuffs for nodes, which provides an @@ -175,22 +216,48 @@ func NewNetwork(pubKey crypto.PublicKey, config NetworkConfig) ( notarySetCaches: make(map[uint64]map[types.NodeID]struct{}), voteCache: make( map[types.Position]map[types.VoteHeader]*types.Vote), + censor: &dummyCensor{}, } n.ctx, n.ctxCancel = context.WithCancel(context.Background()) // Construct transport layer. + var trans TransportClient switch config.Type { case NetworkTypeTCPLocal: - n.trans = NewTCPTransportClient(pubKey, config.Marshaller, true) + trans = NewTCPTransportClient(pubKey, config.Marshaller, true) case NetworkTypeTCP: - n.trans = NewTCPTransportClient(pubKey, config.Marshaller, false) + trans = NewTCPTransportClient(pubKey, config.Marshaller, false) case NetworkTypeFake: - n.trans = NewFakeTransportClient(pubKey) + trans = NewFakeTransportClient(pubKey) default: panic(fmt.Errorf("unknown network type: %v", config.Type)) } + n.trans = &censorClient{ + TransportClient: trans, + censor: &dummyCensor{}, + } return } +// SetCensor to this network module. +func (n *Network) SetCensor(censorIn, censorOut NetworkCensor) { + if censorIn == nil { + censorIn = &dummyCensor{} + } + if censorOut == nil { + censorOut = &dummyCensor{} + } + func() { + n.censorLock.Lock() + defer n.censorLock.Unlock() + n.censor = censorIn + }() + func() { + n.trans.lock.Lock() + defer n.trans.lock.Unlock() + n.trans.censor = censorOut + }() +} + // PullBlocks implements core.Network interface. func (n *Network) PullBlocks(hashes common.Hashes) { go n.pullBlocksAsync(hashes) @@ -224,6 +291,12 @@ func (n *Network) BroadcastBlock(block *types.Block) { panic(err) } n.addBlockToCache(block) + if block.IsFinalized() { + n.addBlockFinalizationToCache( + block.Hash, + block.Finalization.Height, + block.Finalization.Randomness) + } } // BroadcastAgreementResult implements core.Network interface. @@ -232,9 +305,13 @@ func (n *Network) BroadcastAgreementResult( if !n.markAgreementResultAsSent(result.BlockHash) { return } - n.addBlockRandomnessToCache(result.BlockHash, result.Randomness) + n.addBlockFinalizationToCache( + result.BlockHash, + result.FinalizationHeight, + nil, + ) notarySet := n.getNotarySet(result.Position.Round) - count := len(notarySet) * gossipAgreementResultPercent / 100 + count := len(notarySet)*gossipAgreementResultPercent/100 + 1 for nID := range notarySet { if count--; count < 0 { break @@ -303,6 +380,13 @@ func (n *Network) Setup(serverEndpoint interface{}) (err error) { } func (n *Network) dispatchMsg(e *TransportEnvelope) { + if func() bool { + n.censorLock.RLock() + defer n.censorLock.RUnlock() + return n.censor.Censor(e.Msg) + }() { + return + } msg := n.cloneForFake(e.Msg) switch v := msg.(type) { case *types.Block: @@ -539,13 +623,15 @@ func (n *Network) addBlockToCache(b *types.Block) { n.blockCache[b.Hash] = b.Clone() } -func (n *Network) addBlockRandomnessToCache(hash common.Hash, rand []byte) { +func (n *Network) addBlockFinalizationToCache( + hash common.Hash, height uint64, rand []byte) { n.blockCacheLock.Lock() defer n.blockCacheLock.Unlock() block, exist := n.blockCache[hash] if !exist { return } + block.Finalization.Height = height block.Finalization.Randomness = rand } |