go-ethereumを読む(4) geth起動編 2. startNode

go-ethereumを読む(3) geth起動編 1. makeFullNodeの続きでstartNodeから追っていきます。

v1.8.15ベースに解説します。


startNode

前の記事でも書きましたが Nodeはstackという変数名で使われることが多いです

// cmd/geth/main.go

func startNode(ctx *cli.Context, stack *node.Node) {
// 省略
// Start up the node itself
utils.StartNode(stack)
// 省略
}


startNode > utils.StartNode(stack)

// cmd/utils/cmd.go

func StartNode(stack *node.Node) {
if err := stack.Start(); err != nil {
Fatalf("Error starting protocol stack: %v", err)
}
// 省略
}


startNode > utils.StartNode(stack) > stack.Start()



func (n *Node) Start() error {
// 省略
// データを保存するディレクトリをセットアップする
if err := n.openDataDir(); err != nil {
return err
}

// p2pの設定をっセットアップする
// 省略
// p2pを作成
running := &p2p.Server{Config: n.serverConfig}
n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)

// makeFullNodeで登録したServiceをここで取り出す
services := make(map[reflect.Type]Service)
for _, constructor := range n.serviceFuncs {
// Create a new context for the particular service
ctx := &ServiceContext{
config: n.config,
services: make(map[reflect.Type]Service),
EventMux: n.eventmux,
AccountManager: n.accman,
}
for kind, s := range services { // copy needed for threaded access
ctx.services[kind] = s
}
// Construct and save the service
service, err := constructor(ctx)
// 省略
kind := reflect.TypeOf(service)
if _, exists := services[kind]; exists {
return &DuplicateServiceError{Kind: kind}
}
services[kind] = service
}
// Gather the protocols and start the freshly assembled P2P server
for _, service := range services {
running.Protocols = append(running.Protocols, service.Protocols()...)
}
// running(p2p)をスタートさせる
if err := running.Start(); err != nil {
return convertFileLockError(err)
}
// Start each of the services
started := []reflect.Type{}
// 各サービスをスタートさせる
for kind, service := range services {
// 省略
}
// rpc用のサーバ(socket)を起動する(IPC, HTTP, WSなど)
if err := n.startRPC(services); err != nil {
// 省略
}
// Finish initializing the startup
n.services = services
n.server = running
n.stop = make(chan struct{})

return nil
}

// p2p/server.go
type Server struct {
Config // 起動中に変更される可能性がある

newTransport func(net.Conn) transport
newPeerHook func(*Peer)

lock sync.Mutex // protects running
running bool

ntab discoverTable
listener net.Listener
ourHandshake *protoHandshake
lastLookup time.Time
DiscV5 *discv5.Network

peerOp chan peerOpFunc
peerOpDone chan struct{}

quit chan struct{}
addstatic chan *discover.Node
removestatic chan *discover.Node
posthandshake chan *conn
addpeer chan *conn
delpeer chan peerDrop
loopWG sync.WaitGroup // loop, listenLoop
peerFeed event.Feed
log log.Logger
}

// p2p/server.go
type Config struct {
PrivateKey *ecdsa.PrivateKey `toml:"-"`

MaxPeers int // 0以上
DialRatio int `toml:",omitempty"` // 2の場合、 1/2でp2p接続に応答する

NoDiscovery bool
DiscoveryV5 bool `toml:",omitempty"`

Name string `toml:"-"`

BootstrapNodes []*discover.Node
BootstrapNodesV5 []*discv5.Node `toml:",omitempty"`

StaticNodes []*discover.Node
TrustedNodes []*discover.Node
NetRestrict *netutil.Netlist `toml:",omitempty"`
NodeDatabase string `toml:",omitempty"`
Protocols []Protocol `toml:"-"`
ListenAddr string
NAT nat.Interface `toml:",omitempty"` // Nat(Network Address Translation)の方式を設定、 UPnPかNat-PMPか先に見つかった方
Dialer NodeDialer `toml:"-"`
NoDial bool `toml:",omitempty"`
EnableMsgEvents bool
Logger log.Logger `toml:",omitempty"`
}


startNode > utils.StartNode(stack) > stack.Start() > running.Start()

// p2p/server.go

func (srv *Server) Start() (err error) {
// 省略
if srv.newTransport == nil {
srv.newTransport = newRLPX
}
if srv.Dialer == nil {
srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
}
srv.quit = make(chan struct{})
srv.addpeer = make(chan *conn)
srv.delpeer = make(chan peerDrop)
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *discover.Node)
srv.removestatic = make(chan *discover.Node)
srv.addtrusted = make(chan *discover.Node)
srv.removetrusted = make(chan *discover.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})

var (
conn *net.UDPConn
sconn *sharedUDPConn
realaddr *net.UDPAddr
unhandled chan discover.ReadPacket
)
// Node discoveryを行う
// UDPをつかって接続する
// nodetable情報はLevelDBに保存(デフォルト)
// 省略
dynPeers := srv.maxDialedConns()
dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
// 省略
// tcpのセットアップしてListenする
if srv.ListenAddr != "" {
if err := srv.startListening(); err != nil {
return err
}
}
// 省略
srv.loopWG.Add(1)
// p2pの接続監視ループ
go srv.run(dialer)
srv.running = true
return nil
}


