aboutsummaryrefslogtreecommitdiffstats
path: root/core/test/network.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/test/network.go')
-rw-r--r--core/test/network.go100
1 files changed, 93 insertions, 7 deletions
diff --git a/core/test/network.go b/core/test/network.go
index 6034fa6..c903c57 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -130,13 +130,52 @@ func (req *PullRequest) UnmarshalJSON(data []byte) (err error) {
return
}
+// NetworkCensor is a interface to determine if a message should be censored.
+type NetworkCensor interface {
+ Censor(interface{}) bool
+}
+
+type censorClient struct {
+ TransportClient
+
+ censor NetworkCensor
+ lock sync.RWMutex
+}
+
+func (cc *censorClient) Send(ID types.NodeID, msg interface{}) error {
+ if func() bool {
+ cc.lock.RLock()
+ defer cc.lock.RUnlock()
+ return cc.censor.Censor(msg)
+ }() {
+ return nil
+ }
+ return cc.TransportClient.Send(ID, msg)
+}
+
+func (cc *censorClient) Broadcast(
+ IDs map[types.NodeID]struct{}, latency LatencyModel, msg interface{}) error {
+ if func() bool {
+ cc.lock.RLock()
+ defer cc.lock.RUnlock()
+ return cc.censor.Censor(msg)
+ }() {
+ return nil
+ }
+ return cc.TransportClient.Broadcast(IDs, latency, msg)
+}
+
+type dummyCensor struct{}
+
+func (dc *dummyCensor) Censor(interface{}) bool { return false }
+
// Network implements core.Network interface based on TransportClient.
type Network struct {
ID types.NodeID
config NetworkConfig
ctx context.Context
ctxCancel context.CancelFunc
- trans TransportClient
+ trans *censorClient
dMoment time.Time
fromTransport <-chan *TransportEnvelope
toConsensus chan interface{}
@@ -156,6 +195,8 @@ type Network struct {
cache *utils.NodeSetCache
notarySetCachesLock sync.Mutex
notarySetCaches map[uint64]map[types.NodeID]struct{}
+ censor NetworkCensor
+ censorLock sync.RWMutex
}
// NewNetwork setup network stuffs for nodes, which provides an
@@ -175,22 +216,48 @@ func NewNetwork(pubKey crypto.PublicKey, config NetworkConfig) (
notarySetCaches: make(map[uint64]map[types.NodeID]struct{}),
voteCache: make(
map[types.Position]map[types.VoteHeader]*types.Vote),
+ censor: &dummyCensor{},
}
n.ctx, n.ctxCancel = context.WithCancel(context.Background())
// Construct transport layer.
+ var trans TransportClient
switch config.Type {
case NetworkTypeTCPLocal:
- n.trans = NewTCPTransportClient(pubKey, config.Marshaller, true)
+ trans = NewTCPTransportClient(pubKey, config.Marshaller, true)
case NetworkTypeTCP:
- n.trans = NewTCPTransportClient(pubKey, config.Marshaller, false)
+ trans = NewTCPTransportClient(pubKey, config.Marshaller, false)
case NetworkTypeFake:
- n.trans = NewFakeTransportClient(pubKey)
+ trans = NewFakeTransportClient(pubKey)
default:
panic(fmt.Errorf("unknown network type: %v", config.Type))
}
+ n.trans = &censorClient{
+ TransportClient: trans,
+ censor: &dummyCensor{},
+ }
return
}
+// SetCensor to this network module.
+func (n *Network) SetCensor(censorIn, censorOut NetworkCensor) {
+ if censorIn == nil {
+ censorIn = &dummyCensor{}
+ }
+ if censorOut == nil {
+ censorOut = &dummyCensor{}
+ }
+ func() {
+ n.censorLock.Lock()
+ defer n.censorLock.Unlock()
+ n.censor = censorIn
+ }()
+ func() {
+ n.trans.lock.Lock()
+ defer n.trans.lock.Unlock()
+ n.trans.censor = censorOut
+ }()
+}
+
// PullBlocks implements core.Network interface.
func (n *Network) PullBlocks(hashes common.Hashes) {
go n.pullBlocksAsync(hashes)
@@ -224,6 +291,12 @@ func (n *Network) BroadcastBlock(block *types.Block) {
panic(err)
}
n.addBlockToCache(block)
+ if block.IsFinalized() {
+ n.addBlockFinalizationToCache(
+ block.Hash,
+ block.Finalization.Height,
+ block.Finalization.Randomness)
+ }
}
// BroadcastAgreementResult implements core.Network interface.
@@ -232,9 +305,13 @@ func (n *Network) BroadcastAgreementResult(
if !n.markAgreementResultAsSent(result.BlockHash) {
return
}
- n.addBlockRandomnessToCache(result.BlockHash, result.Randomness)
+ n.addBlockFinalizationToCache(
+ result.BlockHash,
+ result.FinalizationHeight,
+ nil,
+ )
notarySet := n.getNotarySet(result.Position.Round)
- count := len(notarySet) * gossipAgreementResultPercent / 100
+ count := len(notarySet)*gossipAgreementResultPercent/100 + 1
for nID := range notarySet {
if count--; count < 0 {
break
@@ -303,6 +380,13 @@ func (n *Network) Setup(serverEndpoint interface{}) (err error) {
}
func (n *Network) dispatchMsg(e *TransportEnvelope) {
+ if func() bool {
+ n.censorLock.RLock()
+ defer n.censorLock.RUnlock()
+ return n.censor.Censor(e.Msg)
+ }() {
+ return
+ }
msg := n.cloneForFake(e.Msg)
switch v := msg.(type) {
case *types.Block:
@@ -539,13 +623,15 @@ func (n *Network) addBlockToCache(b *types.Block) {
n.blockCache[b.Hash] = b.Clone()
}
-func (n *Network) addBlockRandomnessToCache(hash common.Hash, rand []byte) {
+func (n *Network) addBlockFinalizationToCache(
+ hash common.Hash, height uint64, rand []byte) {
n.blockCacheLock.Lock()
defer n.blockCacheLock.Unlock()
block, exist := n.blockCache[hash]
if !exist {
return
}
+ block.Finalization.Height = height
block.Finalization.Randomness = rand
}