go-ethereumを読む(3) geth起動編 1. makeFullNode

go-ethereumを読む(2) geth init編の続きです。

go-thereumにはいろいろなコマンドがありますが、gethコマンドでフルノードを起動する処理を追うのが、一番基本だと思うので、そのソースを追って行きます。

今回はノードのセットアップを追っていきます。

途中アカウントマネジャーが登場し、Ethereumの鍵とアカウント管理が出てくるためちょっと追いにくいです。

v1.8.15ベースに解説します。

cmd/geth/main.goのmain関数がgethコマンドの起動ルートになります。

urfave/cliを使用しているのでぱっと見分かりにくいですが、geth関数を実行します。

func main() {

if err := app.Run(os.Args); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}

func geth(ctx *cli.Context) error {
if args := ctx.Args(); len(args) > 0 {
return fmt.Errorf("invalid command: %q", args[0])
}
node := makeFullNode(ctx)
startNode(ctx, node)
node.Wait()
return nil
}


makeFullNode

geth関数の中で主な処理はmakeFullNodeとstartNodeになります。

順に追っていきます。


makeFullNode

// cmd/geth/config.go

func makeFullNode(ctx *cli.Context) *node.Node {
stack, cfg := makeConfigNode(ctx)

utils.RegisterEthService(stack, &cfg.Eth)

if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit)
}
// Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
shhEnabled := enableWhisper(ctx)
shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DeveloperFlag.Name)
if shhEnabled || shhAutoEnabled {
if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) {
cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name))
}
if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) {
cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name)
}
utils.RegisterShhService(stack, &cfg.Shh)
}

// Add the Ethereum Stats daemon if requested.
if cfg.Ethstats.URL != "" {
utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)
}
return stack
}


makeFullNode > makeConfigNode

configを作成

// cmd/geth/config.go

// コマンドライン引数などからgethConfigを設定していく
// nodeを作成する、移行stackという変数名がnodeを指している箇所が頻出
func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
// 省略
cfg := gethConfig{
Eth: eth.DefaultConfig,
Shh: whisper.DefaultConfig,
Node: defaultNodeConfig(),
Dashboard: dashboard.DefaultConfig,
}
// 省略
// Apply flags.
utils.SetNodeConfig(ctx, &cfg.Node)
stack, err := node.New(&cfg.Node)
// 省略
return stack, cfg
}
// gethConfigの設定
type gethConfig struct {
Eth eth.Config
Shh whisper.Config
Node node.Config
Ethstats ethstatsConfig
Dashboard dashboard.Config
}

func defaultNodeConfig() node.Config {
cfg := node.DefaultConfig
cfg.Name = clientIdentifier
cfg.Version = params.VersionWithCommit(gitCommit)
cfg.HTTPModules = append(cfg.HTTPModules, "eth", "shh")
cfg.WSModules = append(cfg.WSModules, "eth", "shh")
cfg.IPCPath = "geth.ipc"
return cfg
}

// node/defaults.go
var DefaultConfig = Config{
DataDir: DefaultDataDir(),
HTTPPort: DefaultHTTPPort,
HTTPModules: []string{"net", "web3"},
HTTPVirtualHosts: []string{"localhost"},
WSPort: DefaultWSPort,
WSModules: []string{"net", "web3"},
P2P: p2p.Config{
ListenAddr: ":30303",
MaxPeers: 25,
NAT: nat.Any(),
},
}

// p2p/nat/nat.go
// デフォルトではp2p接続にUPnPがPMPで早くみつけた方を使用する
func Any() Interface {
return startautodisc("UPnP or NAT-PMP", func() Interface {
found := make(chan Interface, 2)
go func() { found <- discoverUPnP() }()
go func() { found <- discoverPMP() }()
for i := 0; i < cap(found); i++ {
if c := <-found; c != nil {
return c
}
}
return nil
})
}


UPnPとは

機器を通信ネットワークに接続すると、複雑な設定作業などを行わなくても即座に他の機器と通信したり、その機能を利用できるようにする通信規約(プロトコル)。

ネットワークに参加する機器同士はHTTPを使って情報を交換する。さらに話す内容(やりとりする情報)はXMLによって定義される。ある


