1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

watsonx.data 2.0.1 で Spark REST API を使用した CLI によるバッチモード ingestion を実行してみた

Last updated at Posted at 2024-08-27

はじめに

watsonx.data 2.0.0 から CLI ("ibm-lh data-copy" コマンド) による Ingestion (データの取り込み) は Presto と Spark と Spark REST API の3つのモードをサポートするようになりました。

・Presto を使用するモードは、watsonx.data 1.1.x と同じ Prestoを使用するモードです。
・Spark を使用するモードは、watsonx.data 1.1.x と同じSparkを使用するモードです。
・Spark REST API を使用するモードは、watsonx.data 2.0.0 で追加された新しいモードです。

それぞれのモードでIngestionを実行するために ENABLED_INGEST_MODE 環境変数を設定します。

ENABLED_INGEST_MODEの値 モード
PRESTO Prestoを使用するモード
SPARK_LEGACY Sparkを使用するモード
SPARK Spark REST APIを使用するモード

今回は Spark REST API モードの CLI Ingestion の実行方法について ご紹介します。

前提

IBM Cloud Pak for Data (CP4D) 5.0.0/5.0.1上に「データ・ソース・サービス」の一つである watsonx.data 2.0.0/2.0.1 がインストールされている事。
CP4Dが5.0.0の場合はwatsonx.dataは2.0.0、CP4Dが5.0.1の場合はwatsonx.dataは2.0.1をインストールします。
本記事の検証は、CP4D 5.0.1 に watsonx.data 2.0.1 をインストールした環境を使用しています。

事前準備

1. ibm-lh-client のインストール

watsonx.data のクライアントツールである ibm-lh-client をクライアント・マシンにインストールします。本検証では Red Hat Enterprise Linux 8.6 に ibm-lh-client をインストールしています。
インストールは下記のマニュアルの手順に従って容易に実行する事ができます。
Installing ibm-lh-client

手順の中で、環境変数 LH_RELEASE_TAG にインストールする ibm-lh-client のバージョンを設定しますが、LH_RELEASE_TAG=latest に設定すると その時点での最新バージョンのパッケージがダウンロードされてインストールされます。2024年8月23日時点での最新バージョンは 2.0.1 となります。

2. Ingestionで取り込むデーター(ソース)、取り込み先(ターゲット)の表の準備

Ingestionで取り込むデーター、取り込み先の表を予め準備しておく必要があります。
本検証では取り込むデーターも取り込み先の表も、IBM Cloud Object Storage に作成したバケット内に用意しました。

取り込み先の表は、watsonx.data のWebコンソールのインフラストラクチャー・マネージャーでApache Iceberg タイプとして登録したターゲット用のカタログにスキーマを作成し、スキーマの下に表を作成しておきます。

事前準備が完了した状態のインフラストラクチャー・マネージャーの表示は以下のようになります。watsonx.data ではバケットはストレージとして表示されます。
image.png

3. 環境変数の設定

Spark REST API を使用した CLI のIngestion を実行する場合、下記の環境変数を設定する必要があります。

環境変数名 環境変数の意味
ENABLED_INGEST_MODE Ingestionのモード
IBM_LH_SPARK_JOB_ENDPOINT SparkアプリケーションV4のエンドポイント
SOURCE_S3_CREDS ソースのバケットのS3資格情報
TARGET_S3_CREDS ターゲットのバケットのS3資格情報
IBM_LH_SPARK_EXECUTOR_CORES IngestionのExecutor ポッドのCPUコア数 (オプション)。デフォルトは1。
IBM_LH_SPARK_EXECUTOR_MEMORY IngestionのExecutor ポッドのメモリー (オプション)。デフォルトは1G。
IBM_LH_SPARK_EXECUTOR_COUNT IngestionのExecutor ポッドの数 (オプション)。デフォルトは1。
IBM_LH_SPARK_DRIVER_CORES IngestionのDriver ポッドのCPUコア数 (オプション)。デフォルトは1。
IBM_LH_SPARK_DRIVER_MEMORY IngestionのDriver ポッドのメモリー (オプション)。デフォルトは1G。

必須の環境変数の設定方法について説明していきます。

