Help us understand the problem. What is going on with this article?

go-ethereumのChainEventについて

More than 1 year has passed since last update.

発生は ChainHeadEventと大体同じです

func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error) {
        ...
        switch status {
        case CanonStatTy:
            log.Info("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()),
                "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)))
            //log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()),
            //  "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)))

            coalescedLogs = append(coalescedLogs, logs...)
            blockInsertTimer.UpdateSince(bstart)
            events = append(events, ChainEvent{block, block.Hash(), logs})
            lastCanon = block

            // Only count canonical blocks for GC processing time
            bc.gcproc += proctime
        ...
    return 0, events, coalescedLogs, nil
}

CpuAgentがmine()を成功させた時

func (self *worker) wait() {
    for {
        //  CpuAgentがmineを成功した時にこのイベントが発生する
        for result := range self.recv {
            ...
            fmt.Println("worker wait result Block.Number() ", result.Block.Number())
            block := result.Block
            work := result.Work

            for _, r := range work.receipts {
                for _, l := range r.Logs {
                    l.BlockHash = block.Hash()
                }
            }
            for _, log := range work.state.Logs() {
                log.BlockHash = block.Hash()
            }
            stat, err := self.chain.WriteBlockWithState(block, work.receipts, work.state)
            if err != nil {
                log.Error("Failed writing block to chain", "err", err)
                continue
            }
            // Broadcast the block and announce chain insertion event
            self.mux.Post(core.NewMinedBlockEvent{Block: block})
            var (
                events []interface{}
                logs   = work.state.Logs()
            )
            events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
            if stat == core.CanonStatTy {
                events = append(events, core.ChainHeadEvent{Block: block})
            }
            self.chain.PostChainEvents(events, logs)

            // Insert the block into the set of pending ones to wait for confirmations
            self.unconfirmed.Insert(block.NumberU64(), block.Hash())
        }
    }
}

受信

NewPublicFilterAPIに使用

func (s *Ethereum) APIs() []rpc.API {
    apis := ethapi.GetAPIs(s.APIBackend)

    // Append any APIs exposed explicitly by the consensus engine
    apis = append(apis, s.engine.APIs(s.BlockChain())...)

    // Append all the local APIs and return
    return append(apis, []rpc.API{
        ...
        {
            Namespace: "eth",
            Version:   "1.0",
            Service:   NewPublicEthereumAPI(s),
            Public:    true,
        }, {
            Namespace: "eth",
            Version:   "1.0",
            Service:   filters.NewPublicFilterAPI(s.APIBackend, false),
            Public:    true,
        },
        ...
    }...)
}

func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
    api := &PublicFilterAPI{
        backend: backend,
        mux:     backend.EventMux(),
        chainDb: backend.ChainDb(),
        events:  NewEventSystem(backend.EventMux(), backend, lightMode),
        filters: make(map[rpc.ID]*filter),
    }
    go api.timeoutLoop()

    return api
}

func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem {
    m := &EventSystem{
        mux:       mux,
        backend:   backend,
        lightMode: lightMode,
        install:   make(chan *subscription),
        uninstall: make(chan *subscription),
        txsCh:     make(chan core.NewTxsEvent, txChanSize),
        logsCh:    make(chan []*types.Log, logsChanSize),
        rmLogsCh:  make(chan core.RemovedLogsEvent, rmLogsChanSize),
        chainCh:   make(chan core.ChainEvent, chainEvChanSize),
    }

    // Subscribe events
    m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
    m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
    m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
    m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
    // TODO(rjl493456442): use feed to subscribe pending log event
    m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})

    // Make sure none of the subscriptions are empty
    if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
        m.pendingLogSub.Closed() {
        log.Crit("Subscribe for event system failed")
    }

    go m.eventLoop()
    return m
}

