Help us understand the problem. What is going on with this article?

go-ethereumを読む(4) geth起動編 2. startNode

More than 1 year has passed since last update.

go-ethereumを読む(3) geth起動編 1. makeFullNodeの続きでstartNodeから追っていきます。
v1.8.15ベースに解説します。

startNode

前の記事でも書きましたが Nodeはstackという変数名で使われることが多いです

// cmd/geth/main.go
func startNode(ctx *cli.Context, stack *node.Node) {
    // 省略
    // Start up the node itself
    utils.StartNode(stack)
    // 省略
}

startNode > utils.StartNode(stack)

// cmd/utils/cmd.go
func StartNode(stack *node.Node) {
    if err := stack.Start(); err != nil {
        Fatalf("Error starting protocol stack: %v", err)
    }
    // 省略
}

startNode > utils.StartNode(stack) > stack.Start()

func (n *Node) Start() error {
    // 省略
    // データを保存するディレクトリをセットアップする
    if err := n.openDataDir(); err != nil {
        return err
    }

    // p2pの設定をっセットアップする
    // 省略
    // p2pを作成
    running := &p2p.Server{Config: n.serverConfig}
    n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)

    // makeFullNodeで登録したServiceをここで取り出す
    services := make(map[reflect.Type]Service)
    for _, constructor := range n.serviceFuncs {
        // Create a new context for the particular service
        ctx := &ServiceContext{
            config:         n.config,
            services:       make(map[reflect.Type]Service),
            EventMux:       n.eventmux,
            AccountManager: n.accman,
        }
        for kind, s := range services { // copy needed for threaded access
            ctx.services[kind] = s
        }
        // Construct and save the service
        service, err := constructor(ctx)
        // 省略
        kind := reflect.TypeOf(service)
        if _, exists := services[kind]; exists {
            return &DuplicateServiceError{Kind: kind}
        }
        services[kind] = service
    }
    // Gather the protocols and start the freshly assembled P2P server
    for _, service := range services {
        running.Protocols = append(running.Protocols, service.Protocols()...)
    }
    // running(p2p)をスタートさせる
    if err := running.Start(); err != nil {
        return convertFileLockError(err)
    }
    // Start each of the services
    started := []reflect.Type{}
    // 各サービスをスタートさせる
    for kind, service := range services {
        // 省略
    }
    // rpc用のサーバ(socket)を起動する(IPC, HTTP, WSなど)
    if err := n.startRPC(services); err != nil {
        // 省略
    }
    // Finish initializing the startup
    n.services = services
    n.server = running
    n.stop = make(chan struct{})

    return nil
}

// p2p/server.go
type Server struct {
    Config // 起動中に変更される可能性がある

    newTransport func(net.Conn) transport
    newPeerHook  func(*Peer)

    lock    sync.Mutex // protects running
    running bool

    ntab         discoverTable
    listener     net.Listener
    ourHandshake *protoHandshake
    lastLookup   time.Time
    DiscV5       *discv5.Network

    peerOp     chan peerOpFunc
    peerOpDone chan struct{}

    quit          chan struct{}
    addstatic     chan *discover.Node
    removestatic  chan *discover.Node
    posthandshake chan *conn
    addpeer       chan *conn
    delpeer       chan peerDrop
    loopWG        sync.WaitGroup // loop, listenLoop
    peerFeed      event.Feed
    log           log.Logger
}

// p2p/server.go
type Config struct {
    PrivateKey *ecdsa.PrivateKey `toml:"-"`

    MaxPeers int // 0以上
    DialRatio int `toml:",omitempty"` // 2の場合、 1/2でp2p接続に応答する

    NoDiscovery bool
    DiscoveryV5 bool `toml:",omitempty"`

    Name string `toml:"-"`

    BootstrapNodes []*discover.Node
    BootstrapNodesV5 []*discv5.Node `toml:",omitempty"`

    StaticNodes []*discover.Node
    TrustedNodes []*discover.Node
    NetRestrict *netutil.Netlist `toml:",omitempty"`
    NodeDatabase string `toml:",omitempty"`
    Protocols []Protocol `toml:"-"`
    ListenAddr string
    NAT nat.Interface `toml:",omitempty"` // Nat(Network Address Translation)の方式を設定、 UPnPかNat-PMPか先に見つかった方
    Dialer NodeDialer `toml:"-"`
    NoDial bool `toml:",omitempty"`
    EnableMsgEvents bool
    Logger log.Logger `toml:",omitempty"`
}

