5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

個人開発エンジニア応援 - 個人開発の成果や知見を共有しよう!-

MongoDB+Embulk+BigQueryでブロックチェーンのデータ分析にトライしてみた

Last updated at Posted at 2023-09-24

はじめに

データ蓄積→加工→BI表示・・・というデータの可視化・分析の一連の流れを実際にトレーニングするため、今回Symbolブロックチェーンのトランザクションデータの分析に取り組みました。

ゴール

  • Symbolブロックチェーンのトランザクション数の過去統計レポートを公開する
    • 2021年のローンチ以降から現在までのトランザクションの増減傾向を明らかにする。
    • レポートは有償ツールやライセンスを持っていない人でも広く参照できるようにする。

全体構成

mongodb_embulk.png

Symbolブロックチェーンのノードから、過去のトランザクションデータ(約240万ブロック分)をEmbulkを用いてBigQueryに抽出。データの可視化や分析レポートの表示はLokkerStudioで行います。また、データ抽出以後もブロックチェーンのトランザクションデータは約30秒ごとに生成されますので、最新データについてはPythonスクリプトによる日次同期処理で取り込みます。

環境

  • 外部データソース
    • Symbolブロックチェーンノード上のMongoDB
  • Embulk実行環境
    • Ubuntu 20.04.4 LTS (GNU/Linux 5.4.0-62-generic x86_64)
    • Embulk 0.9.25

MongoDB+Embulkによるデータ抽出とBigQueryへの転送

SymbolブロックチェーンノードのMongoDBへの接続

Symbolブロックチェーンのノード内部のデータベースは、通常は外からアクセスすることはできませんが、立ち上げ時の設定を変更することで外部からの接続が可能になります。自身でノードを運用している場合は、下記リンクの手順に従ってノード起動時のデータベース接続許可設定を変更します。

(参考)MongoDBへの接続方法

Embulkの環境構築

Javaのインストール

sudo apt install -y openjdk-8-jdk

Embulkのインストール(最新版ではなく、安定版0.9.25を指定)

wget https://github.com/embulk/embulk/releases/download/v0.9.25/embulk-0.9.25.jar -O embulk.jar
sudo mv embulk.jar /usr/local/bin/embulk
sudo chmod +x /usr/local/bin/embulk

パス設定

.bashrcファイルを開き、下記を追加 nano ~/.bashrc

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export PATH="$JAVA_HOME/bin:$PATH"
export PATH="$HOME/.embulk/bin:$PATH"

変更を反映 source ~/.bashrc

プラグインのインストール

今回インストールするプラグインは下記の3つ

  1. embulk-input-mongodb
  2. embulk-filter-expand_json
  3. embulk-output-bigquery

MongoDBプラグイン

embulk gem install embulk-input-mongodb

Jsonプラグイン

embulk gem install embulk-filter-expand_json

BigQueryプラグイン

embulk 0.9.25環境でBigQueryプラグインをインストールする際に、Rubyのバージョンに起因したインストールエラーが発生する可能性がある。当方の環境ではjwt:2.3.0public_suffix -v 4.0.7を指定することで解消

embulk gem install jwt:2.3.0
embulk gem install public_suffix -v 4.0.7
embulk gem install embulk-output-bigquery

参考リンク

Embulk最新バージョンでプラグインインストールに失敗する場合の対策

最新バージョンのEmbulkをインストールした場合に。プラグインをインストールしようとすると以下のエラーが出ることがある。(最初から0.9.25を指定してインストールした場合は不要かも)

JRuby is not configured well to run "gem". Configure the Embulk system property "jruby".

この場合はEmbulkのバージョンを下げる

sudo /usr/local/bin/embulk selfupdate 0.9.23

参考リンク

Gemファイルのバージョン違いのエラーが出たとき

Running Embulk version (0.9.25) does not match the installed embulk.gem version (0.11.0).

上記のようなエラーが出たときは、自前のBundleファイルを使ったプラグイン管理を行うか、Bundleを使わない場合はGemファイルを削除すればOK。自分の環境だと、最初にEmbulkの最新Verを入れてから、その後Embulkのバージョンを下げた場合に発生。最新のEmbulkインストール時に最新のGemファイルも一緒にインストールされてしまうことが原因なので、EmbulkのGemファイルを削除することで解消。

sudo rm -r ~/.embulk/lib/gems/gems/embulk-0.11.0-java/

MongoDBとEmbulkの接続(データ読み出し)

読み出しテスト

下記のコンフィグファイルを作成。

