diff options
Diffstat (limited to 'simulation/tcp-network.go')
-rw-r--r-- | simulation/tcp-network.go | 70 |
1 files changed, 36 insertions, 34 deletions
diff --git a/simulation/tcp-network.go b/simulation/tcp-network.go index bb63bd1..f30284b 100644 --- a/simulation/tcp-network.go +++ b/simulation/tcp-network.go @@ -239,12 +239,40 @@ func (n *TCPNetwork) Join(endpoint Endpoint) chan interface{} { } // Send sends a msg to another client. -func (n *TCPNetwork) Send(destID types.ValidatorID, msg interface{}) { +func (n *TCPNetwork) Send(destID types.ValidatorID, messageJSON []byte) { clientAddr, exists := n.endpoints[destID] if !exists { return } + msgURL := fmt.Sprintf("http://%s/msg", clientAddr) + go func() { + time.Sleep(n.model.Delay()) + for i := 0; i < retries; i++ { + req, err := http.NewRequest( + http.MethodPost, msgURL, strings.NewReader(string(messageJSON))) + if err != nil { + continue + } + req.Header.Add("ID", n.endpoint.GetID().String()) + + resp, err := n.client.Do(req) + if err == nil { + defer resp.Body.Close() + io.Copy(ioutil.Discard, resp.Body) + } + if err == nil && resp.StatusCode == http.StatusOK { + runtime.Goexit() + } + + fmt.Printf("failed to submit message: %s\n", err) + time.Sleep(1 * time.Second) + } + fmt.Printf("failed to send message: %v\n", string(messageJSON)) + }() +} + +func (n *TCPNetwork) marshalMessage(msg interface{}) (messageJSON []byte) { message := struct { Type string `json:"type"` Payload interface{} `json:"payload"` @@ -270,65 +298,39 @@ func (n *TCPNetwork) Send(destID types.ValidatorID, msg interface{}) { fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, message) return } - - msgURL := fmt.Sprintf("http://%s/msg", clientAddr) - - go func() { - time.Sleep(n.model.Delay()) - for i := 0; i < retries; i++ { - req, err := http.NewRequest( - http.MethodPost, msgURL, strings.NewReader(string(messageJSON))) - if err != nil { - continue - } - req.Header.Add("ID", n.endpoint.GetID().String()) - - resp, err := n.client.Do(req) - if err == nil { - defer resp.Body.Close() - io.Copy(ioutil.Discard, resp.Body) - } - if err == nil && resp.StatusCode == http.StatusOK { - runtime.Goexit() - } - - fmt.Printf("failed to submit message: %s\n", err) - time.Sleep(1 * time.Second) - } - fmt.Printf("failed to send message: %v\n", msg) - }() + return } // BroadcastBlock broadcast blocks into the network. func (n *TCPNetwork) BroadcastBlock(block *types.Block) { - block = block.Clone() + payload := n.marshalMessage(block) for endpoint := range n.endpoints { if endpoint == block.ProposerID { continue } - n.Send(endpoint, block) + n.Send(endpoint, payload) } } // BroadcastNotaryAck broadcast notaryAck into the network. func (n *TCPNetwork) BroadcastNotaryAck(notaryAck *types.NotaryAck) { - notaryAck = notaryAck.Clone() + payload := n.marshalMessage(notaryAck) for endpoint := range n.endpoints { if endpoint == notaryAck.ProposerID { continue } - n.Send(endpoint, notaryAck) + n.Send(endpoint, payload) } } // BroadcastVote broadcast vote into the network. func (n *TCPNetwork) BroadcastVote(vote *types.Vote) { - vote = vote.Clone() + payload := n.marshalMessage(vote) for endpoint := range n.endpoints { if endpoint == vote.ProposerID { continue } - n.Send(endpoint, vote) + n.Send(endpoint, payload) } } |