diff options
Diffstat (limited to 'simulation/tcp-network.go')
-rw-r--r-- | simulation/tcp-network.go | 457 |
1 files changed, 0 insertions, 457 deletions
diff --git a/simulation/tcp-network.go b/simulation/tcp-network.go deleted file mode 100644 index 468baff..0000000 --- a/simulation/tcp-network.go +++ /dev/null @@ -1,457 +0,0 @@ -// Copyright 2018 The dexon-consensus-core Authors -// This file is part of the dexon-consensus-core library. -// -// The dexon-consensus-core library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus-core library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus-core library. If not, see -// <http://www.gnu.org/licenses/>. - -package simulation - -import ( - "context" - "encoding/json" - "fmt" - "io" - "io/ioutil" - "math/rand" - "net" - "net/http" - "os" - "runtime" - "strconv" - "strings" - "sync" - "syscall" - "time" - - "github.com/dexon-foundation/dexon-consensus-core/core/types" -) - -const retries = 5 - -// TCPNetwork implements the Network interface. -type TCPNetwork struct { - local bool - port int - endpoint Endpoint - client *http.Client - - peerServer string - endpointMutex sync.RWMutex - endpoints map[types.ValidatorID]string - recieveChan chan interface{} - model Model -} - -// NewTCPNetwork returns pointer to a new Network instance. -func NewTCPNetwork(local bool, peerServer string, model Model) *TCPNetwork { - pServer := peerServer - if local { - pServer = "127.0.0.1" - } - // Force connection reuse. - tr := &http.Transport{ - MaxIdleConnsPerHost: 1024, - TLSHandshakeTimeout: 0 * time.Second, - } - client := &http.Client{ - Transport: tr, - Timeout: 5 * time.Second, - } - return &TCPNetwork{ - local: local, - peerServer: pServer, - client: client, - endpoints: make(map[types.ValidatorID]string), - recieveChan: make(chan interface{}, msgBufferSize), - model: model, - } -} - -// Start starts the http server for accepting message. -func (n *TCPNetwork) Start() { - listenSuccess := make(chan struct{}) - go func() { - for { - ctx, cancel := context.WithTimeout(context.Background(), - 50*time.Millisecond) - defer cancel() - go func() { - <-ctx.Done() - if ctx.Err() != context.Canceled { - listenSuccess <- struct{}{} - } - }() - port := 1024 + rand.Int()%1024 - if !n.local { - port = peerPort - } - addr := net.JoinHostPort("0.0.0.0", strconv.Itoa(port)) - server := &http.Server{ - Addr: addr, - Handler: n, - } - - n.port = port - if err := server.ListenAndServe(); err != nil { - cancel() - if err == http.ErrServerClosed { - break - } - if !n.local { - panic(err) - } - // In local-tcp, retry with other port. - operr, ok := err.(*net.OpError) - if !ok { - panic(err) - } - oserr, ok := operr.Err.(*os.SyscallError) - if !ok { - panic(operr) - } - errno, ok := oserr.Err.(syscall.Errno) - if !ok { - panic(oserr) - } - if errno != syscall.EADDRINUSE { - panic(errno) - } - } - } - }() - <-listenSuccess - fmt.Printf("Validator started at 0.0.0.0:%d\n", n.port) -} - -// NumPeers returns the number of peers in the network. -func (n *TCPNetwork) NumPeers() int { - n.endpointMutex.Lock() - defer n.endpointMutex.Unlock() - - return len(n.endpoints) -} - -// ServerHTTP implements the http.Handler interface. -func (n *TCPNetwork) ServeHTTP(w http.ResponseWriter, r *http.Request) { - defer r.Body.Close() - body, err := ioutil.ReadAll(r.Body) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - m := struct { - Type string `json:"type"` - Payload json.RawMessage `json:"payload"` - }{} - if err := json.Unmarshal(body, &m); err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - switch m.Type { - case "block": - block := &types.Block{} - if err := json.Unmarshal(m.Payload, block); err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - n.recieveChan <- block - case "vote": - vote := &types.Vote{} - if err := json.Unmarshal(m.Payload, vote); err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - n.recieveChan <- vote - case "notaryAck": - ack := &types.NotaryAck{} - if err := json.Unmarshal(m.Payload, ack); err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - n.recieveChan <- ack - default: - w.WriteHeader(http.StatusBadRequest) - return - } -} - -// Join allow a client to join the network. It reutnrs a interface{} channel for -// the client to recieve information. -func (n *TCPNetwork) Join(endpoint Endpoint) { - n.endpointMutex.Lock() - defer n.endpointMutex.Unlock() - - n.endpoint = endpoint - - joinURL := fmt.Sprintf("http://%s:%d/join", n.peerServer, peerPort) - peersURL := fmt.Sprintf("http://%s:%d/peers", n.peerServer, peerPort) - - // Join the peer list. - for { - time.Sleep(time.Second) - - req, err := http.NewRequest(http.MethodGet, joinURL, nil) - if err != nil { - continue - } - req.Header.Add("ID", endpoint.GetID().String()) - req.Header.Add("PORT", fmt.Sprintf("%d", n.port)) - - resp, err := n.client.Do(req) - if err == nil { - defer resp.Body.Close() - io.Copy(ioutil.Discard, resp.Body) - } - if err == nil && resp.StatusCode == http.StatusOK { - break - } - } - - var peerList map[types.ValidatorID]string - - // Wait for the server to collect all validators and return a list. - for { - time.Sleep(time.Second) - - req, err := http.NewRequest(http.MethodGet, peersURL, nil) - if err != nil { - fmt.Println(err) - continue - } - resp, err := n.client.Do(req) - if err != nil || resp.StatusCode != http.StatusOK { - continue - } - - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - - if err := json.Unmarshal(body, &peerList); err != nil { - fmt.Printf("error: %v", err) - continue - } - break - } - - for key, val := range peerList { - n.endpoints[key] = val - } -} - -// ReceiveChan return the receive channel. -func (n *TCPNetwork) ReceiveChan() <-chan interface{} { - return n.recieveChan -} - -// Send sends a msg to another client. -func (n *TCPNetwork) Send(destID types.ValidatorID, messageJSON []byte) { - clientAddr, exists := n.endpoints[destID] - if !exists { - return - } - - msgURL := fmt.Sprintf("http://%s/msg", clientAddr) - go func() { - time.Sleep(n.model.Delay()) - for i := 0; i < retries; i++ { - req, err := http.NewRequest( - http.MethodPost, msgURL, strings.NewReader(string(messageJSON))) - if err != nil { - continue - } - req.Header.Add("ID", n.endpoint.GetID().String()) - - resp, err := n.client.Do(req) - if err == nil { - defer resp.Body.Close() - io.Copy(ioutil.Discard, resp.Body) - } - if err == nil && resp.StatusCode == http.StatusOK { - runtime.Goexit() - } - - fmt.Printf("failed to submit message: %s\n", err) - time.Sleep(1 * time.Second) - } - fmt.Printf("failed to send message: %v\n", string(messageJSON)) - }() -} - -func (n *TCPNetwork) marshalMessage(msg interface{}) (messageJSON []byte) { - message := struct { - Type string `json:"type"` - Payload interface{} `json:"payload"` - }{} - - switch v := msg.(type) { - case *types.Block: - message.Type = "block" - message.Payload = v - case *types.NotaryAck: - message.Type = "notaryAck" - message.Payload = v - case *types.Vote: - message.Type = "vote" - message.Payload = v - default: - fmt.Println("error: invalid message type") - return - } - - messageJSON, err := json.Marshal(message) - if err != nil { - fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, message) - return - } - return -} - -// BroadcastBlock broadcast blocks into the network. -func (n *TCPNetwork) BroadcastBlock(block *types.Block) { - payload := n.marshalMessage(block) - for endpoint := range n.endpoints { - if endpoint == block.ProposerID { - continue - } - n.Send(endpoint, payload) - } -} - -// BroadcastNotaryAck broadcast notaryAck into the network. -func (n *TCPNetwork) BroadcastNotaryAck(notaryAck *types.NotaryAck) { - payload := n.marshalMessage(notaryAck) - for endpoint := range n.endpoints { - if endpoint == notaryAck.ProposerID { - continue - } - n.Send(endpoint, payload) - } -} - -// BroadcastVote broadcast vote into the network. -func (n *TCPNetwork) BroadcastVote(vote *types.Vote) { - payload := n.marshalMessage(vote) - for endpoint := range n.endpoints { - if endpoint == vote.ProposerID { - continue - } - n.Send(endpoint, payload) - } -} - -// DeliverBlocks sends blocks to peerServer. -func (n *TCPNetwork) DeliverBlocks(blocks BlockList) { - messageJSON, err := json.Marshal(blocks) - if err != nil { - fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, blocks) - return - } - - msgURL := fmt.Sprintf("http://%s:%d/delivery", n.peerServer, peerPort) - - go func() { - for i := 0; i < retries; i++ { - req, err := http.NewRequest( - http.MethodPost, msgURL, strings.NewReader(string(messageJSON))) - if err != nil { - continue - } - req.Header.Add("ID", n.endpoint.GetID().String()) - - resp, err := n.client.Do(req) - if err == nil { - defer resp.Body.Close() - io.Copy(ioutil.Discard, resp.Body) - } - - if err == nil && resp.StatusCode == http.StatusOK { - runtime.Goexit() - } - time.Sleep(1 * time.Second) - } - fmt.Printf("failed to send message: %v\n", blocks) - }() -} - -// NotifyServer sends message to peerServer -func (n *TCPNetwork) NotifyServer(msg Message) { - messageJSON, err := json.Marshal(msg) - if err != nil { - fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, msg) - return - } - - msgURL := fmt.Sprintf("http://%s:%d/message", n.peerServer, peerPort) - - for i := 0; i < retries; i++ { - req, err := http.NewRequest( - http.MethodPost, msgURL, strings.NewReader(string(messageJSON))) - if err != nil { - continue - } - req.Header.Add("ID", n.endpoint.GetID().String()) - - resp, err := n.client.Do(req) - if err == nil { - defer resp.Body.Close() - io.Copy(ioutil.Discard, resp.Body) - } - if err == nil && resp.StatusCode == http.StatusOK { - return - } - time.Sleep(1 * time.Second) - } - fmt.Printf("failed to send message: %v\n", msg) - - return -} - -// GetServerInfo retrieve the info message from peerServer. -func (n *TCPNetwork) GetServerInfo() InfoMessage { - infoMsg := InfoMessage{} - msgURL := fmt.Sprintf("http://%s:%d/info", n.peerServer, peerPort) - - req, err := http.NewRequest( - http.MethodGet, msgURL, nil) - if err != nil { - fmt.Printf("error: %v\n", err) - } - - resp, err := n.client.Do(req) - if err != nil { - fmt.Printf("error: %v\n", err) - return infoMsg - } - if resp.StatusCode != http.StatusOK { - fmt.Printf("error: %v\n", err) - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - - if err := json.Unmarshal(body, &infoMsg); err != nil { - fmt.Printf("error: %v", err) - } - return infoMsg -} - -// Endpoints returns all validatorIDs. -func (n *TCPNetwork) Endpoints() types.ValidatorIDs { - vIDs := make(types.ValidatorIDs, 0, len(n.endpoints)) - for vID := range n.endpoints { - vIDs = append(vIDs, vID) - } - return vIDs -} |