6
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

IBM Analytics Engine Sparkからwatsonx.data を操作: 「3. Sparkによる処理」

Last updated at Posted at 2023-12-12

watsonx.data では、以下のユースケースを実現するために IBM Analytics Engine Spark を使用することが推奨されています。

  1. 大量のデータのwatsonx.dataテーブルへの取り込み(取り込み前にデータをクレンジングして変換可能)
  2. watsonx.data テーブルのパフォーマンス向上のためのテーブルのメンテナンス操作
  3. クエリとして表現するのが難しい複雑な分析ワークロード

「IBM Analytics Engine Sparkからwatsonx.data を操作」ではpythonを使ってwatsonx.dataのテーブルをIBM Analytics Engine instance(Spark)でアクセスする方法について説明します。環境はIBM Cloudです。

IBM Analytics Engine Sparkからwatsonx.data を操作
以下の順序で説明します。この記事は「3. Sparkによる処理」です。

1. IBM Analytics Engine Spark インスタンスの作成
2. IBM Analytics Engine Spark の構成
3. Sparkによる処理

下の図が全体像です。この記事は⑦、⑧をまず準備し、⑥を作成しアップロードし、④のアクセス情報と共に①にRESTAPIでPython実行ファイルの登録を行い、その結果①が⑥のコードによって②③を操作し処理します。
出力は⑨で確認します。
[図1: 全体像]
qiita.jpg

尚、この内容は公式ドキュメントGetting started with Spark use caseに書かれている内容を環境に合わせる部分を加え追加して説明しているものです。

当記事はGetting started with Spark use caseにあるSpark sample python fileを動作させる方法について説明します。

0. 前提

  • IBM watsonx.dataのインスタンスがあること
  • IBM Analytics Engineのインスタンス があり「2. IBM Analytics Engine Spark の構成」で構成済みであること
  • オブジェクトストレージ(バケット)が1つ以上カタログ・タイプ「Apache Iceberg」でIBM watsonx.dataに登録済みであること(図1の②、③)
  • Spark実行ソースを置くオブジェクトストレージのバケット(図1の④)
  • Ingest用ソースを置くオブジェクトストレージのバケット --- これはSpark実行ソースを置くバケットと同じでも問題ないです(図1の⑤)
  • IBM Analytics Engineにアクセス可能なAPIKEY

あったほうがよいもの:
エラーやログやprintで出力した内容など標準出力に出力されるメッセージは以下がないと見れません:

  • IBM Log Analysis with LogDNA(図1の⑨)
    • ライトプランでも使えないことはないですが、検索ができないかつライブでのログのみなので、有料の「7日間のログ検索」以上のプランがおすすめです
    • 当記事では「IBM Analytics Engine」のログを「IBM Log Analysis with LogDNA」で見るためのログの設定方法、およびその見方などは説明していません。以下を参照して設定、参照をお願いします
         - Analytics Engine 「ログの構成および表示

1. 前準備: 必要データのCOSへのアップロード

Spark sample python fileにはIngestサンプルがあるので、そこで使用するデータファイルをダウンロードし、COSにアップロードしておきます。

1-1. TLC Trip Record Data(NYのタクシーのオープンデータ)のダウンロード

TLC Trip Record Dataより、2022年のJanuary, February, March, April, May, Juneの6ヶ月分のYellow Taxi Trip Recordsをダウンロードします。
image.png

1-2. zipcodes.csvのダウンロード

https://raw.githubusercontent.com/spark-examples/spark-scala-examples/3ea16e4c6c1614609c2bd7ebdffcee01c0fe6017/src/main/resources/zipcodes.csv
よりダウンロードします。

1-3. COSへのへのアップロード

  1. Ingest用ソースを置くオブジェクトストレージのバケット(図1の⑤)に、1-1でダウンロードしたファイルを、/nyc-taxi/というフォルダーにアップロードします。

2.  Ingest用ソースを置くオブジェクトストレージのバケット(図1の⑤)に、1-2でダウンロードしたファイルを、アップロードします。(フォルダーはなし、ルートにそのまま)

