お疲れ様です。ビッグデータという言葉が流行りだしてから幾星霜、皆さんの中でもそろそろ社内にビッグデータ処理基盤を作りたいという方がいるのではないでしょうか?
というわけでSQL on Hadoopでよく知られるHiveを使う上で最低限必要だなと思う知識をまとめてみました。
PrestoやAWS Athena、Amazon Redshift Spectrumを使う場合でも裏側にはHiveのテーブルが必要な場合が多いと思いますので、知っておいて損は無いかと思います。
テーブル・データ設計
パーティションを設定する
Hiveではテーブルにパーティションを設定する事でテーブルの検索・更新範囲を限定する事ができます。逆に言えばパーティションを設定しない場合、常にテーブル全体を検索・更新してしまうという事です。なのでパーティション設定は基本的に必須です。
パーティションはデータをインポートする頻度に応じて決定するのが一般的かと思います。日次でインポートするテーブルならパーティションはdateの1階層、毎時でインポートするテーブルならパーティションはdate/hourの2階層、といった感じです。
パーティションはテーブル作成時に以下のようにして設定します。
CREATE EXTERNAL TABLE IF NOT EXISTS sample_table(
id string,
name string)
PARTITIONED BY (date string, hour string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION 's3://somebucket/sample/';
新しいパーティションにデータをインポートする際は以下のように行います。この時に既存のパーティションを指定すると、データが上書きされます。基本的にHiveではUPDATEやDELETEは使わず、パーティションを指定してのデータ上書きで代替します。
INSERT OVERWRITE TABLE sample_table
PARTITION(date ='20170401', pt_hh='00')
select
id,
name
from data_source;
検索の際にはWHERE句にパーティションを指定する事で検索の際にスキャンする範囲を絞る事ができ、効率的になります。WHERE句でパーティションを指定しないクエリはテーブルのフルスキャンと同義なので気をつけましょう。
--date=20170401 ~ 20170405のデータのみをスキャン
SELECT
id
from sample_table
where
date >= '20170401'
and date <= '20170405'
and name = "Bob";
--テーブルをフルスキャン
SELECT
id
from sample_table
where
name = "Bob";
また、ダイナミックパーティションを使うとデータをインポートするパーティションの指定を動的に行う事ができます。
詳しくはこちらをご参照ください。
Hive dynamic partition insertsにまつわるいくつかの問題と対処について
適切なファイルフォーマットと圧縮形式を選ぶ
Hiveで扱うデータの実体はファイルであり、そのフォーマットや圧縮形式にも様々なものがあります。
概要に関しては以下に簡単にまとめてあります。
独断と偏見で選ぶHDFSのファイル形式
単純にクエリを最適化したいなら個人的にはフォーマットにORC、圧縮形式にZlibの組み合わせを使うのが無難ではないかと思います。 ただし、EMR5.0系の上でHiveを動かす場合はORCだと遅くなるとの事ですので代わりにフォーマットにParquet、圧縮形式にGZipの組み合わせを使うと良いです。
ORC file performance: Query performance may be slower than expected for ORC files.
Differences for Hive on Amazon EMR Versions and Default Apache Hive
ファイルフォーマットと圧縮形式の組み合わせは、テーブル作成時に以下のように指定します。
CREATE EXTERNAL TABLE IF NOT EXISTS sample_table(
id string,
name string)
PARTITIONED BY (date string, hour string)
STORED AS PARQUET
TBLPROPERTIES ("parquet.compress"="GZIP");
ファイルサイズに気をつける
Hadoop系のプロダクトは小さいファイルを大量に扱うのは苦手ですので、Hiveで読み込むデータファイルのサイズはある程度大きくする必要があります。64~512MB程度がオススメです。
生ログファイルは小さいファイルの事が多いと思いますので、何らかの手段でファイルをマージする必要があります。S3のファイルをマージする場合はS3DistCpを使うと便利です。
参考 : S3上の大量データをEMRするときはS3DistCpを使うと捗る
Hiveを使ってマージしたい場合はReducerの数が出力ファイルの数になりますので、set mapred.reduce.tasks=5;
のようにReducer数を指定してしまうとか、
hive.merge.mapfiles=true
hive.merge.mapredfiles=true
hive.merge.size.per.task=256000000
hive.merge.smallfiles.avgsize=32000000
のように出力ファイルをマージするといった方法があります。(どちらの方法でもパフォーマンスに多少影響がありますので注意してください)
Sparkを使ってマージする場合はパーティション数が出力ファイルの数になりますので、以下のように出力前にパーティション数を変更すればOKです。
output_df.repartition(5)
こちらは実ワークロードの際は ログ件数に応じてパーティション数を動的に計算するような関数を1つ噛ませておくとログ件数変化に強くなります。
val partition_count = calculatePartition(output_df.count())
output_df.repartition(partition_count)
WHERE句で良く使うカラムはインポート時にソートしておく
ORCのような列形式フォーマットは、Predicate PushDownのようなクエリを効率化する機能を備えていますが、データインポート時にWHERE句で良く使うカラムをソートしておくとより効率的になります。
もう少し詳細に説明しますと、ORCファイルはいくつかのデータの固まりで構成されており、各固まりはIndex dataにその固まりにおける各カラムの最小値と最大値を保持しています。
例えば下記のようなクエリを発行したとします。
SELECT id, name from users_action where age > 10
この時Hiveはデータの実体を読み込む前にIndex dataを先に確認し、ageカラムの最大値が10以下のものはスキップします。このようにしてデータを読み込む総量を減らす事ができます。
この機能はデータ投入時にageカラムがソートされている時に最大限の効果を発揮します。ソートされて投入された場合、固まり1は0 <= age <= 10
のデータ、固まり2は10 <= age <= 20
のデータ、のように含まれるageの範囲が狭まりスキップできる個数が増えるためです。
以上の理由で、WHERE句で良く使いそうなカラムはソートしてインポートしておくと良いです。
クエリチューニング
クエリには必ずWHERE句でパーティションを指定する
既に述べた通りですが、WHERE句でパーティションを指定しないクエリはテーブルのフルスキャンを意味します。基本的には避けてください。
なお、hive.mapred.mode=strict
を指定するとパーティションを指定しないクエリ、LIMITを指定しないORDER BYクエリが投げられなくなります。
ORDER BYは避ける
HiveでのORDER BY
は1つのノードにデータをすべて集めてソートするので極めて重くなります。集計結果をソートするぐらいならOKですが、データの全件ソートなどはORDER BY
でやってはいけません。
データの全件ソートを行いたい場合は代わりにSORT BY
とDISTRIBUTE BY
を組み合わせて使います。SORT BY
は各reducerごとにデータをソートするというもので、DISTRIBUTE BY
は指定されたカラムが同じ値のデータは同じreducerに割り当てられる事を保証するというものです。
例えばユーザーIDごとに時系列順にソートしたいような場合は以下のようなクエリを実行すればOKです。
--reducerごとにソートされるので分散される
SELECT
id,
create_at
from users_action
sort by create_at asc
DISTRIBUTE BY id
--1つのreducerでソートするので死ぬ
SELECT
id,
create_at
from users_action
order by id, create_at asc
JOINの際は左側に大きいテーブルを持ってくる
HiveではJOIN対象のテーブル容量がある程度小さい場合は、小さいテーブルをメモリにキャッシュすることでreduceのステップをスキップできます。これをMapサイドジョインと呼び、これを使えばかなりクエリが速くなります。
Mapサイドジョインを活用するためにJOINの際は左側のテーブルが一番大きくなるようにしましょう。
処理が分散されているかどうか確認する
クエリが想定より遅い場合、処理が1つのノードに集中してしまっているケースが考えられます。Gangliaなどのシステム監視ツールを使って処理が正しく分散されているか確認しましょう。下の画像は1つのノードに処理が集中してしまっている場合の例です。
処理が分散されていない場合、以下のようにクエリの前にEXPLAINをつけて実行することでクエリがどのようにジョブに変換されているか知ることができます。(EXPLAIN EXTENDEDを使えばもっと詳しく知れます)
EXPLAIN SELECT SUM(number) FROM onecol.
EXPLAINの結果をもとにクエリを書き換えてみましょう。中間テーブルを使ってクエリを分割してみるのが結構効果的だったりします。
各種設定
Tezを使う
実行エンジンは基本的にMapReduceよりTezの方が速いですので、可能な限りTezを使うようにしましょう。以下の設定で使えます。
set hive.execution.engine=tez;
Mapサイドジョインを利用する
既に述べた通り、結合対象のテーブルサイズが小さい場合はMapサイドジョインを利用すれば高速になります。対象のテーブルサイズ閾値をいくつにするかはhive.mapjoin.smalltable.filesize
で決められます。
Mapサイドジョインを使うとOutOfMemoryErrorが発生する場合がありますが、その場合は後述するメモリ設定で調整してください。
set hive.auto.convert.join=true;
CBOを利用する
CBO(Cost Based Optimizer)を利用するとテーブルのデータを元にクエリの実行計画を最適化してくれます。特にデメリットは無いのでONにしておきましょう。
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
Vectorizationを利用する
Vectorizationを使うと一度に複数行のデータを処理することができるので速度が上がります。これも基本的にはONにしておいて良いかと思います。
set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
ただ、自分はEMR5.4.0 + Parquetの環境だとエラーが出てしまったので以下の設定で部分的にOFFにしています。
set hive.vectorized.use.vector.serde.deserialize = false;
set hive.vectorized.use.row.serde.deserialize = false;
適切なメモリ設定を行う
最適なメモリ設定は実行するクエリ・データ量によって異なりますので、時には特定のクエリを実行するときだけメモリ設定を変更するのも必要になります。
特にこだわりがなければメモリ設定は以下のルールを守って行うと良いでしょう。
- Tezへの割り当てメモリはYARNへの最大割り当てメモリと同じにする
- Tezコンテナへの割り当てメモリはTezへの割り当てメモリの80%にする
- Tezコンテナ内のJavaへの割り当てメモリはTezコンテナへの割り当てメモリの80%にする
以上のルールに従うと例えば以下のような設定になります。
--YARNへの最大割り当てメモリ
set yarn.scheduler.maximum-allocation-mb=2560;
--Tezへの割り当てメモリ(YARNと等しい)
set tez.am.resource.memory.mb=2560;
set tez.task.resource.memory.mb=2560;
--Tezコンテナへの割り当てメモリ(Tezの80%)
set hive.tez.container.size=2048;
--Javaへの割り当てメモリ(Tezコンテナの80%)
set hive.tez.java.opts=-Xmx1640m
メモリ設定について詳しく知りたい方はHortonworksさんが良い記事を書いてくれているのでこちらを読むと良いかと思います。
Demystify Apache Tez Memory Tuning - Step by Step
また、YARNの各コンテナは1つのCPUしか使えないため、メモリ設定を大きくし過ぎるとコンテナがCPUを使い切れなくなってしまいます。
例えばAWSのm3.xlargeインスタンスはvCPU4でメモリ15GBですので、コンテナが4つ以上立ち上がるようにメモリ設定を行う必要があります。
※以下の記事にはYARNで使うスケジューラーをDominantResourceCalculator
に切り替えればCPUリソースも効率的に使えると書いてあるのですが、試せていません。もしご存知の方いれば教えてください。
Managing CPU Resources in your Hadoop YARN Clusters
参考記事
本文内で言及した記事の他に以下の記事を参考にさせていただきました。いつもありがとうございます。
Top 10 Performance Tuning Tips for Amazon Athena
Tuning Parquet file performance
Hiveのベストプラクティス(かもしれないこと)をめもっておく
Hiveのクエリを何倍も速くする4つの方法
ほぼやけくそHive Hacks