aboutsummaryrefslogtreecommitdiffstats
path: root/core/test/tcp-transport.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/test/tcp-transport.go')
-rw-r--r--core/test/tcp-transport.go86
1 files changed, 43 insertions, 43 deletions
diff --git a/core/test/tcp-transport.go b/core/test/tcp-transport.go
index 2afea14..8bbaf9c 100644
--- a/core/test/tcp-transport.go
+++ b/core/test/tcp-transport.go
@@ -37,18 +37,18 @@ import (
// tcpMessage is the general message between peers and server.
type tcpMessage struct {
- ValidatorID types.ValidatorID `json:"vid"`
- Type string `json:"type"`
- Info string `json:"conn"`
+ NodeID types.NodeID `json:"nid"`
+ Type string `json:"type"`
+ Info string `json:"conn"`
}
// TCPTransport implements Transport interface via TCP connection.
type TCPTransport struct {
peerType TransportPeerType
- vID types.ValidatorID
+ nID types.NodeID
localPort int
- peersInfo map[types.ValidatorID]string
- peers map[types.ValidatorID]chan<- []byte
+ peersInfo map[types.NodeID]string
+ peers map[types.NodeID]chan<- []byte
peersLock sync.RWMutex
recvChannel chan *TransportEnvelope
ctx context.Context
@@ -60,7 +60,7 @@ type TCPTransport struct {
// NewTCPTransport constructs an TCPTransport instance.
func NewTCPTransport(
peerType TransportPeerType,
- vID types.ValidatorID,
+ nID types.NodeID,
latency LatencyModel,
marshaller Marshaller,
localPort int) *TCPTransport {
@@ -68,9 +68,9 @@ func NewTCPTransport(
ctx, cancel := context.WithCancel(context.Background())
return &TCPTransport{
peerType: peerType,
- vID: vID,
- peersInfo: make(map[types.ValidatorID]string),
- peers: make(map[types.ValidatorID]chan<- []byte),
+ nID: nID,
+ peersInfo: make(map[types.NodeID]string),
+ peers: make(map[types.NodeID]chan<- []byte),
recvChannel: make(chan *TransportEnvelope, 1000),
ctx: ctx,
cancel: cancel,
@@ -82,7 +82,7 @@ func NewTCPTransport(
// Send implements Transport.Send method.
func (t *TCPTransport) Send(
- endpoint types.ValidatorID, msg interface{}) (err error) {
+ endpoint types.NodeID, msg interface{}) (err error) {
payload, err := t.marshalMessage(msg)
if err != nil {
@@ -110,8 +110,8 @@ func (t *TCPTransport) Broadcast(msg interface{}) (err error) {
t.peersLock.RLock()
defer t.peersLock.RUnlock()
- for vID, ch := range t.peers {
- if vID == t.vID {
+ for nID, ch := range t.peers {
+ if nID == t.nID {
continue
}
go func(ch chan<- []byte) {
@@ -131,7 +131,7 @@ func (t *TCPTransport) Close() (err error) {
// Reset peers.
t.peersLock.Lock()
defer t.peersLock.Unlock()
- t.peers = make(map[types.ValidatorID]chan<- []byte)
+ t.peers = make(map[types.NodeID]chan<- []byte)
// Tell our user that this channel is closed.
close(t.recvChannel)
t.recvChannel = nil
@@ -139,10 +139,10 @@ func (t *TCPTransport) Close() (err error) {
}
// Peers implements Transport.Peers method.
-func (t *TCPTransport) Peers() (peers map[types.ValidatorID]struct{}) {
- peers = make(map[types.ValidatorID]struct{})
- for vID := range t.peersInfo {
- peers[vID] = struct{}{}
+func (t *TCPTransport) Peers() (peers map[types.NodeID]struct{}) {
+ peers = make(map[types.NodeID]struct{})
+ for nID := range t.peersInfo {
+ peers[nID] = struct{}{}
}
return
}
@@ -152,16 +152,16 @@ func (t *TCPTransport) marshalMessage(
msgCarrier := struct {
PeerType TransportPeerType `json:"peer_type"`
- From types.ValidatorID `json:"from"`
+ From types.NodeID `json:"from"`
Type string `json:"type"`
Payload interface{} `json:"payload"`
}{
PeerType: t.peerType,
- From: t.vID,
+ From: t.nID,
Payload: msg,
}
switch msg.(type) {
- case map[types.ValidatorID]string:
+ case map[types.NodeID]string:
msgCarrier.Type = "peerlist"
case *tcpMessage:
msgCarrier.Type = "trans-msg"
@@ -188,13 +188,13 @@ func (t *TCPTransport) marshalMessage(
func (t *TCPTransport) unmarshalMessage(
payload []byte) (
peerType TransportPeerType,
- from types.ValidatorID,
+ from types.NodeID,
msg interface{},
err error) {
msgCarrier := struct {
PeerType TransportPeerType `json:"peer_type"`
- From types.ValidatorID `json:"from"`
+ From types.NodeID `json:"from"`
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
}{}
@@ -205,7 +205,7 @@ func (t *TCPTransport) unmarshalMessage(
from = msgCarrier.From
switch msgCarrier.Type {
case "peerlist":
- var peers map[types.ValidatorID]string
+ var peers map[types.NodeID]string
if err = json.Unmarshal(msgCarrier.Payload, &peers); err != nil {
return
}
@@ -376,12 +376,12 @@ func (t *TCPTransport) listenerRoutine(listener *net.TCPListener) {
// we only utilize the write part for simplicity.
func (t *TCPTransport) buildConnectionsToPeers() (err error) {
var wg sync.WaitGroup
- for vID, addr := range t.peersInfo {
- if vID == t.vID {
+ for nID, addr := range t.peersInfo {
+ if nID == t.nID {
continue
}
wg.Add(1)
- go func(vID types.ValidatorID, addr string) {
+ go func(nID types.NodeID, addr string) {
defer wg.Done()
conn, localErr := net.Dial("tcp", addr)
@@ -394,8 +394,8 @@ func (t *TCPTransport) buildConnectionsToPeers() (err error) {
t.peersLock.Lock()
defer t.peersLock.Unlock()
- t.peers[vID] = t.connWriter(conn)
- }(vID, addr)
+ t.peers[nID] = t.connWriter(conn)
+ }(nID, addr)
}
wg.Wait()
return
@@ -410,13 +410,13 @@ type TCPTransportClient struct {
// NewTCPTransportClient constructs a TCPTransportClient instance.
func NewTCPTransportClient(
- vID types.ValidatorID,
+ nID types.NodeID,
latency LatencyModel,
marshaller Marshaller,
local bool) *TCPTransportClient {
return &TCPTransportClient{
- TCPTransport: *NewTCPTransport(TransportPeer, vID, latency, marshaller, 8080),
+ TCPTransport: *NewTCPTransport(TransportPeer, nID, latency, marshaller, 8080),
local: local,
}
}
@@ -492,15 +492,15 @@ func (t *TCPTransportClient) Join(
conn = net.JoinHostPort(ip, strconv.Itoa(t.localPort))
}
if err = t.Report(&tcpMessage{
- Type: "conn",
- ValidatorID: t.vID,
- Info: conn,
+ Type: "conn",
+ NodeID: t.nID,
+ Info: conn,
}); err != nil {
return
}
// Wait for peers list sent by server.
e := <-t.recvChannel
- if t.peersInfo, ok = e.Msg.(map[types.ValidatorID]string); !ok {
+ if t.peersInfo, ok = e.Msg.(map[types.NodeID]string); !ok {
panic(fmt.Errorf("expect peer list, not %v", e))
}
// Setup connections to other peers.
@@ -509,8 +509,8 @@ func (t *TCPTransportClient) Join(
}
// Report to server that the connections to other peers are ready.
if err = t.Report(&tcpMessage{
- Type: "conn-ready",
- ValidatorID: t.vID,
+ Type: "conn-ready",
+ NodeID: t.nID,
}); err != nil {
return
}
@@ -547,11 +547,11 @@ func NewTCPTransportServer(
serverPort int) *TCPTransportServer {
return &TCPTransportServer{
- // NOTE: the assumption here is the validator ID of peers
+ // NOTE: the assumption here is the node ID of peers
// won't be zero.
TCPTransport: *NewTCPTransport(
TransportPeerServer,
- types.ValidatorID{},
+ types.NodeID{},
nil,
marshaller,
serverPort),
@@ -586,7 +586,7 @@ func (t *TCPTransportServer) WaitForPeers(numPeers int) (err error) {
if msg.Type != "conn" {
panic(fmt.Errorf("expect connection report, not %v", e))
}
- t.peersInfo[msg.ValidatorID] = msg.Info
+ t.peersInfo[msg.NodeID] = msg.Info
// Check if we already collect enought peers.
if len(t.peersInfo) == numPeers {
break
@@ -600,7 +600,7 @@ func (t *TCPTransportServer) WaitForPeers(numPeers int) (err error) {
return
}
// Wait for peers to send 'ready' report.
- readies := make(map[types.ValidatorID]struct{})
+ readies := make(map[types.NodeID]struct{})
for {
e := <-t.recvChannel
msg, ok := e.Msg.(*tcpMessage)
@@ -610,10 +610,10 @@ func (t *TCPTransportServer) WaitForPeers(numPeers int) (err error) {
if msg.Type != "conn-ready" {
panic(fmt.Errorf("expect connection ready, not %v", e))
}
- if _, reported := readies[msg.ValidatorID]; reported {
+ if _, reported := readies[msg.NodeID]; reported {
panic(fmt.Errorf("already report conn-ready message: %v", e))
}
- readies[msg.ValidatorID] = struct{}{}
+ readies[msg.NodeID] = struct{}{}
if len(readies) == numPeers {
break
}