Nat-PMPとは

NAT-PMP は、NAT デバイスと LAN 側ホストとの間でアドレス/ポートマッピングリクエストのやりとりを行うためのプロトコル。RFC 6886 では次のように記載されている。

「NAT ゲートウェイは、WAN 側 IP アドレス宛てに送られてきたマッピングリクエストや、ゲートウェイの WAN 側ネットワークインターフェースから受信したマッピングリクエストを受け入れてはならない。」
また、作成されるマッピングにおける LAN 側アドレスには、受信したマッピングリクエストパケットのソースアドレスを使わなければ「ならない」とされています。


makeFullNode > makeConfigNode > SetNodeConfig

// 各種サーバの設定をセットしていく

func SetNodeConfig(ctx *cli.Context, cfg *node.Config) {
SetP2PConfig(ctx, &cfg.P2P)
setIPC(ctx, cfg)
setHTTP(ctx, cfg)
setWS(ctx, cfg)
setNodeUserIdent(ctx, cfg)
// 省略
// cfgの設定が続く
}


makeFullNode > makeConfigNode > SetNodeConfig > SetP2PConfig

// cmd/utils/flags.go

func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) {
// PrivateKeyの設定
setNodeKey(ctx, cfg)
setNAT(ctx, cfg)
setListenAddress(ctx, cfg)
// Ethereum Discovery Protocol用のbootstrapノードの設定
setBootstrapNodes(ctx, cfg)
setBootstrapNodesV5(ctx, cfg)
// 省略
// light mode (client or server) の判定
// DiscoveryV5を使うか判定
// developer mode (p2pしない) 判定
}


makeFullNode > makeConfigNode > SetNodeConfig > SetP2PConfig > setIPC


  • geth console等で使用するIPCPathを設定する


makeFullNode > makeConfigNode > node.New


  • AccountManagerを作る

// node/node.go

// Nodeの作成と思いきやアカウントの管理の為のクラスもここで作成し、Nodeに持たせている。
// マイナー報酬を払うのにアカウントが必要なのでしょうが無いがこの処理がなかなか複雑...
// New creates a new P2P node, ready for protocol registration.
func New(conf *Config) (*Node, error) {
// 省略
// AccountManagerを作成する
// amはaccountManager  
// ephemeralKeystoreはkeystoreのディレクトリパス
am, ephemeralKeystore, err := makeAccountManager(conf)
// 省略
return &Node{
accman: am,
ephemeralKeystore: ephemeralKeystore,
config: conf,
serviceFuncs: []ServiceConstructor{},
ipcEndpoint: conf.IPCEndpoint(),
httpEndpoint: conf.HTTPEndpoint(),
wsEndpoint: conf.WSEndpoint(),
eventmux: new(event.TypeMux),
log: conf.Logger,
}, nil
}


makeFullNode > makeConfigNode > node.New > makeAccountManager

// node/config.go

func makeAccountManager(conf *Config) (*accounts.Manager, string, error) {
// 設定の呼び出し
scryptN, scryptP, keydir, err := conf.AccountConfig()
var ephemeral string
// 省略
// BackendとKeystoreを作成
backends := []accounts.Backend{
keystore.NewKeyStore(keydir, scryptN, scryptP),
}
// 省略
return accounts.NewManager(backends...), ephemeral, nil
}

// accounts/accounts.go
// BackendとKeyStoreの型
type Backend interface {
Wallets() []Wallet
Subscribe(sink chan<- WalletEvent) event.Subscription
}
// accounts/keystore/keystore.go
type KeyStore struct {
storage keyStore // keyStorePassphrase(現在はこれだけっぽい) keyの保存場所と保存方法の定義をもっている
cache *accountCache // Accountのメモリ上に持っているもの
changes chan struct{} // accountCacheの変更を検知するチャンネル
unlocked map[common.Address]*unlocked // アンロックされているAccountのmap

wallets []accounts.Wallet // ImpleはKeystoreWalletなど
updateFeed event.Feed // walletの追加・削除を検知する Feedは1対多のサブスクリプションを提供する
updateScope event.SubscriptionScope // サブスクリプションの購読を一度に購読解除する機能を提供
updating bool // notification loopが実行中かどうか

mu sync.RWMutex
}

