Help us understand the problem. What is going on with this article?

go-ethereumを読む(3) geth起動編 1. makeFullNode

More than 1 year has passed since last update.

go-ethereumを読む(2) geth init編の続きです。
go-thereumにはいろいろなコマンドがありますが、gethコマンドでフルノードを起動する処理を追うのが、一番基本だと思うので、そのソースを追って行きます。
今回はノードのセットアップを追っていきます。
途中アカウントマネジャーが登場し、Ethereumの鍵とアカウント管理が出てくるためちょっと追いにくいです。
v1.8.15ベースに解説します。

cmd/geth/main.goのmain関数がgethコマンドの起動ルートになります。
urfave/cliを使用しているのでぱっと見分かりにくいですが、geth関数を実行します。

func main() {
    if err := app.Run(os.Args); err != nil {
        fmt.Fprintln(os.Stderr, err)
        os.Exit(1)
    }
}

func geth(ctx *cli.Context) error {
    if args := ctx.Args(); len(args) > 0 {
        return fmt.Errorf("invalid command: %q", args[0])
    }
    node := makeFullNode(ctx)
    startNode(ctx, node)
    node.Wait()
    return nil
}

makeFullNode

geth関数の中で主な処理はmakeFullNodeとstartNodeになります。
順に追っていきます。

makeFullNode

// cmd/geth/config.go
func makeFullNode(ctx *cli.Context) *node.Node {
    stack, cfg := makeConfigNode(ctx)

    utils.RegisterEthService(stack, &cfg.Eth)

    if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
        utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit)
    }
    // Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
    shhEnabled := enableWhisper(ctx)
    shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DeveloperFlag.Name)
    if shhEnabled || shhAutoEnabled {
        if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) {
            cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name))
        }
        if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) {
            cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name)
        }
        utils.RegisterShhService(stack, &cfg.Shh)
    }

    // Add the Ethereum Stats daemon if requested.
    if cfg.Ethstats.URL != "" {
        utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)
    }
    return stack
}

makeFullNode > makeConfigNode

configを作成

// cmd/geth/config.go
// コマンドライン引数などからgethConfigを設定していく
// nodeを作成する、移行stackという変数名がnodeを指している箇所が頻出
func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
    // 省略
    cfg := gethConfig{
        Eth:       eth.DefaultConfig,
        Shh:       whisper.DefaultConfig,
        Node:      defaultNodeConfig(),
        Dashboard: dashboard.DefaultConfig,
    }
    // 省略
    // Apply flags.
    utils.SetNodeConfig(ctx, &cfg.Node)
    stack, err := node.New(&cfg.Node)
    // 省略
    return stack, cfg
}
// gethConfigの設定
type gethConfig struct {
    Eth       eth.Config
    Shh       whisper.Config
    Node      node.Config
    Ethstats  ethstatsConfig
    Dashboard dashboard.Config
}

func defaultNodeConfig() node.Config {
    cfg := node.DefaultConfig
    cfg.Name = clientIdentifier
    cfg.Version = params.VersionWithCommit(gitCommit)
    cfg.HTTPModules = append(cfg.HTTPModules, "eth", "shh")
    cfg.WSModules = append(cfg.WSModules, "eth", "shh")
    cfg.IPCPath = "geth.ipc"
    return cfg
}

// node/defaults.go
var DefaultConfig = Config{
    DataDir:          DefaultDataDir(),
    HTTPPort:         DefaultHTTPPort,
    HTTPModules:      []string{"net", "web3"},
    HTTPVirtualHosts: []string{"localhost"},
    WSPort:           DefaultWSPort,
    WSModules:        []string{"net", "web3"},
    P2P: p2p.Config{
        ListenAddr: ":30303",
        MaxPeers:   25,
        NAT:        nat.Any(),
    },
}

// p2p/nat/nat.go
// デフォルトではp2p接続にUPnPがPMPで早くみつけた方を使用する
func Any() Interface {
    return startautodisc("UPnP or NAT-PMP", func() Interface {
        found := make(chan Interface, 2)
        go func() { found <- discoverUPnP() }()
        go func() { found <- discoverPMP() }()
        for i := 0; i < cap(found); i++ {
            if c := <-found; c != nil {
                return c
            }
        }
        return nil
    })
}

UPnPとは

機器を通信ネットワークに接続すると、複雑な設定作業などを行わなくても即座に他の機器と通信したり、その機能を利用できるようにする通信規約(プロトコル)。
ネットワークに参加する機器同士はHTTPを使って情報を交換する。さらに話す内容(やりとりする情報)はXMLによって定義される。ある

