3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

go-ethereumを読む(3) geth起動編 1. makeFullNode

Last updated at Posted at 2018-09-11

go-ethereumを読む(2) geth init編の続きです。
go-thereumにはいろいろなコマンドがありますが、gethコマンドでフルノードを起動する処理を追うのが、一番基本だと思うので、そのソースを追って行きます。
今回はノードのセットアップを追っていきます。
途中アカウントマネジャーが登場し、Ethereumの鍵とアカウント管理が出てくるためちょっと追いにくいです。
v1.8.15ベースに解説します。

cmd/geth/main.goのmain関数がgethコマンドの起動ルートになります。
urfave/cliを使用しているのでぱっと見分かりにくいですが、geth関数を実行します。

func main() {
	if err := app.Run(os.Args); err != nil {
		fmt.Fprintln(os.Stderr, err)
		os.Exit(1)
	}
}

func geth(ctx *cli.Context) error {
	if args := ctx.Args(); len(args) > 0 {
		return fmt.Errorf("invalid command: %q", args[0])
	}
	node := makeFullNode(ctx)
	startNode(ctx, node)
	node.Wait()
	return nil
}

makeFullNode

geth関数の中で主な処理はmakeFullNodeとstartNodeになります。
順に追っていきます。

makeFullNode

// cmd/geth/config.go
func makeFullNode(ctx *cli.Context) *node.Node {
	stack, cfg := makeConfigNode(ctx)

	utils.RegisterEthService(stack, &cfg.Eth)

	if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
		utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit)
	}
	// Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
	shhEnabled := enableWhisper(ctx)
	shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DeveloperFlag.Name)
	if shhEnabled || shhAutoEnabled {
		if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) {
			cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name))
		}
		if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) {
			cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name)
		}
		utils.RegisterShhService(stack, &cfg.Shh)
	}

	// Add the Ethereum Stats daemon if requested.
	if cfg.Ethstats.URL != "" {
		utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)
	}
	return stack
}

makeFullNode > makeConfigNode

configを作成

// cmd/geth/config.go
// コマンドライン引数などからgethConfigを設定していく
// nodeを作成する、移行stackという変数名がnodeを指している箇所が頻出
func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
	// 省略
	cfg := gethConfig{
		Eth:       eth.DefaultConfig,
		Shh:       whisper.DefaultConfig,
		Node:      defaultNodeConfig(),
		Dashboard: dashboard.DefaultConfig,
	}
	// 省略
	// Apply flags.
	utils.SetNodeConfig(ctx, &cfg.Node)
	stack, err := node.New(&cfg.Node)
	// 省略
	return stack, cfg
}
// gethConfigの設定
type gethConfig struct {
	Eth       eth.Config
	Shh       whisper.Config
	Node      node.Config
	Ethstats  ethstatsConfig
	Dashboard dashboard.Config
}

func defaultNodeConfig() node.Config {
	cfg := node.DefaultConfig
	cfg.Name = clientIdentifier
	cfg.Version = params.VersionWithCommit(gitCommit)
	cfg.HTTPModules = append(cfg.HTTPModules, "eth", "shh")
	cfg.WSModules = append(cfg.WSModules, "eth", "shh")
	cfg.IPCPath = "geth.ipc"
	return cfg
}

// node/defaults.go
var DefaultConfig = Config{
	DataDir:          DefaultDataDir(),
	HTTPPort:         DefaultHTTPPort,
	HTTPModules:      []string{"net", "web3"},
	HTTPVirtualHosts: []string{"localhost"},
	WSPort:           DefaultWSPort,
	WSModules:        []string{"net", "web3"},
	P2P: p2p.Config{
		ListenAddr: ":30303",
		MaxPeers:   25,
		NAT:        nat.Any(),
	},
}

// p2p/nat/nat.go
// デフォルトではp2p接続にUPnPがPMPで早くみつけた方を使用する
func Any() Interface {
	return startautodisc("UPnP or NAT-PMP", func() Interface {
		found := make(chan Interface, 2)
		go func() { found <- discoverUPnP() }()
		go func() { found <- discoverPMP() }()
		for i := 0; i < cap(found); i++ {
			if c := <-found; c != nil {
				return c
			}
		}
		return nil
	})
}

UPnPとは

機器を通信ネットワークに接続すると、複雑な設定作業などを行わなくても即座に他の機器と通信したり、その機能を利用できるようにする通信規約(プロトコル)。
ネットワークに参加する機器同士はHTTPを使って情報を交換する。さらに話す内容(やりとりする情報)はXMLによって定義される。ある