func (es *EventSystem) eventLoop() {
    // Ensure all subscriptions get cleaned up
    defer func() {
        es.pendingLogSub.Unsubscribe()
        es.txsSub.Unsubscribe()
        es.logsSub.Unsubscribe()
        es.rmLogsSub.Unsubscribe()
        es.chainSub.Unsubscribe()
    }()

    index := make(filterIndex)
    for i := UnknownSubscription; i < LastIndexSubscription; i++ {
        index[i] = make(map[rpc.ID]*subscription)
    }

    for {
        select {
        // Handle subscribed events
        case ev := <-es.txsCh:
            es.broadcast(index, ev)
        case ev := <-es.logsCh:
            es.broadcast(index, ev)
        case ev := <-es.rmLogsCh:
            es.broadcast(index, ev)
        case ev := <-es.chainCh:
            es.broadcast(index, ev)
        case ev, active := <-es.pendingLogSub.Chan():
            if !active { // system stopped
                return
            }
            es.broadcast(index, ev)

        case f := <-es.install:
            if f.typ == MinedAndPendingLogsSubscription {
                // the type are logs and pending logs subscriptions
                index[LogsSubscription][f.id] = f
                index[PendingLogsSubscription][f.id] = f
            } else {
                index[f.typ][f.id] = f
            }
            close(f.installed)

        case f := <-es.uninstall:
            if f.typ == MinedAndPendingLogsSubscription {
                // the type are logs and pending logs subscriptions
                delete(index[LogsSubscription], f.id)
                delete(index[PendingLogsSubscription], f.id)
            } else {
                delete(index[f.typ], f.id)
            }
            close(f.err)

        // System stopped
        case <-es.txsSub.Err():
            return
        case <-es.logsSub.Err():
            return
        case <-es.rmLogsSub.Err():
            return
        case <-es.chainSub.Err():
            return
        }
    }
}

func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
    if ev == nil {
        return
    }

    switch e := ev.(type) {
    case []*types.Log:
        if len(e) > 0 {
            for _, f := range filters[LogsSubscription] {
                if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
                    f.logs <- matchedLogs
                }
            }
        }
    case core.RemovedLogsEvent:
        for _, f := range filters[LogsSubscription] {
            if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
                f.logs <- matchedLogs
            }
        }
    case *event.TypeMuxEvent:
        switch muxe := e.Data.(type) {
        case core.PendingLogsEvent:
            for _, f := range filters[PendingLogsSubscription] {
                if e.Time.After(f.created) {
                    if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
                        f.logs <- matchedLogs
                    }
                }
            }
        }
    case core.NewTxsEvent:
        hashes := make([]common.Hash, 0, len(e.Txs))
        for _, tx := range e.Txs {
            hashes = append(hashes, tx.Hash())
        }
        for _, f := range filters[PendingTransactionsSubscription] {
            f.hashes <- hashes
        }
    case core.ChainEvent:
        for _, f := range filters[BlocksSubscription] {
            f.headers <- e.Block.Header()
        }
        if es.lightMode && len(filters[LogsSubscription]) > 0 {
            es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
                for _, f := range filters[LogsSubscription] {
                    if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
                        f.logs <- matchedLogs
                    }
                }
            })
        }
    }
}

bloom filterを作るのに使ってる