Nat-PMPとは

NAT-PMP は、NAT デバイスと LAN 側ホストとの間でアドレス/ポートマッピングリクエストのやりとりを行うためのプロトコル。RFC 6886 では次のように記載されている。
「NAT ゲートウェイは、WAN 側 IP アドレス宛てに送られてきたマッピングリクエストや、ゲートウェイの WAN 側ネットワークインターフェースから受信したマッピングリクエストを受け入れてはならない。」
また、作成されるマッピングにおける LAN 側アドレスには、受信したマッピングリクエストパケットのソースアドレスを使わなければ「ならない」とされています。

makeFullNode > makeConfigNode > SetNodeConfig

// 各種サーバの設定をセットしていく
func SetNodeConfig(ctx *cli.Context, cfg *node.Config) {
    SetP2PConfig(ctx, &cfg.P2P)
    setIPC(ctx, cfg)
    setHTTP(ctx, cfg)
    setWS(ctx, cfg)
    setNodeUserIdent(ctx, cfg)
    // 省略
    // cfgの設定が続く
}

makeFullNode > makeConfigNode > SetNodeConfig > SetP2PConfig

// cmd/utils/flags.go
func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) {
    // PrivateKeyの設定
    setNodeKey(ctx, cfg)
    setNAT(ctx, cfg)
    setListenAddress(ctx, cfg)
    // Ethereum Discovery Protocol用のbootstrapノードの設定
    setBootstrapNodes(ctx, cfg)
    setBootstrapNodesV5(ctx, cfg)
    // 省略
    // light mode (client or server) の判定
    // DiscoveryV5を使うか判定
    // developer mode (p2pしない) 判定
}

makeFullNode > makeConfigNode > SetNodeConfig > SetP2PConfig > setIPC

  • geth console等で使用するIPCPathを設定する

makeFullNode > makeConfigNode > node.New

  • AccountManagerを作る
// node/node.go
// Nodeの作成と思いきやアカウントの管理の為のクラスもここで作成し、Nodeに持たせている。
// マイナー報酬を払うのにアカウントが必要なのでしょうが無いがこの処理がなかなか複雑...
// New creates a new P2P node, ready for protocol registration.
func New(conf *Config) (*Node, error) {
    // 省略
    // AccountManagerを作成する
    // amはaccountManager  
    // ephemeralKeystoreはkeystoreのディレクトリパス
    am, ephemeralKeystore, err := makeAccountManager(conf)
    // 省略
    return &Node{
        accman:            am,
        ephemeralKeystore: ephemeralKeystore,
        config:            conf,
        serviceFuncs:      []ServiceConstructor{},
        ipcEndpoint:       conf.IPCEndpoint(),
        httpEndpoint:      conf.HTTPEndpoint(),
        wsEndpoint:        conf.WSEndpoint(),
        eventmux:          new(event.TypeMux),
        log:               conf.Logger,
    }, nil
}

makeFullNode > makeConfigNode > node.New > makeAccountManager

// node/config.go
func makeAccountManager(conf *Config) (*accounts.Manager, string, error) {
    // 設定の呼び出し
    scryptN, scryptP, keydir, err := conf.AccountConfig()
    var ephemeral string
    // 省略
    // BackendとKeystoreを作成
    backends := []accounts.Backend{
        keystore.NewKeyStore(keydir, scryptN, scryptP),
    }
    // 省略
    return accounts.NewManager(backends...), ephemeral, nil
}

// accounts/accounts.go
// BackendとKeyStoreの型
type Backend interface {
    Wallets() []Wallet
    Subscribe(sink chan<- WalletEvent) event.Subscription
}
// accounts/keystore/keystore.go
type KeyStore struct {
    storage  keyStore                     // keyStorePassphrase(現在はこれだけっぽい) keyの保存場所と保存方法の定義をもっている
    cache    *accountCache                // Accountのメモリ上に持っているもの
    changes  chan struct{}                // accountCacheの変更を検知するチャンネル
    unlocked map[common.Address]*unlocked // アンロックされているAccountのmap

    wallets     []accounts.Wallet       // ImpleはKeystoreWalletなど
    updateFeed  event.Feed              // walletの追加・削除を検知する Feedは1対多のサブスクリプションを提供する
    updateScope event.SubscriptionScope // サブスクリプションの購読を一度に購読解除する機能を提供
    updating    bool                    // notification loopが実行中かどうか

    mu sync.RWMutex
}