Nat-PMPとは

NAT-PMP は、NAT デバイスと LAN 側ホストとの間でアドレス/ポートマッピングリクエストのやりとりを行うためのプロトコル。RFC 6886 では次のように記載されている。
「NAT ゲートウェイは、WAN 側 IP アドレス宛てに送られてきたマッピングリクエストや、ゲートウェイの WAN 側ネットワークインターフェースから受信したマッピングリクエストを受け入れてはならない。」
また、作成されるマッピングにおける LAN 側アドレスには、受信したマッピングリクエストパケットのソースアドレスを使わなければ「ならない」とされています。

makeFullNode > makeConfigNode > SetNodeConfig

// 各種サーバの設定をセットしていく
func SetNodeConfig(ctx *cli.Context, cfg *node.Config) {
	SetP2PConfig(ctx, &cfg.P2P)
	setIPC(ctx, cfg)
	setHTTP(ctx, cfg)
	setWS(ctx, cfg)
	setNodeUserIdent(ctx, cfg)
	// 省略
	// cfgの設定が続く
}

makeFullNode > makeConfigNode > SetNodeConfig > SetP2PConfig

// cmd/utils/flags.go
func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) {
	// PrivateKeyの設定
	setNodeKey(ctx, cfg)
	setNAT(ctx, cfg)
	setListenAddress(ctx, cfg)
	// Ethereum Discovery Protocol用のbootstrapノードの設定
	setBootstrapNodes(ctx, cfg)
	setBootstrapNodesV5(ctx, cfg)
	// 省略
	// light mode (client or server) の判定
	// DiscoveryV5を使うか判定
	// developer mode (p2pしない) 判定
}

makeFullNode > makeConfigNode > SetNodeConfig > SetP2PConfig > setIPC

  • geth console等で使用するIPCPathを設定する

makeFullNode > makeConfigNode > node.New

  • AccountManagerを作る
// node/node.go
// Nodeの作成と思いきやアカウントの管理の為のクラスもここで作成し、Nodeに持たせている。
// マイナー報酬を払うのにアカウントが必要なのでしょうが無いがこの処理がなかなか複雑...
// New creates a new P2P node, ready for protocol registration.
func New(conf *Config) (*Node, error) {
	// 省略
	// AccountManagerを作成する
	// amはaccountManager  
	// ephemeralKeystoreはkeystoreのディレクトリパス
	am, ephemeralKeystore, err := makeAccountManager(conf)
	// 省略
	return &Node{
		accman:            am,
		ephemeralKeystore: ephemeralKeystore,
		config:            conf,
		serviceFuncs:      []ServiceConstructor{},
		ipcEndpoint:       conf.IPCEndpoint(),
		httpEndpoint:      conf.HTTPEndpoint(),
		wsEndpoint:        conf.WSEndpoint(),
		eventmux:          new(event.TypeMux),
		log:               conf.Logger,
	}, nil
}

makeFullNode > makeConfigNode > node.New > makeAccountManager

// node/config.go
func makeAccountManager(conf *Config) (*accounts.Manager, string, error) {
	// 設定の呼び出し
	scryptN, scryptP, keydir, err := conf.AccountConfig()
	var ephemeral string
	// 省略
	// BackendとKeystoreを作成
	backends := []accounts.Backend{
		keystore.NewKeyStore(keydir, scryptN, scryptP),
	}
	// 省略
	return accounts.NewManager(backends...), ephemeral, nil
}

// accounts/accounts.go
// BackendとKeyStoreの型
type Backend interface {
	Wallets() []Wallet
	Subscribe(sink chan<- WalletEvent) event.Subscription
}
// accounts/keystore/keystore.go
type KeyStore struct {
	storage  keyStore                     // keyStorePassphrase(現在はこれだけっぽい) keyの保存場所と保存方法の定義をもっている
	cache    *accountCache                // Accountのメモリ上に持っているもの
	changes  chan struct{}                // accountCacheの変更を検知するチャンネル
	unlocked map[common.Address]*unlocked // アンロックされているAccountのmap

	wallets     []accounts.Wallet       // ImpleはKeystoreWalletなど
	updateFeed  event.Feed              // walletの追加・削除を検知する Feedは1対多のサブスクリプションを提供する
	updateScope event.SubscriptionScope // サブスクリプションの購読を一度に購読解除する機能を提供
	updating    bool                    // notification loopが実行中かどうか

	mu sync.RWMutex
}