func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
    if config.SyncMode == downloader.LightSync {
        return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")
    }
    if !config.SyncMode.IsValid() {
        return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
    }
    chainDb, err := CreateDB(ctx, config, "chaindata")
    if err != nil {
        return nil, err
    }
    chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
    if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
        return nil, genesisErr
    }
    log.Info("Initialised chain configuration", "config", chainConfig)

    eth := &Ethereum{
        config:         config,
        chainDb:        chainDb,
        chainConfig:    chainConfig,
        eventMux:       ctx.EventMux,
        accountManager: ctx.AccountManager,
        engine:         CreateConsensusEngine(ctx, &config.Ethash, chainConfig, chainDb),
        shutdownChan:   make(chan bool),
        networkId:      config.NetworkId,
        gasPrice:       config.GasPrice,
        etherbase:      config.Etherbase,
        bloomRequests:  make(chan chan *bloombits.Retrieval),
        bloomIndexer:   NewBloomIndexer(chainDb, params.BloomBitsBlocks),
    }

    log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)

    if !config.SkipBcVersionCheck {
        bcVersion := rawdb.ReadDatabaseVersion(chainDb)
        if bcVersion != core.BlockChainVersion && bcVersion != 0 {
            return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.\n", bcVersion, core.BlockChainVersion)
        }
        rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion)
    }
    var (
        vmConfig    = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
        cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout}
    )
    eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig)
    if err != nil {
        return nil, err
    }
    // Rewind the chain in case of an incompatible config upgrade.
    if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
        log.Warn("Rewinding chain to upgrade configuration", "err", compat)
        eth.blockchain.SetHead(compat.RewindTo)
        rawdb.WriteChainConfig(chainDb, genesisHash, chainConfig)
    }
    eth.bloomIndexer.Start(eth.blockchain)

    if config.TxPool.Journal != "" {
        config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
    }
    eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)

    if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
        return nil, err
    }
    eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
    eth.miner.SetExtra(makeExtraData(config.ExtraData))

    eth.APIBackend = &EthAPIBackend{eth, nil}
    gpoParams := config.GPO
    if gpoParams.Default == nil {
        gpoParams.Default = config.GasPrice
    }
    eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)

    return eth, nil
}

func NewBloomIndexer(db ethdb.Database, size uint64) *core.ChainIndexer {
    backend := &BloomIndexer{
        db:   db,
        size: size,
    }
    table := ethdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix))

    return core.NewChainIndexer(db, table, backend, size, bloomConfirms, bloomThrottling, "bloombits")
}

func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer {
    c := &ChainIndexer{
        chainDb:     chainDb,
        indexDb:     indexDb,
        backend:     backend,
        update:      make(chan struct{}, 1),
        quit:        make(chan chan error),
        sectionSize: section,
        confirmsReq: confirm,
        throttling:  throttling,
        log:         log.New("type", kind),
    }
    // Initialize database dependent fields and start the updater
    c.loadValidSections()
    go c.updateLoop()

    return c
}

func (c *ChainIndexer) Start(chain ChainIndexerChain) {
    events := make(chan ChainEvent, 10)
    sub := chain.SubscribeChainEvent(events)

    go c.eventLoop(chain.CurrentHeader(), events, sub)
}

type ChainIndexerChain interface {
    // CurrentHeader retrieves the latest locally known header.
    CurrentHeader() *types.Header

    // SubscribeChainEvent subscribes to new head header notifications.
    SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription
}

func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainEvent, sub event.Subscription) {
    // Mark the chain indexer as active, requiring an additional teardown
    atomic.StoreUint32(&c.active, 1)

    defer sub.Unsubscribe()

    // Fire the initial new head event to start any outstanding processing
    c.newHead(currentHeader.Number.Uint64(), false)

    var (
        prevHeader = currentHeader
        prevHash   = currentHeader.Hash()
    )
    for {
        select {
        case errc := <-c.quit:
            // Chain indexer terminating, report no failure and abort
            errc <- nil
            return

        case ev, ok := <-events:
            // Received a new event, ensure it's not nil (closing) and update
            if !ok {
                errc := <-c.quit
                errc <- nil
                return
            }
            header := ev.Block.Header()
            if header.ParentHash != prevHash {
                // Reorg to the common ancestor (might not exist in light sync mode, skip reorg then)
                // TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?

                // TODO(karalabe): This operation is expensive and might block, causing the event system to
                // potentially also lock up. We need to do with on a different thread somehow.
                if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, header); h != nil {
                    c.newHead(h.Number.Uint64(), true)
                }
            }
            c.newHead(header.Number.Uint64(), false)

            prevHeader, prevHash = header, header.Hash()
        }
    }
}

ethstatsのService

func (s *Service) Start(server *p2p.Server) error {
    s.server = server
    go s.loop()

    log.Info("Stats daemon started")
    return nil
}