// accounts/keystore/account_cache.go
type accountCache struct {
    keydir   string
    watcher  *watcher
    mu       sync.Mutex
    all      accountsByURL
    byAddr   map[common.Address][]accounts.Account
    throttle *time.Timer
    notify   chan struct{}
    fileC    fileCache
}

// accounts/accounts.go
// Walletを操作するためのインターフェス
type Wallet interface {
    // 省略
}

// accounts/keystore/keystore_wallet.go
// WalletのImplがkeystoreWallet(マイナーのwalletなどデフォルトがこ
// 他にusbwalletが出てくるが今回は省略
type keystoreWallet struct {
    account  accounts.Account
    keystore *KeyStore
}

// accounts/accounts.go
type Account struct {
    Address common.Address `json:"address"` // keyに紐付いたEthereumのaccount address
    URL     URL            `json:"url"`     // オプション backendのリソースの場所を示すもの
}

makeFullNode > makeConfigNode > node.New > makeAccountManager > NewKeyStore

// accounts/keystore/keystore.go
func NewKeyStore(keydir string, scryptN, scryptP int) *KeyStore {
    keydir, _ = filepath.Abs(keydir)
    ks := &KeyStore{storage: &keyStorePassphrase{keydir, scryptN, scryptP}}
    // accountCache,keystoreの生きているaccountの一覧,変更channelの登録
    // keystoreWalletをksに登録
    ks.init(keydir)
    return ks
}

makeFullNode > makeConfigNode > node.New > makeAccountManager > NewManager

  • BackendとKeyStoreを使ってAccountManagerを作成する
  • AccountManagerはWalletのイベント監視をしている
  • Subscription・Feed・channelの関係が複雑...
    • event自体はchannelで受け取る
    • そのeventをFeedでpublishする
    • FeedはSubscriptionを複数登録している
// accounts/manager.go
func NewManager(backends ...Backend) *Manager {
    // バックエンドからwalletを取得してURLでsortする
    // Backendは*keystore.KeyStoreと*usbwallet.Hub
    var wallets []Wallet
    // 省略

    // バックエンドからのwallet notificationsをサブスクライブする
    updates := make(chan WalletEvent, 4*len(backends))
    subs := make([]event.Subscription, len(backends))
    for i, backend := range backends {
        // keystore walletの追加削除イベントをサブスクライブする
        subs[i] = backend.Subscribe(updates)
    }
    am := &Manager{
        backends: make(map[reflect.Type][]Backend),
        updaters: subs,
        updates:  updates,
        wallets:  wallets,
        quit:     make(chan chan error),
    }
    for _, backend := range backends {
        kind := reflect.TypeOf(backend)
        // keystore walletやusb wallet
        am.backends[kind] = append(am.backends[kind], backend)
    }
    // WalletEventを監視
    go am.update()
    return am
}

// accounts/manager.go
// updaters・updates・feedの関係がややこしい
// updaters(Subscription)はSubscriptionの集合を管理したいため
// updatesはWalletEventをうけとるため
// feedはsubscribeしてるものにpublishするため
type Manager struct {
    backends map[reflect.Type][]Backend // *keystore.KeyStoreと*usbwallet.Hub
    updaters []event.Subscription       // backend.Subscribe (まとめてUnsubscribeするための持ってる?)
    updates  chan WalletEvent           // WalletEvent (WalletArrived | WalletOpened | WalletDropped)
    wallets  []Wallet                   // KeyStoreの場合、keydirにある分

    feed event.Feed // walletのEventをpublish(一斉配信)する

    quit chan chan error
    lock sync.RWMutex
}

// accounts/manager.go
// walletの更新を監視している
func (am *Manager) update() {
    defer func() {
        am.lock.Lock()
        for _, sub := range am.updaters {
            sub.Unsubscribe()
        }
        am.updaters = nil
        am.lock.Unlock()
    }()
    for {
        select {
        case event := <-am.updates:
            am.lock.Lock()
            switch event.Kind {
            case WalletArrived:
                am.wallets = merge(am.wallets, event.Wallet)
            case WalletDropped:
                am.wallets = drop(am.wallets, event.Wallet)
            }
            am.lock.Unlock()
            am.feed.Send(event)
        case errc := <-am.quit:
            errc <- nil
            return
        }
    }
}
  • BackendのSubscribeはKeyStoreの
  • WalletEventをFeedにしてSubscribeしてSubscriptionにして返す  
  • 定期的にWalletを更新する