// accounts/keystore/account_cache.go
type accountCache struct {
keydir string
watcher *watcher
mu sync.Mutex
all accountsByURL
byAddr map[common.Address][]accounts.Account
throttle *time.Timer
notify chan struct{}
fileC fileCache
}

// accounts/accounts.go
// Walletを操作するためのインターフェス
type Wallet interface {
// 省略
}

// accounts/keystore/keystore_wallet.go
// WalletのImplがkeystoreWallet(マイナーのwalletなどデフォルトがこ
// 他にusbwalletが出てくるが今回は省略
type keystoreWallet struct {
account accounts.Account
keystore *KeyStore
}

// accounts/accounts.go
type Account struct {
Address common.Address `json:"address"` // keyに紐付いたEthereumのaccount address
URL URL `json:"url"` // オプション backendのリソースの場所を示すもの
}


makeFullNode > makeConfigNode > node.New > makeAccountManager > NewKeyStore



// accounts/keystore/keystore.go
func NewKeyStore(keydir string, scryptN, scryptP int) *KeyStore {
keydir, _ = filepath.Abs(keydir)
ks := &KeyStore{storage: &keyStorePassphrase{keydir, scryptN, scryptP}}
// accountCache,keystoreの生きているaccountの一覧,変更channelの登録
// keystoreWalletをksに登録
ks.init(keydir)
return ks
}


makeFullNode > makeConfigNode > node.New > makeAccountManager > NewManager


  • BackendとKeyStoreを使ってAccountManagerを作成する

  • AccountManagerはWalletのイベント監視をしている

  • Subscription・Feed・channelの関係が複雑...


    • event自体はchannelで受け取る

    • そのeventをFeedでpublishする

    • FeedはSubscriptionを複数登録している



// accounts/manager.go

func NewManager(backends ...Backend) *Manager {
// バックエンドからwalletを取得してURLでsortする
// Backendは*keystore.KeyStoreと*usbwallet.Hub
var wallets []Wallet
// 省略

// バックエンドからのwallet notificationsをサブスクライブする
updates := make(chan WalletEvent, 4*len(backends))
subs := make([]event.Subscription, len(backends))
for i, backend := range backends {
// keystore walletの追加削除イベントをサブスクライブする
subs[i] = backend.Subscribe(updates)
}
am := &Manager{
backends: make(map[reflect.Type][]Backend),
updaters: subs,
updates: updates,
wallets: wallets,
quit: make(chan chan error),
}
for _, backend := range backends {
kind := reflect.TypeOf(backend)
// keystore walletやusb wallet
am.backends[kind] = append(am.backends[kind], backend)
}
// WalletEventを監視
go am.update()
return am
}

// accounts/manager.go
// updaters・updates・feedの関係がややこしい
// updaters(Subscription)はSubscriptionの集合を管理したいため
// updatesはWalletEventをうけとるため
// feedはsubscribeしてるものにpublishするため
type Manager struct {
backends map[reflect.Type][]Backend // *keystore.KeyStoreと*usbwallet.Hub
updaters []event.Subscription // backend.Subscribe (まとめてUnsubscribeするための持ってる?)
updates chan WalletEvent // WalletEvent (WalletArrived | WalletOpened | WalletDropped)
wallets []Wallet // KeyStoreの場合、keydirにある分

feed event.Feed // walletのEventをpublish(一斉配信)する

quit chan chan error
lock sync.RWMutex
}

// accounts/manager.go
// walletの更新を監視している
func (am *Manager) update() {
defer func() {
am.lock.Lock()
for _, sub := range am.updaters {
sub.Unsubscribe()
}
am.updaters = nil
am.lock.Unlock()
}()
for {
select {
case event := <-am.updates:
am.lock.Lock()
switch event.Kind {
case WalletArrived:
am.wallets = merge(am.wallets, event.Wallet)
case WalletDropped:
am.wallets = drop(am.wallets, event.Wallet)
}
am.lock.Unlock()
am.feed.Send(event)
case errc := <-am.quit:
errc <- nil
return
}
}
}


  • BackendのSubscribeはKeyStoreの


  • WalletEventをFeedにしてSubscribeしてSubscriptionにして返す  

  • 定期的にWalletを更新する

