0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

watsonx.data 2.0.1 で Spark を使用した CLI によるバッチモード ingestion を実行してみた

Last updated at Posted at 2024-08-26

はじめに

watsonx.data 2.0.0 から CLI ("ibm-lh data-copy" コマンド) による Ingestion (データの取り込み) は Presto と Spark と Spark REST API の3つのモードをサポートするようになりました。

・Prestoを使用するモードは、watsonx.data 1.1.x と同じ Prestoを使用するモードです。
・Sparkを使用するモードは、watsonx.data 1.1.x と同じSparkを使用するモードです。
・Spark REST APIを使用するモードは、watsonx.data 2.0.0 で追加された新しいモードです。

それぞれのモードでIngestionを実行するために ENABLED_INGEST_MODE 環境変数を設定します。

ENABLED_INGEST_MODEの値 モード
PRESTO Prestoを使用するモード
SPARK_LEGACY Sparkを使用するモード
SPARK Spark REST APIを使用するモード

今回は Spark モードのCLI Ingestionの実行方法について ご紹介します。

watsonx.data 2.0.2 では、新しい環境変数を設定する必要がありますので ご注意ください。

前提

IBM Cloud Pak for Data (CP4D) 5.0.0/5.0.1上に「データ・ソース・サービス」の一つである watsonx.data 2.0.0/2.0.1 がインストールされている事。
CP4Dが5.0.0の場合はwatsonx.dataは2.0.0、CP4Dが5.0.1の場合はwatsonx.dataは2.0.1をインストールします。
本記事の検証は、CP4D 5.0.1 に watsonx.data 2.0.1 をインストールした環境を使用しています。

事前準備

1. ibm-lh-client のインストール

watsonx.data のクライアントツールである ibm-lh-client をクライアント・マシンにインストールします。本検証では Red Hat Enterprise Linux 8.6 に ibm-lh-client をインストールしています。
インストールは下記のマニュアルの手順に従って容易に実行する事ができます。
Installing ibm-lh-client

手順の中で、環境変数 LH_RELEASE_TAG にインストールする ibm-lh-client のバージョンを設定しますが、LH_RELEASE_TAG=latest に設定すると その時点での最新バージョンのパッケージがダウンロードされてインストールされます。2024年8月23日時点での最新バージョンは 2.0.1 となります。

2. Ingestionで取り込むデーター(ソース)、取り込み先(ターゲット)の表の準備

Ingestionで取り込むデーター、取り込み先の表を予め準備しておく必要があります。
本検証では取り込むデーターも取り込み先の表も、IBM Cloud Object Storage に作成したバケット内に用意しました。

取り込み先の表は、watsonx.data のWebコンソールのインフラストラクチャー・マネージャーでApache Iceberg タイプとして登録したターゲット用のカタログにスキーマを作成し、スキーマの下に表を作成しておきます。

事前準備が完了した状態のインフラストラクチャー・マネージャーの表示は以下のようになります。watsonx.data ではバケットはストレージとして表示されます。
image.png

3. 環境変数の設定

Spark を使用した CLI のIngestion を実行する場合、下記の環境変数を設定する必要があります。
watsonx.data 2.0.2 では、USE_NATIVE_SPARK、USE_EXTERNAL_SPARK、INSTANCE_ID が新しい環境変数として追加されています。USE_NATIVE_SPARK か USE_EXTERNAL_SPARK と INSTANCE_ID を設定する必要がありますのでご注意ください。