[Feed]とSubscription](https://qiita.com/t10471/items/f6e034386171424fb109)を使って1対多の通知を実現している

// accounts/keystore/keystore.go
// keystore walletの追加削除イベントをサブスクライブする
func (ks *KeyStore) Subscribe(sink chan<- accounts.WalletEvent) event.Subscription {
    // 省略
    // WalletEventをFeedに登録しSubscriptionとして返ってきたのをSubscriptionScopeとしてさらに登録
    sub := ks.updateScope.Track(ks.updateFeed.Subscribe(sink))
    if !ks.updating {
        ks.updating = true
        go ks.updater()
    }
    return sub
}

// accounts/keystore/keystore.go
// KeyStoreを定期的に変更通知かタイマーでリフレッシュする「
func (ks *KeyStore) updater() {
    for {
        select {
        case <-ks.changes:
        case <-time.After(walletRefreshCycle):
        }
        // Walletの状態を調べて、walletsの状態を更新して
        // WalletDropped WalletArrivedのイベントを送信する(ks.updateFeed.Send(event))
        ks.refreshWallets()

        ks.mu.Lock()
        if ks.updateScope.Count() == 0 {
            ks.updating = false
            ks.mu.Unlock()
            return
        }
        ks.mu.Unlock()
    }
}

// accounts/keystore/keystore.go
func (ks *KeyStore) refreshWallets() {
    // 省略
    accs := ks.cache.accounts()
    wallets := make([]accounts.Wallet, 0, len(accs))
    events := []accounts.WalletEvent{}

    for _, account := range accs {
        // 古いwalletを消す(WalletDropped EVENTの追加)
        // 知らないwalletなら追加する(WalletArrived EVENTの追加)
        // 一致したら、ks.walletsをwalletsに追加
        // 省略
    }
    // 残りをWalletDropped EVENTの追加にする 
    // 省略
    // EVENTの発火
    for _, event := range events {
        ks.updateFeed.Send(event)
    }
}

makeFullNode > makeConfigNode > SetEthConfig

// cmd/utils/flags.go
func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
    // 省略
    // 1つ目はかならずKeyStore(他はUSB)
    ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore)
    // マイニングの報酬を受け取るアドレスを設定する
    setEtherbase(ctx, ks, cfg)
    // GPO は Gas Price Oracle (Gas Priceを決める) gasprice.Configの設定をする
    setGPO(ctx, &cfg.GPO)
    setTxPool(ctx, &cfg.TxPool)
    setEthash(ctx, cfg)
    // flagによる設定のcfgに登録する
    // devフラグ指定時の設定はここにある
}

// eth/gasprice/gasprice.go
// Configという名前の構造体が多い...
type Config struct {
        Blocks     int
        Percentile int
        Default    *big.Int `toml:",omitempty"`
}

// core/tx_pool.go
type TxPoolConfig struct {
    // ローカルのトランザクションはGasPriceがTxPool.gasPriceを無視する (locals  *accountSetにトランザクションを入れておく)
    NoLocals  bool          // ローカルでトランザクションを発行するか?(localで発行したものは自ブロック作成時に優先させたいため)
    Journal   string        // ローカルトランザクションを保存しておくパス
    Rejournal time.Duration // pool.journal.rotateを呼ぶ間隔

    PriceLimit uint64 // poolに入れるgasPriceの最小値
    PriceBump  uint64 // 新しいトランザクションを既にあるトランザクションよりも優先順位をあげるために必要なgasPriceの割合

    AccountSlots uint64 // アカウントごとの保存できるトランザクション最少数、これを越したpendingは削除される 
    GlobalSlots  uint64 // 全てのアカウントの保存できるトランザクションの最大数
    AccountQueue uint64 // アカウントごとの実行していないトランザクションのアカウントごとの最大サイズ、超えると削除さいれる
    GlobalQueue  uint64 //  全てのアカウントの実行していないトランザクションの保存サイズaccounts

    Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
}

// core/tx_pool.go
var DefaultTxPoolConfig = TxPoolConfig{
    Journal:   "transactions.rlp",
    Rejournal: time.Hour,

    PriceLimit: 1,
    PriceBump:  10,

    AccountSlots: 16,
    GlobalSlots:  4096,
    AccountQueue: 64,
    GlobalQueue:  1024,

    Lifetime: 3 * time.Hour,
}