予め OCPクラスターにログインし、watsonx.data がインストールされているプロジェクトに変更しておきます。
例)

$ oc project cpd-operands
Now using project "cpd-operands" on server "https://api.66820424808b98001eb03c88.cloud.techzone.ibm.com:6443".

ENABLED_INGEST_MODE
「はじめに」に記述したとおり、ENABLED_INGEST_MODE には SPARK を設定します。
SPARKがデフォルトのモードで、ENABLED_INGEST_MODE 環境変数が設定されていない場合 SPARK モードとして実行されます。
例)

export ENABLED_INGEST_MODE=SPARK

SOURCE_S3_CREDS
Ingestionのソースファイルが含まれているバケットの資格情報を、下記のフォーマットで設定します。設定する値は使用するオブジェクト・ストレージのコンソール等から入手します。

AWS_ACCESS_KEY_ID=<access_key>,AWS_SECRET_ACCESS_KEY=<secret_key>,ENDPOINT_URL=<endpoint_url>,AWS_REGION=<region>,BUCKET_NAME=<bucket_name>

例)

export SOURCE_S3_CREDS="AWS_ACCESS_KEY_ID=XXXXXXXXXX,AWS_SECRET_ACCESS_KEY=YYYYYYYYYY,ENDPOINT_URL=https://s3.jp-tok.cloud-object-storage.appdomain.cloud,AWS_REGION=jp-tok,BUCKET_NAME=src-bucket1"

TARGET_S3_CREDS
Ingestionのターゲット側のバケットの資格情報を、SOURCE_S3_CREDSと同じフォーマットで設定します。設定する値は使用するオブジェクト・ストレージのコンソール等から入手します。
例)

export TARGET_S3_CREDS="AWS_ACCESS_KEY_ID=XXXXXXXXXX,AWS_SECRET_ACCESS_KEY=YYYYYYYYYY,ENDPOINT_URL=https://s3.jp-tok.cloud-object-storage.appdomain.cloud,AWS_REGION=jp-tok,BUCKET_NAME=tgt-bucket1"

IBM_LH_SPARK_ で始まる環境変数は、オプションのIngestionのエンジンの構成に関する環境変数です。Ingestionのパフォーマンスに影響を与えるため設定した方が良いでしょう。デフォルトでは最小のリソースしか確保されません。今回は下記の値を設定します。お客様の本番環境に設定する場合は事前に検証して最適な値を設定する必要があります。

export IBM_LH_SPARK_EXECUTOR_CORES=2
export IBM_LH_SPARK_EXECUTOR_MEMORY=4G
export IBM_LH_SPARK_EXECUTOR_COUNT=2
export IBM_LH_SPARK_DRIVER_CORES=2
export IBM_LH_SPARK_DRIVER_MEMORY=4G

4. "ibm-lh data-copy" コマンドに指定するパラメーターの決定

"ibm-lh data-copy" コマンドに指定するパラメーターを決定します。本記事では、ソースファイルとして S3 バケット内の Parquet ファイルを指定する場合に必要なパラメーターについてのみ説明します。

パラメーター パラメーターの意味
source-data-files S3 バケット内の Paquet ファイルか CSV ファイル、又はフォルダーへのパス。フォルダーのパスの場合は最後が"/"である必要があります。
target-tables <カタログ名>.<スキーマ名>.<テーブル名> の形式のターゲット表。
user Ingestionを実行するCP4Dのユーザー名
password Ingestionを実行するCP4Dのユーザーのパスワード
url watsonx.dataのベースのURL
engine-id SparkエンジンのID
instance-id CP4DのインスタンスID
sync-status Ingestionを同期モードで実行する (オプション)

各パラメーターの設定ついて説明します。

source-data-files
"s3://<バケット名>/<フォルダー名>/<ファイル名>" 又は "s3://<バケット名>/<フォルダー名>/" の形式でソースファイルを指定します。

1個のファイルを指定する例)
--source-data-files s3://src-bucket1/folder1/test1.parquet

フォルダー名を指定する例)
--source-data-files s3://src-bucket1/folder1/

target-tables
"s3://<ターゲットのカタログ名>/<スキーマ名>/<ターゲット表名>" の形式でターゲット表を指定します。
例)