環境変数名 環境変数の意味
ENABLED_INGEST_MODE Ingestionのモード
IBM_LH_BEARER_TOKEN Sparkジョブをサブミットするために必要なベアラートークン
IBM_LH_SPARK_JOB_ENDPOINT SparkアプリケーションV4のエンドポイント
HMS_CLIENT_USER Hive MetastoreクライアントのユーザーID
HMS_CLIENT_PASSWORD Hive Metastoreクライアントのユーザーのパスワード
SOURCE_S3_CREDS ソースのバケットのS3資格情報
TARGET_S3_CREDS ターゲットのバケットのS3資格情報
IBM_LH_SPARK_EXECUTOR_CORES IngestionのExecutor ポッドのCPUコア数 (オプション)。デフォルトは1。
IBM_LH_SPARK_EXECUTOR_MEMORY IngestionのExecutor ポッドのメモリー (オプション)。デフォルトは1G。
IBM_LH_SPARK_EXECUTOR_COUNT IngestionのExecutor ポッドの数 (オプション) デフォルトは1。
IBM_LH_SPARK_DRIVER_CORES IngestionのDriver ポッドのCPUコア数 (オプション) デフォルトは1。
IBM_LH_SPARK_DRIVER_MEMORY IngestionのDriver ポッドのメモリー (オプション) デフォルトは1G。
USE_NATIVE_SPARK Native Sparkエンジンを使用するかどうか。watsonx.data 2.0.2 で追加された環境変数
USE_EXTERNAL_SPARK External Sparkエンジンを使用するかどうか。watsonx.data 2.0.2 で追加された環境変数
INSTANCE_ID watsonx.data のインスタンスID。watsonx.data 2.0.2 で追加された環境変数

必須の環境変数の設定方法について説明していきます。

予め OCPクラスターにログインし、watsonx.data がインストールされているプロジェクトに変更しておきます。
例)

$ oc project cpd-operands
Now using project "cpd-operands" on server "https://api.66820424808b98001eb03c88.cloud.techzone.ibm.com:6443".

ENABLED_INGEST_MODE
「はじめに」に記述したとおり、ENABLED_INGEST_MODE には SPARK_LEGACY を設定します。
例)

export ENABLED_INGEST_MODE=SPARK_LEGACY

IBM_LH_BEARER_TOKEN
IBM_LH_BEARER_TOKEN は CP4D の下記のAPIを実行して得られたトークンを設定します。
CPD_USER と CPD_PASSWORD には Spark を実行する権限を持つ CP4D のユーザーIDとパスワードを設定します。
例)

CPD_USER=cpadmin
CPD_PASSWORD=password
curl -k -X POST -H "cache-control: no-cache" -H "Content-Type: application/json" -d "{\"username\":\"${CPD_USER}\",\"password\":\"${CPD_PASSWORD}\"}" "https://${cpd_cluster_host}/icp4d-api/v1/authorize"

cpd_cluster_host は oc get route | grep "cpd " を実行した結果の HOSTの値 (結果の2列目) を指定します。"cpd " (cpdの後にブランクを指定) で grep してください。
例)

$ oc get route | grep "cpd "
cpd  cpd-cpd-operands.apps.66820424808b98001eb03c88.cloud.techzone.ibm.com   ibm-nginx-svc  ibm-nginx-https-port  reencrypt/Redirect None

API の json 形式の出力から jq を使用して "" を除いた token の値を 環境変数 IBM_LH_BEARER_TOKEN に設定します。
例)

export CPD_USER=user1
export CPD_PASSWORD=zzzzzzzz
export CPD_CLUSTER_HOST=cpd-cpd-operands.apps.66820424808b98001eb03c88.cloud.techzone.ibm.com

export IBM_LH_BEARER_TOKEN=`curl -k -X POST -H "cache-control: no-cache" -H "Content-Type: application/json" -d "{\"username\":\"${CPD_USER}\",\"password\":\"${CPD_PASSWORD}\"}" "https://${cpd_cluster_host}/icp4d-api/v1/authorize" | jq -r '.token'`

IBM_LH_SPARK_JOB_ENDPOINT

Native Sparkエンジン を使用する場合は「インフラストラクチャー・マネージャー」上で青色で表示されているSparkエンジンを選択すると表示される「watsonx.dataアプリケーション・エンドポイント」を指定します。
Native Sparkエンジンについては USE_NATIVE_SPARK を参照してください。

