aboutsummaryrefslogtreecommitdiffstats
path: root/core/test/network.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/test/network.go')
-rw-r--r--core/test/network.go33
1 files changed, 27 insertions, 6 deletions
diff --git a/core/test/network.go b/core/test/network.go
index f32c27f..c0ec255 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -179,8 +179,9 @@ type Network struct {
trans *censorClient
dMoment time.Time
fromTransport <-chan *TransportEnvelope
- toConsensus chan interface{}
+ toConsensus chan types.Msg
toNode chan interface{}
+ badPeerChan chan interface{}
sentAgreementLock sync.Mutex
sentAgreement map[common.Hash]struct{}
blockCacheLock sync.RWMutex
@@ -208,8 +209,9 @@ func NewNetwork(pubKey crypto.PublicKey, config NetworkConfig) (
n = &Network{
ID: types.NewNodeID(pubKey),
config: config,
- toConsensus: make(chan interface{}, 1000),
+ toConsensus: make(chan types.Msg, 1000),
toNode: make(chan interface{}, 1000),
+ badPeerChan: make(chan interface{}, 1000),
sentAgreement: make(map[common.Hash]struct{}),
blockCache: make(map[common.Hash]*types.Block, maxBlockCache),
unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
@@ -348,7 +350,7 @@ func (n *Network) BroadcastDKGPartialSignature(
}
// ReceiveChan implements core.Network interface.
-func (n *Network) ReceiveChan() <-chan interface{} {
+func (n *Network) ReceiveChan() <-chan types.Msg {
return n.toConsensus
}
@@ -396,14 +398,23 @@ func (n *Network) dispatchMsg(e *TransportEnvelope) {
}
delete(n.unreceivedBlocks, v.Hash)
}()
- n.toConsensus <- v
+ n.toConsensus <- types.Msg{
+ PeerID: e.From,
+ Payload: v,
+ }
case *types.Vote:
// Add this vote to cache.
n.addVoteToCache(v)
- n.toConsensus <- v
+ n.toConsensus <- types.Msg{
+ PeerID: e.From,
+ Payload: v,
+ }
case *types.AgreementResult,
*typesDKG.PrivateShare, *typesDKG.PartialSignature:
- n.toConsensus <- v
+ n.toConsensus <- types.Msg{
+ PeerID: e.From,
+ Payload: v,
+ }
case packedStateChanges:
if n.stateModule == nil {
panic(errors.New(
@@ -466,6 +477,11 @@ Loop:
default:
}
select {
+ case peer := <-n.badPeerChan:
+ if peer == nil {
+ continue Loop
+ }
+ n.trans.Disconnect(peer.(types.NodeID))
case <-n.ctx.Done():
break Loop
case e, ok := <-n.fromTransport:
@@ -535,6 +551,11 @@ func (n *Network) PurgeNodeSetCache(round uint64) {
n.cache.Purge(round)
}
+// ReportBadPeerChan reports that a peer is sending bad message.
+func (n *Network) ReportBadPeerChan() chan<- interface{} {
+ return n.badPeerChan
+}
+
func (n *Network) pullBlocksAsync(hashes common.Hashes) {
// Setup notification channels for each block hash.
notYetReceived := make(map[common.Hash]struct{})