LoginSignup
11
9

More than 5 years have passed since last update.

Amazon KinesisにFluentdで流し込んだデータをHiveでクエリ

Last updated at Posted at 2015-03-17

このポストはわたし個人のメモであり、所属する組織を代表するものではありません。

サマリ

Fluentd -> Kinesis -> Hive(EMR)
という感じのストリームデータのクエリ環境を作った。というか動かしてみた。

準備

KinesisのStream作成

AWSの万地面とコンソールで。

Kinesisへのデータ流し込み

aws-fluent-plugin-kinesisを使って流しこみ。

EMRを起動

CLIからEMRクラスタを起動。

aws emr create-cluster \
--name Hive \ #クラスタ名
--ami-version 3.4.0 \   #AMIバージョン
--applications Name=Hive Name=Hue \ # HiveとHueをインストール
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=c3.xlarge InstanceGroupType=CORE,InstanceCount=2,InstanceType=c3.xlarge \ #インスタンスタイプと台数
--ec2-attributes SubnetId=YOUR_SUBNET,KeyName=YOUR_KEYPAIR \ #デプロイするEC2のサブネットやキーペアを指定
--log-uri s3://PATH/TO/LOG/ \  #ログを回収するS3バケット
--enable-debugging \ #デバッグフラグ。これをオンにしておくとマネジメントコンソールでログが確認できる

EMRにSSHでログイン

aws emr ssh --key-pair-file YOUR_KEY_FILE --cluster-id YOUR_CLUSTER_ID

hiveでテーブル定義

Fluentdはデータを1行毎にJSONの文字列としてKinesisに流し込むので、まずはひとかたまりの文字列として取り扱う。また、HiveからKinesisへの接続は com.amazon.emr.kinesis.hive.KinesisStorageHandlerというEMRに予めインストールされているコネクタクラスを利用する。このクラスの詳細についてはAWSのドキュメントを参照。

CREATE TABLE json (line string)
STORED BY
'com.amazon.emr.kinesis.hive.KinesisStorageHandler'
TBLPROPERTIES(
  "kinesis.stream.name"="YOUR_STREAM_NAME",
  "kinesis.endpoint.region"="ap-northeast-1",
  "kinesis.records.batchsize"="10000"
);

だいたいこれで準備完了。

クエリ

ここまでで準備はできたのであとはクエリする。
例えばKinesis Stream内にあるレコード群を1分ごとにGroupingしつつ、Levelというラベルで更にGroupingしてレコード数をカウントしてみるならこんなかんじ。

元のJSONの中身は特に触れてないが、あんまり重要なポイントではないのでそのままスルー。

SELECT
  source.time_str,
  source.level,
  count(1)
FROM(
  SELECT 
    parsed.*,
    regexp_replace(parsed.time,"[0-9]{2}Z","00Z") as time_str
  FROM
    json
  LATERAL VIEW
    json_tuple(json.line, 'id', 'level', 'method', 'uri', 'reqtime', 'foobar', 'time', 'tag') parsed
  AS 
    id, level, method, uri, reqtime, foobar, time, tag
) source
GROUP BY
  source.time_str, source.level;

無事にクエリできた!

何が起こったのか?

今回のKinesis Streamには8つのShardがあった。それに対してMapReduceのログを見ると、下記のように8つのMapperが動作していたことがわかる。つまり、ShardごとにMapperが立ち上がり、データ取り出しをしていたということだろう。
mr.jpg

絵にするとこんな感じか。
emr.jpg

たしかにKinesis Stream側でもガシガシGetRecordsをRequestされてる感がある。
kinesis.jpg

それなりに時間はかかる。

Hiveだから結構時間かかる、というのもあるけれども、もうひとつ時間かかるところが。

今回使ったEMRのKinesisコネクタクラスにはチェックポイント機能がないので、毎回Kinesis Stream内のデータをフルスキャンすることになるというのがポイント。Kinesis Streamには常に過去24時間分のデータが保持されているので、言い変えてみると、毎回これをスキャンすることになる。(毎回過去24時間のWindow関数的な感じ?w)

高速化するには?

しかしこれって、Spark Streaming + SparkSQLでやるにしても同じ問題がつきまとう気がする。Sparkに対して逐次的にKinesisからデータを取り込んでおいて、クエリは取り込み済みデータに対して行う、みたいな感じにSQLのクエリエンジンとデータ取り込み部分を分離してやる必要がありそう。

11
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
9