// accounts/keystore/account_cache.go
type accountCache struct {
	keydir   string
	watcher  *watcher
	mu       sync.Mutex
	all      accountsByURL
	byAddr   map[common.Address][]accounts.Account
	throttle *time.Timer
	notify   chan struct{}
	fileC    fileCache
}

// accounts/accounts.go
// Walletを操作するためのインターフェス
type Wallet interface {
	// 省略
}

// accounts/keystore/keystore_wallet.go
// WalletのImplがkeystoreWallet(マイナーのwalletなどデフォルトがこ
// 他にusbwalletが出てくるが今回は省略
type keystoreWallet struct {
	account  accounts.Account
	keystore *KeyStore
}

// accounts/accounts.go
type Account struct {
	Address common.Address `json:"address"` // keyに紐付いたEthereumのaccount address
	URL     URL            `json:"url"`     // オプション backendのリソースの場所を示すもの
}

makeFullNode > makeConfigNode > node.New > makeAccountManager > NewKeyStore


// accounts/keystore/keystore.go
func NewKeyStore(keydir string, scryptN, scryptP int) *KeyStore {
	keydir, _ = filepath.Abs(keydir)
	ks := &KeyStore{storage: &keyStorePassphrase{keydir, scryptN, scryptP}}
	// accountCache,keystoreの生きているaccountの一覧,変更channelの登録
	// keystoreWalletをksに登録
	ks.init(keydir)
	return ks
}

makeFullNode > makeConfigNode > node.New > makeAccountManager > NewManager

  • BackendとKeyStoreを使ってAccountManagerを作成する
  • AccountManagerはWalletのイベント監視をしている
  • Subscription・Feed・channelの関係が複雑...
    • event自体はchannelで受け取る
    • そのeventをFeedでpublishする
      • FeedはSubscriptionを複数登録している
// accounts/manager.go
func NewManager(backends ...Backend) *Manager {
	// バックエンドからwalletを取得してURLでsortする
	// Backendは*keystore.KeyStoreと*usbwallet.Hub
	var wallets []Wallet
	// 省略

	// バックエンドからのwallet notificationsをサブスクライブする
	updates := make(chan WalletEvent, 4*len(backends))
	subs := make([]event.Subscription, len(backends))
	for i, backend := range backends {
		// keystore walletの追加削除イベントをサブスクライブする
		subs[i] = backend.Subscribe(updates)
	}
	am := &Manager{
		backends: make(map[reflect.Type][]Backend),
		updaters: subs,
		updates:  updates,
		wallets:  wallets,
		quit:     make(chan chan error),
	}
	for _, backend := range backends {
		kind := reflect.TypeOf(backend)
		// keystore walletやusb wallet
		am.backends[kind] = append(am.backends[kind], backend)
	}
	// WalletEventを監視
	go am.update()
	return am
}

// accounts/manager.go
// updaters・updates・feedの関係がややこしい
// updaters(Subscription)はSubscriptionの集合を管理したいため
// updatesはWalletEventをうけとるため
// feedはsubscribeしてるものにpublishするため
type Manager struct {
	backends map[reflect.Type][]Backend // *keystore.KeyStoreと*usbwallet.Hub
	updaters []event.Subscription       // backend.Subscribe (まとめてUnsubscribeするための持ってる?)
	updates  chan WalletEvent           // WalletEvent (WalletArrived | WalletOpened | WalletDropped)
	wallets  []Wallet                   // KeyStoreの場合、keydirにある分

	feed event.Feed // walletのEventをpublish(一斉配信)する

	quit chan chan error
	lock sync.RWMutex
}

// accounts/manager.go
// walletの更新を監視している
func (am *Manager) update() {
	defer func() {
		am.lock.Lock()
		for _, sub := range am.updaters {
			sub.Unsubscribe()
		}
		am.updaters = nil
		am.lock.Unlock()
	}()
	for {
		select {
		case event := <-am.updates:
			am.lock.Lock()
			switch event.Kind {
			case WalletArrived:
				am.wallets = merge(am.wallets, event.Wallet)
			case WalletDropped:
				am.wallets = drop(am.wallets, event.Wallet)
			}
			am.lock.Unlock()
			am.feed.Send(event)
		case errc := <-am.quit:
			errc <- nil
			return
		}
	}
}
  • BackendのSubscribeはKeyStoreの
  • WalletEventをFeedにしてSubscribeしてSubscriptionにして返す  
  • 定期的にWalletを更新する

