Edited at

Amazon KinesisにFluentdで流し込んだデータをHiveでクエリ

More than 3 years have passed since last update.

このポストはわたし個人のメモであり、所属する組織を代表するものではありません。


サマリ

Fluentd -> Kinesis -> Hive(EMR)

という感じのストリームデータのクエリ環境を作った。というか動かしてみた。


準備


KinesisのStream作成

AWSの万地面とコンソールで。


Kinesisへのデータ流し込み

aws-fluent-plugin-kinesisを使って流しこみ。


EMRを起動

CLIからEMRクラスタを起動。

aws emr create-cluster \

--name Hive \ #クラスタ名
--ami-version 3.4.0 \ #AMIバージョン
--applications Name=Hive Name=Hue \ # HiveとHueをインストール
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=c3.xlarge InstanceGroupType=CORE,InstanceCount=2,InstanceType=c3.xlarge \ #インスタンスタイプと台数
--ec2-attributes SubnetId=YOUR_SUBNET,KeyName=YOUR_KEYPAIR \ #デプロイするEC2のサブネットやキーペアを指定
--log-uri s3://PATH/TO/LOG/ \ #ログを回収するS3バケット
--enable-debugging \ #デバッグフラグ。これをオンにしておくとマネジメントコンソールでログが確認できる


EMRにSSHでログイン

aws emr ssh --key-pair-file YOUR_KEY_FILE --cluster-id YOUR_CLUSTER_ID


hiveでテーブル定義

Fluentdはデータを1行毎にJSONの文字列としてKinesisに流し込むので、まずはひとかたまりの文字列として取り扱う。また、HiveからKinesisへの接続は com.amazon.emr.kinesis.hive.KinesisStorageHandlerというEMRに予めインストールされているコネクタクラスを利用する。このクラスの詳細についてはAWSのドキュメントを参照。

CREATE TABLE json (line string)

STORED BY
'com.amazon.emr.kinesis.hive.KinesisStorageHandler'
TBLPROPERTIES(
"kinesis.stream.name"="YOUR_STREAM_NAME",
"kinesis.endpoint.region"="ap-northeast-1",
"kinesis.records.batchsize"="10000"
);

だいたいこれで準備完了。


クエリ

ここまでで準備はできたのであとはクエリする。

例えばKinesis Stream内にあるレコード群を1分ごとにGroupingしつつ、Levelというラベルで更にGroupingしてレコード数をカウントしてみるならこんなかんじ。

元のJSONの中身は特に触れてないが、あんまり重要なポイントではないのでそのままスルー。

SELECT

source.time_str,
source.level,
count(1)
FROM(
SELECT
parsed.*,
regexp_replace(parsed.time,"[0-9]{2}Z","00Z") as time_str
FROM
json
LATERAL VIEW
json_tuple(json.line, 'id', 'level', 'method', 'uri', 'reqtime', 'foobar', 'time', 'tag') parsed
AS
id, level, method, uri, reqtime, foobar, time, tag
) source
GROUP BY
source.time_str, source.level;

無事にクエリできた!


何が起こったのか?

今回のKinesis Streamには8つのShardがあった。それに対してMapReduceのログを見ると、下記のように8つのMapperが動作していたことがわかる。つまり、ShardごとにMapperが立ち上がり、データ取り出しをしていたということだろう。

mr.jpg

絵にするとこんな感じか。

emr.jpg

たしかにKinesis Stream側でもガシガシGetRecordsをRequestされてる感がある。

kinesis.jpg


それなりに時間はかかる。

Hiveだから結構時間かかる、というのもあるけれども、もうひとつ時間かかるところが。

今回使ったEMRのKinesisコネクタクラスにはチェックポイント機能がないので、毎回Kinesis Stream内のデータをフルスキャンすることになるというのがポイント。Kinesis Streamには常に過去24時間分のデータが保持されているので、言い変えてみると、毎回これをスキャンすることになる。(毎回過去24時間のWindow関数的な感じ?w)


高速化するには?

しかしこれって、Spark Streaming + SparkSQLでやるにしても同じ問題がつきまとう気がする。Sparkに対して逐次的にKinesisからデータを取り込んでおいて、クエリは取り込み済みデータに対して行う、みたいな感じにSQLのクエリエンジンとデータ取り込み部分を分離してやる必要がありそう。