discoverを設定する

discover.Configをセットアップしてntab(node table)に登録する

udpのセットアップをしてListenする

dialStateを作成する

// p2p/dial.go

func newDialState(static []*discover.Node, bootnodes []*discover.Node, ntab discoverTable, maxdyn int, netrestrict *netutil.Netlist) *dialstate {
s := &dialstate{
maxDynDials: maxdyn,
ntab: ntab,
netrestrict: netrestrict,
static: make(map[discover.NodeID]*dialTask),
dialing: make(map[discover.NodeID]connFlag),
bootnodes: make([]*discover.Node, len(bootnodes)),
randomNodes: make([]*discover.Node, maxdyn/2),
hist: new(dialHistory),
}
copy(s.bootnodes, bootnodes)
for _, n := range static {
s.addStatic(n)
}
return s
}


登録してあるサービスをStartさせる


Ethereum fullNodeの場合


(s *Ethereum) Start(srvr *p2p.Server)

// eth/backend.go

func (s *Ethereum) Start(srvr *p2p.Server) error {
// BloomFillterの受信スレッドを起動
s.startBloomHandlers(params.BloomBitsBlocks)

// Start the RPC service
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())

// Figure out a max peers count based on the server limits
maxPeers := srvr.MaxPeers
if s.config.LightServ > 0 {
if s.config.LightPeers >= srvr.MaxPeers {
return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
}
maxPeers -= s.config.LightPeers
}
// Start the networking layer and the light server if requested
s.protocolManager.Start(maxPeers)
if s.lesServer != nil {
s.lesServer.Start(srvr)
}
return nil
}


(s *Ethereum) Start(srvr *p2p.Server) > s.protocolManager.Start(maxPeers)

// eth/backend.go

func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers

// broadcast transactions
pm.txCh = make(chan core.TxPreEvent, txChanSize)
pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
// transaction broadcastを監視
go pm.txBroadcastLoop()

// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
// マイニングされたbloadcastを監視
go pm.minedBroadcastLoop()

// start sync handlers
// blockの同期をとる
go pm.syncer()
// transactionの同期をとる
go pm.txsyncLoop()
}


(n *Node) Start() startRPC


  • プロセス用のサーバ?


    • n.startInProc(apis)



  • unixドメインソケットを開いてサーバを起動


    • startIPC(apis)



  • tcpソケットを開いてhttpサーバを起動


    • startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts)



  • tcpソケットを開いてwebsocketサーバを起動


    • n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll)




startInProc

// node/node.go

func (n *Node) startInProc(apis []rpc.API) error {
// Register all the APIs exposed by the services
handler := rpc.NewServer()
for _, api := range apis {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
n.log.Debug("InProc registered", "service", api.Service, "namespace", api.Namespace)
}
n.inprocHandler = handler
return nil
}

func NewServer() *Server {
server := &Server{
services: make(serviceRegistry),
codecs: set.New(),
run: 1,
}

// register a default service which will provide meta information about the RPC service such as the services and
// methods it offers.
rpcService := &RPCService{server}
server.RegisterName(MetadataApi, rpcService)

return server
}


startIPC

// node/node.go

func (n *Node) startIPC(apis []rpc.API) error {
if n.ipcEndpoint == "" {
return nil // IPC disabled.
}
listener, handler, err := rpc.StartIPCEndpoint(n.ipcEndpoint, apis)
if err != nil {
return err
}
n.ipcListener = listener
n.ipcHandler = handler
n.log.Info("IPC endpoint opened", "url", n.ipcEndpoint)
return nil
}

func StartIPCEndpoint(ipcEndpoint string, apis []API) (net.Listener, *Server, error) {
// Register all the APIs exposed by the services.
// 上のNewServerと一緒
handler := NewServer()
for _, api := range apis {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return nil, nil, err
}
log.Debug("IPC registered", "namespace", api.Namespace)
}
// All APIs registered, start the IPC listener.
listener, err := ipcListen(ipcEndpoint)
if err != nil {
return nil, nil, err
}
go handler.ServeListener(listener)
return listener, handler, nil
}


startHTTP

// node/node.go

func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string) error {
// Short circuit if the HTTP endpoint isn't being exposed
if endpoint == "" {
return nil
}
listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis, modules, cors, vhosts)
if err != nil {
return err
}
n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%s", endpoint), "cors", strings.Join(cors, ","), "vhosts", strings.Join(vhosts, ","))
// All listeners booted successfully
n.httpEndpoint = endpoint
n.httpListener = listener
n.httpHandler = handler

return nil
}


startWS

// node/node.go

func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) error {
// Short circuit if the WS endpoint isn't being exposed
if endpoint == "" {
return nil
}
listener, handler, err := rpc.StartWSEndpoint(endpoint, apis, modules, wsOrigins, exposeAll)
if err != nil {
return err
}
n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%s", listener.Addr()))
// All listeners booted successfully
n.wsEndpoint = endpoint
n.wsListener = listener
n.wsHandler = handler

return nil
}