image.png

2. Spark sample python fileの修正

Spark sample python fileはIBM Analytics Engine Sparkからwatsonx.data を操作する実行ファイルのサンプルで、pythonで書かれています。

Spark sample python fileですが、これだけ読むと< >に囲まれたオブジェクトストレージ情報のみ入れればいいのかと思うのですが、実際には自分の環境に合わせて他も変更する必要があります。

以下に変更すべき部分を変数化したファイルを作成しましたので、これをダウンロードし、自分の環境の値をセットしてください。
https://github.com/kyokonishito/watsonx_spark/blob/main/lakehouse-spark-sample.py

コード上部に変数部分をまとめましたので、以下の値をセットしてください。

変数名
CATALOG_NAME カタログ名(図1の②)
SCHEMA_NAM スキーマ名(図1の②の中に作成するスキーマ名)
DB_BUCKET カタログに登録したオブジェクトストレージのバケット名(図1の③)
DB_BUCKET_ENDPOINT カタログに登録したオブジェクトストレージ(図1の③)のエンドポイント
DB_BUCKET_ACCESS_KEY カタログに登録したオブジェクトストレージ(図1の③)のACCESS KEY
DB_BUCKET_SECRET_KEY カタログに登録したオブジェクトストレージ(図1の③)のSECRET KEY
SOURCE_BUCKET データファイルをアップロードしたオブジェクトストレージのバケット名(図1の⑤)
SOURCE_BUCKET_ENDPOINT データファイル位アップロードしたオブジェクトストレージ(図1の⑤)のエンドポイント
SOURCE_BUCKET_ACCESS_KEY データファイルをアップロードしたオブジェクトストレージ(図1の⑤)のACCESS KEY
SOURCE_BUCKET_SECRET_KEY データファイルをアップロードしたオブジェクトストレージ(図1の⑤)のSECRET KEY

設定例:

# 以下自分の環境の値にセットする
CATALOG_NAME="nishito_iceberg"
SCHEMA_NAME="demodb"
DB_BUCKET="nishito-iceberg"
DB_BUCKET_ENDPOINT="s3.direct.jp-tok.cloud-object-storage.appdomain.cloud"
DB_BUCKET_ACCESS_KEY = "xxxxxxxxxxxxxxxx"
DB_BUCKET_SECRET_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
SOURCE_BUCKET="spark-source"
SOURCE_BUCKET_ENDPOINT="s3.direct.us-south.cloud-object-storage.appdomain.cloud"
SOURCE_BUCKET_ACCESS_KEY = "xxxxxxxxxxxxxxxxx"
SOURCE_BUCKET_SECRET_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

あとはサンプルはmainの部分で実行単位の関数でまとまっていますので、実行したい部分のみ生かして、実行したくない部分はコメントアウトすると確認しやすいです。

またオリジナルは最後に作成したテーブルを削除しているのですが、削除してしまうと何をやったのか結果がよくわからないので、コメントアウトして実行しないようにしています。ここは必要に応じて復活させください。

以下簡単な関数の説明です。

  • init_spark(): sparkセッションの初期化
  • create_database(spark): スキーマの作成
  • list_databases(spark): スキーマの一覧
  • basic_iceberg_table_operations(spark): Icebergテーブル基本操作
  • create_table_from_parquet_data(spark): parquetデータをIcebergテーブルに取り込む
  • ingest_from_csv_temp_table(spark): csvデータをIcebergテーブルに取り込む
  • ingest_monthly_data(spark): 上で作成したyellow_taxi_2022テーブルに、2月から6月までのデータをロードする
  • perform_table_maintenance_operations(spark): テーブル保守
  • evolve_schema(spark): テーブルの変更(列の追加)
  • clean_database(spark): 作成したテーブル、スキーマの削除

修正が完了したらファイルを保存します。 これが図1の⑥のファイルです。

3. python実行ファイル(図1の⑥)のCOSへのアップロード

IBM Analytics Engine SparkはCOSからファイルを読んで実行します。