[Feed]とSubscription](https://qiita.com/t10471/items/f6e034386171424fb109)を使って1対多の通知を実現している

// accounts/keystore/keystore.go

// keystore walletの追加削除イベントをサブスクライブする
func (ks *KeyStore) Subscribe(sink chan<- accounts.WalletEvent) event.Subscription {
// 省略
// WalletEventをFeedに登録しSubscriptionとして返ってきたのをSubscriptionScopeとしてさらに登録
sub := ks.updateScope.Track(ks.updateFeed.Subscribe(sink))
if !ks.updating {
ks.updating = true
go ks.updater()
}
return sub
}

// accounts/keystore/keystore.go
// KeyStoreを定期的に変更通知かタイマーでリフレッシュする「
func (ks *KeyStore) updater() {
for {
select {
case <-ks.changes:
case <-time.After(walletRefreshCycle):
}
// Walletの状態を調べて、walletsの状態を更新して
// WalletDropped WalletArrivedのイベントを送信する(ks.updateFeed.Send(event))
ks.refreshWallets()

ks.mu.Lock()
if ks.updateScope.Count() == 0 {
ks.updating = false
ks.mu.Unlock()
return
}
ks.mu.Unlock()
}
}

// accounts/keystore/keystore.go
func (ks *KeyStore) refreshWallets() {
// 省略
accs := ks.cache.accounts()
wallets := make([]accounts.Wallet, 0, len(accs))
events := []accounts.WalletEvent{}

for _, account := range accs {
// 古いwalletを消す(WalletDropped EVENTの追加)
// 知らないwalletなら追加する(WalletArrived EVENTの追加)
// 一致したら、ks.walletsをwalletsに追加
// 省略
}
// 残りをWalletDropped EVENTの追加にする 
// 省略
// EVENTの発火
for _, event := range events {
ks.updateFeed.Send(event)
}
}


makeFullNode > makeConfigNode > SetEthConfig

// cmd/utils/flags.go

func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
// 省略
// 1つ目はかならずKeyStore(他はUSB)
ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore)
// マイニングの報酬を受け取るアドレスを設定する
setEtherbase(ctx, ks, cfg)
// GPO は Gas Price Oracle (Gas Priceを決める) gasprice.Configの設定をする
setGPO(ctx, &cfg.GPO)
setTxPool(ctx, &cfg.TxPool)
setEthash(ctx, cfg)
// flagによる設定のcfgに登録する
// devフラグ指定時の設定はここにある
}

// eth/gasprice/gasprice.go
// Configという名前の構造体が多い...
type Config struct {
Blocks int
Percentile int
Default *big.Int `toml:",omitempty"`
}

// core/tx_pool.go
type TxPoolConfig struct {
// ローカルのトランザクションはGasPriceがTxPool.gasPriceを無視する (locals *accountSetにトランザクションを入れておく)
NoLocals bool // ローカルでトランザクションを発行するか?(localで発行したものは自ブロック作成時に優先させたいため)
Journal string // ローカルトランザクションを保存しておくパス
Rejournal time.Duration // pool.journal.rotateを呼ぶ間隔

PriceLimit uint64 // poolに入れるgasPriceの最小値
PriceBump uint64 // 新しいトランザクションを既にあるトランザクションよりも優先順位をあげるために必要なgasPriceの割合

AccountSlots uint64 // アカウントごとの保存できるトランザクション最少数、これを越したpendingは削除される 
GlobalSlots uint64 // 全てのアカウントの保存できるトランザクションの最大数
AccountQueue uint64 // アカウントごとの実行していないトランザクションのアカウントごとの最大サイズ、超えると削除さいれる
GlobalQueue uint64 //  全てのアカウントの実行していないトランザクションの保存サイズaccounts

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
}