func (s *Service) loop() {
    // Subscribe to chain events to execute updates on
    var blockchain blockChain
    var txpool txPool
    if s.eth != nil {
        blockchain = s.eth.BlockChain()
        txpool = s.eth.TxPool()
    } else {
        blockchain = s.les.BlockChain()
        txpool = s.les.TxPool()
    }

    chainHeadCh := make(chan core.ChainHeadEvent, chainHeadChanSize)
    headSub := blockchain.SubscribeChainHeadEvent(chainHeadCh)
    defer headSub.Unsubscribe()

    txEventCh := make(chan core.NewTxsEvent, txChanSize)
    txSub := txpool.SubscribeNewTxsEvent(txEventCh)
    defer txSub.Unsubscribe()

    // Start a goroutine that exhausts the subsciptions to avoid events piling up
    var (
        quitCh = make(chan struct{})
        headCh = make(chan *types.Block, 1)
        txCh   = make(chan struct{}, 1)
    )
    go func() {
        var lastTx mclock.AbsTime

    HandleLoop:
        for {
            select {
            // Notify of chain head events, but drop if too frequent
            case head := <-chainHeadCh:
                select {
                case headCh <- head.Block:
                default:
                }

            // Notify of new transaction events, but drop if too frequent
            case <-txEventCh:
                if time.Duration(mclock.Now()-lastTx) < time.Second {
                    continue
                }
                lastTx = mclock.Now()

                select {
                case txCh <- struct{}{}:
                default:
                }

            // node stopped
            case <-txSub.Err():
                break HandleLoop
            case <-headSub.Err():
                break HandleLoop
            }
        }
        close(quitCh)
    }()
    // Loop reporting until termination
    for {
        // Resolve the URL, defaulting to TLS, but falling back to none too
        path := fmt.Sprintf("%s/api", s.host)
        urls := []string{path}

        if !strings.Contains(path, "://") { // url.Parse and url.IsAbs is unsuitable (https://github.com/golang/go/issues/19779)
            urls = []string{"wss://" + path, "ws://" + path}
        }
        // Establish a websocket connection to the server on any supported URL
        var (
            conf *websocket.Config
            conn *websocket.Conn
            err  error
        )
        for _, url := range urls {
            if conf, err = websocket.NewConfig(url, "http://localhost/"); err != nil {
                continue
            }
            conf.Dialer = &net.Dialer{Timeout: 5 * time.Second}
            if conn, err = websocket.DialConfig(conf); err == nil {
                break
            }
        }
        if err != nil {
            log.Warn("Stats server unreachable", "err", err)
            time.Sleep(10 * time.Second)
            continue
        }
        // Authenticate the client with the server
        if err = s.login(conn); err != nil {
            log.Warn("Stats login failed", "err", err)
            conn.Close()
            time.Sleep(10 * time.Second)
            continue
        }
        go s.readLoop(conn)

        // Send the initial stats so our node looks decent from the get go
        if err = s.report(conn); err != nil {
            log.Warn("Initial stats report failed", "err", err)
            conn.Close()
            continue
        }
        // Keep sending status updates until the connection breaks
        fullReport := time.NewTicker(15 * time.Second)

        for err == nil {
            select {
            case <-quitCh:
                conn.Close()
                return

            case <-fullReport.C:
                if err = s.report(conn); err != nil {
                    log.Warn("Full stats report failed", "err", err)
                }
            case list := <-s.histCh:
                if err = s.reportHistory(conn, list); err != nil {
                    log.Warn("Requested history report failed", "err", err)
                }
            case head := <-headCh:
                if err = s.reportBlock(conn, head); err != nil {
                    log.Warn("Block stats report failed", "err", err)
                }
                if err = s.reportPending(conn); err != nil {
                    log.Warn("Post-block transaction stats report failed", "err", err)
                }
            case <-txCh:
                if err = s.reportPending(conn); err != nil {
                    log.Warn("Transaction stats report failed", "err", err)
                }
            }
        }
        // Make sure the connection is closed
        conn.Close()
    }
}
t10471
mercari
フリマアプリ「メルカリ」を、グローバルで開発しています。
https://tech.mercari.com/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away