image.png

例)

export IBM_LH_SPARK_JOB_ENDPOINT=https://cpd-zen.apps.66ceee3f70150268e8a816df.ocp.techzone.ibm.com/lakehouse/api/v2/spark_engines/spark251/applications

External Sparkエンジン を使用する場合はwatsonx.data の Sparkエンジンの「アプリケーションV4のエンドポイント」を指定します。
External Sparkエンジンについては USE_EXTERNAL_SPARK を参照してください。
CP4Dの「インスタンス」のメニューから作成した Sparkインスタンスの場合、CP4DのWebコンソールにログインし、インスタンスの一覧から Ingestion に使用する Spark を選択するとインスタンスの詳細情報が表示されますので、アクセス・エンドポイント中の "Spark ジョブ V4 エンドポイント" の値を設定します。
image.png
例)

export IBM_LH_SPARK_JOB_ENDPOINT=https://cpd-cpd-operands.apps.66820424808b98001eb03c88.cloud.techzone.ibm.com/v4/analytics_engines/3f209068-3ba1-4cca-9df1-18ad655f3c8e/spark_applications

HMS_CLIENT_USER
指定するユーザー名は決め打ちで lakehouse です。

HMS_CLIENT_PASSWORD
下記の手順で、lhconsole-api ポッドに含まれているパスワードを入手します。

lhconsole-api ポッド名を確認します。2つのポッドが表示されます。
例)

$ oc get pod | grep lhconsole-api
lhconsole-api-86d87f7b6b-gtzvc   1/1   Running   0   23d
lhconsole-api-86d87f7b6b-vsgmj   1/1   Running   0   23d

コマンドで lhconsole-api ポッド名を指定しますが、どちらのポッド名を指定しても同じパスワードが出力されます。

$ oc exec -it lhconsole-api-86d87f7b6b-gtzvc -- cat /mnt/infra/ibm-lh-secrets/LH_INSTANCE_SECRET
k3idfu9i2s6co6wu2cBA5kxn[admin@bastion-gym-lan ~]$

パスワードの最後に改行コードが無いため、パスワードの値に続けてプロンプトが表示されますが、今回の場合パスワードの値は k3idfu9i2s6co6wu2cBA5kxn となりますので、この文字列を環境変数に設定します。
例)

export HMS_CLIENT_PASSWORD=k3idfu9i2s6co6wu2cBA5kxn

SOURCE_S3_CREDS
Ingestionのソースファイルが含まれているバケットの資格情報を、下記のフォーマットで設定します。設定する値は使用するオブジェクト・ストレージのコンソール等から入手します。

AWS_ACCESS_KEY_ID=<access_key>,AWS_SECRET_ACCESS_KEY=<secret_key>,ENDPOINT_URL=<endpoint_url>,AWS_REGION=<region>,BUCKET_NAME=<bucket_name>

例)

export SOURCE_S3_CREDS="AWS_ACCESS_KEY_ID=XXXXXXXXXX,AWS_SECRET_ACCESS_KEY=YYYYYYYYYY,ENDPOINT_URL=https://s3.jp-tok.cloud-object-storage.appdomain.cloud,AWS_REGION=jp-tok,BUCKET_NAME=src-bucket1"

TARGET_S3_CREDS
Ingestionのターゲット側のバケットの資格情報を、SOURCE_S3_CREDSと同じフォーマットで設定します。設定する値は使用するオブジェクト・ストレージのコンソール等から入手します。
例)

export TARGET_S3_CREDS="AWS_ACCESS_KEY_ID=XXXXXXXXXX,AWS_SECRET_ACCESS_KEY=YYYYYYYYYY,ENDPOINT_URL=https://s3.jp-tok.cloud-object-storage.appdomain.cloud,AWS_REGION=jp-tok,BUCKET_NAME=tgt-bucket1"