// consensus/ethash/ethash.go
// ethashの設定(マイニングのアルゴリズム)
type Config struct {
    CacheDir       string
    CachesInMem    int
    CachesOnDisk   int
    DatasetDir     string
    DatasetsInMem  int
    DatasetsOnDisk int
    PowMode        Mode
}

// consensus/ethash/ethash.go
// Ethash Ethereum用のPowのアルゴリズム
type Ethash struct {
    config Config

    caches   *lru // マイニングと検証に使うデータ
    datasets *lru // マイニングに使うデータ chachesを元に生成される

    // Mining related fields
    rand     *rand.Rand    // noneを生成するための乱数オブジェクト
    threads  int           // マイニングするのに使うスレッド数
    update   chan struct{} // マイニングのパラメータの更新を受け取るチャンネル
    hashrate metrics.Meter // hashrateを監視するオブジェクト

    // Remote sealer related fields
    workCh       chan *sealTask   // Notification channel to push new work and relative result channel to remote sealer
    fetchWorkCh  chan *sealWork   // Channel used for remote sealer to fetch mining work
    submitWorkCh chan *mineResult // Channel used for remote sealer to submit their mining result
    fetchRateCh  chan chan uint64 // Channel used to gather submitted hash rate for local or remote sealer.
    submitRateCh chan *hashrate   // Channel used for remote sealer to submit their mining hashrate

    // The fields below are hooks for testing
    shared    *Ethash       // Shared PoW verifier to avoid cache regeneration
    fakeFail  uint64        // Block number which fails PoW check even in fake mode
    fakeDelay time.Duration // Time delay to sleep for before returning from verify

    lock sync.Mutex // キャッシュとマイニングの為のフィールドをスレッドセーフにするためのロック

    closeOnce sync.Once       // Ensures exit channel will not be closed twice.
    exitCh    chan chan error // Notification channel to exiting backend threads
}

// core/tx_pool.go
type TxPool struct {
    config       TxPoolConfig
    chainconfig  *params.ChainConfig
    chain        blockChain
    gasPrice     *big.Int
    txFeed       event.Feed
    scope        event.SubscriptionScope
    chainHeadCh  chan ChainHeadEvent
    chainHeadSub event.Subscription
    signer       types.Signer
    mu           sync.RWMutex

    currentState  *state.StateDB      // ブロックチェーンのヘッダーの現在の状態
    pendingState  *state.ManagedState // 仮のnonceのペンディング状態
    currentMaxGas uint64              // 現在のトランザクションごとのgas limit

    locals  *accountSet // プールから削除されるルールの適用外のローカルトランザクションの集合
    journal *txJournal  // ディスクに保存するローカルトランザクションのジャーナル

    pending map[common.Address]*txList   // 現在処理可能なすべてのトランザクション
    queue   map[common.Address]*txList   // キューに積まれてるが処理不能なトランザクション
    beats   map[common.Address]time.Time // 各既知のアカウントからの最後の通知
    all     *txLookup                    // 検索可能な全てのトランザクション
    priced  *txPricedList                // 金額順に並んだトランザクション

    wg sync.WaitGroup // シャットダウンの為のsync

    homestead bool
}

makeFullNode > RegisterEthService

eth Serviceを設定する

// cmd/utils/flags.go
func RegisterEthService(stack *node.Node, cfg *eth.Config) {
    var err error
    if cfg.SyncMode == downloader.LightSync {
        err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
            return les.New(ctx, cfg)
        })
    } else {
        err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
            fullNode, err := eth.New(ctx, cfg)
            if fullNode != nil && cfg.LightServ > 0 {
                ls, _ := les.NewLesServer(fullNode, cfg)
                fullNode.AddLesServer(ls)
            }
            return fullNode, err
        })
    }
    if err != nil {
        Fatalf("Failed to register the Ethereum service: %v", err)
    }
}

// node/node.go
// 後で起動するサービスの登録のメソッド
func (n *Node) Register(constructor ServiceConstructor) error {
    n.lock.Lock()
    defer n.lock.Unlock()

    if n.server != nil {
        return ErrNodeRunning
    }
    // serviceFuncs: []ServiceConstructor{}
    n.serviceFuncs = append(n.serviceFuncs, constructor)
    return nil
}

// node/service.go
type ServiceConstructor func(ctx *ServiceContext) (Service, error)

Serviceという抽象化した形で機能を登録します

t10471
mercari
フリマアプリ「メルカリ」を、グローバルで開発しています。
https://tech.mercari.com/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away