aboutsummaryrefslogtreecommitdiffstats
path: root/simulation/tcp-network.go
diff options
context:
space:
mode:
Diffstat (limited to 'simulation/tcp-network.go')
-rw-r--r--simulation/tcp-network.go70
1 files changed, 36 insertions, 34 deletions
diff --git a/simulation/tcp-network.go b/simulation/tcp-network.go
index bb63bd1..f30284b 100644
--- a/simulation/tcp-network.go
+++ b/simulation/tcp-network.go
@@ -239,12 +239,40 @@ func (n *TCPNetwork) Join(endpoint Endpoint) chan interface{} {
}
// Send sends a msg to another client.
-func (n *TCPNetwork) Send(destID types.ValidatorID, msg interface{}) {
+func (n *TCPNetwork) Send(destID types.ValidatorID, messageJSON []byte) {
clientAddr, exists := n.endpoints[destID]
if !exists {
return
}
+ msgURL := fmt.Sprintf("http://%s/msg", clientAddr)
+ go func() {
+ time.Sleep(n.model.Delay())
+ for i := 0; i < retries; i++ {
+ req, err := http.NewRequest(
+ http.MethodPost, msgURL, strings.NewReader(string(messageJSON)))
+ if err != nil {
+ continue
+ }
+ req.Header.Add("ID", n.endpoint.GetID().String())
+
+ resp, err := n.client.Do(req)
+ if err == nil {
+ defer resp.Body.Close()
+ io.Copy(ioutil.Discard, resp.Body)
+ }
+ if err == nil && resp.StatusCode == http.StatusOK {
+ runtime.Goexit()
+ }
+
+ fmt.Printf("failed to submit message: %s\n", err)
+ time.Sleep(1 * time.Second)
+ }
+ fmt.Printf("failed to send message: %v\n", string(messageJSON))
+ }()
+}
+
+func (n *TCPNetwork) marshalMessage(msg interface{}) (messageJSON []byte) {
message := struct {
Type string `json:"type"`
Payload interface{} `json:"payload"`
@@ -270,65 +298,39 @@ func (n *TCPNetwork) Send(destID types.ValidatorID, msg interface{}) {
fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, message)
return
}
-
- msgURL := fmt.Sprintf("http://%s/msg", clientAddr)
-
- go func() {
- time.Sleep(n.model.Delay())
- for i := 0; i < retries; i++ {
- req, err := http.NewRequest(
- http.MethodPost, msgURL, strings.NewReader(string(messageJSON)))
- if err != nil {
- continue
- }
- req.Header.Add("ID", n.endpoint.GetID().String())
-
- resp, err := n.client.Do(req)
- if err == nil {
- defer resp.Body.Close()
- io.Copy(ioutil.Discard, resp.Body)
- }
- if err == nil && resp.StatusCode == http.StatusOK {
- runtime.Goexit()
- }
-
- fmt.Printf("failed to submit message: %s\n", err)
- time.Sleep(1 * time.Second)
- }
- fmt.Printf("failed to send message: %v\n", msg)
- }()
+ return
}
// BroadcastBlock broadcast blocks into the network.
func (n *TCPNetwork) BroadcastBlock(block *types.Block) {
- block = block.Clone()
+ payload := n.marshalMessage(block)
for endpoint := range n.endpoints {
if endpoint == block.ProposerID {
continue
}
- n.Send(endpoint, block)
+ n.Send(endpoint, payload)
}
}
// BroadcastNotaryAck broadcast notaryAck into the network.
func (n *TCPNetwork) BroadcastNotaryAck(notaryAck *types.NotaryAck) {
- notaryAck = notaryAck.Clone()
+ payload := n.marshalMessage(notaryAck)
for endpoint := range n.endpoints {
if endpoint == notaryAck.ProposerID {
continue
}
- n.Send(endpoint, notaryAck)
+ n.Send(endpoint, payload)
}
}
// BroadcastVote broadcast vote into the network.
func (n *TCPNetwork) BroadcastVote(vote *types.Vote) {
- vote = vote.Clone()
+ payload := n.marshalMessage(vote)
for endpoint := range n.endpoints {
if endpoint == vote.ProposerID {
continue
}
- n.Send(endpoint, vote)
+ n.Send(endpoint, payload)
}
}