発生は ChainHeadEventと大体同じです

func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error) {
        switch status {
        case CanonStatTy:
            log.Info("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()),
                "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)))
            //log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()),
            //  "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)))

            coalescedLogs = append(coalescedLogs, logs...)
            events = append(events, ChainEvent{block, block.Hash(), logs})
            lastCanon = block

            // Only count canonical blocks for GC processing time
            bc.gcproc += proctime
    return 0, events, coalescedLogs, nil


func (self *worker) wait() {
    for {
        //  CpuAgentがmineを成功した時にこのイベントが発生する
        for result := range self.recv {
            fmt.Println("worker wait result Block.Number() ", result.Block.Number())
            block := result.Block
            work := result.Work

            for _, r := range work.receipts {
                for _, l := range r.Logs {
                    l.BlockHash = block.Hash()
            for _, log := range work.state.Logs() {
                log.BlockHash = block.Hash()
            stat, err := self.chain.WriteBlockWithState(block, work.receipts, work.state)
            if err != nil {
                log.Error("Failed writing block to chain", "err", err)
            // Broadcast the block and announce chain insertion event
            self.mux.Post(core.NewMinedBlockEvent{Block: block})
            var (
                events []interface{}
                logs   = work.state.Logs()
            events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
            if stat == core.CanonStatTy {
                events = append(events, core.ChainHeadEvent{Block: block})
            self.chain.PostChainEvents(events, logs)

            // Insert the block into the set of pending ones to wait for confirmations
            self.unconfirmed.Insert(block.NumberU64(), block.Hash())



func (s *Ethereum) APIs() []rpc.API {
    apis := ethapi.GetAPIs(s.APIBackend)

    // Append any APIs exposed explicitly by the consensus engine
    apis = append(apis, s.engine.APIs(s.BlockChain())...)

    // Append all the local APIs and return
    return append(apis, []rpc.API{
            Namespace: "eth",
            Version:   "1.0",
            Service:   NewPublicEthereumAPI(s),
            Public:    true,
        }, {
            Namespace: "eth",
            Version:   "1.0",
            Service:   filters.NewPublicFilterAPI(s.APIBackend, false),
            Public:    true,

func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
    api := &PublicFilterAPI{
        backend: backend,
        mux:     backend.EventMux(),
        chainDb: backend.ChainDb(),
        events:  NewEventSystem(backend.EventMux(), backend, lightMode),
        filters: make(map[rpc.ID]*filter),
    go api.timeoutLoop()

    return api

func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem {
    m := &EventSystem{
        mux:       mux,
        backend:   backend,
        lightMode: lightMode,
        install:   make(chan *subscription),
        uninstall: make(chan *subscription),
        txsCh:     make(chan core.NewTxsEvent, txChanSize),
        logsCh:    make(chan []*types.Log, logsChanSize),
        rmLogsCh:  make(chan core.RemovedLogsEvent, rmLogsChanSize),
        chainCh:   make(chan core.ChainEvent, chainEvChanSize),

    // Subscribe events
    m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
    m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
    m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
    m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
    // TODO(rjl493456442): use feed to subscribe pending log event
    m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})

    // Make sure none of the subscriptions are empty
    if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
        m.pendingLogSub.Closed() {
        log.Crit("Subscribe for event system failed")

    go m.eventLoop()
    return m

func (es *EventSystem) eventLoop() {
    // Ensure all subscriptions get cleaned up
    defer func() {

    index := make(filterIndex)
    for i := UnknownSubscription; i < LastIndexSubscription; i++ {
        index[i] = make(map[rpc.ID]*subscription)

    for {
        select {
        // Handle subscribed events
        case ev := <-es.txsCh:
            es.broadcast(index, ev)
        case ev := <-es.logsCh:
            es.broadcast(index, ev)
        case ev := <-es.rmLogsCh:
            es.broadcast(index, ev)
        case ev := <-es.chainCh:
            es.broadcast(index, ev)
        case ev, active := <-es.pendingLogSub.Chan():
            if !active { // system stopped
            es.broadcast(index, ev)

        case f := <-es.install:
            if f.typ == MinedAndPendingLogsSubscription {
                // the type are logs and pending logs subscriptions
                index[LogsSubscription][f.id] = f
                index[PendingLogsSubscription][f.id] = f
            } else {
                index[f.typ][f.id] = f

        case f := <-es.uninstall:
            if f.typ == MinedAndPendingLogsSubscription {
                // the type are logs and pending logs subscriptions
                delete(index[LogsSubscription], f.id)
                delete(index[PendingLogsSubscription], f.id)
            } else {
                delete(index[f.typ], f.id)

        // System stopped
        case <-es.txsSub.Err():
        case <-es.logsSub.Err():
        case <-es.rmLogsSub.Err():
        case <-es.chainSub.Err():

func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
    if ev == nil {

    switch e := ev.(type) {
    case []*types.Log:
        if len(e) > 0 {
            for _, f := range filters[LogsSubscription] {
                if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
                    f.logs <- matchedLogs
    case core.RemovedLogsEvent:
        for _, f := range filters[LogsSubscription] {
            if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
                f.logs <- matchedLogs
    case *event.TypeMuxEvent:
        switch muxe := e.Data.(type) {
        case core.PendingLogsEvent:
            for _, f := range filters[PendingLogsSubscription] {
                if e.Time.After(f.created) {
                    if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
                        f.logs <- matchedLogs
    case core.NewTxsEvent:
        hashes := make([]common.Hash, 0, len(e.Txs))
        for _, tx := range e.Txs {
            hashes = append(hashes, tx.Hash())
        for _, f := range filters[PendingTransactionsSubscription] {
            f.hashes <- hashes
    case core.ChainEvent:
        for _, f := range filters[BlocksSubscription] {
            f.headers <- e.Block.Header()
        if es.lightMode && len(filters[LogsSubscription]) > 0 {
            es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
                for _, f := range filters[LogsSubscription] {
                    if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
                        f.logs <- matchedLogs

bloom filterを作るのに使ってる

func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
    if config.SyncMode == downloader.LightSync {
        return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")
    if !config.SyncMode.IsValid() {
        return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
    chainDb, err := CreateDB(ctx, config, "chaindata")
    if err != nil {
        return nil, err
    chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
    if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
        return nil, genesisErr
    log.Info("Initialised chain configuration", "config", chainConfig)

    eth := &Ethereum{
        config:         config,
        chainDb:        chainDb,
        chainConfig:    chainConfig,
        eventMux:       ctx.EventMux,
        accountManager: ctx.AccountManager,
        engine:         CreateConsensusEngine(ctx, &config.Ethash, chainConfig, chainDb),
        shutdownChan:   make(chan bool),
        networkId:      config.NetworkId,
        gasPrice:       config.GasPrice,
        etherbase:      config.Etherbase,
        bloomRequests:  make(chan chan *bloombits.Retrieval),
        bloomIndexer:   NewBloomIndexer(chainDb, params.BloomBitsBlocks),

    log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)

    if !config.SkipBcVersionCheck {
        bcVersion := rawdb.ReadDatabaseVersion(chainDb)
        if bcVersion != core.BlockChainVersion && bcVersion != 0 {
            return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.\n", bcVersion, core.BlockChainVersion)
        rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion)
    var (
        vmConfig    = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
        cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout}
    eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig)
    if err != nil {
        return nil, err
    // Rewind the chain in case of an incompatible config upgrade.
    if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
        log.Warn("Rewinding chain to upgrade configuration", "err", compat)
        rawdb.WriteChainConfig(chainDb, genesisHash, chainConfig)

    if config.TxPool.Journal != "" {
        config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
    eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)

    if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
        return nil, err
    eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)

    eth.APIBackend = &EthAPIBackend{eth, nil}
    gpoParams := config.GPO
    if gpoParams.Default == nil {
        gpoParams.Default = config.GasPrice
    eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)

    return eth, nil

func NewBloomIndexer(db ethdb.Database, size uint64) *core.ChainIndexer {
    backend := &BloomIndexer{
        db:   db,
        size: size,
    table := ethdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix))

    return core.NewChainIndexer(db, table, backend, size, bloomConfirms, bloomThrottling, "bloombits")

func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer {
    c := &ChainIndexer{
        chainDb:     chainDb,
        indexDb:     indexDb,
        backend:     backend,
        update:      make(chan struct{}, 1),
        quit:        make(chan chan error),
        sectionSize: section,
        confirmsReq: confirm,
        throttling:  throttling,
        log:         log.New("type", kind),
    // Initialize database dependent fields and start the updater
    go c.updateLoop()

    return c

func (c *ChainIndexer) Start(chain ChainIndexerChain) {
    events := make(chan ChainEvent, 10)
    sub := chain.SubscribeChainEvent(events)

    go c.eventLoop(chain.CurrentHeader(), events, sub)

type ChainIndexerChain interface {
    // CurrentHeader retrieves the latest locally known header.
    CurrentHeader() *types.Header

    // SubscribeChainEvent subscribes to new head header notifications.
    SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription

func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainEvent, sub event.Subscription) {
    // Mark the chain indexer as active, requiring an additional teardown
    atomic.StoreUint32(&c.active, 1)

    defer sub.Unsubscribe()

    // Fire the initial new head event to start any outstanding processing
    c.newHead(currentHeader.Number.Uint64(), false)

    var (
        prevHeader = currentHeader
        prevHash   = currentHeader.Hash()
    for {
        select {
        case errc := <-c.quit:
            // Chain indexer terminating, report no failure and abort
            errc <- nil

        case ev, ok := <-events:
            // Received a new event, ensure it's not nil (closing) and update
            if !ok {
                errc := <-c.quit
                errc <- nil
            header := ev.Block.Header()
            if header.ParentHash != prevHash {
                // Reorg to the common ancestor (might not exist in light sync mode, skip reorg then)
                // TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?

                // TODO(karalabe): This operation is expensive and might block, causing the event system to
                // potentially also lock up. We need to do with on a different thread somehow.
                if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, header); h != nil {
                    c.newHead(h.Number.Uint64(), true)
            c.newHead(header.Number.Uint64(), false)

            prevHeader, prevHash = header, header.Hash()


func (s *Service) Start(server *p2p.Server) error {
    s.server = server
    go s.loop()

    log.Info("Stats daemon started")
    return nil

func (s *Service) loop() {
    // Subscribe to chain events to execute updates on
    var blockchain blockChain
    var txpool txPool
    if s.eth != nil {
        blockchain = s.eth.BlockChain()
        txpool = s.eth.TxPool()
    } else {
        blockchain = s.les.BlockChain()
        txpool = s.les.TxPool()

    chainHeadCh := make(chan core.ChainHeadEvent, chainHeadChanSize)
    headSub := blockchain.SubscribeChainHeadEvent(chainHeadCh)
    defer headSub.Unsubscribe()

    txEventCh := make(chan core.NewTxsEvent, txChanSize)
    txSub := txpool.SubscribeNewTxsEvent(txEventCh)
    defer txSub.Unsubscribe()

    // Start a goroutine that exhausts the subsciptions to avoid events piling up
    var (
        quitCh = make(chan struct{})
        headCh = make(chan *types.Block, 1)
        txCh   = make(chan struct{}, 1)
    go func() {
        var lastTx mclock.AbsTime

        for {
            select {
            // Notify of chain head events, but drop if too frequent
            case head := <-chainHeadCh:
                select {
                case headCh <- head.Block:

            // Notify of new transaction events, but drop if too frequent
            case <-txEventCh:
                if time.Duration(mclock.Now()-lastTx) < time.Second {
                lastTx = mclock.Now()

                select {
                case txCh <- struct{}{}:

            // node stopped
            case <-txSub.Err():
                break HandleLoop
            case <-headSub.Err():
                break HandleLoop
    // Loop reporting until termination
    for {
        // Resolve the URL, defaulting to TLS, but falling back to none too
        path := fmt.Sprintf("%s/api", s.host)
        urls := []string{path}

        if !strings.Contains(path, "://") { // url.Parse and url.IsAbs is unsuitable (https://github.com/golang/go/issues/19779)
            urls = []string{"wss://" + path, "ws://" + path}
        // Establish a websocket connection to the server on any supported URL
        var (
            conf *websocket.Config
            conn *websocket.Conn
            err  error
        for _, url := range urls {
            if conf, err = websocket.NewConfig(url, "http://localhost/"); err != nil {
            conf.Dialer = &net.Dialer{Timeout: 5 * time.Second}
            if conn, err = websocket.DialConfig(conf); err == nil {
        if err != nil {
            log.Warn("Stats server unreachable", "err", err)
            time.Sleep(10 * time.Second)
        // Authenticate the client with the server
        if err = s.login(conn); err != nil {
            log.Warn("Stats login failed", "err", err)
            time.Sleep(10 * time.Second)
        go s.readLoop(conn)

        // Send the initial stats so our node looks decent from the get go
        if err = s.report(conn); err != nil {
            log.Warn("Initial stats report failed", "err", err)
        // Keep sending status updates until the connection breaks
        fullReport := time.NewTicker(15 * time.Second)

        for err == nil {
            select {
            case <-quitCh:

            case <-fullReport.C:
                if err = s.report(conn); err != nil {
                    log.Warn("Full stats report failed", "err", err)
            case list := <-s.histCh:
                if err = s.reportHistory(conn, list); err != nil {
                    log.Warn("Requested history report failed", "err", err)
            case head := <-headCh:
                if err = s.reportBlock(conn, head); err != nil {
                    log.Warn("Block stats report failed", "err", err)
                if err = s.reportPending(conn); err != nil {
                    log.Warn("Post-block transaction stats report failed", "err", err)
            case <-txCh:
                if err = s.reportPending(conn); err != nil {
                    log.Warn("Transaction stats report failed", "err", err)
        // Make sure the connection is closed

