このエントリーはドワンゴアドベントカレンダー17日目のエントリーです。
ストリーム処理エンジンのNorikraについて、最近聞くことが増えてきました。
使ってみたい方は結構いるのではないでしょうか。
とは言え、「ストリーム処理を試してみたい、環境構築してやってみよう」と思っても、JRuby入れてNorikra入れて、fluentd入れてNorikraとのin/outの連携して、集計結果を格納する為にElasticsearch構築して、Kibanaから見れるようにして、認証機構や改廃の機構も入れて...あ、ストリームソースも用意しなきゃ...となって、そこそこ手間が掛かります。
一揃いの環境構築するのは面倒、という方向け(自分向け)に、手軽にコマンド一発で立ち上げられる環境を作りました。
Norikraそのものについては、こちらの記事が素晴らしかったです。
http://hase.hateblo.jp/entry/2014/12/16/142250
figについては、こちらの記事を読むと思います。
http://qiita.com/inokappa/items/eac7e62b4429304fc047
全部入りのfigを立ち上げる
[準備]fig buildでDockerfileをまとめてビルド
$ git clone https://github.com/ixixi/stream-processing-fig
$ cd stream-processing-fig
$ cd fig/log_collector
$ fig build
これで今回利用するDockerfileがビルドされますので、その間、しばら食事に出かけるなどして、気長に待ちましょう。
一度buildがされると、以降はキャッシュされるため、containers/
以下の内容に変更を加えない限り、再ビルドは必要ありません。
fig upでまとめて立ち上げる
$ fig up -d && sleep 30 && docker exec -it logcollector_fluentd_1 pkill -HUP -f fluentd
(figはコンテナ間の起動の待ち合わせしないため、しばらく待ってfluentdを再起動する処理をしています。)
少し待つと、コンテナが5つ立ち上がっているはずです。確認してみましょう。
すべてのStateがUp
になっていることを確認しましょう。
$ fig ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------------------------------------
logcollector_elasticsearch_1 /run.sh Up 9200/tcp, 0.0.0.0:9300->9300/tcp
logcollector_fluentd_1 supervisord -n Up 0.0.0.0:24224->24224/tcp
logcollector_kibana_1 /kibana/bin/kibana Up 5601/tcp
logcollector_nginx_1 nginx -g daemon off; Up 0.0.0.0:26578->26578/tcp,
443/tcp,
0.0.0.0:5601->5601/tcp,
80/tcp, 0.0.0.0:9200->9200/tcp
logcollector_norikra_1 norikra start --stats=/nor ... Up 26571/tcp, 26578/tcp, 8778/tcp
この時点で、
http://${host}:26578
でNorikraのWebUI、
http://${host}:5601
でkibana、
http://${host}:9200/_plugn/head
でElasticsearchにアクセス出来ます。
※BASIC認証を掛けています(初期値はtest/test)
コンテナ間の関係/役割
--
コンテナの関係は以下のようになっています。
fluentdでのデータのフローは以下のようになっています。
このfigで立ち上がるfluentdは、データを受け取るとElasticsearchとNorikraの両方に流します。
Norikraで、ストリーム処理した結果について、さらにfluentdがElasticsearchにデータを格納します。
つまり、生データと、Norikraでの処理済みデータの両方がElasitcsearchに格納されます。
また、認証機構があると便利かと思うので、Nginxを挟んでNorikra(のWebUI)/Elasticsearch/Kibanaの各コンテナにBASIC認証を掛けてアクセスするようにしています。
起動後の画面
Elasticsearch (head plugin)
Kibana4
Norikra
fig.ymlの構成
設定/データ/ログを以下のように分けています。
.
├── README.md
├── containers # 各コンテナのDockerfileを置いている
│ ├── elasticsearch
│ │ ├── Dockerfile
│ │ └── run.sh
│ ├── fluentd
│ │ ├── Dockerfile
│ ├── kibana
│ │ └── Dockerfile
│ ├── nginx
│ │ └── Dockerfile
│ └── norikra
│ └── Dockerfile
└── fig
└── log_collector
├── fig.yml # 構成の定義
├── data # elasticsearchの格納先など、ホスト側での永続化用のマウントポイント
│ ├── elasticsearch
│ │ ├── data
│ │ └── work
│ ├── fluentd
│ └── norikra
├── log # ログはホスト側のここに書き出す
│ ├── elasticsearch
│ ├── elasticsearch-supervisord
│ ├── fluentd
│ ├── fluentd-supervisord
│ ├── nginx
│ └── norikra
└── setting # 各種設定ファイル。起動時にマウントして実行される
├── elasticsearch
│ ├── crontab
│ └── elasticsearch.yml
├── elasticsearch-supervisord
│ ├── crond.conf
│ └── supervisord.conf
├── fluentd
│ └── fluentd.conf
├── kibana
│ └── kibana.yml
├── nginx
│ ├── htpasswd
│ └── nginx.conf
└── norikra
基本的に、設定はマウントしているホスト側で持つようにしているため、fig/log_collector/setting
以下の各種設定ファイルを書き換えてコンテナを立ち上げ直せば、再ビルド無しに設定変更が反映されます。
同梱されているストリームソース
ストリーム処理をやってみようと思っても、手軽に試せるデータソースが無くてちょっと...となることは意外とあったりします。
このfigでは、すぐにストリーム処理が出来るように、以下の3種類のストリームソースは最初から使えるようにしています。
Norikra自身のメトリクス
NorikraはJRubyで動くので、JVMのメトリクスを取得出来ると便利ですよね。
jolokiaというJMX-HTTP bridgeを使うと、HTTPでメトリクスが取得できるようになります。
$curl http://norikra:8778/jolokia/read/java.lang:type=Memory
{
"timestamp": 1418823847,
"status": 200,
"request": {
"mbean": "java.lang:type=Memory",
"type": "read"
},
"value": {
"Verbose": true,
"ObjectPendingFinalizationCount": 0,
"NonHeapMemoryUsage": {
"max": 224395264,
"committed": 90836992,
"init": 24313856,
"used": 69053632
},
"HeapMemoryUsage": {
"max": 458752000,
"committed": 345702400,
"init": 394891136,
"used": 139351160
},
"ObjectName": {
"objectName": "java.lang:type=Memory"
}
}
}
fluent-plugin-jolokiaを使うと、jolokiaを入れたNorikraから各種メトリクスをfluentdが収集してストリームとして受け取れるので、最初からのプラグインを入れています。
Dockerコンテナのメトリクス
fluent-plugin-docker-metrics を使えば、Dockerのメトリクスも収集出来ます。
このpluginでは、コンテナの中からは、そのままだと上手く扱えません。
fluent-plugin-docker-metricsでは、各種メトリクスが格納されている/sys/fs/cgroup
以下の内容を参照して収集しているのですが、これは、コンテナ内の/sys/fs/cgroup
ではなくホスト側の/sys/fs/cgroup
を見る必要があるため、/sys/fs/cgroup
と/var/run/docker.sock
を、(read-onlyで)マウントすることで、当該ホスト上で起動している全コンテナのメトリクスを収集しています。
twitterのストリーム(要Token)
ストリーム処理の定番、fluent-plugin-twitterのプラグインもfluentdに入れてあります。
これはtwitterのストリームデータを取得するinput pluginです。
このpluginはtwitter APIを利用するため、APIキーとトークンを用意して、
fig/log_collector/setting/fluentd/fluentd.conf
の、下記部分のコメントアウトを外して、自分のAPIキー等に書き換えれば、あとはtwitterストリームが扱えるようになります。
<source>
consumer_key ${YOUR_API_key}
consumer_secret ${YOUR_API_Secret}
oauth_token ${YOUR_access_token}
oauth_token_secret ${YOUR_access_token_secret}
tag nested.twitter
timeline sampling
output_format nest
</source>
ダッシュボード作成
これで既にデータは収集され続ける環境が動いていますので、ダッシュボードも作ることが出来ます。
Kibana4のVisualizer
データは既にElasticsearchに入っていると思うので、Vizualize(グラフ定義)を好きに幾つか作りましょう。
Dashboardタブで、保存したVizualizeを並べると、Dashboardが作れます。
右下では、NorikraのHeapMemoryUsage/NonHeapMemoryUsageのStackedAreaChartを出したりしています。
Norikra
NorikraのUDFを使ったクエリ
Norikraは、UDF/UDAFを作れます。
現在、Rubygemsには、Norikra-udf-percentileとnorikra-udf-uri_parserが登録されていますので、これは既にこの環境に入っています。
URI parserについてはQiitaの紹介記事もありますね。
norikra-udf-uri_parserは、URIのリクエストパラメタ等のパースをやってくれる便利UDFなので、リアルタイムアクセスログ解析をやる場合には非常に便利だと思います。
Norikraからfluentdでのクエリ結果取り出し
Norikraのクエリ結果をfluent-plugin-norikraで取り出してデータソースとするには、クエリを一つ一つfluentdのconfで指定するtarget
と、GROUPをまとめて指定するsweep
がありますが、ここでは、お手軽に利用できるよう、sweep決め打ちとしています。
<source>
type norikra
norikra "#{ENV['NORIKRA_PORT_26571_TCP_ADDR']+':'+ENV['NORIKRA_PORT_26571_TCP_PORT']}"
<fetch>
method sweep
target out
interval 1s
tag field tag_name
tag_prefix query
</fetch>
</source>
Norikraのクエリ登録時のGROUPとしてout
を指定すると、fluentdで取り出され、elasticsearchに格納されます。
また、tag_name
というカラムが、取り出された後のtagとなります。
Elasticsearchに格納時、${tag_parts[0]}-YYYY.MM.DD
というインデックスとして、_type
が${tag_part[1]}
となるようにしています。
つまり、例えば次のようなクエリ
SELECT
"aggregate.tweet_length" as tag_name,
user_lang,
COUNT(1) as count,
percentiles( message.length(), {50,90, 98} ) AS percentiles
FROM
twitter_sampling.win:time(10 min)
GROUP BY
user_lang
OUTPUT SNAPSHOT EVERY 5 sec
をNorikraに(Groupをout
として)登録すると、直近10分のtweetに対して、5秒毎に、aggregate-YYYY.MM.DDというインデックス名で、Elasticsearchに格納されます。_type
がtweet_length
となります。
これで、NorikraとKibana4で、大分遊べるようになったのではないでしょうか。
運用にあたってのカスタマイズ
Elasticsearchのデータの改廃がしたい
Elasticsearchのコンテナ内では、cronを動かしています。fig/log_collector/setting/elasticsearch/crontab
をコンテナ起動時に読み込みます。
logstash形式で保存しているので、例えば、index名がfoo
で保持期間9日としたい、という場合は、次のような行を書き加えておくと、9日以前のログは自動で削除してくれます。
* * * * * PREFIX="foo-"; DAY=9; INDEX=`date +$PREFIX\%Y.\%m.\%d --date "${DAY} days ago"`; curl -XDELETE -w'\n' "http://localhost:9200/${INDEX}"
BASIC認証のユーザを追加/変更したい
fig/log_collector/setting/nginx/htpasswd
を変更して、コンテナを起動し直しましょう。
fluentd/Elasticsearch/Norikraにpluginを追加したい
それぞれのDockerfileでpluginをinstallしているので、
containers/fluentd/Dockerfile
containers/elasticsearch/Dockerfile
containers/norikra/Dockerfile
に、必要なpluginのインストールの行を書き加えて、fig build
し直しましょう。
最後に
ストリーム処理基盤として、Norikraは(StormやSpark Streamingに比べ、)非常にお手軽に使えて便利です。
UDF/UDAFを作ればだいぶ凝った処理も書けますし、かなりのユースケースでNorikraを利用できるシーンはあるのではないかと思います。
将来的に、Norikraを本格的に使うのであれば、一つのNorikraに色んな種類のクエリを相乗りさせるより、たとえ一つのホスト上においても、Docker化したNorikraのコンテナを複数立てた方が、Norikraの再起動の影響範囲を極小化出来るので良さそうかなぁ、などと思ってたりします。
明日は
ドワンゴアドベントカレンダー19日目 @yukkuri_sinai さんです。どうぞお楽しみに!