aboutsummaryrefslogtreecommitdiffstats
path: root/core/syncer/agreement.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/syncer/agreement.go')
-rw-r--r--core/syncer/agreement.go119
1 files changed, 92 insertions, 27 deletions
diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go
index f172b3b..98e86b1 100644
--- a/core/syncer/agreement.go
+++ b/core/syncer/agreement.go
@@ -23,6 +23,7 @@ import (
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core"
+ "github.com/dexon-foundation/dexon-consensus/core/crypto"
"github.com/dexon-foundation/dexon-consensus/core/types"
"github.com/dexon-foundation/dexon-consensus/core/utils"
)
@@ -30,33 +31,42 @@ import (
// Struct agreement implements struct of BA (Byzantine Agreement) protocol
// needed in syncer, which only receives agreement results.
type agreement struct {
- cache *utils.NodeSetCache
- inputChan chan interface{}
- outputChan chan<- *types.Block
- pullChan chan<- common.Hash
- blocks map[types.Position]map[common.Hash]*types.Block
- agreementResults map[common.Hash]struct{}
- latestCRSRound uint64
- pendings map[uint64]map[common.Hash]*types.AgreementResult
- logger common.Logger
- confirmedBlocks map[common.Hash]struct{}
- ctx context.Context
- ctxCancel context.CancelFunc
+ chainTip uint64
+ cache *utils.NodeSetCache
+ tsigVerifierCache *core.TSigVerifierCache
+ inputChan chan interface{}
+ outputChan chan<- *types.Block
+ pullChan chan<- common.Hash
+ blocks map[types.Position]map[common.Hash]*types.Block
+ agreementResults map[common.Hash]struct{}
+ latestCRSRound uint64
+ pendingAgrs map[uint64]map[common.Hash]*types.AgreementResult
+ pendingBlocks map[uint64]map[common.Hash]*types.Block
+ logger common.Logger
+ confirmedBlocks map[common.Hash]struct{}
+ ctx context.Context
+ ctxCancel context.CancelFunc
}
// newAgreement creates a new agreement instance.
-func newAgreement(ch chan<- *types.Block, pullChan chan<- common.Hash,
- cache *utils.NodeSetCache, logger common.Logger) *agreement {
+func newAgreement(chainTip uint64,
+ ch chan<- *types.Block, pullChan chan<- common.Hash,
+ cache *utils.NodeSetCache, verifier *core.TSigVerifierCache,
+ logger common.Logger) *agreement {
a := &agreement{
- cache: cache,
- inputChan: make(chan interface{}, 1000),
- outputChan: ch,
- pullChan: pullChan,
- blocks: make(map[types.Position]map[common.Hash]*types.Block),
- agreementResults: make(map[common.Hash]struct{}),
- logger: logger,
- pendings: make(
+ chainTip: chainTip,
+ cache: cache,
+ tsigVerifierCache: verifier,
+ inputChan: make(chan interface{}, 1000),
+ outputChan: ch,
+ pullChan: pullChan,
+ blocks: make(map[types.Position]map[common.Hash]*types.Block),
+ agreementResults: make(map[common.Hash]struct{}),
+ logger: logger,
+ pendingAgrs: make(
map[uint64]map[common.Hash]*types.AgreementResult),
+ pendingBlocks: make(
+ map[uint64]map[common.Hash]*types.Block),
confirmedBlocks: make(map[common.Hash]struct{}),
}
a.ctx, a.ctxCancel = context.WithCancel(context.Background())
@@ -76,7 +86,11 @@ func (a *agreement) run() {
}
switch v := val.(type) {
case *types.Block:
- a.processBlock(v)
+ if v.IsFinalized() {
+ a.processFinalizedBlock(v)
+ } else {
+ a.processBlock(v)
+ }
case *types.AgreementResult:
a.processAgreementResult(v)
case uint64:
@@ -100,17 +114,68 @@ func (a *agreement) processBlock(b *types.Block) {
}
}
+func (a *agreement) processFinalizedBlock(block *types.Block) {
+ if block.Position.Round < core.DKGDelayRound {
+ return
+ }
+ // Cache those results that CRS is not ready yet.
+ if _, exists := a.confirmedBlocks[block.Hash]; exists {
+ a.logger.Trace("finalized block already confirmed", "block", block)
+ return
+ }
+ if block.Position.Round > a.latestCRSRound {
+ pendingsForRound, exists := a.pendingBlocks[block.Position.Round]
+ if !exists {
+ pendingsForRound = make(map[common.Hash]*types.Block)
+ a.pendingBlocks[block.Position.Round] = pendingsForRound
+ }
+ pendingsForRound[block.Hash] = block
+ a.logger.Trace("finalized block cached", "block", block)
+ return
+ }
+ if err := utils.VerifyBlockSignature(block); err != nil {
+ return
+ }
+ verifier, ok, err := a.tsigVerifierCache.UpdateAndGet(block.Position.Round)
+ if err != nil {
+ a.logger.Error("error verifying block randomness",
+ "block", block,
+ "error", err)
+ return
+ }
+ if !ok {
+ a.logger.Error("cannot verify block randomness", "block", block)
+ return
+ }
+ if !verifier.VerifySignature(block.Hash, crypto.Signature{
+ Type: "bls",
+ Signature: block.Finalization.Randomness,
+ }) {
+ a.logger.Error("incorrect block randomness", "block", block)
+ return
+ }
+ a.confirm(block)
+ if block.Position.Height > a.chainTip+1 {
+ if _, exist := a.confirmedBlocks[block.ParentHash]; !exist {
+ a.pullChan <- block.ParentHash
+ }
+ }
+}
+
func (a *agreement) processAgreementResult(r *types.AgreementResult) {
+ if r.Position.Round >= core.DKGDelayRound {
+ return
+ }
// Cache those results that CRS is not ready yet.
if _, exists := a.confirmedBlocks[r.BlockHash]; exists {
a.logger.Trace("Agreement result already confirmed", "result", r)
return
}
if r.Position.Round > a.latestCRSRound {
- pendingsForRound, exists := a.pendings[r.Position.Round]
+ pendingsForRound, exists := a.pendingAgrs[r.Position.Round]
if !exists {
pendingsForRound = make(map[common.Hash]*types.AgreementResult)
- a.pendings[r.Position.Round] = pendingsForRound
+ a.pendingAgrs[r.Position.Round] = pendingsForRound
}
pendingsForRound[r.BlockHash] = r
a.logger.Trace("Agreement result cached", "result", r)
@@ -164,11 +229,11 @@ func (a *agreement) processNewCRS(round uint64) {
a.latestCRSRound = round
// Verify all pending results.
for r := prevRound; r <= a.latestCRSRound; r++ {
- pendingsForRound := a.pendings[r]
+ pendingsForRound := a.pendingAgrs[r]
if pendingsForRound == nil {
continue
}
- delete(a.pendings, r)
+ delete(a.pendingAgrs, r)
for _, res := range pendingsForRound {
if err := core.VerifyAgreementResult(res, a.cache); err != nil {
a.logger.Error("Invalid agreement result",