startNode > utils.StartNode(stack) > stack.Start() > running.Start()

// p2p/server.go
func (srv *Server) Start() (err error) {
    // 省略
    if srv.newTransport == nil {
        srv.newTransport = newRLPX
    }
    if srv.Dialer == nil {
        srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
    }
    srv.quit = make(chan struct{})
    srv.addpeer = make(chan *conn)
    srv.delpeer = make(chan peerDrop)
    srv.posthandshake = make(chan *conn)
    srv.addstatic = make(chan *discover.Node)
    srv.removestatic = make(chan *discover.Node)
    srv.addtrusted = make(chan *discover.Node)
    srv.removetrusted = make(chan *discover.Node)
    srv.peerOp = make(chan peerOpFunc)
    srv.peerOpDone = make(chan struct{})

    var (
        conn      *net.UDPConn
        sconn     *sharedUDPConn
        realaddr  *net.UDPAddr
        unhandled chan discover.ReadPacket
    )
    // Node discoveryを行う
    // UDPをつかって接続する
    // nodetable情報はLevelDBに保存(デフォルト)
    // 省略
    dynPeers := srv.maxDialedConns()
    dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
    // 省略
    // tcpのセットアップしてListenする
    if srv.ListenAddr != "" {
        if err := srv.startListening(); err != nil {
            return err
        }
    }
    // 省略
    srv.loopWG.Add(1)
    // p2pの接続監視ループ
    go srv.run(dialer)
    srv.running = true
    return nil
}

discoverを設定する

discover.Configをセットアップしてntab(node table)に登録する
udpのセットアップをしてListenする
dialStateを作成する

// p2p/dial.go
func newDialState(static []*discover.Node, bootnodes []*discover.Node, ntab discoverTable, maxdyn int, netrestrict *netutil.Netlist) *dialstate {
    s := &dialstate{
        maxDynDials: maxdyn,
        ntab:        ntab,
        netrestrict: netrestrict,
        static:      make(map[discover.NodeID]*dialTask),
        dialing:     make(map[discover.NodeID]connFlag),
        bootnodes:   make([]*discover.Node, len(bootnodes)),
        randomNodes: make([]*discover.Node, maxdyn/2),
        hist:        new(dialHistory),
    }
    copy(s.bootnodes, bootnodes)
    for _, n := range static {
        s.addStatic(n)
    }
    return s
}

登録してあるサービスをStartさせる

Ethereum fullNodeの場合

(s *Ethereum) Start(srvr *p2p.Server)

// eth/backend.go
func (s *Ethereum) Start(srvr *p2p.Server) error {
    // BloomFillterの受信スレッドを起動
    s.startBloomHandlers(params.BloomBitsBlocks)

    // Start the RPC service
    s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())

    // Figure out a max peers count based on the server limits
    maxPeers := srvr.MaxPeers
    if s.config.LightServ > 0 {
        if s.config.LightPeers >= srvr.MaxPeers {
            return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
        }
        maxPeers -= s.config.LightPeers
    }
    // Start the networking layer and the light server if requested
    s.protocolManager.Start(maxPeers)
    if s.lesServer != nil {
        s.lesServer.Start(srvr)
    }
    return nil
}

(s *Ethereum) Start(srvr *p2p.Server) > s.protocolManager.Start(maxPeers)

// eth/backend.go
func (pm *ProtocolManager) Start(maxPeers int) {
    pm.maxPeers = maxPeers

    // broadcast transactions
    pm.txCh = make(chan core.TxPreEvent, txChanSize)
    pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
    // transaction broadcastを監視
    go pm.txBroadcastLoop()

    // broadcast mined blocks
    pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
    // マイニングされたbloadcastを監視
    go pm.minedBroadcastLoop()

    // start sync handlers
    // blockの同期をとる
    go pm.syncer()
    // transactionの同期をとる
    go pm.txsyncLoop()
}

