// Copyright 2018 The dexon-consensus Authors
// This file is part of the dexon-consensus library.
//
// The dexon-consensus library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The dexon-consensus library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the dexon-consensus library. If not, see
// <http://www.gnu.org/licenses/>.

package test

import (
	"context"
	"encoding/base64"
	"encoding/binary"
	"encoding/json"
	"fmt"
	"io"
	"math"
	"math/rand"
	"net"
	"os"
	"strconv"
	"strings"
	"sync"
	"syscall"
	"time"

	"github.com/dexon-foundation/dexon-consensus/common"
	"github.com/dexon-foundation/dexon-consensus/core/crypto"
	"github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa"
	"github.com/dexon-foundation/dexon-consensus/core/types"
	"github.com/dexon-foundation/dexon-consensus/core/types/dkg"
)

const (
	tcpThroughputReportNum = 10
)

type tcpHandshake struct {
	DMoment time.Time
	Peers   map[types.NodeID]string
}

type tcpPeerRecord struct {
	conn        string
	sendChannel chan<- []byte
	pubKey      crypto.PublicKey
}

// tcpMessage is the general message between peers and server.
type tcpMessage struct {
	NodeID types.NodeID `json:"nid"`
	Type   string       `json:"type"`
	Info   string       `json:"conn"`
}

// BlockEventMessage is for monitoring block events' time.
type BlockEventMessage struct {
	BlockHash  common.Hash `json:"hash"`
	Timestamps []time.Time `json:"timestamps"`
}

// buildPeerInfo is a tricky way to combine connection string and
// base64 encoded byte slice for public key into a single string,
// separated by ';'.
func buildPeerInfo(pubKey crypto.PublicKey, conn string) string {
	return conn + ";" + base64.StdEncoding.EncodeToString(pubKey.Bytes())
}

// parsePeerInfo parse connection string and base64 encoded public key built
// via buildPeerInfo.
func parsePeerInfo(info string) (key crypto.PublicKey, conn string) {
	tokens := strings.Split(info, ";")
	conn = tokens[0]
	data, err := base64.StdEncoding.DecodeString(tokens[1])
	if err != nil {
		panic(err)
	}
	key, err = ecdsa.NewPublicKeyFromByteSlice(data)
	if err != nil {
		panic(err)
	}
	return
}

var (
	// ErrTCPHandShakeFail is reported if the tcp handshake fails.
	ErrTCPHandShakeFail = fmt.Errorf("tcp handshake fail")

	// ErrConnectToUnexpectedPeer is reported if connect to unexpected peer.
	ErrConnectToUnexpectedPeer = fmt.Errorf("connect to unexpected peer")

	// ErrMessageOverflow is reported if the message is too long.
	ErrMessageOverflow = fmt.Errorf("message size overflow")
)

// TCPTransport implements Transport interface via TCP connection.
type TCPTransport struct {
	peerType          TransportPeerType
	nID               types.NodeID
	pubKey            crypto.PublicKey
	localPort         int
	peers             map[types.NodeID]*tcpPeerRecord
	peersLock         sync.RWMutex
	recvChannel       chan *TransportEnvelope
	ctx               context.Context
	cancel            context.CancelFunc
	marshaller        Marshaller
	throughputRecords []ThroughputRecord
	throughputLock    sync.Mutex
	dMoment           time.Time
}

// NewTCPTransport constructs an TCPTransport instance.
func NewTCPTransport(peerType TransportPeerType, pubKey crypto.PublicKey,
	marshaller Marshaller, localPort int) *TCPTransport {
	ctx, cancel := context.WithCancel(context.Background())
	return &TCPTransport{
		peerType:          peerType,
		nID:               types.NewNodeID(pubKey),
		pubKey:            pubKey,
		peers:             make(map[types.NodeID]*tcpPeerRecord),
		recvChannel:       make(chan *TransportEnvelope, 1000),
		ctx:               ctx,
		cancel:            cancel,
		localPort:         localPort,
		marshaller:        marshaller,
		throughputRecords: []ThroughputRecord{},
	}
}

const handshakeMsg = "Welcome to DEXON network for test."

