はじめに
watsonx.data 2.0.0 から CLI ("ibm-lh data-copy" コマンド) による Ingestion (データの取り込み) は Presto と Spark と Spark REST API の3つのモードをサポートするようになりました。
・Prestoを使用するモードは、watsonx.data 1.1.x と同じ Prestoを使用するモードです。
・Sparkを使用するモードは、watsonx.data 1.1.x と同じSparkを使用するモードです。
・Spark REST APIを使用するモードは、watsonx.data 2.0.0 で追加された新しいモードです。
それぞれのモードでIngestionを実行するために ENABLED_INGEST_MODE 環境変数を設定します。
ENABLED_INGEST_MODEの値 | モード |
---|---|
PRESTO | Prestoを使用するモード |
SPARK_LEGACY | Sparkを使用するモード |
SPARK | Spark REST APIを使用するモード |
今回は Spark モードのCLI Ingestionの実行方法について ご紹介します。
watsonx.data 2.0.2 では、新しい環境変数を設定する必要がありますので ご注意ください。
前提
IBM Cloud Pak for Data (CP4D) 5.0.0/5.0.1上に「データ・ソース・サービス」の一つである watsonx.data 2.0.0/2.0.1 がインストールされている事。
CP4Dが5.0.0の場合はwatsonx.dataは2.0.0、CP4Dが5.0.1の場合はwatsonx.dataは2.0.1をインストールします。
本記事の検証は、CP4D 5.0.1 に watsonx.data 2.0.1 をインストールした環境を使用しています。
事前準備
1. ibm-lh-client のインストール
watsonx.data のクライアントツールである ibm-lh-client をクライアント・マシンにインストールします。本検証では Red Hat Enterprise Linux 8.6 に ibm-lh-client をインストールしています。
インストールは下記のマニュアルの手順に従って容易に実行する事ができます。
Installing ibm-lh-client
手順の中で、環境変数 LH_RELEASE_TAG にインストールする ibm-lh-client のバージョンを設定しますが、LH_RELEASE_TAG=latest に設定すると その時点での最新バージョンのパッケージがダウンロードされてインストールされます。2024年8月23日時点での最新バージョンは 2.0.1 となります。
2. Ingestionで取り込むデーター(ソース)、取り込み先(ターゲット)の表の準備
Ingestionで取り込むデーター、取り込み先の表を予め準備しておく必要があります。
本検証では取り込むデーターも取り込み先の表も、IBM Cloud Object Storage に作成したバケット内に用意しました。
取り込み先の表は、watsonx.data のWebコンソールのインフラストラクチャー・マネージャーでApache Iceberg タイプとして登録したターゲット用のカタログにスキーマを作成し、スキーマの下に表を作成しておきます。
事前準備が完了した状態のインフラストラクチャー・マネージャーの表示は以下のようになります。watsonx.data ではバケットはストレージとして表示されます。
3. 環境変数の設定
Spark を使用した CLI のIngestion を実行する場合、下記の環境変数を設定する必要があります。
watsonx.data 2.0.2 では、USE_NATIVE_SPARK、USE_EXTERNAL_SPARK、INSTANCE_ID が新しい環境変数として追加されています。USE_NATIVE_SPARK か USE_EXTERNAL_SPARK と INSTANCE_ID を設定する必要がありますのでご注意ください。
環境変数名 | 環境変数の意味 |
---|---|
ENABLED_INGEST_MODE | Ingestionのモード |
IBM_LH_BEARER_TOKEN | Sparkジョブをサブミットするために必要なベアラートークン |
IBM_LH_SPARK_JOB_ENDPOINT | SparkアプリケーションV4のエンドポイント |
HMS_CLIENT_USER | Hive MetastoreクライアントのユーザーID |
HMS_CLIENT_PASSWORD | Hive Metastoreクライアントのユーザーのパスワード |
SOURCE_S3_CREDS | ソースのバケットのS3資格情報 |
TARGET_S3_CREDS | ターゲットのバケットのS3資格情報 |
IBM_LH_SPARK_EXECUTOR_CORES | IngestionのExecutor ポッドのCPUコア数 (オプション)。デフォルトは1。 |
IBM_LH_SPARK_EXECUTOR_MEMORY | IngestionのExecutor ポッドのメモリー (オプション)。デフォルトは1G。 |
IBM_LH_SPARK_EXECUTOR_COUNT | IngestionのExecutor ポッドの数 (オプション) デフォルトは1。 |
IBM_LH_SPARK_DRIVER_CORES | IngestionのDriver ポッドのCPUコア数 (オプション) デフォルトは1。 |
IBM_LH_SPARK_DRIVER_MEMORY | IngestionのDriver ポッドのメモリー (オプション) デフォルトは1G。 |
USE_NATIVE_SPARK | Native Sparkエンジンを使用するかどうか。watsonx.data 2.0.2 で追加された環境変数 |
USE_EXTERNAL_SPARK | External Sparkエンジンを使用するかどうか。watsonx.data 2.0.2 で追加された環境変数 |
INSTANCE_ID | watsonx.data のインスタンスID。watsonx.data 2.0.2 で追加された環境変数 |
必須の環境変数の設定方法について説明していきます。
予め OCPクラスターにログインし、watsonx.data がインストールされているプロジェクトに変更しておきます。
例)
$ oc project cpd-operands
Now using project "cpd-operands" on server "https://api.66820424808b98001eb03c88.cloud.techzone.ibm.com:6443".
ENABLED_INGEST_MODE
「はじめに」に記述したとおり、ENABLED_INGEST_MODE には SPARK_LEGACY を設定します。
例)
export ENABLED_INGEST_MODE=SPARK_LEGACY
IBM_LH_BEARER_TOKEN
IBM_LH_BEARER_TOKEN は CP4D の下記のAPIを実行して得られたトークンを設定します。
CPD_USER と CPD_PASSWORD には Spark を実行する権限を持つ CP4D のユーザーIDとパスワードを設定します。
例)
CPD_USER=cpadmin
CPD_PASSWORD=password
curl -k -X POST -H "cache-control: no-cache" -H "Content-Type: application/json" -d "{\"username\":\"${CPD_USER}\",\"password\":\"${CPD_PASSWORD}\"}" "https://${cpd_cluster_host}/icp4d-api/v1/authorize"
cpd_cluster_host は oc get route | grep "cpd " を実行した結果の HOSTの値 (結果の2列目) を指定します。"cpd " (cpdの後にブランクを指定) で grep してください。
例)
$ oc get route | grep "cpd "
cpd cpd-cpd-operands.apps.66820424808b98001eb03c88.cloud.techzone.ibm.com ibm-nginx-svc ibm-nginx-https-port reencrypt/Redirect None
API の json 形式の出力から jq を使用して "" を除いた token の値を 環境変数 IBM_LH_BEARER_TOKEN に設定します。
例)
export CPD_USER=user1
export CPD_PASSWORD=zzzzzzzz
export CPD_CLUSTER_HOST=cpd-cpd-operands.apps.66820424808b98001eb03c88.cloud.techzone.ibm.com
export IBM_LH_BEARER_TOKEN=`curl -k -X POST -H "cache-control: no-cache" -H "Content-Type: application/json" -d "{\"username\":\"${CPD_USER}\",\"password\":\"${CPD_PASSWORD}\"}" "https://${cpd_cluster_host}/icp4d-api/v1/authorize" | jq -r '.token'`
IBM_LH_SPARK_JOB_ENDPOINT
Native Sparkエンジン を使用する場合は「インフラストラクチャー・マネージャー」上で青色で表示されているSparkエンジンを選択すると表示される「watsonx.dataアプリケーション・エンドポイント」を指定します。
Native Sparkエンジンについては USE_NATIVE_SPARK を参照してください。
例)
export IBM_LH_SPARK_JOB_ENDPOINT=https://cpd-zen.apps.66ceee3f70150268e8a816df.ocp.techzone.ibm.com/lakehouse/api/v2/spark_engines/spark251/applications
External Sparkエンジン を使用する場合はwatsonx.data の Sparkエンジンの「アプリケーションV4のエンドポイント」を指定します。
External Sparkエンジンについては USE_EXTERNAL_SPARK を参照してください。
CP4Dの「インスタンス」のメニューから作成した Sparkインスタンスの場合、CP4DのWebコンソールにログインし、インスタンスの一覧から Ingestion に使用する Spark を選択するとインスタンスの詳細情報が表示されますので、アクセス・エンドポイント中の "Spark ジョブ V4 エンドポイント" の値を設定します。
例)
export IBM_LH_SPARK_JOB_ENDPOINT=https://cpd-cpd-operands.apps.66820424808b98001eb03c88.cloud.techzone.ibm.com/v4/analytics_engines/3f209068-3ba1-4cca-9df1-18ad655f3c8e/spark_applications
HMS_CLIENT_USER
指定するユーザー名は決め打ちで lakehouse です。
HMS_CLIENT_PASSWORD
下記の手順で、lhconsole-api ポッドに含まれているパスワードを入手します。
lhconsole-api ポッド名を確認します。2つのポッドが表示されます。
例)
$ oc get pod | grep lhconsole-api
lhconsole-api-86d87f7b6b-gtzvc 1/1 Running 0 23d
lhconsole-api-86d87f7b6b-vsgmj 1/1 Running 0 23d
コマンドで lhconsole-api ポッド名を指定しますが、どちらのポッド名を指定しても同じパスワードが出力されます。
$ oc exec -it lhconsole-api-86d87f7b6b-gtzvc -- cat /mnt/infra/ibm-lh-secrets/LH_INSTANCE_SECRET
k3idfu9i2s6co6wu2cBA5kxn[admin@bastion-gym-lan ~]$
パスワードの最後に改行コードが無いため、パスワードの値に続けてプロンプトが表示されますが、今回の場合パスワードの値は k3idfu9i2s6co6wu2cBA5kxn となりますので、この文字列を環境変数に設定します。
例)
export HMS_CLIENT_PASSWORD=k3idfu9i2s6co6wu2cBA5kxn
SOURCE_S3_CREDS
Ingestionのソースファイルが含まれているバケットの資格情報を、下記のフォーマットで設定します。設定する値は使用するオブジェクト・ストレージのコンソール等から入手します。
AWS_ACCESS_KEY_ID=<access_key>,AWS_SECRET_ACCESS_KEY=<secret_key>,ENDPOINT_URL=<endpoint_url>,AWS_REGION=<region>,BUCKET_NAME=<bucket_name>
例)
export SOURCE_S3_CREDS="AWS_ACCESS_KEY_ID=XXXXXXXXXX,AWS_SECRET_ACCESS_KEY=YYYYYYYYYY,ENDPOINT_URL=https://s3.jp-tok.cloud-object-storage.appdomain.cloud,AWS_REGION=jp-tok,BUCKET_NAME=src-bucket1"
TARGET_S3_CREDS
Ingestionのターゲット側のバケットの資格情報を、SOURCE_S3_CREDSと同じフォーマットで設定します。設定する値は使用するオブジェクト・ストレージのコンソール等から入手します。
例)
export TARGET_S3_CREDS="AWS_ACCESS_KEY_ID=XXXXXXXXXX,AWS_SECRET_ACCESS_KEY=YYYYYYYYYY,ENDPOINT_URL=https://s3.jp-tok.cloud-object-storage.appdomain.cloud,AWS_REGION=jp-tok,BUCKET_NAME=tgt-bucket1"
USE_NATIVE_SPARK
USE_NATIVE_SPARK はwatsonx.data 2.0.2 で追加された環境変数です。Native Sparkエンジンは、インフラストラクチャー・マネージャーでエンジンを追加する時に「新規インスタンスの作成」を選択して登録されたエンジンです。登録されたSparkエンジンはインフラストラクチャー・マネージャー上で青い色で表示されます。
IngestionにNative Sparkエンジンを使用する場合は true に設定します。
例)
export USE_NATIVE_SPARK=true
USE_EXTERNAL_SPARK
USE_EXTERNAL_SPARK はwatsonx.data 2.0.2 で追加された環境変数です。External Sparkエンジンは、インフラストラクチャー・マネージャーでSparkエンジンを追加する時に「インスタンスの詳細を入力してください」か「同じ場所にあるインスタンスの選択」を選択して登録されたSparkエンジンです。「同じ場所にあるインスタンス」は同じCP4D内で、CP4D Webコンソールのインスタンスのメニューから作成されたSparkエンジンですが、watsonx.data 2.0.2 では非推奨となり、2.0.3 では削除される予定です。登録されたSparkエンジンはインフラストラクチャー・マネージャー上で灰色で表示されます。
IngestionにExternal Sparkエンジンを使用する場合は true に設定します。
例)
export USE_EXTERNAL_SPARK=true
watsonx.data 2.0.2では、USE_NATIVE_SPARK か USE_EXTERNAL_SPARK どちらかを環境変数として設定する必要があります。
INSTANCE_ID
INSTANCE_IDはwatsonx.data 2.0.2 で追加された環境変数で、watsonx.data 2.0.2 では設定する必要があります。
INSTANCE_IDにはwatsonx.dataのインスタンスIDを指定します。watsonx.dataのインスタンスIDは、watsonx.dataのWebコンソールの左下に表示されている「インスタンスの詳細」アイコン を押すと表示されるインスタンスの詳細な情報の中に含まれています。
設定例)
export INSTANCE_ID=1724906185944494
IBM_LH_SPARK_ で始まる環境変数は、オプションのIngestionのエンジンの構成に関する環境変数です。Ingestionのパフォーマンスに影響を与えるため設定した方が良いでしょう。デフォルトでは最小のリソースしか確保されません。今回は下記の値を設定します。お客様の本番環境に設定する場合は事前に検証して最適な値を設定する必要があります。
export IBM_LH_SPARK_EXECUTOR_CORES=2
export IBM_LH_SPARK_EXECUTOR_MEMORY=4G
export IBM_LH_SPARK_EXECUTOR_COUNT=2
export IBM_LH_SPARK_DRIVER_CORES=2
export IBM_LH_SPARK_DRIVER_MEMORY=4G
4. "ibm-lh data-copy" コマンドに指定するパラメーターの決定
"ibm-lh data-copy" コマンドに指定するパラメーターを決定します。本記事では、ソースファイルとして S3 バケット内の Parquet ファイルを指定する場合に必要なパラメーターについてのみ説明します。
パラメーター | パラメーターの意味 |
---|---|
source-data-files | S3 バケット内の Paquet ファイルか CSV ファイル、又はフォルダーへのパス。フォルダーのパスの場合は最後が"/"である必要があります。 |
target-tables | <カタログ名>.<スキーマ名>.<テーブル名> の形式のターゲット表。 |
ingestion-engine-endpoint | hostname=’’,port=’’,type=spark” の形式のIngestion エンジンのエンドポイント。type は spark である必要があります。 |
trust-store-path | Spark ジョブのポッド内の truststore 資格情報のパス。決め打ちで file:///opt/ibm/jdk/lib/security/cacerts を指定。 |
trust-store-password | Spark ジョブのポッド内の truststore 資格情報のパスワード。決め打ちで changeit を指定。 |
target-catalog-uri | thrift:// の形式の Hive Metastore thrift のエンドポイント。 |
各パラメーターの設定ついて説明します。
source-data-files
"s3://<バケット名>/<フォルダー名>/<ファイル名>" 又は "s3://<バケット名>/<フォルダー名>/" の形式でソースファイルを指定します。
1個のファイルを指定する例)
--source-data-files s3://src-bucket1/folder1/test1.parquet
フォルダー名を指定する例)
--source-data-files s3://src-bucket1/folder1/
target-tables
"s3://<ターゲットのカタログ名>/<スキーマ名>/<ターゲット表名>" の形式でターゲット表を指定します。
例)
--target-tables tgtbucket1.target1.gvt_data_v
ingestion-engine-endpoint
hostname と port には何も指定しません。type=spark を指定します。
例)
--ingestion-engine-endpoint "hostname='',port='',type=spark"
trust-store-path
例) 決め打ちで下記のように設定します。
--trust-store-path file:///opt/ibm/jdk/lib/security/cacerts
trust-store-password
例) 決め打ちで下記のように設定します。
--trust-store-password changeit
target-catalog-uri
watsonx.data の Webコンソールで「インフラストラクチャー・マネージャー」を開き、Ingestionのターゲットに使用するカタログを選択します。
「詳細」タブに表示される「メタストア・ホスト」の値をコピーして thrift:// の後に指定します。
例)
--target-catalog-uri 'thrift://ibm-lh-lakehouse-hive-metastore-svc.cpd-operands.svc.cluster.local:9083'
バッチモード Ingestion の実行
スクリプト・ファイルの作成
これまで説明した環境変数の設定と "ibm-lh data-copy" へのパラメーターを元にバッチモード Ingestion 用のスクリプト・ファイルを作成し、実行権限を付けます。
watsonx.data 2.0.1の例)
$ cat spark-legacy-batch-ingestion-parq-single_qiita.sh
export ENABLED_INGEST_MODE=SPARK_LEGACY
CPD_USER=cpadmin
CPD_PASSWORD=zzzzzzzz
cpd_cluster_host=cpd-cpd-operands.apps.66820424808b98001eb03c88.cloud.techzone.ibm.com
export IBM_LH_BEARER_TOKEN=`curl -k -X POST -H "cache-control: no-cache" -H "Content-Type: application/json" -d "{\"username\":\"${CPD_USER}\",\"password\":\"${CPD_PASSWORD}\"}" "https://${cpd_cluster_host}/icp4d-api/v1/authorize" | jq -r '.token'`
export IBM_LH_SPARK_JOB_ENDPOINT=https://cpd-cpd-operands.apps.66820424808b98001eb03c88.cloud.techzone.ibm.com/v4/analytics_engines/3f209068-3ba1-4cca-9df1-18ad655f3c8e/spark_applications
export HMS_CLIENT_USER=lakehouse
export HMS_CLIENT_PASSWORD=k3idfu9i2s6co6wu2cBA5kxn
export SOURCE_S3_CREDS="AWS_ACCESS_KEY_ID=XXXXXXXXXX,AWS_SECRET_ACCESS_KEY=YYYYYYYYYY,ENDPOINT_URL=https://s3.jp-tok.cloud-object-storage.appdomain.cloud,AWS_REGION=jp-tok,BUCKET_NAME=src-bucket1"
export TARGET_S3_CREDS="AWS_ACCESS_KEY_ID=XXXXXXXXXX,AWS_SECRET_ACCESS_KEY=YYYYYYYYYY,ENDPOINT_URL=https://s3.jp-tok.cloud-object-storage.appdomain.cloud,AWS_REGION=jp-tok,BUCKET_NAME=tgt-bucket1"
export IBM_LH_SPARK_EXECUTOR_CORES=2
export IBM_LH_SPARK_EXECUTOR_MEMORY=4G
export IBM_LH_SPARK_EXECUTOR_COUNT=2
export IBM_LH_SPARK_DRIVER_CORES=2
export IBM_LH_SPARK_DRIVER_MEMORY=4G
bash ./ibm-lh data-copy \
--source-data-files s3://src-bucket1/parquet_folder/test1.parquet \
--target-table tgtbucket1.target1.gvt_data_v \
--ingestion-engine-endpoint "hostname='',port='',type=spark" \
--target-catalog-uri thrift://ibm-lh-lakehouse-hive-metastore-svc.cpd-operands.svc.cluster.local:9083 \
--trust-store-path file:///opt/ibm/jdk/lib/security/cacerts \
--trust-store-password changeit
スクリプト・ファイルを実行します。
例)
$ ./spark-legacy-batch-ingestion-parq-single_qiita.sh
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1271 100 1227 100 44 5199 186 --:--:-- --:--:-- --:--:-- 5362
Start data migration
Ingesting SECTION: cmdline
Spark ingestion job 2046e434-3547-4008-be8f-2477908cb529 submitted.
Spark ingestion job 2046e434-3547-4008-be8f-2477908cb529 in progress...
Complete migration
最後に "Complete migration" が表示されれば Ingestion は成功です。
Ingestion ジョブが失敗した場合は、下記の手順で Ingestionのポッドの標準出力のログ を入手して、原因を調査します。
1.watsonx.data ではなく CP4D のWebコンソールに戻り、左上のナビゲーション・メニューの「管理」の中の「ストレージ・ボリューム」を選択します。
2.Sparkのストレージ・ボリュームを選択します。Sparkのストレージ・ボリューム名は、"<watsonx.dataがインストールされた名前スペース>::<Sparkのインスタンスを作成した時に指定したボリューム名>" となります。
3.「ファイル・ブラウザ」のタブを選択し、Sparkのボリュームの下に表示されている項目を展開します。この項目番号は Sparkのジョブの履歴を保管するSpark履歴サーバーのポッド ( spark-history-deployment-<ID> ) のIDのようですが詳細は不明です。
4.項目を展開すると、過去に実行したSparkのIngestionジョブのIDの一覧が表示されますので、Ingestionを実行した時に標準出力に表示された "Spark ingestion job <ジョブID> submitted." メッセージに表示されたジョブIDを探してクリックして展開します。ジョブIDは時系列順ではなく、ランダムに生成されると思われるジョブIDの数字の順に表示されます。
5.さらにジョブIDの下に表示される logs ファルダーをクリックすると右側に logs フォルダーに含まれるログの一覧が表示されますので、通常一番下に表示される spark-driver-<ジョブID>-stdou ファイルをチェックし、「ダウンロード」をクリックします。
6.ブラウザによりダウンロードされたファイルが表示されますので「開く」か「名前を付けて保存」を選択し、テキスト・エディターで開きます。
このログは Ingestion の Driver Podの標準出力のログで、Ingestionの実行状況が最も詳細に記録されるログですので、これを調査してIngestionの失敗の原因を特定します。
以上で、watsonx.data 2.0.1 で Spark を使用した CLI によるバッチモード ingestion の実行方法の紹介を終わります。
Presto を使用した watsonx.data 2.0.1 でのCLI によるバッチモード ingestion の実行方法については以下の記事をご参照ください。
watsonx.data 2.0.1 で Presto を使用した CLI によるバッチモード ingestion を実行してみた
Spark REST API を使用した watsonx.data 2.0.1 でのCLI によるバッチモード ingestion の実行方法については以下の記事をご参照ください。
watsonx.data 2.0.1 で Spark REST API を使用した CLI によるバッチモード ingestion を実行してみた
この記事の変更履歴
2024年8月26日 初版
2024年9月12日
・watsonx.data 2.0.2 で追加された環境変数の記述を追加
・IBM_LH_SPARK_JOB_ENDPOINT 環境変数につていの記述を追加