[Feed]とSubscription](https://qiita.com/t10471/items/f6e034386171424fb109)を使って1対多の通知を実現している

// accounts/keystore/keystore.go
// keystore walletの追加削除イベントをサブスクライブする
func (ks *KeyStore) Subscribe(sink chan<- accounts.WalletEvent) event.Subscription {
	// 省略
	// WalletEventをFeedに登録しSubscriptionとして返ってきたのをSubscriptionScopeとしてさらに登録
	sub := ks.updateScope.Track(ks.updateFeed.Subscribe(sink))
	if !ks.updating {
		ks.updating = true
		go ks.updater()
	}
	return sub
}

// accounts/keystore/keystore.go
// KeyStoreを定期的に変更通知かタイマーでリフレッシュする「
func (ks *KeyStore) updater() {
	for {
		select {
		case <-ks.changes:
		case <-time.After(walletRefreshCycle):
		}
		// Walletの状態を調べて、walletsの状態を更新して
		// WalletDropped WalletArrivedのイベントを送信する(ks.updateFeed.Send(event))
		ks.refreshWallets()

		ks.mu.Lock()
		if ks.updateScope.Count() == 0 {
			ks.updating = false
			ks.mu.Unlock()
			return
		}
		ks.mu.Unlock()
	}
}

// accounts/keystore/keystore.go
func (ks *KeyStore) refreshWallets() {
	// 省略
	accs := ks.cache.accounts()
	wallets := make([]accounts.Wallet, 0, len(accs))
	events := []accounts.WalletEvent{}

	for _, account := range accs {
		// 古いwalletを消す(WalletDropped EVENTの追加)
		// 知らないwalletなら追加する(WalletArrived EVENTの追加)
		// 一致したら、ks.walletsをwalletsに追加
		// 省略
	}
	// 残りをWalletDropped EVENTの追加にする 
	// 省略
	// EVENTの発火
	for _, event := range events {
		ks.updateFeed.Send(event)
	}
}

makeFullNode > makeConfigNode > SetEthConfig

// cmd/utils/flags.go
func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
	// 省略
	// 1つ目はかならずKeyStore(他はUSB)
	ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore)
	// マイニングの報酬を受け取るアドレスを設定する
	setEtherbase(ctx, ks, cfg)
	// GPO は Gas Price Oracle (Gas Priceを決める) gasprice.Configの設定をする
	setGPO(ctx, &cfg.GPO)
	setTxPool(ctx, &cfg.TxPool)
	setEthash(ctx, cfg)
	// flagによる設定のcfgに登録する
	// devフラグ指定時の設定はここにある
}

// eth/gasprice/gasprice.go
// Configという名前の構造体が多い...
type Config struct {
		Blocks     int
		Percentile int
		Default    *big.Int `toml:",omitempty"`
}

// core/tx_pool.go
type TxPoolConfig struct {
	// ローカルのトランザクションはGasPriceがTxPool.gasPriceを無視する (locals  *accountSetにトランザクションを入れておく)
	NoLocals  bool          // ローカルでトランザクションを発行するか?(localで発行したものは自ブロック作成時に優先させたいため)
	Journal   string        // ローカルトランザクションを保存しておくパス
	Rejournal time.Duration // pool.journal.rotateを呼ぶ間隔

	PriceLimit uint64 // poolに入れるgasPriceの最小値
	PriceBump  uint64 // 新しいトランザクションを既にあるトランザクションよりも優先順位をあげるために必要なgasPriceの割合

	AccountSlots uint64 // アカウントごとの保存できるトランザクション最少数、これを越したpendingは削除される 
	GlobalSlots  uint64 // 全てのアカウントの保存できるトランザクションの最大数
	AccountQueue uint64 // アカウントごとの実行していないトランザクションのアカウントごとの最大サイズ、超えると削除さいれる
	GlobalQueue  uint64 //  全てのアカウントの実行していないトランザクションの保存サイズaccounts

	Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
}

// core/tx_pool.go
var DefaultTxPoolConfig = TxPoolConfig{
	Journal:   "transactions.rlp",
	Rejournal: time.Hour,

	PriceLimit: 1,
	PriceBump:  10,

	AccountSlots: 16,
	GlobalSlots:  4096,
	AccountQueue: 64,
	GlobalQueue:  1024,

	Lifetime: 3 * time.Hour,
}