func (t *TCPTransport) serverHandshake(conn net.Conn) (
	nID types.NodeID, err error) {
	if err := conn.SetDeadline(time.Now().Add(3 * time.Second)); err != nil {
		panic(err)
	}
	msg := &tcpMessage{
		NodeID: t.nID,
		Type:   "handshake",
		Info:   handshakeMsg,
	}
	var payload []byte
	payload, err = json.Marshal(msg)
	if err != nil {
		return
	}
	if err = t.write(conn, payload); err != nil {
		return
	}
	if payload, err = t.read(conn); err != nil {
		return
	}
	if err = json.Unmarshal(payload, &msg); err != nil {
		return
	}
	if msg.Type != "handshake-ack" || msg.Info != handshakeMsg {
		err = ErrTCPHandShakeFail
		return
	}
	nID = msg.NodeID
	return
}

func (t *TCPTransport) clientHandshake(conn net.Conn) (
	nID types.NodeID, err error) {
	if err := conn.SetDeadline(time.Now().Add(3 * time.Second)); err != nil {
		panic(err)
	}
	var payload []byte
	if payload, err = t.read(conn); err != nil {
		return
	}
	msg := &tcpMessage{}
	if err = json.Unmarshal(payload, &msg); err != nil {
		return
	}
	if msg.Type != "handshake" || msg.Info != handshakeMsg {
		err = ErrTCPHandShakeFail
		return
	}
	nID = msg.NodeID
	msg = &tcpMessage{
		NodeID: t.nID,
		Type:   "handshake-ack",
		Info:   handshakeMsg,
	}
	payload, err = json.Marshal(msg)
	if err != nil {
		return
	}
	if err = t.write(conn, payload); err != nil {
		return
	}
	return
}

func (t *TCPTransport) send(
	endpoint types.NodeID, msg interface{}, payload []byte) {
	t.peersLock.RLock()
	defer t.peersLock.RUnlock()
	t.handleThroughputData(msg, payload)
	t.peers[endpoint].sendChannel <- payload
}

// Send implements Transport.Send method.
func (t *TCPTransport) Send(
	endpoint types.NodeID, msg interface{}) (err error) {

	payload, err := t.marshalMessage(msg)
	if err != nil {
		return
	}
	go t.send(endpoint, msg, payload)
	return
}

// Broadcast implements Transport.Broadcast method.
func (t *TCPTransport) Broadcast(endpoints map[types.NodeID]struct{},
	latency LatencyModel, msg interface{}) (err error) {
	payload, err := t.marshalMessage(msg)
	if err != nil {
		return
	}
	for nID := range endpoints {
		if nID == t.nID {
			continue
		}
		go func(ID types.NodeID) {
			time.Sleep(latency.Delay())
			t.send(ID, msg, payload)
		}(nID)
	}
	return
}

// Close implements Transport.Close method.
func (t *TCPTransport) Close() (err error) {
	// Tell all routines raised by us to die.
	t.cancel()
	// Reset peers.
	t.peersLock.Lock()
	defer t.peersLock.Unlock()
	t.peers = make(map[types.NodeID]*tcpPeerRecord)
	// Tell our user that this channel is closed.
	close(t.recvChannel)
	t.recvChannel = nil
	return
}

// Peers implements Transport.Peers method.
func (t *TCPTransport) Peers() (peers []crypto.PublicKey) {
	for _, rec := range t.peers {
		peers = append(peers, rec.pubKey)
	}
	return
}

func (t *TCPTransport) write(conn net.Conn, b []byte) (err error) {
	if len(b) > math.MaxUint32 {
		return ErrMessageOverflow
	}
	msgLength := make([]byte, 4)
	binary.LittleEndian.PutUint32(msgLength, uint32(len(b)))
	if _, err = conn.Write(msgLength); err != nil {
		return
	}
	if _, err = conn.Write(b); err != nil {
		return
	}
	return
}

func (t *TCPTransport) read(conn net.Conn) (b []byte, err error) {
	msgLength := make([]byte, 4)
	if _, err = io.ReadFull(conn, msgLength); err != nil {
		return
	}
	b = make([]byte, int(binary.LittleEndian.Uint32(msgLength)))
	if _, err = io.ReadFull(conn, b); err != nil {
		return
	}
	return
}