(n *Node) Start() startRPC

  • プロセス用のサーバ?
    • n.startInProc(apis)
  • unixドメインソケットを開いてサーバを起動
    • startIPC(apis)
  • tcpソケットを開いてhttpサーバを起動
    • startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts)
  • tcpソケットを開いてwebsocketサーバを起動
    • n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll)

startInProc

// node/node.go
func (n *Node) startInProc(apis []rpc.API) error {
    // Register all the APIs exposed by the services
    handler := rpc.NewServer()
    for _, api := range apis {
        if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
            return err
        }
        n.log.Debug("InProc registered", "service", api.Service, "namespace", api.Namespace)
    }
    n.inprocHandler = handler
    return nil
}

func NewServer() *Server {
    server := &Server{
        services: make(serviceRegistry),
        codecs:   set.New(),
        run:      1,
    }

    // register a default service which will provide meta information about the RPC service such as the services and
    // methods it offers.
    rpcService := &RPCService{server}
    server.RegisterName(MetadataApi, rpcService)

    return server
}

startIPC

// node/node.go
func (n *Node) startIPC(apis []rpc.API) error {
    if n.ipcEndpoint == "" {
        return nil // IPC disabled.
    }
    listener, handler, err := rpc.StartIPCEndpoint(n.ipcEndpoint, apis)
    if err != nil {
        return err
    }
    n.ipcListener = listener
    n.ipcHandler = handler
    n.log.Info("IPC endpoint opened", "url", n.ipcEndpoint)
    return nil
}

func StartIPCEndpoint(ipcEndpoint string, apis []API) (net.Listener, *Server, error) {
    // Register all the APIs exposed by the services.
    // 上のNewServerと一緒
    handler := NewServer()
    for _, api := range apis {
        if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
            return nil, nil, err
        }
        log.Debug("IPC registered", "namespace", api.Namespace)
    }
    // All APIs registered, start the IPC listener.
    listener, err := ipcListen(ipcEndpoint)
    if err != nil {
        return nil, nil, err
    }
    go handler.ServeListener(listener)
    return listener, handler, nil
}

startHTTP

// node/node.go
func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string) error {
    // Short circuit if the HTTP endpoint isn't being exposed
    if endpoint == "" {
        return nil
    }
    listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis, modules, cors, vhosts)
    if err != nil {
        return err
    }
    n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%s", endpoint), "cors", strings.Join(cors, ","), "vhosts", strings.Join(vhosts, ","))
    // All listeners booted successfully
    n.httpEndpoint = endpoint
    n.httpListener = listener
    n.httpHandler = handler

    return nil
}

startWS

// node/node.go
func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) error {
    // Short circuit if the WS endpoint isn't being exposed
    if endpoint == "" {
        return nil
    }
    listener, handler, err := rpc.StartWSEndpoint(endpoint, apis, modules, wsOrigins, exposeAll)
    if err != nil {
        return err
    }
    n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%s", listener.Addr()))
    // All listeners booted successfully
    n.wsEndpoint = endpoint
    n.wsListener = listener
    n.wsHandler = handler

    return nil
}

// cmd/utils/cmd.go

stack.Start()が終わったらシグナルを待ち受ける

