LoginSignup
1
0

More than 5 years have passed since last update.

go-ethereumのChainHeadEventについて

Posted at

ChainHeadEvent

新しいBlockが作成されたときに発生するイベントです。
5秒おきに futureBlocks にデータがあるかを調べ存在する場合、BlockChainにBlockを追加してそのタイミングで呼ばれます。

発信元


func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) {
    ...
    bc := &BlockChain{
        chainConfig:  chainConfig,
        cacheConfig:  cacheConfig,
        db:           db,
        triegc:       prque.New(),
        stateCache:   state.NewDatabase(db),
        quit:         make(chan struct{}),
        bodyCache:    bodyCache,
        bodyRLPCache: bodyRLPCache,
        blockCache:   blockCache,
        futureBlocks: futureBlocks,
        engine:       engine,
        vmConfig:     vmConfig,
        badBlocks:    badBlocks,
    }
    ...
    // Take ownership of this particular state
    go bc.update()
    return bc, nil
}

func (bc *BlockChain) update() {
    futureTimer := time.NewTicker(5 * time.Second)
    defer futureTimer.Stop()
    for {
        select {
        case <-futureTimer.C:
            bc.procFutureBlocks()
        ...
        }
    }
}

func (bc *BlockChain) procFutureBlocks() {
    blocks := make([]*types.Block, 0, bc.futureBlocks.Len())
    for _, hash := range bc.futureBlocks.Keys() {
        if block, exist := bc.futureBlocks.Peek(hash); exist {
            blocks = append(blocks, block.(*types.Block))
        }
    }
    if len(blocks) > 0 {
        types.BlockBy(types.Number).Sort(blocks)
        for i := range blocks {
            bc.InsertChain(blocks[i : i+1])
        }
    }
}

func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
    n, events, logs, err := bc.insertChain(chain)
    bc.PostChainEvents(events, logs)
    return n, err
}

func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error) {
    ...
    if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
        events = append(events, ChainHeadEvent{lastCanon})
    }
    return 0, events, coalescedLogs, nil
}

CpuAgentがmine()を成功させた時

func (self *worker) wait() {
    for {
        //  CpuAgentがmineを成功した時にこのイベントが発生する
        for result := range self.recv {
            ...
            fmt.Println("worker wait result Block.Number() ", result.Block.Number())
            block := result.Block
            work := result.Work

            for _, r := range work.receipts {
                for _, l := range r.Logs {
                    l.BlockHash = block.Hash()
                }
            }
            for _, log := range work.state.Logs() {
                log.BlockHash = block.Hash()
            }
            stat, err := self.chain.WriteBlockWithState(block, work.receipts, work.state)
            if err != nil {
                log.Error("Failed writing block to chain", "err", err)
                continue
            }
            // Broadcast the block and announce chain insertion event
            self.mux.Post(core.NewMinedBlockEvent{Block: block})
            var (
                events []interface{}
                logs   = work.state.Logs()
            )
            events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
            if stat == core.CanonStatTy {
                events = append(events, core.ChainHeadEvent{Block: block})
            }
            self.chain.PostChainEvents(events, logs)

            // Insert the block into the set of pending ones to wait for confirmations
            self.unconfirmed.Insert(block.NumberU64(), block.Hash())
        }
    }
}

受信

func (pm *ProtocolManager) blockLoop() {...}
func (pool *TxPool) loop() {...}
func (self *worker) update() {...}
type BlockChain struct {
    chainConfig *params.ChainConfig // Chain & network configuration
    cacheConfig *CacheConfig        // Cache configuration for pruning

    db     ethdb.Database // Low level persistent database to store final content in
    triegc *prque.Prque   // Priority queue mapping block numbers to tries to gc
    gcproc time.Duration  // Accumulates canonical block processing for trie dumping

    hc            *HeaderChain
    rmLogsFeed    event.Feed
    chainFeed     event.Feed
    chainSideFeed event.Feed
    chainHeadFeed event.Feed
    logsFeed      event.Feed
    scope         event.SubscriptionScope
    genesisBlock  *types.Block

    mu      sync.RWMutex // global mutex for locking chain operations
    chainmu sync.RWMutex // blockchain insertion lock
    procmu  sync.RWMutex // block processor lock

    checkpoint       int          // checkpoint counts towards the new checkpoint
    currentBlock     atomic.Value // Current head of the block chain
    currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)

    stateCache   state.Database // State database to reuse between imports (contains state cache)
    bodyCache    *lru.Cache     // Cache for the most recent block bodies
    bodyRLPCache *lru.Cache     // Cache for the most recent block bodies in RLP encoded format
    blockCache   *lru.Cache     // Cache for the most recent entire blocks
    futureBlocks *lru.Cache     // future blocks are blocks added for later processing

    quit    chan struct{} // blockchain quit channel
    running int32         // running must be called atomically
    // procInterrupt must be atomically called
    procInterrupt int32          // interrupt signaler for block processing
    wg            sync.WaitGroup // chain processing wait group for shutting down

    engine    consensus.Engine
    processor Processor // block processor interface
    validator Validator // block and state validator interface
    vmConfig  vm.Config

    badBlocks *lru.Cache // Bad block cache
}

type TxPool struct {
    config       TxPoolConfig
    chainconfig  *params.ChainConfig
    chain        blockChain
    gasPrice     *big.Int
    txFeed       event.Feed
    scope        event.SubscriptionScope
    chainHeadCh  chan ChainHeadEvent
    chainHeadSub event.Subscription
    signer       types.Signer
    mu           sync.RWMutex

    currentState  *state.StateDB      // Current state in the blockchain head
    pendingState  *state.ManagedState // Pending state tracking virtual nonces
    currentMaxGas uint64              // Current gas limit for transaction caps

    locals  *accountSet // Set of local transaction to exempt from eviction rules
    journal *txJournal  // Journal of local transaction to back up to disk

    pending map[common.Address]*txList   // All currently processable transactions
    queue   map[common.Address]*txList   // Queued but non-processable transactions
    beats   map[common.Address]time.Time // Last heartbeat from each known account
    all     *txLookup                    // All transactions to allow lookups
    priced  *txPricedList                // All transactions sorted by price

    wg sync.WaitGroup // for shutdown sync

    homestead bool
}
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0