func (t *TCPTransport) marshalMessage(
	msg interface{}) (payload []byte, err error) {

	msgCarrier := struct {
		PeerType TransportPeerType `json:"peer_type"`
		From     types.NodeID      `json:"from"`
		Type     string            `json:"type"`
		Payload  interface{}       `json:"payload"`
	}{
		PeerType: t.peerType,
		From:     t.nID,
		Payload:  msg,
	}
	switch msg.(type) {
	case *tcpHandshake:
		msgCarrier.Type = "tcp-handshake"
	case *tcpMessage:
		msgCarrier.Type = "trans-msg"
	case []ThroughputRecord:
		msgCarrier.Type = "throughput-record"
	case *BlockEventMessage:
		msgCarrier.Type = "block-event"
	default:
		if t.marshaller == nil {
			err = fmt.Errorf("unknown msg type: %v", msg)
			break
		}
		// Delegate to user defined marshaller.
		var buff []byte
		msgCarrier.Type, buff, err = t.marshaller.Marshal(msg)
		if err != nil {
			break
		}
		msgCarrier.Payload = json.RawMessage(buff)
	}
	if err != nil {
		return
	}
	payload, err = json.Marshal(msgCarrier)
	return
}

func (t *TCPTransport) unmarshalMessage(
	payload []byte) (
	peerType TransportPeerType,
	from types.NodeID,
	msg interface{},
	err error) {

	msgCarrier := struct {
		PeerType TransportPeerType `json:"peer_type"`
		From     types.NodeID      `json:"from"`
		Type     string            `json:"type"`
		Payload  json.RawMessage   `json:"payload"`
	}{}
	if err = json.Unmarshal(payload, &msgCarrier); err != nil {
		return
	}
	peerType = msgCarrier.PeerType
	from = msgCarrier.From
	switch msgCarrier.Type {
	case "tcp-handshake":
		handshake := &tcpHandshake{}
		if err = json.Unmarshal(msgCarrier.Payload, &handshake); err != nil {
			return
		}
		msg = handshake
	case "trans-msg":
		m := &tcpMessage{}
		if err = json.Unmarshal(msgCarrier.Payload, m); err != nil {
			return
		}
		msg = m
	case "throughput-record":
		m := &[]ThroughputRecord{}
		if err = json.Unmarshal(msgCarrier.Payload, m); err != nil {
			return
		}
		msg = m
	case "block-event":
		m := &BlockEventMessage{}
		if err = json.Unmarshal(msgCarrier.Payload, m); err != nil {
			return
		}
		msg = m
	default:
		if t.marshaller == nil {
			err = fmt.Errorf("unknown msg type: %v", msgCarrier.Type)
			break
		}
		msg, err = t.marshaller.Unmarshal(msgCarrier.Type, msgCarrier.Payload)
	}
	return
}

// connReader is a reader routine to read from a TCP connection.
func (t *TCPTransport) connReader(conn net.Conn) {
	defer func() {
		if err := conn.Close(); err != nil {
			panic(err)
		}
	}()

	var (
		err     error
		payload []byte
	)

	checkErr := func(err error) (toBreak bool) {
		if err == io.EOF {
			toBreak = true
			return
		}
		// Check if timeout.
		nErr, ok := err.(*net.OpError)
		if !ok {
			panic(err)
		}
		if !nErr.Timeout() {
			panic(err)
		}
		return
	}
Loop:
	for {
		select {
		case <-t.ctx.Done():
			break Loop
		default:
		}
		// Add timeout when reading to check if shutdown.
		if err := conn.SetReadDeadline(
			time.Now().Add(2 * time.Second)); err != nil {

			panic(err)
		}
		// Read message length.
		if payload, err = t.read(conn); err != nil {
			if checkErr(err) {
				break
			}
			continue
		}
		peerType, from, msg, err := t.unmarshalMessage(payload)
		if err != nil {
			panic(err)
		}
		t.recvChannel <- &TransportEnvelope{
			PeerType: peerType,
			From:     from,
			Msg:      msg,
		}
	}
}