config.yml
in:
  type: mongodb
  hosts:
  - {host: localhost, port: 27017}
  database: catapult
  collection: blocks
  json_column_name: record
  limit: 3
filters:
  - type: expand_json
    json_column_name: record
    expanded_columns:
            - {name: _id, type: string}
            - {name: block, type: json}
out: {type: stdout}

プレビュー確認

embulk preview config.yml

本番実行

embulk run config.yml

EmbulkでのJSONのパース(抽出したいカラムを指定)

MongoDBのコレクションに格納されているJsonデータから、必要なデータを抽出するConfigファイルのサンプルについては下記の通り。embulk-filter-expand_jsonプラグインを使ってJSONをパースし、rename でカラム名を整える。expand_jsonプラグインの設定中では、rootを指定することでそこから目的のカラムを指定することができる。

filters:
  - type: expand_json
    json_column_name: record
    root: "$."
    expanded_columns:
            - {name: _id, type: string}
            - {name: block.timestamp, type: long}
            - {name: meta.totalFee, type: long}
            - {name: meta.transactionsCount, type: long}
            - {name: meta.totalTransactionsCount, type: long}
            - {name: block.size, type: long}
            - {name: block.height, type: long}
  - type: rename
    colmuns:
      _id: id
      block.timestamp: timestamp
      meta.totalFee: totalFee
      meta.transactionsCount: transactionsCount
      meta.totalTransactionsCount: totalTransactionsCount
      block.size: size
      block.height: height

BigQueryテーブルへの転送

転送先のテーブルを作成

Embulkでロードしたデータを書き込む(転送)する先のテーブルをBigQuery上で作成する。このとき、スキーマの型とEmbulkで読みだしたカラムの仕様がマッチしていないと転送時にエラーになるので注意する。今回のデータ抽出のテーブルスキーマの例は以下の通り。※BigQuery上でのDB、テーブルの作成方法は割愛します。

image.png

このとき、データの型とモードの指定に注意。特に、MongoDB上でByte列のまま格納されているデータなどを直接BigQueryにロードしようとすると不一致になったり文字化けしたりする。また、BigQuery側で「REQUIRED」に指定していた項目も、なぜかEmbulkからロード時に「NULLABLE」に勝手に変更されるようで、BigQuery側のスキーマ指定を全部「NULLABLE」にするとエラーが消えた事象も確認。※詳しく検証してないので参考までに

テーブルスキーマの指定

データ転送先のテーブルをBigQuery側で作成したら、スキーマ構造をJSON形式でエクスポートして、Embulkの実行環境上に配置。データ抽出のConfigファイルで参照させる。