// cmd/utils/cmd.go


stack.Start()が終わったらシグナルを待ち受ける



// cmd/utils/cmd.go
func startNode(ctx *cli.Context, stack *node.Node) {
// 省略
go func() {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(sigc)
<-sigc
go stack.Stop()
for i := 10; i > 0; i-- {
<-sigc
if i > 1 {
log.Warn("Already shutting down, interrupt more to panic.", "times", i-1)
}
}
}()

// cmd/geth/main.go

func startNode(ctx *cli.Context, stack *node.Node) {
// 省略
// utils.StartNode(stack)が終わったら
// 1番目のkeystoreのアカウントをunlockする
ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore)

passwords := utils.MakePasswordList(ctx)
unlocks := strings.Split(ctx.GlobalString(utils.UnlockedAccountFlag.Name), ",")
for i, account := range unlocks {
if trimmed := strings.TrimSpace(account); trimmed != "" {
unlockAccount(ctx, ks, trimmed, i, passwords)
}
}

// AccountManagerがeventを待ち受ける
events := make(chan accounts.WalletEvent, 16)
stack.AccountManager().Subscribe(events)

// walletの存在変更を監視する
go func() {
// Create an chain state reader for self-derivation
rpcClient, err := stack.Attach()
if err != nil {
utils.Fatalf("Failed to attach to self: %v", err)
}
stateReader := ethclient.NewClient(rpcClient)

// Open any wallets already attached
for _, wallet := range stack.AccountManager().Wallets() {
if err := wallet.Open(""); err != nil {
log.Warn("Failed to open wallet", "url", wallet.URL(), "err", err)
}
}
// Listen for wallet event till termination
for event := range events {
switch event.Kind {
case accounts.WalletArrived:
if err := event.Wallet.Open(""); err != nil {
log.Warn("New wallet appeared, failed to open", "url", event.Wallet.URL(), "err", err)
}
case accounts.WalletOpened:
status, _ := event.Wallet.Status()
log.Info("New wallet appeared", "url", event.Wallet.URL(), "status", status)

if event.Wallet.URL().Scheme == "ledger" {
event.Wallet.SelfDerive(accounts.DefaultLedgerBaseDerivationPath, stateReader)
} else {
event.Wallet.SelfDerive(accounts.DefaultBaseDerivationPath, stateReader)
}

case accounts.WalletDropped:
log.Info("Old wallet dropped", "url", event.Wallet.URL())
event.Wallet.Close()
}
}
}()
// 省略
}
// Attach creates an RPC client attached to an in-process API handler.
// DialInProcでnet.pipe connectionを作成する
func (n *Node) Attach() (*rpc.Client, error) {
n.lock.RLock()
defer n.lock.RUnlock()

if n.server == nil {
return nil, ErrNodeStopped
}
return rpc.DialInProc(n.inprocHandler), nil
}

// accounts/accounts.go
type WalletEventType int

const (
// WalletArrived is fired when a new wallet is detected either via USB or via
// a filesystem event in the keystore.
WalletArrived WalletEventType = iota
// WalletOpened is fired when a wallet is successfully opened with the purpose
// of starting any background processes such as automatic key derivation.
WalletOpened
// WalletDropped
WalletDropped
)

// WalletEvent is an event fired by an account backend when a wallet arrival or
// departure is detected.
type WalletEvent struct {
Wallet Wallet // Wallet instance arrived or departed
Kind WalletEventType // Event type that happened in the system
}

func startNode(ctx *cli.Context, stack *node.Node) {

// 省略
// マイニングを始める
// Start auxiliary services if enabled
if ctx.GlobalBool(utils.MiningEnabledFlag.Name) || ctx.GlobalBool(utils.DeveloperFlag.Name) {
// Mining only makes sense if a full Ethereum node is running
if ctx.GlobalString(utils.SyncModeFlag.Name) == "light" {
utils.Fatalf("Light clients do not support mining")
}
var ethereum *eth.Ethereum
if err := stack.Service(&ethereum); err != nil {
utils.Fatalf("Ethereum service not running: %v", err)
}
// Set the gas price to the limits from the CLI and start mining
gasprice := utils.GlobalBig(ctx, utils.MinerLegacyGasPriceFlag.Name)
if ctx.IsSet(utils.MinerGasPriceFlag.Name) {
gasprice = utils.GlobalBig(ctx, utils.MinerGasPriceFlag.Name)
}
ethereum.TxPool().SetGasPrice(gasprice)

threads := ctx.GlobalInt(utils.MinerLegacyThreadsFlag.Name)
if ctx.GlobalIsSet(utils.MinerThreadsFlag.Name) {
threads = ctx.GlobalInt(utils.MinerThreadsFlag.Name)
}
if err := ethereum.StartMining(threads); err != nil {
utils.Fatalf("Failed to start mining: %v", err)
}
}

}


stopを待つ

func geth(ctx *cli.Context) error {

// 省略
node.Wait()
return nil
}