// connWriter is a writer routine to write to TCP connection.
func (t *TCPTransport) connWriter(conn net.Conn) chan<- []byte {
	// Disable write deadline.
	if err := conn.SetWriteDeadline(time.Time{}); err != nil {
		panic(err)
	}

	ch := make(chan []byte, 1000)
	go func() {
		defer func() {
			close(ch)
			if err := conn.Close(); err != nil {
				panic(err)
			}
		}()
		for {
			select {
			case <-t.ctx.Done():
				return
			default:
			}
			select {
			case <-t.ctx.Done():
				return
			case msg := <-ch:
				// Send message length in uint32.
				if err := t.write(conn, msg); err != nil {
					panic(err)
				}
			}
		}
	}()
	return ch
}

// listenerRoutine is a routine to accept incoming request for TCP connection.
func (t *TCPTransport) listenerRoutine(listener *net.TCPListener) {
	closed := false
	defer func() {
		if closed {
			return
		}
		if err := listener.Close(); err != nil {
			panic(err)
		}
	}()
	for {
		select {
		case <-t.ctx.Done():
			return
		default:
		}

		if err := listener.SetDeadline(time.Now().Add(5 * time.Second)); err != nil {
			panic(err)
		}
		conn, err := listener.Accept()
		if err != nil {
			// Check if the connection is closed.
			if strings.Contains(err.Error(), "use of closed network connection") {
				closed = true
				return
			}
			// Check if timeout error.
			nErr, ok := err.(*net.OpError)
			if !ok {
				panic(err)
			}
			if !nErr.Timeout() {
				panic(err)
			}
			continue
		}
		if _, err := t.serverHandshake(conn); err != nil {
			fmt.Println(err)
			continue
		}
		go t.connReader(conn)
	}
}

// buildConnectionToPeers constructs TCP connections to each peer.
// Although TCP connection could be used for both read/write operation,
// we only utilize the write part for simplicity.
func (t *TCPTransport) buildConnectionsToPeers() (err error) {
	var wg sync.WaitGroup
	var errs []error
	var errsLock sync.Mutex
	addErr := func(err error) {
		errsLock.Lock()
		defer errsLock.Unlock()
		errs = append(errs, err)
	}
	for nID, rec := range t.peers {
		if nID == t.nID {
			continue
		}
		wg.Add(1)
		go func(nID types.NodeID, addr string) {
			defer wg.Done()
			conn, localErr := net.Dial("tcp", addr)
			if localErr != nil {
				addErr(localErr)
				return
			}
			serverID, localErr := t.clientHandshake(conn)
			if localErr != nil {
				addErr(localErr)
				return
			}
			if nID != serverID {
				addErr(ErrConnectToUnexpectedPeer)
				return
			}
			t.peersLock.Lock()
			defer t.peersLock.Unlock()
			t.peers[nID].sendChannel = t.connWriter(conn)
		}(nID, rec.conn)
	}
	wg.Wait()
	if len(errs) > 0 {
		// Propagate this error to outside, at least one error
		// could be returned to caller.
		err = errs[0]
	}
	return
}

// ThroughputRecord records the network throughput data.
type ThroughputRecord struct {
	Type string    `json:"type"`
	Size int       `json:"size"`
	Time time.Time `json:"time"`
}

// TCPTransportClient implement TransportClient base on TCP connection.
type TCPTransportClient struct {
	TCPTransport
	local              bool
	serverWriteChannel chan<- []byte
}

// NewTCPTransportClient constructs a TCPTransportClient instance.
func NewTCPTransportClient(
	pubKey crypto.PublicKey,
	marshaller Marshaller,
	local bool) *TCPTransportClient {

	return &TCPTransportClient{
		TCPTransport: *NewTCPTransport(TransportPeer, pubKey, marshaller, 8080),
		local:        local,
	}
}

// Report implements TransportClient.Report method.
func (t *TCPTransportClient) Report(msg interface{}) (err error) {
	payload, err := t.marshalMessage(msg)
	if err != nil {
		return
	}
	go func() {
		t.serverWriteChannel <- payload
	}()
	return
}

