このポストはわたし個人のメモであり、所属する組織を代表するものではありません。
サマリ
Fluentd -> Kinesis -> Hive(EMR)
という感じのストリームデータのクエリ環境を作った。というか動かしてみた。
準備
KinesisのStream作成
AWSの万地面とコンソールで。
Kinesisへのデータ流し込み
aws-fluent-plugin-kinesisを使って流しこみ。
EMRを起動
CLIからEMRクラスタを起動。
aws emr create-cluster \
--name Hive \ #クラスタ名
--ami-version 3.4.0 \ #AMIバージョン
--applications Name=Hive Name=Hue \ # HiveとHueをインストール
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=c3.xlarge InstanceGroupType=CORE,InstanceCount=2,InstanceType=c3.xlarge \ #インスタンスタイプと台数
--ec2-attributes SubnetId=YOUR_SUBNET,KeyName=YOUR_KEYPAIR \ #デプロイするEC2のサブネットやキーペアを指定
--log-uri s3://PATH/TO/LOG/ \ #ログを回収するS3バケット
--enable-debugging \ #デバッグフラグ。これをオンにしておくとマネジメントコンソールでログが確認できる
EMRにSSHでログイン
aws emr ssh --key-pair-file YOUR_KEY_FILE --cluster-id YOUR_CLUSTER_ID
hiveでテーブル定義
Fluentdはデータを1行毎にJSONの文字列としてKinesisに流し込むので、まずはひとかたまりの文字列として取り扱う。また、HiveからKinesisへの接続は com.amazon.emr.kinesis.hive.KinesisStorageHandlerというEMRに予めインストールされているコネクタクラスを利用する。このクラスの詳細についてはAWSのドキュメントを参照。
CREATE TABLE json (line string)
STORED BY
'com.amazon.emr.kinesis.hive.KinesisStorageHandler'
TBLPROPERTIES(
"kinesis.stream.name"="YOUR_STREAM_NAME",
"kinesis.endpoint.region"="ap-northeast-1",
"kinesis.records.batchsize"="10000"
);
だいたいこれで準備完了。
クエリ
ここまでで準備はできたのであとはクエリする。
例えばKinesis Stream内にあるレコード群を1分ごとにGroupingしつつ、Levelというラベルで更にGroupingしてレコード数をカウントしてみるならこんなかんじ。
元のJSONの中身は特に触れてないが、あんまり重要なポイントではないのでそのままスルー。
SELECT
source.time_str,
source.level,
count(1)
FROM(
SELECT
parsed.*,
regexp_replace(parsed.time,"[0-9]{2}Z","00Z") as time_str
FROM
json
LATERAL VIEW
json_tuple(json.line, 'id', 'level', 'method', 'uri', 'reqtime', 'foobar', 'time', 'tag') parsed
AS
id, level, method, uri, reqtime, foobar, time, tag
) source
GROUP BY
source.time_str, source.level;
無事にクエリできた!
何が起こったのか?
今回のKinesis Streamには8つのShardがあった。それに対してMapReduceのログを見ると、下記のように8つのMapperが動作していたことがわかる。つまり、ShardごとにMapperが立ち上がり、データ取り出しをしていたということだろう。
たしかにKinesis Stream側でもガシガシGetRecordsをRequestされてる感がある。
それなりに時間はかかる。
Hiveだから結構時間かかる、というのもあるけれども、もうひとつ時間かかるところが。
今回使ったEMRのKinesisコネクタクラスにはチェックポイント機能がないので、毎回Kinesis Stream内のデータをフルスキャンすることになるというのがポイント。Kinesis Streamには常に過去24時間分のデータが保持されているので、言い変えてみると、毎回これをスキャンすることになる。(毎回過去24時間のWindow関数的な感じ?w)
高速化するには?
しかしこれって、Spark Streaming + SparkSQLでやるにしても同じ問題がつきまとう気がする。Sparkに対して逐次的にKinesisからデータを取り込んでおいて、クエリは取り込み済みデータに対して行う、みたいな感じにSQLのクエリエンジンとデータ取り込み部分を分離してやる必要がありそう。