out:
  type: bigquery
  auth_method: json_key
  json_keyfile: ./bigquery/***.json 
  project: <プロジェクトID>
  dataset: <データセットID>
  table: <テーブルID>
  auto_create_table: true
  schema_file: ./schema.json <- ここでスキーマ定義ファイル参照
  location: US
  charset: UTF-8

サービスアカウントの有効化

外部からBigQueryのテーブルに書き込むためのサービスアカウントを作成します。作成したサービスアカウントのJSON KeyファイルをEmbulkの実行環境上に配置し、EmbulkのConfigファイル上で参照します(下記)

out:
  type: bigquery
  auth_method: json_key
  json_keyfile: ./bigquery/***.json <- ここにKeyファイルを指定
  project: <プロジェクトID>
  dataset: <データセットID>
  table: <テーブルID>
  auto_create_table: true
  schema_file: ./schema.json
  location: US
  charset: UTF-8

BigQueryのテストテーブルの作成、及びサービスアカウントの作成方法についてはこちらの記事が大変詳しく参考になりました。

MongoDBからBigQueryへのデータ転送

これまでの流れをまとめると、config.ymlファイルは大きく以下の3つの構造で設定が記載されることになる。最終的なconfigファイルのサンプルは以下の通り。

in:MongoDBの接続先の指定
filters:JSONのパースとカラム定義
out:BigQueryの接続情報、書込み先テーブルとスキーマ定義

config.yml
in:
  type: mongodb
  hosts:
  - {host: localhost, port: 27017}
  database: catapult
  collection: blocks
filters:
  - type: expand_json
    json_column_name: record
    root: "$."
    expanded_columns:
            - {name: _id, type: string}
            - {name: block.timestamp, type: long}
            - {name: meta.totalFee, type: long}
            - {name: meta.transactionsCount, type: long}
            - {name: meta.totalTransactionsCount, type: long}
            - {name: block.size, type: long}
            - {name: block.height, type: long}
  - type: rename
    colmuns:
      _id: id
      block.timestamp: timestamp
      meta.totalFee: totalFee
      meta.transactionsCount: transactionsCount
      meta.totalTransactionsCount: totalTransactionsCount
      block.size: size
      block.height: height
out:
  type: bigquery
  auth_method: json_key
  json_keyfile: ./bigquery/***.json <- ここにKeyファイルを指定
  project: <プロジェクトID>
  dataset: <データセットID>
  table: <テーブルID>
  auto_create_table: true
  schema_file: ./schema.json
  location: US
  charset: UTF-8

上記のconfigファイルをembulk run config.ymlで実行。特に問題なければ、MongoDBからBigQueryのテーブル上へのデータ転送が開始される。ちなみに今回の検証ではSymbolブロックチェーンのトランザクションデータ約240万レコードが、わずか数分(もっと短いかも?)で一発でBigQuery上のテーブルにロードされた。Embulk恐るべし…。

BigQueryでのデータ加工

これはSymbolブロックチェーンの仕様に特有の処理ですのでご参考まで。

SymbolのTimestampをDatetime形式に変換するクエリ

Symbolのブロックに格納されているTimestamp(ブロック生成時刻)は、ネメシスブロック(チェーンの最初のブロック)の生成時刻(UTC)からの経過ミリ秒という仕様になっています。そのままだと時系列での集計などがやりづらいので、Datetime形式に変換します。

INSERT INTO `symbol_db.block_datetime`
 SELECT 
  id AS id,
  CAST(FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', TIMESTAMP_SECONDS( 1615853185 + CAST((timestamp /1000) AS INT64)+ 32400000)) AS DATETIME) as datetime
 FROM `symbol_db.block_data`
  WHERE id IS NOT NULL

日次でのデータ集計

約30秒ごとに生成されたブロックデータを、日単位で集計するクエリの例

SELECT
  CAST(datetime AS DATE) day,
  SUM(transactionsCount) transactionsCount,
  SUM(totalTransactionsCount) totalTransactionsCount
FROM
  `symbol_db.joined_block_table`
GROUP BY 
  day
ORDER BY 
  day

LookerStudioでのレポート公開

BigQueryのテーブルにまでロードできれば、あとは対象のテーブルをLooker Studioに接続すれば可視化はすぐに行える。対象テーブルを開いて「エクスポート」>「Looker Studioで調べる」ですぐに連携可能。ただ、レポートを外部に公開しようとすると意外とコツが必要。

image.png

レポートの外部公開のコツ

一般公開の許可設定

まず一般公開のやり方がわからず。「ファイル」「公開設定」で一般公開を許可する必要があった。それをやらないと、いくら共有リンクを発行してもパブリックには見れない。

image.png

BigQuery(データソース側)の公開許可設定

さらに、公開設定をした後も、今度はBigQuery側のデータソースについても設定の追加が必要。「リソース」>「追加済みリソースの管理」から、「データの認証情報」を「閲覧者」ではなく「データオーナー」に変更。あと「コミュニティにおける視覚化へのアクセス」も「オン」に変更。

image.png

image.png

BigQueryのデータソースも合わせて一般公開

分析レポートだけでなくデータソース側も一般公開する場合は、下記のリンクを参考に、データセットのユーザに「allUsers」を閲覧者として追加する。

Looker Studioでレポート画面を作り込む

ここまでくれば、あとは好きにレポート画面を作り込んで公開するだけです。直感的に操作できるので、シンプルなグラフであればそんなにトレーニングしなくても使えると思います。業務でTableauを触ったことはありましたが、ほぼ同じ感覚で操作できました(むしろブラウザ上でサクサク動いて、大規模データの計算処理もこちらの方が高速でした)

外部に公開するレポートの作成にあたっては、オープンデータを活用したダッシュボードを作成されている事例が非常に参考になりました。

最新ブロックデータの同期処理

ここまでで、約240万ブロック分のデータをBigQuery上で分析・可視化する環境が整いました。これに加えて、今度は30秒ごとに生成されるブロックのデータを日次で同期させる処理を作成します。

本当はEmbulkだけで差分ロードみたいなこともやれるんだと思いますが、今回はPythonでのBigQuery操作も試したかったのであえてPythonでやってみました。

PythonでのBigQuery操作

まず、PythonでBigQueryにアクセスする際の基本操作をまとめたクラスを作成しました。

"""_summary_
BigQuery操作関数をまとめたクラス
"""

from google.cloud import bigquery

# BigQuery操作関数をまとめた自作クラス
class BigQueryController:
    # コンストラクタ
    def __init__(self, json_path, table_name):
        self.client = bigquery.Client.from_service_account_json(json_path)
        self.table = self.client.get_table(table_name)

    # データを挿入する
    def insert_data(self, rows_to_insert):
        errors = self.client.insert_rows(self.table, rows_to_insert)
        if errors == []:
            print("New rows have been added.")
        else:
            print("Encountered errors while inserting rows: {}".format(errors))

    # データを取得する
    def get_data(self, query):
        query_job = self.client.query(query)
        return query_job.result()

    # データを削除する
    def delete_data(self, query):
        query_job = self.client.query(query)
        return query_job.result()

    # データを更新する
    def update_data(self, query):
        query_job = self.client.query(query)
        return query_job.result()

    # データを挿入する
    def insert_data(self, rows_to_insert):
        errors = self.client.insert_rows(self.table, rows_to_insert)
        if errors == []:
            print("New rows have been added.")
        else:
            print("Encountered errors while inserting rows: {}".format(errors))

このクラスを継承して、今回のBlockデータを格納したテーブルを操作するクラスを実装します。

# bigquery_controllerクラスを継承して、block_table操作用クラスを作成する

from controller.bigquery_controller import BigQueryController

class BlockDataController(BigQueryController):

    # コンストラクタ
    def __init__(self, json_path, table_name):
        super().__init__(json_path, table_name)

    # データを挿入する
    def insert_data(self, rows_to_insert):
        errors = self.client.insert_rows(self.table, rows_to_insert)
        if errors == []:
            print("New rows have been added.")
        else:
            print("Encountered errors while inserting rows: {}".format(errors))

    # 最新ブロックデータを取得する
    def get_latest_block_record(self):
        table_name = self.table.full_table_id.replace(':', '.')
        query = "SELECT * FROM `" + table_name + "` ORDER BY height DESC LIMIT 1"
        result = self.get_data(query)
        for row in result:
            return row

ブロックデータ同期処理の実装

先程のBigQuery操作クラスを使って、テーブルに格納されている最新のブロック高から、ブロックチェーン上の最新のブロック高までの差分データをロードする処理を実装します。

Symbolブロックチェーンは多くのノードがAPIノードとしても機能しているため、チェーン上の最新のブロック高をAPI経由で取得します。

import json
import urllib.request

NODEURL = "https://***:3001"

# Symbolノードから最新のブロック高を取得する関数
def get_latest_height():
    try:
        req = urllib.request.Request(NODEURL + '/chain/info')
        with urllib.request.urlopen(req) as res:
            if res.status == 200:
                json_data = json.load(res)
                return json_data['height']
            else:
                print(f"HTTPエラー: {res.status}")
                return None
    except urllib.error.URLError as e:
        print(f"通信エラー: {e.reason}")
        return None
    except Exception as e:
        print(f"予期せぬエラーが発生しました: {e}")
        return None

あとは、テーブル上の最新のブロック高の次のブロックから、ノードから取得したチェーンの最新ブロックに追いつくまで、ノードのAPIで繰り返しブロックデータを取得していきます。

# heightからブロック情報を取得する関数
def get_blocks( height ):
    try:
        req = urllib.request.Request(NODEURL + '/blocks/' + str(height))
        with urllib.request.urlopen(req) as res:
            if res.status == 200:
                return json.load(res)
            else:
                print(f"HTTPエラー: {res.status}")
                return None
    except urllib.error.URLError as e:
        print(f"通信エラー: {e.reason}")
        return None
    except Exception as e:
        print(f"予期せぬエラーが発生しました: {e}")
        return None

メイン処理は以下の通り。EmbulkでMongoDBからBigQueryにロードしたときと同様、外部からアクセスするためのサービスユーザの認証Key(JSONファイル)を参照する必要がありますので、予め実行環境に配置しておきます。あとは書込みたいテーブルのIDを指定し、繰り返し書込みを行うだけです。念のため、BigQueryへの書込み回数を軽減するため、1000件データが溜まってから書き込む処理としています。

# メイン処理
if __name__ == "__main__":

    # BigQuery操作用クラスのインスタンス生成
    json_path = "hoge.json" # BigQueryのサービスユーザ用認証Keyファイルのパス
    table_name = "braided-liberty-386707.symbol_db.joined_block_table" # 接続先テーブルID
    block_data_controller = bdc.BlockDataController(json_path, table_name) #BigQuery操作用自作クラス

    # BigQueryに取り込まれている最新ブロックデータの取得
    latest_block_record = block_data_controller.get_latest_block_record()
    # 最新ブロックのheightを取得
    latest_height = int(latest_block_record[6])
    # 最新ブロックの次のブロックのheightを取得
    next_height = latest_height + 1

    # 以下、最新データに追いつくまで繰り返し実行
    stop_height = int(get_latest_height())
    if stop_height:
        pass
    else:
        print("ブロックデータの取得に失敗しました。")
        exit()

    rows_to_insert = []
    while True:
        print("next_height:" + str(next_height))
        # ブロック情報の取得
        block_info = get_blocks(next_height)
        if block_info:
            pass
        else:
            print("ブロックデータの取得に失敗しました。")
            exit()

        # 書き込み用ブロックデータの作成
        block_data = {
            "id": str(block_info["id"]),
            "timestamp": int(block_info["block"]["timestamp"]),
            "totalFee": int(block_info["meta"]["totalFee"]),
            "transactionsCount": int(block_info["meta"]["transactionsCount"]),
            "totalTransactionsCount": int(block_info["meta"]["totalTransactionsCount"]),
            "size": int(block_info["block"]["size"]),
            "height": int(block_info["block"]["height"]),
            "datetime": get_datetime(int(block_info["block"]["timestamp"]))
        }

        rows_to_insert.append(block_data)

        # 1000件溜まるか、最新ブロックに追いついたらテーブルに書き込み
        if len(rows_to_insert) >= 1000 or next_height >= stop_height:
            # BigQueryにデータを書き込む
            block_data_controller.insert_data(rows_to_insert)
            # リストを空にする
            rows_to_insert = []

        # 最新データに追いついたら終了
        if next_height >= stop_height:
            break

        # 次のブロックのheightを取得
        next_height += 1

上記のサンプルコードでは割愛していますが、実運用上ではエラー発生時または正常処理終了時にLNEで通知を飛ばすように設定しています。PythonでLINE通知を飛ばす関数の例は以下です。

# LINE通知
def send_notify(msg):

    #APIのURLとトークン
    url = "https://notify-api.line.me/api/notify" #URL
    access_token = "***" #取得したトークン

    headers = {"Authorization": "Bearer " + access_token}
    send_data = {"message": msg}

    result = requests.post(url, headers=headers, data=send_data)
    
    print("Send Massage")
    print(result)

あとは作成した同期処理のPythonプログラムを、CronやWindowsのタスクスケジューラーで定期実行すれば、日次で最新データを自動同期してくれる処理の完成です。

成果物

すごく長くなってしまいましたが、以上でSymbolブロックチェーンのトランザクション数を統計分析する環境とレポート画面を完成させることができました。完成したレポートはこちら。一般公開してますのでどなたでも見れます。

image.png
image.png

まとめ

想像してた以上に手間がかかって大変だった…というのが最初の感想です。まず240万件もあるデータを前に、1件ずつPythonとかで処理してたらとてもじゃないけど終わらないし、効率よく処理するにはどうすればいいか?を調査するところからスタートでした。

実際に調べてやってみることで、ETLツールの必要性やその効果(強力さ)、レポートやデータを公開するときのコツなど、色々と大変勉強になりました。データベース、Linux周りの環境構築、ETLツール、データレイク、Pythonのスクリプト、BIツールまで、一気通貫に様々な領域の技術を勉強することができ、非常に良いトレーニングになりました。

あと、今回この数百万件オーダーのデータ集計・可視化環境を、個人が無償で作成できたことにもすごく驚いています。BigQueryも今回くらいの規模だと余裕で無料枠でいけます。レポートの作成と公開もLookerStudioでスムーズに行なえました。Embulkやそのプラグインなど、素晴らしいソフトウェアを開発してくださっているOSS開発者の皆様や、リッチなコンピューティングリソースを無料で使わせてくれるGoogleさんに深く感謝です。

今回はトランザクションの集計をメインに行いましたが、ハーベスト報酬(いわゆるマイニング報酬)のデータやノードの分散状況など、Symbolブロックチェーンの活用度や状況を測るKPIは他にもいくつかあるので、また次のテーマを設定してトライしてみようと思います。

長文お付き合いいただき、ありがとうございました。個人でデータ分析や可視化の開発にチャレンジしたい方の参考になれば幸いです。また、これを機にSymbolブロックチェーンにもご興味持ってもらえると幸いです。開発しやすく、高性能で非常に興味深い技術です。

5
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?