はじめに
この記事はMicroAd Advent Calendar 2017の13日目の記事です。
Hive(hadoop)は大量データを効率的に捌くため、データの非正規化上等な世界です。
必然的に扱うデータのカラム数は肥大化する傾向があり、如何にして必要の無いカラムの名前を意識しないでデータを操作するか、というクエリが必要になってきます。
この記事ではETLの初期の段階で必要となってくる
- パーティショニング
- 重複削除
において、具体的なカラムを意識しないで操作する方法について紹介します。
結論だけ先に書いておくと、正規表現マッチングによるカラムの指定とrow_nuber関数使いましょうという話です。
Hiveによるデータ変換の必要性と問題
生のログをHadoopに効率的に使うためにはETLの初期の段階で以下のような変換が必要になります。
- HDFS向けのデータへの変換
- 列指向フォーマット(ORC|Parquent)への変換
- 圧縮
- 細分化されたファイルの結合
- パーティショニング
- 重複削除
これらの変換はデータ全体に対して行うため、基本的にはSELECT *
でガツッと書けそうですがパーティションニング
と重複削除
においては以下のような問題があり、具体的なカラム名について記述の必要が出てきます。
パーティショニング
HDFSに突っ込むデータは大概はdate
とかhour
とか時系列にパーティショニングする事が多いかと思います。
fluentdやflumeならtimestampに応じてHDFSの格納先を変更できるので、データを突っ込む際に時系列でのパーティショニングが可能です。
が、場合によってはあるキーをベースに追加でパーティションの階層を増やしたくるケースがあります。
ここでdynamic partition insert
を使ってパーティションを増やしたテーブルにデータを突っ込み直そうとすると、ある問題に直面します。
それはdynamic partition insert
でパーティションとして指定したいカラムはSELECTの末尾に順番を合わせて書くという制約があることです。
このままではSELECT句で末尾の順番を気にしながら全てのカラムを羅列しなければならなくなります。
重複削除を行う
もう一方の問題は重複削除のクエリをどう書くか?という点です。
データ収集のパイプラインをExactly once
に構築するのはかなり難しく、大概はデータパイプラインはAt least once
を保証して、データの終点(Hadoopなど)でid
などのユニークなカラムを使って重複削除を行います。
重複削除のクエリで最もポピュラーな方法はid
でGROUP BY
してその他カラムはMAX
で集約してSELET
するというものがありますが、やはりカラム数が多いとクエリを書くのが辛くなります。
解決法
Hiveカラムの正規表現マッチングを使用してパーティションキーのみ順番を指定する
HiveではSELECTするカラムを正規表現によりマッチングさせる便利機能があります。
この機能を使って、パーティションに使うカラムのみを除く正規表現を書くことで、パーティションキーの順番のみを意識したクエリがかけます。
具体的には以下のようなクエリです。以下の例ではdate
,hour
,country
をパーティションキーとして使おうとしています。
SELECT `(date|hour|country)?+.+`, country, date, hour FROM json_log;
このクエリでは`(date|hour|country)?+.+`
の正規表現マッチングの箇所でcountry
、date
、hour
以外のカラムがマッチさせ、除外されたパーティションキーとして使うカラムは別途、明示的に順番を指定しています。
ちなみにこのカラム名の正規表現マッチングの機能を使おうとすると、hive.support.quoted.identifiers
をnone
にする必要があります。
idの重複削除にはrow_numbe関数を使う
HiveではWindow関数が使えるので、row_number関数が使えます。
以下のようなクエリでidカラムの重複に対して連番が振れるので、その1番目だけを使い重複を削除できます。
WITH with_order_num AS (
SELECT
*, row_number()
over(
partition by id
) AS order_num
FROM
json_log
)
SELECT
*
FROM
with_order_num
WHERE
order_num = 1
;
まとめ
上記のテクニックをまとめると以下のようなクエリになります。
WITH with_order_num AS (
SELECT
*, row_number()
over(
partition by id
) AS order_num
FROM
json_log
WHERE
WHERE date = '20171213'
AND hour = '15'
)
FROM with_order_num
INSERT OVERWRITE TABLE orc_log partition(country, dt, hour)
SELECT
`(date|hour|country)?+.+`,
country,
date,
hour
WHERE
order_num = 1
;
このように必要なカラム名以外を意識することなく、変換処理が書けました。
また、カラム名に非依存な変換クエリにしておくと、データ構造の変更にも強いですし、扱うログのカラム名の規則がちゃんと揃っていればほぼコピペのクエリで済みます。
(実際にはクエリをジョブで投げる際はtemplate化して都度レンダリングするケースが殆どだと思いますのでtemplateが使いまわせます。)