// consensus/ethash/ethash.go
// ethashの設定(マイニングのアルゴリズム)
type Config struct {
	CacheDir       string
	CachesInMem    int
	CachesOnDisk   int
	DatasetDir     string
	DatasetsInMem  int
	DatasetsOnDisk int
	PowMode        Mode
}

// consensus/ethash/ethash.go
// Ethash Ethereum用のPowのアルゴリズム
type Ethash struct {
	config Config

	caches   *lru // マイニングと検証に使うデータ
	datasets *lru // マイニングに使うデータ chachesを元に生成される

	// Mining related fields
	rand     *rand.Rand    // noneを生成するための乱数オブジェクト
	threads  int           // マイニングするのに使うスレッド数
	update   chan struct{} // マイニングのパラメータの更新を受け取るチャンネル
	hashrate metrics.Meter // hashrateを監視するオブジェクト

	// Remote sealer related fields
	workCh       chan *sealTask   // Notification channel to push new work and relative result channel to remote sealer
	fetchWorkCh  chan *sealWork   // Channel used for remote sealer to fetch mining work
	submitWorkCh chan *mineResult // Channel used for remote sealer to submit their mining result
	fetchRateCh  chan chan uint64 // Channel used to gather submitted hash rate for local or remote sealer.
	submitRateCh chan *hashrate   // Channel used for remote sealer to submit their mining hashrate

	// The fields below are hooks for testing
	shared    *Ethash       // Shared PoW verifier to avoid cache regeneration
	fakeFail  uint64        // Block number which fails PoW check even in fake mode
	fakeDelay time.Duration // Time delay to sleep for before returning from verify

	lock sync.Mutex // キャッシュとマイニングの為のフィールドをスレッドセーフにするためのロック

	closeOnce sync.Once       // Ensures exit channel will not be closed twice.
	exitCh    chan chan error // Notification channel to exiting backend threads
}

// core/tx_pool.go
type TxPool struct {
	config       TxPoolConfig
	chainconfig  *params.ChainConfig
	chain        blockChain
	gasPrice     *big.Int
	txFeed       event.Feed
	scope        event.SubscriptionScope
	chainHeadCh  chan ChainHeadEvent
	chainHeadSub event.Subscription
	signer       types.Signer
	mu           sync.RWMutex

	currentState  *state.StateDB      // ブロックチェーンのヘッダーの現在の状態
	pendingState  *state.ManagedState // 仮のnonceのペンディング状態
	currentMaxGas uint64              // 現在のトランザクションごとのgas limit

	locals  *accountSet // プールから削除されるルールの適用外のローカルトランザクションの集合
	journal *txJournal  // ディスクに保存するローカルトランザクションのジャーナル

	pending map[common.Address]*txList   // 現在処理可能なすべてのトランザクション
	queue   map[common.Address]*txList   // キューに積まれてるが処理不能なトランザクション
	beats   map[common.Address]time.Time // 各既知のアカウントからの最後の通知
	all     *txLookup                    // 検索可能な全てのトランザクション
	priced  *txPricedList                // 金額順に並んだトランザクション

	wg sync.WaitGroup // シャットダウンの為のsync

	homestead bool
}

makeFullNode > RegisterEthService

eth Serviceを設定する

// cmd/utils/flags.go
func RegisterEthService(stack *node.Node, cfg *eth.Config) {
	var err error
	if cfg.SyncMode == downloader.LightSync {
		err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
			return les.New(ctx, cfg)
		})
	} else {
		err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
			fullNode, err := eth.New(ctx, cfg)
			if fullNode != nil && cfg.LightServ > 0 {
				ls, _ := les.NewLesServer(fullNode, cfg)
				fullNode.AddLesServer(ls)
			}
			return fullNode, err
		})
	}
	if err != nil {
		Fatalf("Failed to register the Ethereum service: %v", err)
	}
}

// node/node.go
// 後で起動するサービスの登録のメソッド
func (n *Node) Register(constructor ServiceConstructor) error {
	n.lock.Lock()
	defer n.lock.Unlock()

	if n.server != nil {
		return ErrNodeRunning
	}
	// serviceFuncs: []ServiceConstructor{}
	n.serviceFuncs = append(n.serviceFuncs, constructor)
	return nil
}

// node/service.go
type ServiceConstructor func(ctx *ServiceContext) (Service, error)

Serviceという抽象化した形で機能を登録します

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?