--target-tables tgtbucket1.target1.gvt_data_v

ingestion-engine-endpoint
hostname と port には何も指定しません。type=spark を指定します。
例)

--ingestion-engine-endpoint "hostname='',port='',type=spark"

user
CP4Dのインスタンスに接続するためのCP4Dのユーザーを指定します。
例)

--user cpadmin

password
CP4Dのインスタンスに接続するためのCP4Dのユーザーのパスワードを指定します
例)

--password zzzzzzzz

url
watsonx.dataのベースのURLを指定します。
"https://" + oc get route | grep "cpd " を実行した結果の HOSTの値 (結果の2列目) を指定します。"cpd " (cpdの後にブランクを指定) で grep してください。
例)

$ oc get route | grep "cpd "
cpd  cpd-cpd-operands.apps.66820424808b98001eb03c88.cloud.techzone.ibm.com   ibm-nginx-svc  ibm-nginx-https-port  reencrypt/Redirect None

--url https://cpd-cpd-operands.apps.66820424808b98001eb03c88.cloud.techzone.ibm.com

engine-id
Ingestionに使用するSparkのエンジンIDを指定します。
watsonx.data のWebコンソールからインフラストラクチャー・マネージャーを表示し、Ingestionに使用するSparkエンジンを選択して「詳細」に表示される エンジンID を指定します。
例)
image.png

--engine-id spark127

instance-id
watsonx.dataのインスタンスIDを指定します。インスタンスIDは下記の2つの方法で取得できます。
① watsonx.dataのWebコンソールのURLから入手
例)

https://cpd-cpd-operands.apps.66820424808b98001eb03c88.cloud.techzone.ibm.com/watsonx-data/#/home?instanceId=1719823250083405
に含まれる instanceId=1719823250083405

② watsonx.dataのWebコンソールの左下にある「インスタンスの詳細」アイコン
image.png
を押すと表示されるインスタンスID

image.png

--instance-id 1719823250083405

sync-status
Spark REST API モードでは、"ibm-lh data-copy"コマンドを実行すると、"Ingest job submitted" が表示されてすぐに"Complete migration"が表示されて コマンドは終了しますが、実際の Ingestion はサブミットされたジョブから生成された Ingestion のポッドで非同期に実行されるため Ingestion の終了を検知する事ができません。
syn-status を指定すると Ingestion が完了してから"ibm-lh data-copy"コマンドが終了するため Ingestion の終了が明確になります。
例)

--sync-status

バッチモード Ingestion の実行

スクリプト・ファイルの作成

これまで説明した環境変数の設定と "ibm-lh data-copy" コマンドへのパラメーターを元にバッチモード Ingestion 用のスクリプト・ファイルを作成し、実行権限を付けます。

例)

$ cat spark-rest-batch-ingestion-parq-single_qiita.sh
export ENABLED_INGEST_MODE=SPARK
export IBM_LH_SPARK_JOB_ENDPOINT=https://cpd-cpd-operands.apps.66820424808b98001eb03c88.cloud.techzone.ibm.com/v2/spark/v3/instances/3f209068-3ba1-4cca-9df1-18ad655f3c8e/spark/applications
export SOURCE_S3_CREDS="AWS_ACCESS_KEY_ID=XXXXXXXXXX,AWS_SECRET_ACCESS_KEY=YYYYYYYYYY,ENDPOINT_URL=https://s3.jp-tok.cloud-object-storage.appdomain.cloud,AWS_REGION=jp-tok,BUCKET_NAME=src-bucket1"
export TARGET_S3_CREDS="AWS_ACCESS_KEY_ID=XXXXXXXXXX,AWS_SECRET_ACCESS_KEY=YYYYYYYYYY,ENDPOINT_URL=https://s3.jp-tok.cloud-object-storage.appdomain.cloud,AWS_REGION=jp-tok,BUCKET_NAME=tgt-bucket1"
export IBM_LH_SPARK_EXECUTOR_CORES=2
export IBM_LH_SPARK_EXECUTOR_MEMORY=4G
export IBM_LH_SPARK_EXECUTOR_COUNT=2
export IBM_LH_SPARK_DRIVER_CORES=2
export IBM_LH_SPARK_DRIVER_MEMORY=4G