USE_NATIVE_SPARK
USE_NATIVE_SPARK はwatsonx.data 2.0.2 で追加された環境変数です。Native Sparkエンジンは、インフラストラクチャー・マネージャーでエンジンを追加する時に「新規インスタンスの作成」を選択して登録されたエンジンです。登録されたSparkエンジンはインフラストラクチャー・マネージャー上で青い色で表示されます。
image.png
IngestionにNative Sparkエンジンを使用する場合は true に設定します。
例)

export USE_NATIVE_SPARK=true

USE_EXTERNAL_SPARK
USE_EXTERNAL_SPARK はwatsonx.data 2.0.2 で追加された環境変数です。External Sparkエンジンは、インフラストラクチャー・マネージャーでSparkエンジンを追加する時に「インスタンスの詳細を入力してください」か「同じ場所にあるインスタンスの選択」を選択して登録されたSparkエンジンです。「同じ場所にあるインスタンス」は同じCP4D内で、CP4D Webコンソールのインスタンスのメニューから作成されたSparkエンジンですが、watsonx.data 2.0.2 では非推奨となり、2.0.3 では削除される予定です。登録されたSparkエンジンはインフラストラクチャー・マネージャー上で灰色で表示されます。
image.png
IngestionにExternal Sparkエンジンを使用する場合は true に設定します。
例)

export USE_EXTERNAL_SPARK=true

watsonx.data 2.0.2では、USE_NATIVE_SPARK か USE_EXTERNAL_SPARK どちらかを環境変数として設定する必要があります。

INSTANCE_ID
INSTANCE_IDはwatsonx.data 2.0.2 で追加された環境変数で、watsonx.data 2.0.2 では設定する必要があります。
INSTANCE_IDにはwatsonx.dataのインスタンスIDを指定します。watsonx.dataのインスタンスIDは、watsonx.dataのWebコンソールの左下に表示されている「インスタンスの詳細」アイコン image.png を押すと表示されるインスタンスの詳細な情報の中に含まれています。
image.png
設定例)

export INSTANCE_ID=1724906185944494

IBM_LH_SPARK_ で始まる環境変数は、オプションのIngestionのエンジンの構成に関する環境変数です。Ingestionのパフォーマンスに影響を与えるため設定した方が良いでしょう。デフォルトでは最小のリソースしか確保されません。今回は下記の値を設定します。お客様の本番環境に設定する場合は事前に検証して最適な値を設定する必要があります。

export IBM_LH_SPARK_EXECUTOR_CORES=2
export IBM_LH_SPARK_EXECUTOR_MEMORY=4G
export IBM_LH_SPARK_EXECUTOR_COUNT=2
export IBM_LH_SPARK_DRIVER_CORES=2
export IBM_LH_SPARK_DRIVER_MEMORY=4G

4. "ibm-lh data-copy" コマンドに指定するパラメーターの決定

"ibm-lh data-copy" コマンドに指定するパラメーターを決定します。本記事では、ソースファイルとして S3 バケット内の Parquet ファイルを指定する場合に必要なパラメーターについてのみ説明します。

パラメーター パラメーターの意味
source-data-files S3 バケット内の Paquet ファイルか CSV ファイル、又はフォルダーへのパス。フォルダーのパスの場合は最後が"/"である必要があります。
target-tables <カタログ名>.<スキーマ名>.<テーブル名> の形式のターゲット表。
ingestion-engine-endpoint hostname=’’,port=’’,type=spark” の形式のIngestion エンジンのエンドポイント。type は spark である必要があります。
trust-store-path Spark ジョブのポッド内の truststore 資格情報のパス。決め打ちで file:///opt/ibm/jdk/lib/security/cacerts を指定。
trust-store-password Spark ジョブのポッド内の truststore 資格情報のパスワード。決め打ちで changeit を指定。
target-catalog-uri thrift:// の形式の Hive Metastore thrift のエンドポイント。

