はじめに
watsonx.data 1.1.3 から CLI ("ibm-lh data-copy") による Ingestion (データの取り込み) をバッチモードで実行できるようになりました。
この機能を使用すると、クライアント・マシンから直接スクリプトを実行する事により、 CLI による Ingestion を実行できるようになります。従来は クライアント・マシンから "ibm-lh data-copy" を実行して Ingestion のコンテナを起動した後、表示されたプロンプトで再度 "ibm-lh data-copy" を実行する対話型で実行する必要があったため、バッチモードの使用により、実用性が大きく向上します。
Spark エンジンを使用する場合、設定するパラメーターが多くて複雑であるため、この記事で実行方法についてご紹介します。
参考文献
Spark ingestion through ibm-lh tool command line
前提
IBM Cloud Pak for Data (CP4D)上に「データ・ソース・サービス」の一つである watsonx.data と「分析サービス」の一つである Analytics Engine powered by Apache Spark on Cloud Pak for Data (本記事では以後 Spark と表記) がインストールされている事。
本記事の検証は、CP4D 4.8.5 に watsonx.data 1.1.4 をインストールした環境を使用しています。watsonx.data 1.1.4 をインストールすると Spark は自動的にインストールされますが、Sparkのインスタンスを手動で作成する必要があります。
事前準備
1. ibm-lh-client のインストール
watsonx.data のクライアントツールである ibm-lh-client をクライアント・マシンにインストールします。本検証では Red Hat Enterprise Linux 8.6 に ibm-lh-client をインストールしています。
インストールは下記のマニュアルの手順に従って容易に実行する事ができます。
Installing ibm-lh-client
手順の中で、環境変数 LH_RELEASE_TAG にインストールする ibm-lh-client のバージョンを設定しますが、LH_RELEASE_TAG=latest に設定すると その時点での最新バージョンのパッケージをダウンロードしてインストールします。2024年5月時点での最新バージョンは 1.1.4 になります。
2. Ingestionで取り込むデーター、取り込み先の表の準備
Ingestionで取り込むデーター、取り込み先の表を予め準備しておく必要があります。
本検証では取り込むデーターも取り込み先の表も IBM Cloud Object Storage に作成したバケット内に用意しました。
取り込み先の表は、watsonx.data のインフラストラクチャー・マネージャー上で登録したターゲット用のカタログにスキーマを作成し、スキーマの下に表を作成しておきます。
3. 環境変数の設定
Spark を使用した CLI のIngestion を実行する場合、下記の環境変数を設定する必要があります。
環境変数名 | 環境変数の意味 |
---|---|
IBM_LH_BEARER_TOKEN | Sparkのジョブをサブミットするのに必要なベアラートークン (必須) |
IBM_LH_SPARK_JOB_ENDPOINT | Spark アプリケーション V4 エンドポイント (必須) |
HMS_CLIENT_USER | Hive Metastore クライアントのユーザー名 (必須) |
HMS_CLIENT_PASSWORD | Hive Metastore クライアントのパスワード (必須) |
SOURCE_S3_CREDS=AWS_ACCESS_KEY_ID | ソースファイルのバケットのS3資格情報 (必須) |
TARGET_S3_CREDS=AWS_ACCESS_KEY_ID | ターゲット表のバケットのS3資格情報 (必須) |
IBM_LH_SPARK_EXECUTOR_CORES | IngestionのExecuterのCPUコア数 (オプション) |
IBM_LH_SPARK_EXECUTOR_MEMORY | IngestionのExecuterのメモリー (オプション) |
IBM_LH_SPARK_EXECUTOR_COUNT | IngestionのExecuterの数 (オプション) |
IBM_LH_SPARK_DRIVER_CORES | Ingestion CoordinatorのCPUコア数 (オプション) |
IBM_LH_SPARK_DRIVER_MEMORY | Ingestion Coordinatorのメモリー (オプション) |
必須の環境変数の設定方法について説明していきます。
予め watsonx.data がインストールされているプロジェクトに変更しておきます。
例)
$ oc project wxd
Now using project "wxd" on server "https://api.664c04007d4450001dc0e943.cloud.techzone.ibm.com:6443"
IBM_LH_BEARER_TOKEN
IBM_LH_BEARER_TOKEN は CP4D の下記のAPIを実行して得られたトークンを設定します。
PRESTO_USER と PRESTO_PASSWORD には CP4DのSpark を実行する権限を持つ CP4D のユーザーとパスワードを設定します。
curl -k -X POST -H "cache-control: no-cache" -H "Content-Type: application/json" -d "{\"username\":\"${PRESTO_USER}\",\"password\":\"${PRESTO_PASSWORD}\"}" "https://${cpd_cluster_host}/icp4d-api/v1/authorize"
cpd_cluster_host は "oc get route | grep cpd" を実行した結果の HOSTの値 (結果の2列目) を指定します。
例)
$ oc get route | grep cpd
cpd cpd-wxd.apps.65f786bcf383d0001ebfbbf8.cloud.techzone.ibm.com ibm-nginx-svc ibm-nginx-https-port reencrypt/Redirect None
API の json 形式の出力から jq を使用して "" を除いた token の値を 環境変数 IBM_LH_BEARER_TOKEN に設定します。
例)
export PRESTO01_USER=user1
export PRESTO_PASSWORD=password
export CPD_CLUSTER_HOST=cpd-wxd.apps.65f786bcf383d0001ebfbbf8.cloud.techzone.ibm.com
export IBM_LH_BEARER_TOKEN=`curl -k -X POST -H "cache-control: no-cache" -H "Content-Type: application/json" -d "{\"username\":\"${PRESTO01_USER}\",\"password\":\"${PRESTO_PASSWORD}\"}" "https://${cpd_cluster_host}/icp4d-api/v1/authorize" | jq -r '.token'`
IBM_LH_SPARK_JOB_ENDPOINT
watsonx.data の Sparkエンジンの「アプリケーションV4のエンドポイント」を指定します。
CP4DのWebコンソールにログインし、インスタンスの一覧から Ingestion に使用する Spark を選択するとインスタンスの詳細情報が表示され、アクセス・エンドポイント中の "Spark ジョブ V4 エンドポイント" の値を設定します。
export IBM_LH_SPARK_JOB_ENDPOINT=https://cpd-wxd.apps.65f786bcf383d0001ebfbbf8.cloud.techzone.ibm.com/v4/analytics_engines/b402bc24-e8e6-448f-9d67-04a5613e5ed0/spark_applications
HMS_CLIENT_USER
指定するユーザー名は決め打ちで lakehouse です。
HMS_CLIENT_PASSWORD
下記の手順で、lhconsole-api ポッドに含まれているパスワードを入手します。
lhconsole-api ポッド名を確認します。2つのポッドが表示されます。
例)
$ oc get pod | grep lhconsole-api
lhconsole-api-67ffddfcf5-bxg9k 1/1 Running 0 2d4h
lhconsole-api-67ffddfcf5-qjs2b 1/1 Running 0 2d4h
パスワードの確認
コマンドで lhconsole-api ポッド名を指定しますが、どちらのポッド名を指定しても同じパスワードが出力されます。
$ oc exec -it lhconsole-api-67ffddfcf5-bxg9k -- cat /mnt/infra/ibm-lh-secrets/LH_INSTANCE_SECRET
ldz8ccfnp67webmc8rnlEtwq[admin@bastion-gym-lan bin]$
パスワードの最後に改行コードが無いため、パスワードの値に続けてプロンプトが表示されますが、今回の場合パスワードの値は ldz8ccfnp67webmc8rnlEtwq となりますので、この文字列を環境変数に設定します。
例)
export HMS_CLIENT_PASSWORD=ldz8ccfnp67webmc8rnlEtwq
SOURCE_S3_CREDS
Ingestionのソースファイルが含まれているバケットの資格情報を、下記のフォーマットで設定します。設定する値は使用するオブジェクト・ストレージのコンソール等から入手します。
AWS_ACCESS_KEY_ID=<access_key>,AWS_SECRET_ACCESS_KEY=<secret_key>,ENDPOINT_URL=<endpoint_url>,AWS_REGION=<region>,BUCKET_NAME=<bucket_name>
例)
export SOURCE_S3_CREDS="AWS_ACCESS_KEY_ID=XXXXXXXXXX,AWS_SECRET_ACCESS_KEY=YYYYYYYYYY,ENDPOINT_URL=https://s3.jp-tok.cloud-object-storage.appdomain.cloud,AWS_REGION=jp-tok,BUCKET_NAME=source-bucket2"
TARGET_S3_CREDS
Ingestionのターゲット側のバケットの資格情報を、SOURCE_S3_CREDSと同じフォーマットで設定します。設定する値は使用するオブジェクト・ストレージのコンソール等から入手します。
例)
export TARGET_S3_CREDS="AWS_ACCESS_KEY_ID=XXXXXXXXXX,AWS_SECRET_ACCESS_KEY=YYYYYYYYYY,ENDPOINT_URL=https://s3.jp-tok.cloud-object-storage.appdomain.cloud,AWS_REGION=jp-tok,BUCKET_NAME=target-bucket3"
4. ibm-lh data-copy コマンドに指定するパラメーターの決定
"ibm-lh data-copy" コマンドに指定するパラメーターを決定します。本記事では、ソースファイルとして S3 バケット内の Parquet ファイルを指定する場合に必要なパラメーターについてのみ説明します。
パラメーター | パラメーターの意味 |
---|---|
source-data-files | S3 バケット内の Paquet ファイルか CSV ファイル、又はフォルダーへのパス。フォルダーのパスの場合は最後が"/"である必要があります。 |
target-tables | <カタログ名>.<スキーマ名>.<テーブル名> の形式のターゲット表。 |
ingestion-engine-endpoint | hostname=’’,port=’’,type=spark” の形式のIngestion エンジンのエンドポイント。type は spark である必要があります。 |
trust-store-password | Spark ジョブのポッド内の truststore 資格情報のパスワード。決め打ちで changeit を指定。 |
trust-store-path | Spark ジョブのポッド内の truststore 資格情報のパス。決め打ちで file:///opt/ibm/jdk/lib/security/cacerts を指定。 |
target-catalog-uri | thrift:// の形式の Hive Metastore thrift のエンドポイント。 |
各パラメーターの設定ついて説明します。
source-data-files
"s3://<バケット名>/ファイル" 又は "s3://<バケット名>/フォルダー名/ファイル名" の形式でソースファイルを指定します。
1個のファイルを指定する例)
--source-data-files s3://source-bucket2/folder1/test1.parquet
フォルダー名を指定する例)
--source-data-files s3://source-bucket2/folder1/
target-tables
"s3://<ターゲットのカタログ名>/<スキーマ名>/<ターゲット表名>" の形式でターゲット表を指定します。
例)
--target-tables targetbucket3.target3.gvt_data_v
ingestion-engine-endpoint
hostname と port には何も指定しません。type=spark を指定します。
例)
--ingestion-engine-endpoint "hostname='',port='',type=spark"
trust-store-path
例) 決め打ちなので下記のように設定します。
--trust-store-path file:///opt/ibm/jdk/lib/security/cacerts
trust-store-password
例) 決め打ちなので下記のように設定します。
--trust-store-password changeit
target-catalog-uri
watsonx.data の Webコンソールで「インフラストラクチャー・マネージャー」を開き、Ingestionのターゲットに使用するカタログを選択します。
「詳細」タブに表示される「メタストア・ホスト」の値をコピーして thrift:// の後に指定します。
例)
--target-catalog-uri 'thrift://ibm-lh-lakehouse-hive-metastore-svc.wxd.svc.cluster.local:9083'
バッチモード Ingestion の実行
スクリプト・ファイルの作成
これまで説明した環境変数の設定と "ibm-lh data-copy" へのパラメーターを元にバッチモード Ingestion 用のスクリプト・ファイルを作成し、実行権限を付けます。
例)
$ cat spark-batch-ingestion-s3.sh
PRESTO_USER=user01
PRESTO_PASSWORD=password
CPD_CLUSTER_HOST=cpd-wxd.apps.664c04007d4450001dc0e943.cloud.techzone.ibm.com
export IBM_LH_BEARER_TOKEN=`curl -k -X POST -H "cache-control: no-cache" -H "Content-Type: application/json" -d "{\"username\":\"${PRESTO01_USER}\",\"password\":\"${PRESTO_PASSWORD}\"}" "https://${CPD_CLUSTER_HOST}/icp4d-api/v1/authorize" | jq -r '.token'`
export IBM_LH_SPARK_JOB_ENDPOINT=https://cpd-wxd.apps.664c04007d4450001dc0e943.cloud.techzone.ibm.com/v4/analytics_engines/de064624-d0ea-4cf1-8f83-d7369d75dca4/spark_applications
export HMS_CLIENT_USER=lakehouse
export HMS_CLIENT_PASSWORD=ldz8ccfnp67webmc8rnlEtwq
export SOURCE_S3_CREDS="AWS_ACCESS_KEY_ID=XXXXXXXXXX,AWS_SECRET_ACCESS_KEY=YYYYYYYYYY,ENDPOINT_URL=https://s3.jp-tok.cloud-object-storage.appdomain.cloud,AWS_REGION=jp-tok,BUCKET_NAME=source-bucket2"
export TARGET_S3_CREDS="AWS_ACCESS_KEY_ID=XXXXXXXXXX,AWS_SECRET_ACCESS_KEY=YYYYYYYYYY,ENDPOINT_URL=https://s3.jp-tok.cloud-object-storage.appdomain.cloud,AWS_REGION=jp-tok,BUCKET_NAME=target-bucket3"
/home/admin/ibm-lh-client/bin/ibm-lh data-copy \
--source-data-files s3://source-bucket2/parquet1 \
--target-tables targetbucket3.target3.gvt_data_v \
--ingestion-engine-endpoint "hostname='${CPD_CLUSTER_HOST}',port='443',type=spark" \
--trust-store-password changeit \
--trust-store-path file:///opt/ibm/jdk/lib/security/cacerts \
--target-catalog-uri 'thrift://ibm-lh-lakehouse-hive-metastore-svc.wxd.svc.cluster.local:9083'
スクリプト・ファイルを実行します。
例)
$ ./spark-batch-ingestion-s3.sh
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1169 100 1125 100 44 6901 269 --:--:-- --:--:-- --:--:-- 7171
Start data migration
Ingesting SECTION: cmdline
Spark ingestion job 36defbb4-5c27-455c-9ab6-d43e67d35580 submitted.
Spark ingestion job 36defbb4-5c27-455c-9ab6-d43e67d35580 in progress...
Complete migration
最後に "Complete migration" が表示されれば Ingestion は成功です。
Ingestion ジョブが失敗した場合は、下記の手順で Ingestionのポッドの標準出力のログ を入手して、原因を調査します。
-
watsonx.data ではなく CP4D のWebコンソールに戻り、左上のナビゲーション・メニューの「管理」の中の「ストレージ・ボリューム」を選択します。
-
Sparkのストレージ・ボリュームを選択します。Sparkのストレージ・ボリューム名は、"<watsonx.dataがインストールされた名前スペース>::<Sparkのインスタンスを作成した時に指定したボリューム名>" となります。
3. 「ファイル・ブラウザ」のタブを選択し、Sparkのボリュームの下に表示されている項目を展開します。この項目番号は Sparkのジョブの履歴を保管するSpark履歴サーバーのポッド ( spark-history-deployment-<ID> ) のIDのようですが詳細は不明です。
4. 項目を展開すると、過去に実行したSparkのIngestionジョブのIDの一覧が表示されますので、Ingestionを実行した時に標準出力に表示された "Spark ingestion job <ジョブID> submitted." メッセージに表示されたジョブIDを探してクリックして展開します。ジョブIDは時系列順ではなく、ジョブIDの数字の順に表示されます。
6. さらにジョブIDの下に表示される logs ファルダーをクリックすると右側に logs フォルダーに含まれるログの一覧が表示されますので、通常一番下に表示される spark-driver-<ジョブID>-stdou ファイルをチェックし、「ダウンロード」をクリックします。
- ブラウザによりダウンロードされたファイルが表示されますので「開く」か「名前を付けて保存」を選択し、テキスト・エディターで開きます。
このログは Ingestion の Driver Podの標準出力のログで、Ingestionの実行状況が最も詳細に記録されるログですので、これを調査してIngestionの失敗の原因を特定します。