diff options
Diffstat (limited to 'core/test/tcp-transport.go')
-rw-r--r-- | core/test/tcp-transport.go | 86 |
1 files changed, 43 insertions, 43 deletions
diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go index 2afea14..8bbaf9c 100644 --- a/core/test/tcp-transport.go +++ b/core/test/tcp-transport.go @@ -37,18 +37,18 @@ import ( // tcpMessage is the general message between peers and server. type tcpMessage struct { - ValidatorID types.ValidatorID `json:"vid"` - Type string `json:"type"` - Info string `json:"conn"` + NodeID types.NodeID `json:"nid"` + Type string `json:"type"` + Info string `json:"conn"` } // TCPTransport implements Transport interface via TCP connection. type TCPTransport struct { peerType TransportPeerType - vID types.ValidatorID + nID types.NodeID localPort int - peersInfo map[types.ValidatorID]string - peers map[types.ValidatorID]chan<- []byte + peersInfo map[types.NodeID]string + peers map[types.NodeID]chan<- []byte peersLock sync.RWMutex recvChannel chan *TransportEnvelope ctx context.Context @@ -60,7 +60,7 @@ type TCPTransport struct { // NewTCPTransport constructs an TCPTransport instance. func NewTCPTransport( peerType TransportPeerType, - vID types.ValidatorID, + nID types.NodeID, latency LatencyModel, marshaller Marshaller, localPort int) *TCPTransport { @@ -68,9 +68,9 @@ func NewTCPTransport( ctx, cancel := context.WithCancel(context.Background()) return &TCPTransport{ peerType: peerType, - vID: vID, - peersInfo: make(map[types.ValidatorID]string), - peers: make(map[types.ValidatorID]chan<- []byte), + nID: nID, + peersInfo: make(map[types.NodeID]string), + peers: make(map[types.NodeID]chan<- []byte), recvChannel: make(chan *TransportEnvelope, 1000), ctx: ctx, cancel: cancel, @@ -82,7 +82,7 @@ func NewTCPTransport( // Send implements Transport.Send method. func (t *TCPTransport) Send( - endpoint types.ValidatorID, msg interface{}) (err error) { + endpoint types.NodeID, msg interface{}) (err error) { payload, err := t.marshalMessage(msg) if err != nil { @@ -110,8 +110,8 @@ func (t *TCPTransport) Broadcast(msg interface{}) (err error) { t.peersLock.RLock() defer t.peersLock.RUnlock() - for vID, ch := range t.peers { - if vID == t.vID { + for nID, ch := range t.peers { + if nID == t.nID { continue } go func(ch chan<- []byte) { @@ -131,7 +131,7 @@ func (t *TCPTransport) Close() (err error) { // Reset peers. t.peersLock.Lock() defer t.peersLock.Unlock() - t.peers = make(map[types.ValidatorID]chan<- []byte) + t.peers = make(map[types.NodeID]chan<- []byte) // Tell our user that this channel is closed. close(t.recvChannel) t.recvChannel = nil @@ -139,10 +139,10 @@ func (t *TCPTransport) Close() (err error) { } // Peers implements Transport.Peers method. -func (t *TCPTransport) Peers() (peers map[types.ValidatorID]struct{}) { - peers = make(map[types.ValidatorID]struct{}) - for vID := range t.peersInfo { - peers[vID] = struct{}{} +func (t *TCPTransport) Peers() (peers map[types.NodeID]struct{}) { + peers = make(map[types.NodeID]struct{}) + for nID := range t.peersInfo { + peers[nID] = struct{}{} } return } @@ -152,16 +152,16 @@ func (t *TCPTransport) marshalMessage( msgCarrier := struct { PeerType TransportPeerType `json:"peer_type"` - From types.ValidatorID `json:"from"` + From types.NodeID `json:"from"` Type string `json:"type"` Payload interface{} `json:"payload"` }{ PeerType: t.peerType, - From: t.vID, + From: t.nID, Payload: msg, } switch msg.(type) { - case map[types.ValidatorID]string: + case map[types.NodeID]string: msgCarrier.Type = "peerlist" case *tcpMessage: msgCarrier.Type = "trans-msg" @@ -188,13 +188,13 @@ func (t *TCPTransport) marshalMessage( func (t *TCPTransport) unmarshalMessage( payload []byte) ( peerType TransportPeerType, - from types.ValidatorID, + from types.NodeID, msg interface{}, err error) { msgCarrier := struct { PeerType TransportPeerType `json:"peer_type"` - From types.ValidatorID `json:"from"` + From types.NodeID `json:"from"` Type string `json:"type"` Payload json.RawMessage `json:"payload"` }{} @@ -205,7 +205,7 @@ func (t *TCPTransport) unmarshalMessage( from = msgCarrier.From switch msgCarrier.Type { case "peerlist": - var peers map[types.ValidatorID]string + var peers map[types.NodeID]string if err = json.Unmarshal(msgCarrier.Payload, &peers); err != nil { return } @@ -376,12 +376,12 @@ func (t *TCPTransport) listenerRoutine(listener *net.TCPListener) { // we only utilize the write part for simplicity. func (t *TCPTransport) buildConnectionsToPeers() (err error) { var wg sync.WaitGroup - for vID, addr := range t.peersInfo { - if vID == t.vID { + for nID, addr := range t.peersInfo { + if nID == t.nID { continue } wg.Add(1) - go func(vID types.ValidatorID, addr string) { + go func(nID types.NodeID, addr string) { defer wg.Done() conn, localErr := net.Dial("tcp", addr) @@ -394,8 +394,8 @@ func (t *TCPTransport) buildConnectionsToPeers() (err error) { t.peersLock.Lock() defer t.peersLock.Unlock() - t.peers[vID] = t.connWriter(conn) - }(vID, addr) + t.peers[nID] = t.connWriter(conn) + }(nID, addr) } wg.Wait() return @@ -410,13 +410,13 @@ type TCPTransportClient struct { // NewTCPTransportClient constructs a TCPTransportClient instance. func NewTCPTransportClient( - vID types.ValidatorID, + nID types.NodeID, latency LatencyModel, marshaller Marshaller, local bool) *TCPTransportClient { return &TCPTransportClient{ - TCPTransport: *NewTCPTransport(TransportPeer, vID, latency, marshaller, 8080), + TCPTransport: *NewTCPTransport(TransportPeer, nID, latency, marshaller, 8080), local: local, } } @@ -492,15 +492,15 @@ func (t *TCPTransportClient) Join( conn = net.JoinHostPort(ip, strconv.Itoa(t.localPort)) } if err = t.Report(&tcpMessage{ - Type: "conn", - ValidatorID: t.vID, - Info: conn, + Type: "conn", + NodeID: t.nID, + Info: conn, }); err != nil { return } // Wait for peers list sent by server. e := <-t.recvChannel - if t.peersInfo, ok = e.Msg.(map[types.ValidatorID]string); !ok { + if t.peersInfo, ok = e.Msg.(map[types.NodeID]string); !ok { panic(fmt.Errorf("expect peer list, not %v", e)) } // Setup connections to other peers. @@ -509,8 +509,8 @@ func (t *TCPTransportClient) Join( } // Report to server that the connections to other peers are ready. if err = t.Report(&tcpMessage{ - Type: "conn-ready", - ValidatorID: t.vID, + Type: "conn-ready", + NodeID: t.nID, }); err != nil { return } @@ -547,11 +547,11 @@ func NewTCPTransportServer( serverPort int) *TCPTransportServer { return &TCPTransportServer{ - // NOTE: the assumption here is the validator ID of peers + // NOTE: the assumption here is the node ID of peers // won't be zero. TCPTransport: *NewTCPTransport( TransportPeerServer, - types.ValidatorID{}, + types.NodeID{}, nil, marshaller, serverPort), @@ -586,7 +586,7 @@ func (t *TCPTransportServer) WaitForPeers(numPeers int) (err error) { if msg.Type != "conn" { panic(fmt.Errorf("expect connection report, not %v", e)) } - t.peersInfo[msg.ValidatorID] = msg.Info + t.peersInfo[msg.NodeID] = msg.Info // Check if we already collect enought peers. if len(t.peersInfo) == numPeers { break @@ -600,7 +600,7 @@ func (t *TCPTransportServer) WaitForPeers(numPeers int) (err error) { return } // Wait for peers to send 'ready' report. - readies := make(map[types.ValidatorID]struct{}) + readies := make(map[types.NodeID]struct{}) for { e := <-t.recvChannel msg, ok := e.Msg.(*tcpMessage) @@ -610,10 +610,10 @@ func (t *TCPTransportServer) WaitForPeers(numPeers int) (err error) { if msg.Type != "conn-ready" { panic(fmt.Errorf("expect connection ready, not %v", e)) } - if _, reported := readies[msg.ValidatorID]; reported { + if _, reported := readies[msg.NodeID]; reported { panic(fmt.Errorf("already report conn-ready message: %v", e)) } - readies[msg.ValidatorID] = struct{}{} + readies[msg.NodeID] = struct{}{} if len(readies) == numPeers { break } |