各パラメーターの設定ついて説明します。

source-data-files
"s3://<バケット名>/<フォルダー名>/<ファイル名>" 又は "s3://<バケット名>/<フォルダー名>/" の形式でソースファイルを指定します。

1個のファイルを指定する例)
--source-data-files s3://src-bucket1/folder1/test1.parquet

フォルダー名を指定する例)
--source-data-files s3://src-bucket1/folder1/

target-tables
"s3://<ターゲットのカタログ名>/<スキーマ名>/<ターゲット表名>" の形式でターゲット表を指定します。
例)

--target-tables tgtbucket1.target1.gvt_data_v

ingestion-engine-endpoint
hostname と port には何も指定しません。type=spark を指定します。
例)

--ingestion-engine-endpoint "hostname='',port='',type=spark"

trust-store-path
例) 決め打ちで下記のように設定します。

--trust-store-path file:///opt/ibm/jdk/lib/security/cacerts

trust-store-password
例) 決め打ちで下記のように設定します。

--trust-store-password changeit

target-catalog-uri
watsonx.data の Webコンソールで「インフラストラクチャー・マネージャー」を開き、Ingestionのターゲットに使用するカタログを選択します。
「詳細」タブに表示される「メタストア・ホスト」の値をコピーして thrift:// の後に指定します。
image.png

例)

--target-catalog-uri 'thrift://ibm-lh-lakehouse-hive-metastore-svc.cpd-operands.svc.cluster.local:9083'

バッチモード Ingestion の実行

スクリプト・ファイルの作成

これまで説明した環境変数の設定と "ibm-lh data-copy" へのパラメーターを元にバッチモード Ingestion 用のスクリプト・ファイルを作成し、実行権限を付けます。

watsonx.data 2.0.1の例)

$ cat spark-legacy-batch-ingestion-parq-single_qiita.sh
export ENABLED_INGEST_MODE=SPARK_LEGACY
CPD_USER=cpadmin
CPD_PASSWORD=zzzzzzzz
cpd_cluster_host=cpd-cpd-operands.apps.66820424808b98001eb03c88.cloud.techzone.ibm.com
export IBM_LH_BEARER_TOKEN=`curl -k -X POST -H "cache-control: no-cache" -H "Content-Type: application/json" -d "{\"username\":\"${CPD_USER}\",\"password\":\"${CPD_PASSWORD}\"}" "https://${cpd_cluster_host}/icp4d-api/v1/authorize" | jq -r '.token'`
export IBM_LH_SPARK_JOB_ENDPOINT=https://cpd-cpd-operands.apps.66820424808b98001eb03c88.cloud.techzone.ibm.com/v4/analytics_engines/3f209068-3ba1-4cca-9df1-18ad655f3c8e/spark_applications
export HMS_CLIENT_USER=lakehouse
export HMS_CLIENT_PASSWORD=k3idfu9i2s6co6wu2cBA5kxn
export SOURCE_S3_CREDS="AWS_ACCESS_KEY_ID=XXXXXXXXXX,AWS_SECRET_ACCESS_KEY=YYYYYYYYYY,ENDPOINT_URL=https://s3.jp-tok.cloud-object-storage.appdomain.cloud,AWS_REGION=jp-tok,BUCKET_NAME=src-bucket1"
export TARGET_S3_CREDS="AWS_ACCESS_KEY_ID=XXXXXXXXXX,AWS_SECRET_ACCESS_KEY=YYYYYYYYYY,ENDPOINT_URL=https://s3.jp-tok.cloud-object-storage.appdomain.cloud,AWS_REGION=jp-tok,BUCKET_NAME=tgt-bucket1"
export IBM_LH_SPARK_EXECUTOR_CORES=2
export IBM_LH_SPARK_EXECUTOR_MEMORY=4G
export IBM_LH_SPARK_EXECUTOR_COUNT=2
export IBM_LH_SPARK_DRIVER_CORES=2
export IBM_LH_SPARK_DRIVER_MEMORY=4G

