diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2019-01-26 15:33:24 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-01-26 15:33:24 +0800 |
commit | 049c9820e803f0453442d858d2ef36219981bf4c (patch) | |
tree | d6a2e5ba1ed6f2bd6d9b00ce80ae1a0e775e223f /core/agreement-mgr.go | |
parent | 514eed02d017f8d8badd3e1fedb0c8b9dcffac38 (diff) | |
download | tangerine-consensus-049c9820e803f0453442d858d2ef36219981bf4c.tar.gz tangerine-consensus-049c9820e803f0453442d858d2ef36219981bf4c.tar.zst tangerine-consensus-049c9820e803f0453442d858d2ef36219981bf4c.zip |
core: Optimize message processing (#434)
* core: more strict with 'first' agreement result
* core: Fast filter randomness result and agreement result
* Optimize touchAgreementResult
* core: remove lock in checking first block randomness
* psig to go routine
* polish
* core: polish
Diffstat (limited to 'core/agreement-mgr.go')
-rw-r--r-- | core/agreement-mgr.go | 107 |
1 files changed, 55 insertions, 52 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index e07b23f..bcf1013 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -35,6 +35,8 @@ var ( ErrPreviousRoundIsNotFinished = errors.New("previous round is not finished") ) +const maxResultCache = 100 + // genValidLeader generate a validLeader function for agreement modules. func genValidLeader( mgr *agreementMgr) func(*types.Block) (bool, error) { @@ -80,26 +82,26 @@ type baRoundSetting struct { type agreementMgr struct { // TODO(mission): unbound Consensus instance from this module. - con *Consensus - ID types.NodeID - app Application - gov Governance - network Network - logger common.Logger - cache *utils.NodeSetCache - signer *utils.Signer - lattice *Lattice - ctx context.Context - lastEndTime time.Time - initRound uint64 - configs []*agreementMgrConfig - baModules []*agreement - lastBAResult []types.Position - voteFilters []*utils.VoteFilter - waitGroup sync.WaitGroup - pendingVotes map[uint64][]*types.Vote - pendingBlocks map[uint64][]*types.Block - isRunning bool + con *Consensus + ID types.NodeID + app Application + gov Governance + network Network + logger common.Logger + cache *utils.NodeSetCache + signer *utils.Signer + lattice *Lattice + ctx context.Context + lastEndTime time.Time + initRound uint64 + configs []*agreementMgrConfig + baModules []*agreement + processedBAResult map[types.Position]struct{} + voteFilters []*utils.VoteFilter + waitGroup sync.WaitGroup + pendingVotes map[uint64][]*types.Vote + pendingBlocks map[uint64][]*types.Block + isRunning bool // This lock should be used when attempting to: // - add a new baModule. @@ -115,18 +117,19 @@ type agreementMgr struct { func newAgreementMgr(con *Consensus, initRound uint64, initRoundBeginTime time.Time) *agreementMgr { return &agreementMgr{ - con: con, - ID: con.ID, - app: con.app, - gov: con.gov, - network: con.network, - logger: con.logger, - cache: con.nodeSetCache, - signer: con.signer, - lattice: con.lattice, - ctx: con.ctx, - initRound: initRound, - lastEndTime: initRoundBeginTime, + con: con, + ID: con.ID, + app: con.app, + gov: con.gov, + network: con.network, + logger: con.logger, + cache: con.nodeSetCache, + signer: con.signer, + lattice: con.lattice, + ctx: con.ctx, + initRound: initRound, + lastEndTime: initRoundBeginTime, + processedBAResult: make(map[types.Position]struct{}, maxResultCache), } } @@ -204,10 +207,6 @@ func (mgr *agreementMgr) appendConfig( recv.agreementModule = agrModule mgr.baModules = append(mgr.baModules, agrModule) mgr.voteFilters = append(mgr.voteFilters, utils.NewVoteFilter()) - mgr.lastBAResult = append(mgr.lastBAResult, types.Position{ - Round: round, - ChainID: i, - }) if mgr.isRunning { mgr.waitGroup.Add(1) go func(idx uint32) { @@ -256,19 +255,27 @@ func (mgr *agreementMgr) processBlock(b *types.Block) error { return mgr.baModules[b.Position.ChainID].processBlock(b) } -func (mgr *agreementMgr) firstAgreementResult( - result *types.AgreementResult) (bool, error) { - mgr.lock.RLock() - defer mgr.lock.RUnlock() - if result.Position.ChainID >= uint32(len(mgr.lastBAResult)) { - mgr.logger.Error("Process unknown result for unknown chain to BA", - "position", &result.Position, - "baChain", len(mgr.lastBAResult), - "baRound", len(mgr.configs), - "initRound", mgr.initRound) - return false, utils.ErrInvalidChainID +func (mgr *agreementMgr) touchAgreementResult( + result *types.AgreementResult) (first bool) { + // DO NOT LOCK THIS FUNCTION!!!!!!!! YOU WILL REGRET IT!!!!! + if _, exist := mgr.processedBAResult[result.Position]; !exist { + first = true + if len(mgr.processedBAResult) > maxResultCache { + for k := range mgr.processedBAResult { + // Randomly drop one element. + delete(mgr.processedBAResult, k) + break + } + } + mgr.processedBAResult[result.Position] = struct{}{} } - return result.Position.Newer(&mgr.lastBAResult[result.Position.ChainID]), nil + return +} + +func (mgr *agreementMgr) untouchAgreementResult( + result *types.AgreementResult) { + // DO NOT LOCK THIS FUNCTION!!!!!!!! YOU WILL REGRET IT!!!!! + delete(mgr.processedBAResult, result.Position) } func (mgr *agreementMgr) processAgreementResult( @@ -283,10 +290,6 @@ func (mgr *agreementMgr) processAgreementResult( "initRound", mgr.initRound) return utils.ErrInvalidChainID } - // TODO(jimmy): lock in this function is not safe. - if result.Position.Newer(&mgr.lastBAResult[result.Position.ChainID]) { - mgr.lastBAResult[result.Position.ChainID] = result.Position - } agreement := mgr.baModules[result.Position.ChainID] aID := agreement.agreementID() if isStop(aID) { |