// cmd/utils/cmd.go
func startNode(ctx *cli.Context, stack *node.Node) {
    // 省略
    go func() {
        sigc := make(chan os.Signal, 1)
        signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
        defer signal.Stop(sigc)
        <-sigc
        go stack.Stop()
        for i := 10; i > 0; i-- {
            <-sigc
            if i > 1 {
                log.Warn("Already shutting down, interrupt more to panic.", "times", i-1)
            }
        }
    }()
// cmd/geth/main.go
func startNode(ctx *cli.Context, stack *node.Node) {
    // 省略
    // utils.StartNode(stack)が終わったら  
    // 1番目のkeystoreのアカウントをunlockする
    ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore)

    passwords := utils.MakePasswordList(ctx)
    unlocks := strings.Split(ctx.GlobalString(utils.UnlockedAccountFlag.Name), ",")
    for i, account := range unlocks {
        if trimmed := strings.TrimSpace(account); trimmed != "" {
            unlockAccount(ctx, ks, trimmed, i, passwords)
        }
    }

    // AccountManagerがeventを待ち受ける
    events := make(chan accounts.WalletEvent, 16)
    stack.AccountManager().Subscribe(events)

    // walletの存在変更を監視する
    go func() {
        // Create an chain state reader for self-derivation
        rpcClient, err := stack.Attach()
        if err != nil {
            utils.Fatalf("Failed to attach to self: %v", err)
        }
        stateReader := ethclient.NewClient(rpcClient)

        // Open any wallets already attached
        for _, wallet := range stack.AccountManager().Wallets() {
            if err := wallet.Open(""); err != nil {
                log.Warn("Failed to open wallet", "url", wallet.URL(), "err", err)
            }
        }
        // Listen for wallet event till termination
        for event := range events {
            switch event.Kind {
            case accounts.WalletArrived:
                if err := event.Wallet.Open(""); err != nil {
                    log.Warn("New wallet appeared, failed to open", "url", event.Wallet.URL(), "err", err)
                }
            case accounts.WalletOpened:
                status, _ := event.Wallet.Status()
                log.Info("New wallet appeared", "url", event.Wallet.URL(), "status", status)

                if event.Wallet.URL().Scheme == "ledger" {
                    event.Wallet.SelfDerive(accounts.DefaultLedgerBaseDerivationPath, stateReader)
                } else {
                    event.Wallet.SelfDerive(accounts.DefaultBaseDerivationPath, stateReader)
                }

            case accounts.WalletDropped:
                log.Info("Old wallet dropped", "url", event.Wallet.URL())
                event.Wallet.Close()
            }
        }
    }()
    // 省略
}
// Attach creates an RPC client attached to an in-process API handler.
// DialInProcでnet.pipe connectionを作成する
func (n *Node) Attach() (*rpc.Client, error) {
    n.lock.RLock()
    defer n.lock.RUnlock()

    if n.server == nil {
        return nil, ErrNodeStopped
    }
    return rpc.DialInProc(n.inprocHandler), nil
}

// accounts/accounts.go
type WalletEventType int

const (
    // WalletArrived is fired when a new wallet is detected either via USB or via
    // a filesystem event in the keystore.
    WalletArrived WalletEventType = iota
    // WalletOpened is fired when a wallet is successfully opened with the purpose
    // of starting any background processes such as automatic key derivation.
    WalletOpened
    // WalletDropped
    WalletDropped
)

// WalletEvent is an event fired by an account backend when a wallet arrival or
// departure is detected.
type WalletEvent struct {
    Wallet Wallet          // Wallet instance arrived or departed
    Kind   WalletEventType // Event type that happened in the system
}
func startNode(ctx *cli.Context, stack *node.Node) {
    // 省略
    // マイニングを始める
    // Start auxiliary services if enabled
    if ctx.GlobalBool(utils.MiningEnabledFlag.Name) || ctx.GlobalBool(utils.DeveloperFlag.Name) {
        // Mining only makes sense if a full Ethereum node is running
        if ctx.GlobalString(utils.SyncModeFlag.Name) == "light" {
            utils.Fatalf("Light clients do not support mining")
        }
        var ethereum *eth.Ethereum
        if err := stack.Service(&ethereum); err != nil {
            utils.Fatalf("Ethereum service not running: %v", err)
        }
        // Set the gas price to the limits from the CLI and start mining
        gasprice := utils.GlobalBig(ctx, utils.MinerLegacyGasPriceFlag.Name)
        if ctx.IsSet(utils.MinerGasPriceFlag.Name) {
            gasprice = utils.GlobalBig(ctx, utils.MinerGasPriceFlag.Name)
        }
        ethereum.TxPool().SetGasPrice(gasprice)

        threads := ctx.GlobalInt(utils.MinerLegacyThreadsFlag.Name)
        if ctx.GlobalIsSet(utils.MinerThreadsFlag.Name) {
            threads = ctx.GlobalInt(utils.MinerThreadsFlag.Name)
        }
        if err := ethereum.StartMining(threads); err != nil {
            utils.Fatalf("Failed to start mining: %v", err)
        }
    }

}

stopを待つ

func geth(ctx *cli.Context) error {
    // 省略
    node.Wait()
    return nil
}
t10471
mercari
フリマアプリ「メルカリ」を、グローバルで開発しています。
https://tech.mercari.com/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away