1.目的
IOT等の大量データをObject Storageに保管したDatalakeからエンドユーザーが分析を行うDataWareHouseを作成する際にETL処理が必要になります。RAWデータを整形加工、フォーマット変換してComformedデータ/DWHに変換するまでの処理を指します。IBM Cloud SQL Queryは、Object Storageに保管されたファイルのObject Fileの検索処理だけでなくこのETL処理にも利用できます。本記事ではSQL QueryのETL処理性能を検証してその結果及び性能に関する改善ポイントをまとめました。
オブジェクト・ストレージを使用したデータレイクの構築
https://cloud.ibm.com/docs/solution-tutorials?topic=solution-tutorials-smart-data-lake&locale=ja
IBM Cloud SQL QueryはRelational DB処理に使用するANSI SQLを利用してICOS(IBM Cloud Object Storage)上のObject Fileの処理を行います。SparkSQL/IBM Analytics Engine上で稼働しており、Spark導入/運用スキルも不要で通常のSQL文でSparkSQLが利用できます。
[IBM Cloud SQL Query/ICOSの利用ケース]
走行中の車両から高頻度でUploadされるデータ(JSON)をICOSにて保管して、SQL Queryを利用してデータの整形加工を行い、エンドユーザーが直接分析を行えるDataWareHouse(Parquet)を準備します。
2.性能に関するHints & Tips
Parquetフォーマットの推奨
ユーザーが分析する対象データファイルはJSON/CSVではなくParquetフォーマットを推奨します。ParquetフォーマットはColumnary Based(列志向)FormatでBigData分析に最適でSparkとの相性もいいです。以下のようなメリットがあります。
-保管効率良くケースによっては5%程度にまで圧縮可能
-検索時に必要ColumnのみアクセスになりI/O回数減少することで性能向上見込める
上記によりストレージの保管/アクセスコストを同時に減らせます。SQLQueryを利用すれば(内部的に専用のStocator connectorツールを利用して)JSONからParquetへ短時間で簡単に変換できます。
また一つのObjectファイルサイズは128MBがベストと言われています。JSON/CSVの巨大ZIPファイルはその解凍処理に時間がかかり特に性能劣化するといわれており避けてください。
Partition の活用
RDBと同様にPartitionを活用することで検索効率が良くなりパフォーマンスが向上します。Partitionされた各Object名にPrefixとしてSQL文で(partitioned by)指定したColumn名が入ることになり管理が容易になり、複数Columnを指定してObjectファイル自体を階層化することも可能です(Hive Style partitioning)Partitionで指定する列は、そのcardinalityが1000以上になるとObject Fileが細分化され検索時の性能劣化するので避けてください。また検索時だけでなく多数のファイルを整形加工(列の型やファイルのタイプを変更)処理して1ファイルに統合する時なども有効です。その際の処理速度は結果Object FileのPartiton数に依存します。140Inputfileのデータ加工処理に対しては8Partitonが最適であるという検証結果が得られています。(テスト検証の項参照)
JOBのparallel実行について
最終的に定刻までにユーザー開放するDWH構築するので単位時間あたりのスループット上げる必要あり、SQLqueryのJOBを並列に実行させることが有効です。並列処理するJOB数を増やすことでスループットをあげることが可能ですが、マルチテナントのサービスということもあり検証により最適な並列JOB数を見極めることが必要です。
SQLServerインスタンスのロケーションについて
SQLインスタンスは(2022/1)現在Tokyo Regionでは提供されておらず、距離的近さからDallas/Frankfurt/Chennaiの選択肢になります。データセンターにより処理速度も違ってきますので、事前にテストして確認されるのがいいでしょう。
IBM Cloud Analytics Engineの利用
大容量データで上記チューニングポイント調整でも性能要件を満たさない場合は、IBMCloud Analytics Engineの活用をお勧めします。SQLQueryがマルチテナントなのに対してIBMCloud Analytics Engineは占有サーバーを利用して、ユーザーからSparkQueryを直接利用することが可能になります。SQL Queryではできないシステムリソース(メモリ)の割り当て拡大、SparkのExecutor数の調整等Kernelレベルでのチューニングが可能です。SparkQueryには専用のCatalyst Optimizerがついており、コストベース以外にユーザーのカストマイズによりプランを書き換え最適なPlanを選択するチューニングも可能になっています。
Analytics Engine
Analytics Engine kernel
その他
配列データを含むJSONを処理する(階層データをFlatにするexplode等)Queryはコストが非常に高くなるのでSQLquery以外のところで処理させた方がいいようです。(テスト検証を参照)
2.テスト検証について
DataLakeでのRawデータからConformedデータへの整形加工処理(条件指定による検索処理なし)について性能検証テストを実施しました。
検証用SQLQuery Job
下記SQLの通り120JSON/zip Fileを1Parquet Fileに統合&変換、配列型データをFlat化にするExplode処理、PARTITIONを作成しています。IBM Cloud仮想サーバー(Tokyo)上のPythonアプリからSQLQuery(ibmcloudsql)を利用して(Regional)ICOS(Tokyo)にアクセスしてます。
検証結果及び考察
1.Partition数の調整
"PARTITIONED by (Column名) INTO xxx OBJECTS"のPartition数(結果ObjectFile数)を変更して処理速度を測定しました。120のInput Fileに対しては1から増やしていくと処理速度早くなり、8以上はほぼ変わらないという結果がわかります。後続の検索処理及び管理容易性を考慮すると8が最適と言えます。
次に本番運用を想定して上記JOBを同時に40並列で実行させると平均処理時間は7.1分です(下記ケース1)試しにSQLのby(Column名)だけを削除、続けてPartition句自体を削除したSQL JOBを40並列で実施したところ、それぞれ7.95分、9.4分になり遅くなリました。複数ファイルの変換処理においてPatition化が有効になることがわかります。また事前に120Inputを1Fileに統合しておき、この1統合ファイルに対して8Partitionで処理させると処理速度は16.6分で大幅に悪化しました(下記ケース2)
1ファイルに対するPatition化はOverheadが大きいといえます。
2.並列処理数
1度に同時に処理するJob数を増やして処理速度を測定、検証結果から15分間に処理できる数は60以上にしても伸びない、40~50程度が最適といえます、
3.SQLQueryインスタンスのあるDatacenterの違い
Dallas/Frankfurt/Chennai各々で上記JOBを20並列で実行し処理時間を比較しました。Chennai<<Frankfurt<<Dallasの順に早いという結果がわかります。
同一処理(JSON=>Parquet変換)を15分間隔で1週間実行して、時間帯別の処理性能を、DallasとFrankfurtで比較しました。
Dallasに比べてFrankfurtは日中のリソースに余裕あるという結果が出ています(日本時間:21~24除く)
4.Explode処理/統合処理/JSONからParquet変換処理について
上記検証用SQLQuery Jobは
1.JSONからParquet変換
2.140=>1ファイルの統合処理
3.Explode処理
からなります。上記の1&2と3で2つに分割して40並列で実行(下記ケース2)すると、平均処理時間はそれぞれ1.7分、7.4分です。これを単独JOBの実行結果7.1分(下記ケース1)と比べると、Explode処理がファイル統合処理,JSONParquet変換処理に比べて長時間を要していることがわかります。Explode処理のうち、配列型データの列への展開コストが特に大きくなって処理を遅くさせていると考えられます。Explode処理自体をSQL Query以外で処理させることもOptionとして考えられます。
3.参考リンク