bash ./ibm-lh data-copy \
--source-data-files s3://src-bucket1/parquet_folder/test1.parquet \
--target-table tgtbucket1.target1.gvt_data_v \
--ingestion-engine-endpoint "hostname='',port='',type=spark" \
--target-catalog-uri thrift://ibm-lh-lakehouse-hive-metastore-svc.cpd-operands.svc.cluster.local:9083 \
--trust-store-path file:///opt/ibm/jdk/lib/security/cacerts \
--trust-store-password changeit

スクリプト・ファイルを実行します。
例)

$ ./spark-legacy-batch-ingestion-parq-single_qiita.sh
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1271  100  1227  100    44   5199    186 --:--:-- --:--:-- --:--:--  5362
Start data migration
Ingesting SECTION: cmdline
Spark ingestion job 2046e434-3547-4008-be8f-2477908cb529 submitted.

Spark ingestion job 2046e434-3547-4008-be8f-2477908cb529 in progress...
Complete migration

最後に "Complete migration" が表示されれば Ingestion は成功です。

Ingestion ジョブが失敗した場合は、下記の手順で Ingestionのポッドの標準出力のログ を入手して、原因を調査します。

1.watsonx.data ではなく CP4D のWebコンソールに戻り、左上のナビゲーション・メニューの「管理」の中の「ストレージ・ボリューム」を選択します。
image.png

2.Sparkのストレージ・ボリュームを選択します。Sparkのストレージ・ボリューム名は、"<watsonx.dataがインストールされた名前スペース>::<Sparkのインスタンスを作成した時に指定したボリューム名>" となります。

image.png

3.「ファイル・ブラウザ」のタブを選択し、Sparkのボリュームの下に表示されている項目を展開します。この項目番号は Sparkのジョブの履歴を保管するSpark履歴サーバーのポッド ( spark-history-deployment-<ID> ) のIDのようですが詳細は不明です。

4.項目を展開すると、過去に実行したSparkのIngestionジョブのIDの一覧が表示されますので、Ingestionを実行した時に標準出力に表示された "Spark ingestion job <ジョブID> submitted." メッセージに表示されたジョブIDを探してクリックして展開します。ジョブIDは時系列順ではなく、ランダムに生成されると思われるジョブIDの数字の順に表示されます。
image.png

5.さらにジョブIDの下に表示される logs ファルダーをクリックすると右側に logs フォルダーに含まれるログの一覧が表示されますので、通常一番下に表示される spark-driver-<ジョブID>-stdou ファイルをチェックし、「ダウンロード」をクリックします。
image.png

6.ブラウザによりダウンロードされたファイルが表示されますので「開く」か「名前を付けて保存」を選択し、テキスト・エディターで開きます。
このログは Ingestion の Driver Podの標準出力のログで、Ingestionの実行状況が最も詳細に記録されるログですので、これを調査してIngestionの失敗の原因を特定します。

以上で、watsonx.data 2.0.1 で Spark を使用した CLI によるバッチモード ingestion の実行方法の紹介を終わります。

Presto を使用した watsonx.data 2.0.1 でのCLI によるバッチモード ingestion の実行方法については以下の記事をご参照ください。

watsonx.data 2.0.1 で Presto を使用した CLI によるバッチモード ingestion を実行してみた

Spark REST API を使用した watsonx.data 2.0.1 でのCLI によるバッチモード ingestion の実行方法については以下の記事をご参照ください。
watsonx.data 2.0.1 で Spark REST API を使用した CLI によるバッチモード ingestion を実行してみた

この記事の変更履歴

2024年8月26日 初版
2024年9月12日
・watsonx.data 2.0.2 で追加された環境変数の記述を追加
・IBM_LH_SPARK_JOB_ENDPOINT 環境変数につていの記述を追加

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?