オブジェクトストレージに置いたセンサーデータなど同じフォーマットの大量のCSVファイルを何しから分析したくなったとき、watsonx.data(IBM Cloud SaaS版)でSparkエンジンを使って簡単にSQL Queryの使えるICEBERGテーブルに取り込むことができます。
尚、この投稿の作成時(2023年12月20日)は別サービスのIBM Analytics Engine(Spark)エンジンが必須でしたが、watsonx.data(IBM Cloud SaaS版)内部にSparkエンジンができましたので、別サービスは必須ではになくなりました。
当記事はその方法を説明します。
ここでは「環境省大気汚染物質広域監視システム そらまめくん」からダウンロードできる大気汚染測定結果(時間値)のCSVファイル1ヶ月分を取り込んでみます。
2024年06月11日に内部のSparkエンジンを使用する方法を書き加えました
0. 前提
- IBM watsonx.dataのインスタンスがあること
- 取り込むCSVデータをおくIBM Cloud Objectストレージ (S3でもよい)のバケットがIBM watsonx.dataに登録済みであること
- watsonx.dataのカタログがカタログ・タイプ「Apache Iceberg」で1つ以上登録済みであること
- アクセスするuseridにwatsonx.dataに対してIAMで MetastoreAccess roleがあること(2024/6/11 New)
Sparkにwatsonx.dataのSpark(native spark)を使用する場合
- Sparkエンジンが追加されていること
- ない場合は追加する
SparkにIBM Analytics Engineを使用する場合
- IBM Analytics Engineのインスタンス があること
- ない場合は「1. IBM Analytics Engine Spark インスタンスの作成」を参考に作成してください。
- エンジンに「IBM Analytics Engine(Spark)」 のエンジンが追加されてること
1. 大気汚染測定結果(時間値)のCSVファイル1ヶ月分ダウンロード
「環境省大気汚染物質広域監視システム そらまめくん」のダウンロードページ
https://soramame.env.go.jp/download にアクセスします。
年月指定「2023年11月」、都道府県「全国」で指定し、zipファイルをダウンロードします。
2. ダウンロードしたzipファイルを解凍し、フォルダーごとIBM watsonx.dataに登録済みバケットにアップロード
2023年11月の全国のデータをダウンローするとdata.zipファイルに、1708のCSVファイルが入っていました。
文字コードはSJISで改行コードはCR/LFでした。
尚、watsonx.dataで対応しているCSVファイルの文字コードは、
- UTF-8
- UTF-16
- UTF-32
です。幸いヘッダーのみに日本語が入っており、データは半角英数字記号のみで、データの文字コードはSJISでもUTF-8でも変わらないため、このまま使用することにします。
オブジェクトストレージに、フォルダーごとアップロード
フォルダの中身:
3. 取り込み先テーブルの作成
Icebergのカタログ上でスキーマを作成後、以下のDDLでテーブルを作成してください。
lakehouse
は自分のカタログ名、sensor
は自分のスキーマ名に変更してください。
尚DATA_DATE
は本来ならばDATE属性にすべきですが、元ファイルのフォーマットがYYYY/MM/DD
でこのままだとDATE属性では入らないため、一旦varchar属性にしてあります。必要に応じて、DATE属性で別テーブルをCTAS(CREATE TABLE AS SELECT)で作成する等を検討してください。
CREATE TABLE lakehouse.sensor.jpnair (
"LOC_CODE" varchar,
"DATA_DATE" varchar,
"HOUR" integer,
"SO2" double,
"NO" double,
"NO2" double,
"NOX" double,
"CO" double,
"OX" double,
"NMHC" double,
"CH4" double,
"THC" double,
"SPM" double,
"PM25" double,
"SP" double,
"WD" double,
"WS" double,
"TEMP" double,
"HUM" double
)
WITH (
format = 'PARQUET'
)
4. 取り込みジョブの作成
4-1. 「データの取り込み」画面の表示
watsonx.dataの左側のメニューから「データ・マネージャー」のアイコンをクリックし、「データ・マネージャー」を表示します。次に「取り込みジョブの作成」をクリックします。
4-2. 「エンジン」情報の入力
エンジン:
使用するエンジンを選択します。「2. 前準備2」に登録したエンジン名を選びます。
ジョブ・リソースの構成:
サイズを選びます
4-3. 「Select file(s)」情報の入力
Bucket
2でアップロードしたCOSを選択
Select file type
ここでは`CSV'を選択
ソース・ディレクトリー:
2でアップロードした フォルダにチェックを入れる。
CSVファイル構成
- 「ヘッダーあり」 にチェック
- Encoding
- CSVファイルの文字コードはSJISでしたがデータ部分はUTF-8とコードの変わらない半角英数字記号なので、
UTF-8
にセットします
- CSVファイルの文字コードはSJISでしたがデータ部分はUTF-8とコードの変わらない半角英数字記号なので、
- Line delimiter
-
\r\n
にセットします。
-
最後に「次へ」をクリックします。
4-4. 「ターゲット」情報の入力
取り込み先のテーブルの以下を入力します。「3. 取り込み先テーブルの作成」で作成したテーブルの情報になります。
- カタログ名
- スキーマ名
- テーブル名
4-5. 「サマリー」を確認して、「取り込み」をクリック
5. 取り込みジョブの終了確認
状況が「終了」になれば完了です。完了後、データが無事取り込まれたか確認してみてください。
もし状況が「失敗」の場合はジョブIDをクリックすると、エラーメッセージが表示できます。
データ取り込みジョブの作成については以上で完了です。
6. データの確認
SQLでデータを確認してみましょう!
lakehouse
は自分のカタログ名、sensor
は自分のスキーマ名に変更してください。
件数
SELECT COUNT (*) FROM "lakehouse". "sensor". "jpnair"
SELECTでLOC_CODE= '01101010' かつ DATA_DATE = '2023/11/01' のデータを取得
SELECT * FROM "lakehouse"."sensor"."jpnair" WHERE LOC_CODE= '01101010' AND DATA_DATE = '2023/11/01' ;
無事データ取り込みできたようですね。
以上です。