// core/tx_pool.go
var DefaultTxPoolConfig = TxPoolConfig{
Journal: "transactions.rlp",
Rejournal: time.Hour,

PriceLimit: 1,
PriceBump: 10,

AccountSlots: 16,
GlobalSlots: 4096,
AccountQueue: 64,
GlobalQueue: 1024,

Lifetime: 3 * time.Hour,
}

// consensus/ethash/ethash.go
// ethashの設定(マイニングのアルゴリズム)
type Config struct {
CacheDir string
CachesInMem int
CachesOnDisk int
DatasetDir string
DatasetsInMem int
DatasetsOnDisk int
PowMode Mode
}

// consensus/ethash/ethash.go
// Ethash Ethereum用のPowのアルゴリズム
type Ethash struct {
config Config

caches *lru // マイニングと検証に使うデータ
datasets *lru // マイニングに使うデータ chachesを元に生成される

// Mining related fields
rand *rand.Rand // noneを生成するための乱数オブジェクト
threads int // マイニングするのに使うスレッド数
update chan struct{} // マイニングのパラメータの更新を受け取るチャンネル
hashrate metrics.Meter // hashrateを監視するオブジェクト

// Remote sealer related fields
workCh chan *sealTask // Notification channel to push new work and relative result channel to remote sealer
fetchWorkCh chan *sealWork // Channel used for remote sealer to fetch mining work
submitWorkCh chan *mineResult // Channel used for remote sealer to submit their mining result
fetchRateCh chan chan uint64 // Channel used to gather submitted hash rate for local or remote sealer.
submitRateCh chan *hashrate // Channel used for remote sealer to submit their mining hashrate

// The fields below are hooks for testing
shared *Ethash // Shared PoW verifier to avoid cache regeneration
fakeFail uint64 // Block number which fails PoW check even in fake mode
fakeDelay time.Duration // Time delay to sleep for before returning from verify

lock sync.Mutex // キャッシュとマイニングの為のフィールドをスレッドセーフにするためのロック

closeOnce sync.Once // Ensures exit channel will not be closed twice.
exitCh chan chan error // Notification channel to exiting backend threads
}

// core/tx_pool.go
type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
signer types.Signer
mu sync.RWMutex

currentState *state.StateDB // ブロックチェーンのヘッダーの現在の状態
pendingState *state.ManagedState // 仮のnonceのペンディング状態
currentMaxGas uint64 // 現在のトランザクションごとのgas limit

locals *accountSet // プールから削除されるルールの適用外のローカルトランザクションの集合
journal *txJournal // ディスクに保存するローカルトランザクションのジャーナル

pending map[common.Address]*txList // 現在処理可能なすべてのトランザクション
queue map[common.Address]*txList // キューに積まれてるが処理不能なトランザクション
beats map[common.Address]time.Time // 各既知のアカウントからの最後の通知
all *txLookup // 検索可能な全てのトランザクション
priced *txPricedList // 金額順に並んだトランザクション

wg sync.WaitGroup // シャットダウンの為のsync

homestead bool
}


makeFullNode > RegisterEthService

eth Serviceを設定する

// cmd/utils/flags.go

func RegisterEthService(stack *node.Node, cfg *eth.Config) {
var err error
if cfg.SyncMode == downloader.LightSync {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return les.New(ctx, cfg)
})
} else {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
fullNode, err := eth.New(ctx, cfg)
if fullNode != nil && cfg.LightServ > 0 {
ls, _ := les.NewLesServer(fullNode, cfg)
fullNode.AddLesServer(ls)
}
return fullNode, err
})
}
if err != nil {
Fatalf("Failed to register the Ethereum service: %v", err)
}
}

// node/node.go
// 後で起動するサービスの登録のメソッド
func (n *Node) Register(constructor ServiceConstructor) error {
n.lock.Lock()
defer n.lock.Unlock()

if n.server != nil {
return ErrNodeRunning
}
// serviceFuncs: []ServiceConstructor{}
n.serviceFuncs = append(n.serviceFuncs, constructor)
return nil
}

// node/service.go
type ServiceConstructor func(ctx *ServiceContext) (Service, error)

Serviceという抽象化した形で機能を登録します