// Join implements TransportClient.Join method.
func (t *TCPTransportClient) Join(
	serverEndpoint interface{}) (ch <-chan *TransportEnvelope, err error) {
	// Initiate a TCP server.
	// TODO(mission): config initial listening port.
	var (
		ln        net.Listener
		envelopes = []*TransportEnvelope{}
		ok        bool
		addr      string
		conn      string
	)
	for {
		addr = net.JoinHostPort("0.0.0.0", strconv.Itoa(t.localPort))
		ln, err = net.Listen("tcp", addr)
		if err == nil {
			go t.listenerRoutine(ln.(*net.TCPListener))
			// It is possible to listen on the same port in some platform.
			// Check if this one is actually listening.
			testConn, e := net.Dial("tcp", addr)
			if e != nil {
				err = e
				return
			}
			nID, e := t.clientHandshake(testConn)
			if e != nil {
				err = e
				return
			}
			if nID == t.nID {
				break
			}
			// #nosec G104
			ln.Close()
		}
		if err != nil {
			if !t.local {
				return
			}
			// In local-tcp, retry with other port when the address is in use.
			operr, ok := err.(*net.OpError)
			if !ok {
				panic(err)
			}
			oserr, ok := operr.Err.(*os.SyscallError)
			if !ok {
				panic(operr)
			}
			errno, ok := oserr.Err.(syscall.Errno)
			if !ok {
				panic(oserr)
			}
			if errno != syscall.EADDRINUSE {
				panic(errno)
			}
		}
		// The port is used, generate another port randomly.
		t.localPort = 1024 + rand.Int()%1024 // #nosec G404
	}

	fmt.Println("Connecting to server", "endpoint", serverEndpoint)
	serverConn, err := net.Dial("tcp", serverEndpoint.(string))
	if err != nil {
		return
	}
	_, err = t.clientHandshake(serverConn)
	if err != nil {
		return
	}
	t.serverWriteChannel = t.connWriter(serverConn)
	if t.local {
		conn = addr
	} else {
		// Find my IP.
		var ip string
		if ip, err = FindMyIP(); err != nil {
			return
		}
		conn = net.JoinHostPort(ip, strconv.Itoa(t.localPort))
	}
	if err = t.Report(&tcpMessage{
		NodeID: t.nID,
		Type:   "conn",
		Info:   buildPeerInfo(t.pubKey, conn),
	}); err != nil {
		return
	}
	// Wait for peers list sent by server.
	e := <-t.recvChannel
	handshake, ok := e.Msg.(*tcpHandshake)
	if !ok {
		panic(fmt.Errorf("expect handshake, not %v", e))
	}
	t.dMoment = handshake.DMoment
	// Setup peers information.
	for nID, info := range handshake.Peers {
		pubKey, conn := parsePeerInfo(info)
		t.peers[nID] = &tcpPeerRecord{
			conn:   conn,
			pubKey: pubKey,
		}
	}
	// Setup connections to other peers.
	if err = t.buildConnectionsToPeers(); err != nil {
		return
	}
	// Report to server that the connections to other peers are ready.
	if err = t.Report(&tcpMessage{
		Type:   "conn-ready",
		NodeID: t.nID,
	}); err != nil {
		return
	}
	// Wait for server to ack us that all peers are ready.
	for {
		e := <-t.recvChannel
		msg, ok := e.Msg.(*tcpMessage)
		if !ok {
			envelopes = append(envelopes, e)
			continue
		}
		if msg.Type != "all-ready" {
			err = fmt.Errorf("expected ready message, but %v", msg)
			return
		}
		break
	}
	// Replay those messages sent before peer list and ready-ack.
	for _, e := range envelopes {
		t.recvChannel <- e
	}
	ch = t.recvChannel
	return
}

// Send calls TCPTransport's Send, and send the throughput data to peer server.
func (t *TCPTransportClient) Send(
	endpoint types.NodeID, msg interface{}) (err error) {

	if err := t.TCPTransport.Send(endpoint, msg); err != nil {
		return err
	}
	if len(t.throughputRecords) > tcpThroughputReportNum {
		t.throughputLock.Lock()
		defer t.throughputLock.Unlock()
		if err := t.Report(t.throughputRecords); err != nil {
			panic(err)
		}
		t.throughputRecords = t.throughputRecords[:0]
	}
	return
}

// DMoment implments TransportClient.
func (t *TCPTransportClient) DMoment() time.Time {
	return t.dMoment
}

// TCPTransportServer implements TransportServer via TCP connections.
type TCPTransportServer struct {
	TCPTransport
}

