watsonx.data(IBM Cloud SaaS版)のWeb UIでIBM Analytics Engine(Spark)エンジンを使用したデータ取り込みジョブの作成ができるようになりました(作成後はもちろん実行されます)。
尚、この投稿の作成時(2023年12月20日)は別サービスのIBM Analytics Engine(Spark)エンジンが必須でしたが、watsonx.data(IBM Cloud SaaS版)内部にSparkエンジンができましたので、別サービスは必須ではになくなりました(2024/6/11記載)。
当記事はその方法を説明します。またCSVファイルでもできるのですが、この記事はParquetファイルの取り込みをやっています。
自分のファイルでも可能ですが、ここではサンプルとして TLC Trip Record Data(NYのタクシーのオープンデータ)をテーブルに取り込んでみます。環境はIBM Cloudです。
0. 前提
- IBM watsonx.dataのインスタンスがあること
- IBM Analytics Engineのインスタンス があること
- ない場合は「1. IBM Analytics Engine Spark インスタンスの作成」を参考に作成してください。
- アクセスするuseridにwatsonx.dataに対してIAMで
MetastoreAccess role
があること(2024/6/11 New)
Sparkにwatsonx.dataのSpark(native spark)を使用する場合
- Sparkエンジンが追加されていること
- ない場合は追加する
SparkにIBM Analytics Engineを使用する場合
- IBM Analytics Engineのインスタンス があること
- ない場合は「1. IBM Analytics Engine Spark インスタンスの作成」を参考に作成してください。
- エンジンに「IBM Analytics Engine(Spark)」 のエンジンが追加されてること
- watsonx.dataのカタログがカタログ・タイプ「Apache Iceberg」で1つ以上登録済みであること
- 取り込み先のテーブルがIceberg 形式で作成済であること(*)
(*)作成されてない場合は以下の前準備で作ってみてください。
1. 前準備1(オプション): サンプルデータのダウンロードとサンプルテーブルの作成
もうすでにデータを取り込みたいテーブルがある場合は、ここはスキップしてください。
サンプルのテーブルを作りたい方は以下を実施してください。
1-1. TLC Trip Record Data(NYのタクシーのオープンデータ)のダウンロード
TLC Trip Record Dataより、2022年のJanuaryのYellow Taxi Trip Recordsをダウンロードします。
yellow_tripdata_2022-01.parquet
というファイルが」ダウンロードされます。
1-2. サンプルテーブルの作成
Icebergのカタログ上でスキーマを作成後、以下のDDLでテーブルを作成してください。
lakehouse
は自分のカタログ名、taxi
は自分のスキーマ名に変更してください。
CREATE TABLE lakehouse.taxi.yellow_taxi_2022 (
"VendorID" bigint,
"tpep_pickup_datetime" timestamp,
"tpep_dropoff_datetime" timestamp,
"passenger_count" double,
"trip_distance" double,
"RatecodeID" double,
"store_and_fwd_flag" varchar,
"PULocationID" bigint,
"DOLocationID" bigint,
"payment_type" bigint,
"fare_amount" double,
"extra" double,
"mta_tax" double,
"tip_amount" double,
"tolls_amount" double,
"improvement_surcharge" double,
"total_amount" double,
"congestion_surcharge" double,
"airport_fee" double
)
WITH (
format = 'PARQUET'
)
2. 前準備2: エンジンに「IBM Analytics Engine(Spark)」 のエンジンを追加
取り込みジョブの作成で使用するためにエンジンに「IBM Analytics Engine(Spark)」 のエンジンを追加します。既に追加済みの場合、watsonx.dataのSpark(native spark)を使用する場合はスキップしてください。
2-1. インフラストラクチャー・マネージャーの右上「コンポーネントの追加」から「エンジンの追加」を選択します。
2-2. 「エンジンの追加」入力画面に必要事項を記入し、「レジスター」をクリック
タイプ:
IBM Analytics Engine(Spark)
を選択
表示名:
表示させたい名前を記入
インスタンスAPIポイント:
<以下の手順で取得します>
-
IBM Cloudにログインし,リソースリストを表示します。
https://cloud.ibm.com/resources
(ログインしていない場合はログイン画面の後、リソースリスト画面になります) -
新規資格情報 をクリックして、新しいサービス資格情報を作成します。
名前を入力し、役割を選択して、 追加 をクリックします。
3.作成された資格情報をクリックし中身を表示し、instance_api
をクリップボードにコピーし、入力フィールドに貼り付けます。
APIキー:
<以下の手順で取得します>
上記インスタンスAPIポイントで使用した 「サービス資格情報」をクリックし中身を表示し、apikey
をクリップボードにコピーし、入力フィールドに貼り付けます。
無事登録されると、以下のように「インフラストラクチャー・マネージャー」に表示されるようになります
3. 取り込み用データをCOSにアップロード
wastonx.dataに登録済のオブジェクトストレージのバケットに、取り込み用データをアップロードします。
ここではサンプルとして1-1でダウンロードしたファイルを、アップロードします。
この例ではwastonx.dataに登録済のオブジェクトストレージnishito-iceberg
というバケットのdata_file
というフォルダーにアップロードしました。
4. 取り込みジョブの作成
4-1. 「データの取り込み」画面の表示
watsonx.dataの左側のメニューから「データ・マネージャー」のアイコンをクリックし、「データ・マネージャー」を表示します。次に「取り込みジョブの作成」をクリックします。
4-2. 「エンジン」情報の入力
エンジン:
使用するSparkエンジンを選択します。
ジョブ・リソースの構成:
サイズを選びます
4-3. 「Select file(s)」情報の入力
Bucket
「3. 取り込み用データをCOSにアップロード」でアップロードしたCOSを選択
Select file type
ここでは`Parquet'を選択
Browse remote files
「3. 取り込み用データをCOSにアップロード」でアップロードした ファイルまたはフォルダーにチェックを入れる。
4-4. 「ターゲット」情報の入力
取り込み先のテーブルの以下を入力します。サンプルを使用する場合は、「1-2. サンプルテーブルの作成」で作成したテーブルの情報になります。
- カタログ名
- スキーマ名
- テーブル名
4-5. 「サマリー」を確認して、「取り込み」をクリック
5. 取り込みジョブの終了確認
状況が「終了」になれば完了です。完了後、データが無事取り込まれたか確認してみてください。
もし状況が「失敗」の場合はジョブIDをクリックすると、エラーメッセージが表示できます。
データ取り込みジョブの作成については以上で完了です。
6. python実行ファイル
この「データ取り込みジョブの作成」をすると、実際は「ターゲット」テーブルのあるオブジェクトストレージのバケットに、 IBM Analytics Engine(Spark)で動作させたpython実行ファイルが書き込まれます。
で、そのまま残っています(!!!)
ジョブIDがついたフォルダーが作成されていて、その中にpythonのファイルが入っています。
こちらはそのまま「IBM Analytics Engine Sparkからwatsonx.data を操作: 「3. Sparkによる処理」」でそのまま図1の⑥のファイルとして使用可能です。
GUIではなく、バッチプログラムベースでやりたい時は再利用可能です。でもファイル残ったままだとゴミがたまるような気がするので、今後修正されるかもしれません。
以上です。