diff options
Diffstat (limited to 'core/test/tcp-transport.go')
-rw-r--r-- | core/test/tcp-transport.go | 69 |
1 files changed, 28 insertions, 41 deletions
diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go index b16bbcb..e04ba53 100644 --- a/core/test/tcp-transport.go +++ b/core/test/tcp-transport.go @@ -109,20 +109,14 @@ type TCPTransport struct { recvChannel chan *TransportEnvelope ctx context.Context cancel context.CancelFunc - latency LatencyModel marshaller Marshaller throughputRecords []ThroughputRecord throughputLock sync.Mutex } // NewTCPTransport constructs an TCPTransport instance. -func NewTCPTransport( - peerType TransportPeerType, - pubKey crypto.PublicKey, - latency LatencyModel, - marshaller Marshaller, - localPort int) *TCPTransport { - +func NewTCPTransport(peerType TransportPeerType, pubKey crypto.PublicKey, + marshaller Marshaller, localPort int) *TCPTransport { ctx, cancel := context.WithCancel(context.Background()) return &TCPTransport{ peerType: peerType, @@ -133,7 +127,6 @@ func NewTCPTransport( ctx: ctx, cancel: cancel, localPort: localPort, - latency: latency, marshaller: marshaller, throughputRecords: []ThroughputRecord{}, } @@ -202,6 +195,14 @@ func (t *TCPTransport) clientHandshake(conn net.Conn) ( return } +func (t *TCPTransport) send( + endpoint types.NodeID, msg interface{}, payload []byte) { + t.peersLock.RLock() + defer t.peersLock.RUnlock() + t.handleThroughputData(msg, payload) + t.peers[endpoint].sendChannel <- payload +} + // Send implements Transport.Send method. func (t *TCPTransport) Send( endpoint types.NodeID, msg interface{}) (err error) { @@ -210,38 +211,25 @@ func (t *TCPTransport) Send( if err != nil { return } - go func() { - if t.latency != nil { - time.Sleep(t.latency.Delay()) - } - t.peersLock.RLock() - defer t.peersLock.RUnlock() - t.handleThroughputData(msg, payload) - t.peers[endpoint].sendChannel <- payload - }() + go t.send(endpoint, msg, payload) return } // Broadcast implements Transport.Broadcast method. -func (t *TCPTransport) Broadcast(msg interface{}) (err error) { +func (t *TCPTransport) Broadcast(endpoints map[types.NodeID]struct{}, + latency LatencyModel, msg interface{}) (err error) { payload, err := t.marshalMessage(msg) if err != nil { return } - t.peersLock.RLock() - defer t.peersLock.RUnlock() - - for nID, rec := range t.peers { + for nID := range endpoints { if nID == t.nID { continue } - go func(ch chan<- []byte) { - if t.latency != nil { - time.Sleep(t.latency.Delay()) - } - t.handleThroughputData(msg, payload) - ch <- payload - }(rec.sendChannel) + go func(ID types.NodeID) { + time.Sleep(latency.Delay()) + t.send(ID, msg, payload) + }(nID) } return } @@ -585,14 +573,12 @@ type TCPTransportClient struct { // NewTCPTransportClient constructs a TCPTransportClient instance. func NewTCPTransportClient( pubKey crypto.PublicKey, - latency LatencyModel, marshaller Marshaller, local bool) *TCPTransportClient { return &TCPTransportClient{ - TCPTransport: *NewTCPTransport( - TransportPeer, pubKey, latency, marshaller, 8080), - local: local, + TCPTransport: *NewTCPTransport(TransportPeer, pubKey, marshaller, 8080), + local: local, } } @@ -776,11 +762,7 @@ func NewTCPTransportServer( // NOTE: the assumption here is the node ID of peers // won't be zero. TCPTransport: *NewTCPTransport( - TransportPeerServer, - prvKey.PublicKey(), - nil, - marshaller, - serverPort), + TransportPeerServer, prvKey.PublicKey(), marshaller, serverPort), } } @@ -828,7 +810,11 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) { if err = t.buildConnectionsToPeers(); err != nil { return } - if err = t.Broadcast(peersInfo); err != nil { + peers := make(map[types.NodeID]struct{}) + for ID := range t.peers { + peers[ID] = struct{}{} + } + if err = t.Broadcast(peers, &FixedLatencyModel{}, peersInfo); err != nil { return } // Wait for peers to send 'ready' report. @@ -851,7 +837,8 @@ func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) { } } // Ack all peers ready to go. - if err = t.Broadcast(&tcpMessage{Type: "all-ready"}); err != nil { + if err = t.Broadcast(peers, &FixedLatencyModel{}, + &tcpMessage{Type: "all-ready"}); err != nil { return } return |