// NewTCPTransportServer constructs TCPTransportServer instance.
func NewTCPTransportServer(
	marshaller Marshaller,
	serverPort int) *TCPTransportServer {

	prvKey, err := ecdsa.NewPrivateKey()
	if err != nil {
		panic(err)
	}
	return &TCPTransportServer{
		// NOTE: the assumption here is the node ID of peers
		//       won't be zero.
		TCPTransport: *NewTCPTransport(
			TransportPeerServer, prvKey.PublicKey(), marshaller, serverPort),
	}
}

// Host implements TransportServer.Host method.
func (t *TCPTransportServer) Host() (chan *TransportEnvelope, error) {
	// The port of peer server should be known to other peers,
	// if we can listen on the pre-defiend part, we don't have to
	// retry with other random ports.
	ln, err := net.Listen(
		"tcp", net.JoinHostPort("0.0.0.0", strconv.Itoa(t.localPort)))
	if err != nil {
		return nil, err
	}
	go t.listenerRoutine(ln.(*net.TCPListener))
	return t.recvChannel, nil
}

// SetDMoment implements TransportServer.SetDMoment method.
func (t *TCPTransportServer) SetDMoment(dMoment time.Time) {
	t.dMoment = dMoment
}

// WaitForPeers implements TransportServer.WaitForPeers method.
func (t *TCPTransportServer) WaitForPeers(numPeers uint32) (err error) {
	// Collect peers info. Packets other than peer info is
	// unexpected.
	peersInfo := make(map[types.NodeID]string)
	for {
		// Wait for connection info reported by peers.
		e := <-t.recvChannel
		msg, ok := e.Msg.(*tcpMessage)
		if !ok {
			panic(fmt.Errorf("expect tcpMessage, not %v", e))
		}
		if msg.Type != "conn" {
			panic(fmt.Errorf("expect connection report, not %v", e))
		}
		pubKey, conn := parsePeerInfo(msg.Info)
		fmt.Println("Peer connected", "peer", conn)
		t.peers[msg.NodeID] = &tcpPeerRecord{
			conn:   conn,
			pubKey: pubKey,
		}
		peersInfo[msg.NodeID] = msg.Info
		// Check if we already collect enought peers.
		if uint32(len(peersInfo)) == numPeers {
			break
		}
	}
	// Send collected peers back to them.
	if err = t.buildConnectionsToPeers(); err != nil {
		return
	}
	peers := make(map[types.NodeID]struct{})
	for ID := range t.peers {
		peers[ID] = struct{}{}
	}
	handshake := &tcpHandshake{
		DMoment: t.dMoment,
		Peers:   peersInfo,
	}
	if err = t.Broadcast(peers, &FixedLatencyModel{}, handshake); err != nil {
		return
	}
	// Wait for peers to send 'ready' report.
	readies := make(map[types.NodeID]struct{})
	for {
		e := <-t.recvChannel
		msg, ok := e.Msg.(*tcpMessage)
		if !ok {
			panic(fmt.Errorf("expect tcpMessage, not %v", e))
		}
		if msg.Type != "conn-ready" {
			panic(fmt.Errorf("expect connection ready, not %v", e))
		}
		if _, reported := readies[msg.NodeID]; reported {
			panic(fmt.Errorf("already report conn-ready message: %v", e))
		}
		readies[msg.NodeID] = struct{}{}
		if uint32(len(readies)) == numPeers {
			break
		}
	}
	// Ack all peers ready to go.
	if err = t.Broadcast(peers, &FixedLatencyModel{},
		&tcpMessage{Type: "all-ready"}); err != nil {
		return
	}
	return
}

func (t *TCPTransport) handleThroughputData(msg interface{}, payload []byte) {
	sentTime := time.Now()
	t.throughputLock.Lock()
	defer t.throughputLock.Unlock()
	recordType := ""
	switch msg.(type) {
	case *types.Vote:
		recordType = "vote"
	case *types.Block:
		recordType = "block"
	case *types.AgreementResult:
		recordType = "agreement_result"
	case *dkg.PartialSignature:
		recordType = "partial_sig"
	}
	if len(recordType) > 0 {
		t.throughputRecords = append(t.throughputRecords, ThroughputRecord{
			Type: recordType,
			Time: sentTime,
			Size: len(payload),
		})
	}
}