./ibm-lh data-copy \
--source-data-files s3://src-bucket1/parquet_folder/test1.parquet \
--target-table tgtbucket1.target1.gvt_data_v \
--user cpadmin \
--password zzzzzzzz \
--url https://cpd-cpd-operands.apps.66820424808b98001eb03c88.cloud.techzone.ibm.com \
--engine-id spark127 \
--instance-id 1719823250083405 \
--sync-status

スクリプト・ファイルを実行します。
例)

$ ./spark-rest-batch-ingestion-parq-single_qiita.sh
Start data migration
Ingesting SECTION: cmdline
{'create_if_not_exist': False, 'csv_property': {'encoding': 'utf-8', 'escape_character': '\\', 'field_delimiter': ',', 'header': True, 'line_delimiter': '\n'}, 'engine_id': 'spark127', 'engine_name': 'Spark-01', 'execute_config': {'driver_cores': 2, 'driver_memory': '4G', 'executor_cores': 2, 'executor_memory': '4G', 'num_executors': 2}, 'instance_id': '1719823250083405', 'job_id': 'ingestion-cli-1724726777-6-b1fffc11ff34', 'source_data_files': 's3://src-bucket1/parquet_folder/test1.parquet', 'source_file_type': 'csv', 'start_timestamp': '1724726780490088348', 'status': 'starting', 'target_table': 'tgtbucket1.target1.gvt_data_v', 'username': 'cpadmin'}
Ingestion job status:starting
Ingestion job completed
Complete migration

最後に "Complete migration" が表示されれば Ingestion は成功です。

Ingestion ジョブが失敗した場合は、下記の手順で Ingestionのポッドの標準出力のログ を入手して、原因を調査します。

1.watsonx.data ではなく CP4D のWebコンソールに戻り、左上のナビゲーション・メニューの「管理」の中の「ストレージ・ボリューム」を選択します。
image.png

2.Sparkのストレージ・ボリュームを選択します。Sparkのストレージ・ボリューム名は、"<watsonx.dataがインストールされた名前スペース>::<Sparkのインスタンスを作成した時に指定したボリューム名>" となります。

image.png

3.「ファイル・ブラウザ」のタブを選択し、Sparkのボリュームの下に表示されている項目を展開します。この項目番号は Sparkのジョブの履歴を保管するSpark履歴サーバーのポッド ( spark-history-deployment-<ID> ) のIDのようですが詳細は不明です。

4.項目を展開すると、過去に実行したSparkのIngestionジョブのIDの一覧が表示されますので、Ingestionを実行した時に標準出力に表示された "Spark ingestion job <ジョブID> submitted." メッセージに表示されたジョブIDを探してクリックして展開します。ジョブIDは時系列順ではなく、ランダムに生成されると思われるジョブIDの数字の順に表示されます。
image.png

5.さらにジョブIDの下に表示される logs ファルダーをクリックすると右側に logs フォルダーに含まれるログの一覧が表示されますので、通常一番下に表示される spark-driver-<ジョブID>-stdou ファイルをチェックし、「ダウンロード」をクリックします。
image.png

6.ブラウザによりダウンロードされたファイルが表示されますので「開く」か「名前を付けて保存」を選択し、テキスト・エディターで開きます。
このログは Ingestion の Driver ポッドの標準出力のログで、Ingestionの実行状況が最も詳細に記録されるログですので、これを調査してIngestionの失敗の原因を特定します。

以上で、watsonx.data 2.0.1 で Spark REST API を使用した CLI によるバッチモード ingestion の実行方法の紹介を終わります。

Presto を使用した watsonx.data 2.0.1 でのCLI によるバッチモード ingestion の実行方法については以下の記事をご参照ください。
watsonx.data 2.0.1 で Presto を使用した CLI によるバッチモード ingestion を実行してみた

Sparkを使用した(SPARK LEGACY モード) watsonx.data 2.0.1 でのCLI によるバッチモード ingestion の実行方法については以下の記事をご参照ください。
watsonx.data 2.0.1 で Spark を使用した CLI によるバッチモード ingestion を実行してみた

1
1
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?