Posted at

go-ethereumのChainHeadEventについて

More than 1 year has passed since last update.


ChainHeadEvent

新しいBlockが作成されたときに発生するイベントです。

5秒おきに futureBlocks にデータがあるかを調べ存在する場合、BlockChainにBlockを追加してそのタイミングで呼ばれます。

発信元


func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) {
...
bc := &BlockChain{
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(),
stateCache: state.NewDatabase(db),
quit: make(chan struct{}),
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
blockCache: blockCache,
futureBlocks: futureBlocks,
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
}
...
// Take ownership of this particular state
go bc.update()
return bc, nil
}

func (bc *BlockChain) update() {
futureTimer := time.NewTicker(5 * time.Second)
defer futureTimer.Stop()
for {
select {
case <-futureTimer.C:
bc.procFutureBlocks()
...
}
}
}

func (bc *BlockChain) procFutureBlocks() {
blocks := make([]*types.Block, 0, bc.futureBlocks.Len())
for _, hash := range bc.futureBlocks.Keys() {
if block, exist := bc.futureBlocks.Peek(hash); exist {
blocks = append(blocks, block.(*types.Block))
}
}
if len(blocks) > 0 {
types.BlockBy(types.Number).Sort(blocks)
for i := range blocks {
bc.InsertChain(blocks[i : i+1])
}
}
}

func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
n, events, logs, err := bc.insertChain(chain)
bc.PostChainEvents(events, logs)
return n, err
}

func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error) {
...
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
events = append(events, ChainHeadEvent{lastCanon})
}
return 0, events, coalescedLogs, nil
}

CpuAgentがmine()を成功させた時

func (self *worker) wait() {

for {
// CpuAgentがmineを成功した時にこのイベントが発生する
for result := range self.recv {
...
fmt.Println("worker wait result Block.Number() ", result.Block.Number())
block := result.Block
work := result.Work

for _, r := range work.receipts {
for _, l := range r.Logs {
l.BlockHash = block.Hash()
}
}
for _, log := range work.state.Logs() {
log.BlockHash = block.Hash()
}
stat, err := self.chain.WriteBlockWithState(block, work.receipts, work.state)
if err != nil {
log.Error("Failed writing block to chain", "err", err)
continue
}
// Broadcast the block and announce chain insertion event
self.mux.Post(core.NewMinedBlockEvent{Block: block})
var (
events []interface{}
logs = work.state.Logs()
)
events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
if stat == core.CanonStatTy {
events = append(events, core.ChainHeadEvent{Block: block})
}
self.chain.PostChainEvents(events, logs)

// Insert the block into the set of pending ones to wait for confirmations
self.unconfirmed.Insert(block.NumberU64(), block.Hash())
}
}
}

受信

func (pm *ProtocolManager) blockLoop() {...}

func (pool *TxPool) loop() {...}
func (self *worker) update() {...}

type BlockChain struct {

chainConfig *params.ChainConfig // Chain & network configuration
cacheConfig *CacheConfig // Cache configuration for pruning

db ethdb.Database // Low level persistent database to store final content in
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping

hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
logsFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block

mu sync.RWMutex // global mutex for locking chain operations
chainmu sync.RWMutex // blockchain insertion lock
procmu sync.RWMutex // block processor lock

checkpoint int // checkpoint counts towards the new checkpoint
currentBlock atomic.Value // Current head of the block chain
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)

stateCache state.Database // State database to reuse between imports (contains state cache)
bodyCache *lru.Cache // Cache for the most recent block bodies
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
blockCache *lru.Cache // Cache for the most recent entire blocks
futureBlocks *lru.Cache // future blocks are blocks added for later processing

quit chan struct{} // blockchain quit channel
running int32 // running must be called atomically
// procInterrupt must be atomically called
procInterrupt int32 // interrupt signaler for block processing
wg sync.WaitGroup // chain processing wait group for shutting down

engine consensus.Engine
processor Processor // block processor interface
validator Validator // block and state validator interface
vmConfig vm.Config

badBlocks *lru.Cache // Bad block cache
}

type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
signer types.Signer
mu sync.RWMutex

currentState *state.StateDB // Current state in the blockchain head
pendingState *state.ManagedState // Pending state tracking virtual nonces
currentMaxGas uint64 // Current gas limit for transaction caps

locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk

pending map[common.Address]*txList // All currently processable transactions
queue map[common.Address]*txList // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price

wg sync.WaitGroup // for shutdown sync

homestead bool
}