2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

watsonx.data: Sparkエンジンを使用したデータ取り込みジョブの作成

Last updated at Posted at 2023-12-13

watsonx.data(IBM Cloud SaaS版)のWeb UIでIBM Analytics Engine(Spark)エンジンを使用したデータ取り込みジョブの作成ができるようになりました(作成後はもちろん実行されます)。

尚、この投稿の作成時(2023年12月20日)は別サービスのIBM Analytics Engine(Spark)エンジンが必須でしたが、watsonx.data(IBM Cloud SaaS版)内部にSparkエンジンができましたので、別サービスは必須ではになくなりました(2024/6/11記載)。

当記事はその方法を説明します。またCSVファイルでもできるのですが、この記事はParquetファイルの取り込みをやっています。
自分のファイルでも可能ですが、ここではサンプルとして TLC Trip Record Data(NYのタクシーのオープンデータ)をテーブルに取り込んでみます。環境はIBM Cloudです。

0. 前提

  • アクセスするuseridにwatsonx.dataに対してIAMで MetastoreAccess roleがあること(2024/6/11 New)

Sparkにwatsonx.dataのSpark(native spark)を使用する場合

SparkにIBM Analytics Engineを使用する場合



  • watsonx.dataのカタログがカタログ・タイプ「Apache Iceberg」で1つ以上登録済みであること
  • 取り込み先のテーブルがIceberg 形式で作成済であること(*)

(*)作成されてない場合は以下の前準備で作ってみてください。

1. 前準備1(オプション): サンプルデータのダウンロードとサンプルテーブルの作成

もうすでにデータを取り込みたいテーブルがある場合は、ここはスキップしてください。
サンプルのテーブルを作りたい方は以下を実施してください。

1-1. TLC Trip Record Data(NYのタクシーのオープンデータ)のダウンロード

TLC Trip Record Dataより、2022年のJanuaryのYellow Taxi Trip Recordsをダウンロードします。
yellow_tripdata_2022-01.parquetというファイルが」ダウンロードされます。
taxi_1.jpg

1-2. サンプルテーブルの作成

Icebergのカタログ上でスキーマを作成後、以下のDDLでテーブルを作成してください。
lakehouseは自分のカタログ名、taxiは自分のスキーマ名に変更してください。

CREATE TABLE lakehouse.taxi.yellow_taxi_2022 (
   "VendorID" bigint,
   "tpep_pickup_datetime" timestamp,
   "tpep_dropoff_datetime" timestamp,
   "passenger_count" double,
   "trip_distance" double,
   "RatecodeID" double,
   "store_and_fwd_flag" varchar,
   "PULocationID" bigint,
   "DOLocationID" bigint,
   "payment_type" bigint,
   "fare_amount" double,
   "extra" double,
   "mta_tax" double,
   "tip_amount" double,
   "tolls_amount" double,
   "improvement_surcharge" double,
   "total_amount" double,
   "congestion_surcharge" double,
   "airport_fee" double
)
WITH (
   format = 'PARQUET'
)

2. 前準備2: エンジンに「IBM Analytics Engine(Spark)」 のエンジンを追加

取り込みジョブの作成で使用するためにエンジンに「IBM Analytics Engine(Spark)」 のエンジンを追加します。既に追加済みの場合、watsonx.dataのSpark(native spark)を使用する場合はスキップしてください。

2-1. インフラストラクチャー・マネージャーの右上「コンポーネントの追加」から「エンジンの追加」を選択します。

image.png

2-2. 「エンジンの追加」入力画面に必要事項を記入し、「レジスター」をクリック

タイプ:
IBM Analytics Engine(Spark)を選択

表示名:
表示させたい名前を記入