Spark実行ソースを置くオブジェクトストレージのバケット(図1の④)に、2で作成した実行pythonファイル(図1の⑥)をCOSにアップロードします。
image.png

4. python実行ファイルをSparkにSubmit

Curlまたはpython等でREST APIを呼び出し、python実行ファイルをSparkにSubmitします。
RESTAPI呼び出しの際にのIAMトークン取得のためにAPIKEYが必要です。
ない場合は以下を参考に作成してください。

なお、下記4-1と4-2はpythoncのnotebookを作成してありますので、以下のnotebookを使用して必要事項をセットし、実行することでも実行できます。
https://github.com/kyokonishito/watsonx_spark/blob/main/submit_spark_app.ipynb

4-0. 事前準備 Instance IDの取得

IBM Cloudのリソースリストから、使用するIBM Analytics Engineの詳細を表示し、「構成」タブのInstance IDをコピーして取得しておいてください。
image.png

4-1. IAMトークンの生成

以下の を自分のAPIKEYに置き換えて、実行してください。

curl -X POST 'https://iam.cloud.ibm.com/identity/token' \
--header "Content-Type: application/x-www-form-urlencoded" \
--data-urlencode "grant_type=urn:ibm:params:oauth:grant-type:apikey" \
--data-urlencode "apikey=<your-api-key>"

以下のように出力されますので、"access_token"の値をコピーしておいてください。
出力例
image.png

尚、これは以下のnotebookを使用して必要事項をセットし、実行することでも実行できます。
https://github.com/kyokonishito/watsonx_spark/blob/main/submit_spark_app.ipynb

4-2. python実行ファイルをSparkにSubmit

以下の< >で囲まれた部分を、自分の環境の値に変更して、実行してください。

  • <region>: IBM Analytics Engineのリージョン(us-south, jp-tokなど)
  • <iae-instance-guid>: 4-0で取得しInstance IDの文字列
  • <iam-bearer-token>: 4-1で取得したaccess_tokenの文字列
  • <application_bucket>: python実行ファイル をアップロードしたCOS のバケット名(図1の④)
  • <filename>: python実行ファイル のファイル名(図1の⑥)
  • <object_storage_endpoint>: python実行ファイル をアップロードしたCOS のendpoint
  • <hmac_access_key_for_application-bucket>: python実行ファイル をアップロードしたCOS(図1の④) のaccess key
  • <: python実行ファイル をアップロードしたCOS(図1の④) のsecret key
curl https://api.<region>.ae.cloud.ibm.com/v3/analytics_engines/<iae-instance-guid>/spark_applications \
-H "Authorization: Bearer <iam-bearer-token>" \
-X POST -d '{ "application_details": { "application": "s3a://<application_bucket>/<filename>", "conf": {"spark.hadoop.fs.s3a.bucket.<application-bucket>.endpoint": "<object_storage_endpoint>", "spark.hadoop.fs.s3a.bucket.<application-bucket>.access.key": "<hmac_access_key_for_application-bucket>", "spark.hadoop.fs.s3a.bucket.<application-bucket>.secret.key": "<hmac_secret_key_for_application-bucket>"  } } }'

実行例
image.png

尚、これは以下のnotebookを使用して必要事項をセットし、実行することでも実行できます。
https://github.com/kyokonishito/watsonx_spark/blob/main/submit_spark_app.ipynb

"state":"accepted"が表示されればOKです。

5. 実行状況確認

IBM Cloudのリソースリストから、使用するIBM Analytics Engineの詳細を表示します。
「アプリケーション」タブの「状態」に実行状態が表示されます。

  • 受付待ち
  • 実行中
  • 完了
  • 失敗

完了は正常終了
失敗は何かしらのエラーで異常終了したことを示しています。

image.png

正常終了後、watsonx.dataのGUIでテーブルができているか等確認してみてください。
image.png

6. ログ・標準出力確認

ログや標準出力確認をしたい場合は、IBM Log Analysis with LogDNA(図1の⑨) を使います。以下を参照して設定、参照をお願いします。
   - Analytics Engine 「ログの構成および表示

image.png

以上です。

6
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?