インスタンスAPIポイント:
<以下の手順で取得します>

  1. IBM Cloudにログインし,リソースリストを表示します。
    https://cloud.ibm.com/resources
    (ログインしていない場合はログイン画面の後、リソースリスト画面になります)

  2. 「分析」セクションにあるAnalytics Engineインスタンスの名前をクリックして詳細を表示します。
    image.png

  3. 「サービス資格情報」をクリック。
    image.png

  4. 新規資格情報 をクリックして、新しいサービス資格情報を作成します。
    名前を入力し、役割を選択して、 追加 をクリックします。
    image.png

3.作成された資格情報をクリックし中身を表示し、instance_apiをクリップボードにコピーし、入力フィールドに貼り付けます。
image.png

APIキー:
<以下の手順で取得します>
上記インスタンスAPIポイントで使用した 「サービス資格情報」をクリックし中身を表示し、apikeyをクリップボードにコピーし、入力フィールドに貼り付けます。
image.png

入力が完了したら、「レジスター」をクリックします。
image.png

無事登録されると、以下のように「インフラストラクチャー・マネージャー」に表示されるようになります
image.png

3. 取り込み用データをCOSにアップロード

wastonx.dataに登録済のオブジェクトストレージのバケットに、取り込み用データをアップロードします。

ここではサンプルとして1-1でダウンロードしたファイルを、アップロードします。
この例ではwastonx.dataに登録済のオブジェクトストレージnishito-iceberg というバケットのdata_fileというフォルダーにアップロードしました。
image.png

4.  取り込みジョブの作成

4-1. 「データの取り込み」画面の表示

watsonx.dataの左側のメニューから「データ・マネージャー」のアイコンをクリックし、「データ・マネージャー」を表示します。次に「取り込みジョブの作成」をクリックします。

image.png

4-2. 「エンジン」情報の入力

エンジン:
使用するSparkエンジンを選択します。

ジョブ・リソースの構成:
サイズを選びます

最後に「次へ」をクリックします。
image.png

4-3. 「Select file(s)」情報の入力

「Select remote files」 をクリック
image.png

Bucket
「3. 取り込み用データをCOSにアップロード」でアップロードしたCOSを選択

Select file type
ここでは`Parquet'を選択

Browse remote files
「3. 取り込み用データをCOSにアップロード」でアップロードした ファイルまたはフォルダーにチェックを入れる。

上記をセットしたら「Next」をクリックします。
image.png

最後に「次へ」をクリックします。
image.png

image.png

4-4. 「ターゲット」情報の入力

取り込み先のテーブルの以下を入力します。サンプルを使用する場合は、「1-2. サンプルテーブルの作成」で作成したテーブルの情報になります。

  • カタログ名
  • スキーマ名
  • テーブル名

内容を確認して「次へ」をクリックします。
image.png

4-5. 「サマリー」を確認して、「取り込み」をクリック

「サマリー」を確認後、「取り込み」をクリックしてください。
image.png

ジョブが登録されます:
image.png

5.  取り込みジョブの終了確認

状況が「終了」になれば完了です。完了後、データが無事取り込まれたか確認してみてください。
image.png

もし状況が「失敗」の場合はジョブIDをクリックすると、エラーメッセージが表示できます。
image.png

image.png

データ取り込みジョブの作成については以上で完了です。

6. python実行ファイル

この「データ取り込みジョブの作成」をすると、実際は「ターゲット」テーブルのあるオブジェクトストレージのバケットに、 IBM Analytics Engine(Spark)で動作させたpython実行ファイルが書き込まれます。
で、そのまま残っています(!!!)
ジョブIDがついたフォルダーが作成されていて、その中にpythonのファイルが入っています。
image.png
image.png

こちらはそのまま「IBM Analytics Engine Sparkからwatsonx.data を操作: 「3. Sparkによる処理」」でそのまま図1の⑥のファイルとして使用可能です。

GUIではなく、バッチプログラムベースでやりたい時は再利用可能です。でもファイル残ったままだとゴミがたまるような気がするので、今後修正されるかもしれません。

以上です。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?