Hortonworksアドベントカレンダー 12/21 の記事です。
12/3 NiFi 1.1.0でWebSocketに対応に続き、今回もWebSocketネタで。
Zeppelin、NiFi、Kafka、D3と色々学べるお得なチュートリアル形式となっております。
先日、ある知人から、「NiFiのWebSocketを使って、Zeppelinにリアルタイムなグラフが表示できたらCoolじゃない?」という意見をいただいたのがきっかけでした。
まず、完成図のイメージから。
Zeppelin(左上)は複数のグラフをノートブックという形でまとめて、可視化したデータをメンバ間で共有したり、分析するのに便利なツールです。
今回はZeppelinに図のような円グラフを表示します。円グラフに表示するデータはNiFiからWebSocket経由でPushします。
せっかくサーバサイドからPushできるので、さらにNiFiからKafkaのメッセージを受信するようにしてみましょう。SparkやStormなどのストリーム分析結果をKafkaにパブリッシュしてZeppelinにリアルタイムで表示するような利用シーンを想定しています。
お試しいただくには、Zeppelin、NiFi、KafkaとWebブラウザが必要です。それぞれのインストールと起動はコマンドを数発叩くだけなので非常に簡単です。
環境構築
Zeppelinのインストールと起動
ZeppelinのダウンロードページからBinary Packageをダウンロードして適当なところに展開しましょう。以下のコマンドを実行すると、8080ポートでZeppelinのUIにアクセスできます:
$ cd zeppelin-0.6.2-bin-all
$ ./bin/zeppelin-daemon.sh start
Kafkaのインストールと起動
KafkaのQuickStartに記載の通り、ダウンロードして適当な場所に展開し、次のコマンドで、まずZookeeperを起動します:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
つづいて別のターミナルを開き、Kafkaサーバを起動しましょう:
$ bin/kafka-server-start.sh config/server.properties
また別のターミナルを開いてトピックを作りましょう:
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# ちゃんと作成できたか確認
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
QuickStartに記載のコマンドでターミナルからメッセージの送受信ができるか確認しときましょう。
NiFiのインストールと起動
NiFiのダウンロードページからバイナリをダウンロードして、適当な場所に展開します。
展開したディレクトリの中に設定ファイルがあるので、少し変更しましょう。ZeppelinもNiFiもデフォルトのHTTPリスンポートが8080なので、NiFiのポートを変更します:
$ vi conf/nifi.properties
# web properties #
nifi.web.war.directory=./lib
nifi.web.http.host=
nifi.web.http.port=8081 // 8080以外に変える
その後、NiFiを起動します:
$ ./bin/nifi.sh start
# 起動には少々時間がかかるので、ログを見ときましょう
$ tail -f logs/nifi-app.log
(次のログが表示されれば、起動が完了した合図です)
2016-12-20 14:58:48,166 INFO [main] org.apache.nifi.web.server.JettyServer NiFi has started. The UI is available at the following URLs:
2016-12-20 14:58:48,166 INFO [main] org.apache.nifi.web.server.JettyServer http://127.0.0.1:8081/nifi
ブラウザからlocalhost:8081でNiFiのUIにアクセスします。さて、NiFiのフローを構築していきましょう!
NiFiのフローを作成する
まずはデータが必要、Kafka -> NiFiの流れ
これから、わりと複雑なデータフローを組んでいきます。最初から全部設計できてなくてもねんどをこねるように、動かしながらだんだんとフローを構築できるのがNiFiの良いところです。Kafkaのトピックが用意できているので、そこからデータを取得する部分から始めてみましょう。
画面上部に並んでいるアイコンの一番左にある、Processorのアイコンを白いキャンバスにドラッグ&ドロップします。
検索窓にkafkaと入力すると、いろいろ出てきます。Kafkaのバージョンごとに対応するProcessorがあります:
Kafka | プロセッサ |
---|---|
0.8.x | GetKafka, PutKafka |
0.9.x | ConsumeKafka, PublishKafka |
0.10.x | ConsumeKafka_0_10, PublishKafka_0_10 |
勢いよくバージョンアップしていくKafkaに追いつくためにNiFiをうまく使うのも良いですね。NiFiでは複数のKafkaバージョンに対応するデータフローを作ることができます。
さて、今回利用するのはKafka 0.10.xなのでConsumeKafka_0_10をダブルクリックします。
おめでとうございます!これでNiFiにプロセッサが追加できましたね:
ワーニングのアイコンにマウスをのせると、Topic Name、Group ID、Relationship Successが未設定ですよと教えてくれます。プロセッサを右クリックしてConfigure
をクリックします:
PROPERTIES
タブにある、Topic Name(s)
のValue列をクリックして、先程作成したトピックの名前test
を、Group ID
にはConsumer Groupの名前を入力します。何でも良いです。ここではnifi
としました:
設定できたらAPPLY
をクリックしてダイアログを閉じます。
残る警告はRelationship success
が無いよ、というものです。Kafkaから受信したメッセージの転送先が未設定ということですね。何か作成しましょう。
こんな時に便利なのが、UpdateAttributeプロセッサです。UpdateAttributeプロセッサを追加しましょう。
そして、ConsumeKafka_0_10の真ん中あたりにマウスをのせると、丸い矢印が表示されます、それをドラッグして、UpdateAttributeプロセッサにドロップします。
すると、Create Connection
ダイアログが表示されます。Add
をクリックしてコネクションを作成しましょう。
これで、success
Relationshipのコネクションが作成できました。ConsumeKafka_0_10の設定が完了したので、警告アイコンが消え、停止状態となっています:
UpdateAttributeには警告が出ていますが、そのままで構いません。ConsumeKafka_0_10を起動してみましょう!
プロセッサが再生中のアイコンに変わり、時折右上の方に数字の1
が見えます。これはアクティブなスレッド数を表しています。NiFiでは同一のフローの中にある複数のプロセッサが並列で動きます:
Kafkaトピックにメッセージを送る
NiFi側の準備は整いました、Kafkaへとメッセージを送信しましょう。コンソールから次のコマンドを入力します:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
(何かテキストを入力してEnterすると送信される)
Hello Kafka.
このコマンドは次の入力を待つ状態で起動しっぱなしにしておきましょう。
NiFiのUIに戻ってみると、success
にQueued 1
と表示されています!何か受信したようですね:
コネクションを右クリックして、List queue
を選択します。
すると、success
に滞留しているメッセージが確認できます。NiFiではこのデータをFlowFileと呼びます:
左端のインフォメーションアイコンをクリックすると、FlowFileの情報が色々確認できます。
ミニクイズ
コンソールから送信した文字列はNiFiのUIから確認できます。どこで閲覧できるかわかりますか? UIを色々探索してみましょう :)
NiFiからWebSocketでデータをPushする
Kafka -> NiFiのルートが開拓され、冒頭の絵に描いた餅の右下部分ができました!次は取得したメッセージをWebSocketでクライアントにPushする部分を作りましょう。
話を単純にする、まずは単一クライアントで
いきなり難しいフローを作成するのは大変なものです。まずは簡略化したフローでプロトタイプを行いましょう。
WebSocketは複数のクライアントを扱うことができます。NiFiのWebSocketコンポーネントは各クライアントのセッションをIDで管理しています。複数クライアントを相手にするには多少工夫が必要なので、後回しにして、単一のクライアントと対話できるフローにしましょう。
NiFiをWebSocketサーバにする
以前の投稿でNiFiのWebSocket実装について詳しく書きましたので、こちらも参照してください。
WebSocket周りのフローをまとめるために、Process Groupを作ってみましょう。
Process Groupアイコンをキャンバスに配置します。適当に名前をつけましょう:
作成したProcess Groupをダブルクリックすると、中に入れます。ここにWebSocketサーバの実装を組んでいきます。
ListenWebSocketとJettyWebSocketServer
ListenWebSocket
プロセッサを配置して、ConfigureからPROPERTIESタブを表示します。WebSocket Server ControllerServiceからCreate new service
を選択します:
JettyWebSocketServer
を作成し、Server URL Path
には/realtime-data
と指定します:
APPLY
してプロセッサの設定は完了です。
もう一度ListenWebSocketのPROPERTIESを開き、JettyWebSocketServer
の隣に表示されている矢印をクリック、Editアイコンをクリックして、ControllerServiceの設定に遷移します。
Listen Portに9091と設定し、APPLYで設定を反映させます。そして稲妻アイコンをクリックしてこのControllerServiceを有効化しましょう!
もう一度、ListenWebSocketのConfigureから、今度はSETTINGS
タブを表示します。binary message
とtext message
は今回受信しても利用しないので、自動的に終了するようにします。
ConsumeKafkaと同じように、connected
の先にはUpdateAttributeを仮置きして、ListenWebSocketを起動しましょう。
これで、WebSocketクライアントからの接続を受け付ける準備が整いました!
websocket.orgのエコーテスト
websocket.orgのEcho Testを使って、接続確認を行います。
Location
にはNiFiへとつながるURLを入力します。Connect
して、LogにCONNECTED
と表示されればOKです:
NiFi側ではconnected
にクライアントが接続してきたイベントがFlowFileとして生成されているはずです。List queuesから中身を確認すると、次のAttributesが設定されています:
これらの情報があれば、接続しているWebSocketクライアントへとメッセージを返すことができます。
この情報をNiFiのキャッシュに保持しておきましょう。
WebSocketセッションIDをDistributedCacheに格納する
さぁ、ここからはNiFi標準で使えるDistributedCache
という仕組みを利用してみましょう。これはNiFi上で動くオンメモリのKVSみたいなものです。もっと簡単に言ってしまうと、連想配列です。任意のキーで任意の値を保存でき、フローの色々な場所から共有できます。
まずはDistributedCache
サーバを立てましょう。UIの左側にあるOperate
パレットの設定アイコンをクリックすると、ControllerServiceの一覧画面が表示されます:
ここには、先程作成したJettyWebSocketServerが表示されていますね。プラスアイコンをクリックして、DistributedMapCacheServer
を追加しましょう:
DistributedMapCacheServer
の設定はデフォルトのままで大丈夫です。続いてこのキャッシュサーバに接続するクライアントのDistributedMapCacheClient
を追加します。Server Hostname
にはlocalhost
と設定しましょう:
作成したキャッシュサーバとクライアントのサービスを有効化しておきましょう:
キャッシュサーバの準備ができたのでデータを格納しましょう。WebSocketクライアントが接続した時に生成されたセッションIDを捕獲します。キャッシュサーバにはFlowFileのコンテンツ部分が格納されるので、websocket.session.id
のAttribute値を、コンテンツ部分に抽出する必要があります。
これにはReplaceText
プロセッサを使います。このプロセッサは、テキスト形式のFlowFileのコンテンツ内文字列を置換するのに利用できます。Replacement Strategy
にAlways Replace
を、Replacement Value
に${websocket.session.id}
と設定します。これはNiFiエクスプレッションの式で、FlowFileのAttributeの値を返します。詳細はまたの機会に。。この設定で、該当のAttribute値でFlowFileコンテンツの値を置換します。
PutDistributedMapCache
プロセッサを追加してReplaceText
の出力をつなぎましょう。キャッシュのキーは固定値でwebsocket.session.id
としました。先程作成したDistributedMapCacheClientService
も指定します:
長くなりましたが、ここまで手順にそって進めていくと、次の図のようなフローになっていると思います。ListenWebSocket
のconnected
の先端にある青いポイントをドラッグして、接続先をReplaceText
に移しましょう。こんなことができるのもNiFiならではですね!
ReplaceText
とPutDistributedMapCache
のfailure
relationshipはAuto Terminateしています。出来上がったフローはこの通り:
これで生成されたWebSocketのセッションIDがキャッシュに格納できました!
WebSocketクライアントにKafkaから取得したメッセージをPushする
さあ、WebSocketクライアントにメッセージをPushしてみましょう!使うプロセッサはPutWebSocket
です。設定を見てみましょう、デフォルトでは次のようになっています:
Session Id, ControllerService Id, Endpoint Idはそれぞれ、渡ってきたFlowFileから解決するようになっています。これは複数のクライアントを相手にする場合に効果を発揮しますが、今回は単一クライアントでのプロトタイプなので、固定値を設定してしまいましょう。
ControllerService一覧からJettyWebSocketServer
のインフォメーションアイコンをクリックして、SETTINGSタブにあるIdをコピーします:
コピーしたControllerServiceのIdをPutWebSocket
プロセッサのWebSocket ControllerService Id
に設定します。WebSocket Endpoint Id
はListenWebSocket
プロセッサが待ち受けているPathを指定すれば良いので、/realtime-data
としましょう:
あとは、WebSocket Session Id
があれば、WebSocketの接続相手にメッセージを返すことができます。
セッションIDはDistributedCacheに保存したのでしたね。取り出すために、FetchDistributedMapCache
を配置します。Cache Entry Identifier
には固定値でwebsocket.session.id
を指定しましょう。Distributed Cache Service
にはセッションIDを格納するときにも利用したControllerServiceを再利用できます。
キャッシュから取得したセッションIDを、websocket.session.id
というAttributeに保存するために、Put Cache Value in Attribute
を設定します。
これで、Kafkaから受信したメッセージをFlowFileのコンテンツに残したまま、さらにWebSocketのセッションIDをAttributeに付加することができます。
これまで解説したプロセッサ群はWebSocket Server
というProcessor Group内で定義していました。ConsumeKafkaはルートProcess Groupにあります。Process Group間でFlow Fileをやりとりするために、Input Port
を先頭に追加しましょう。渡ってきたFlow FileをクライアントにPushするので、Input Portの名前はpush
としました。
push
、FetchDistributedMapCache
、PutWebSocket
をスタートしておきましょう:
それでは、ルートProcess Group、NiFi Flow
に戻って、ConsumeKafka_0_10
から、WebSocket Server
へと、フローをつなげます:
では、Kafkaにコンソールからconsole-producerでメッセージを送信してみましょう。同時にEcho TestでメッセージがWebSocket経由で受信できるかも確認します:
おお!つながりましたね!
Zeppelinにグラフを表示する
いよいよ、Zeppelinを使う時がきました!ZeppelinのUIから、新しいノートブックを作成しましょう。名前は何でも良いです:
試しに、Zeppelinで円グラフを表示してみましょう。下図のようなコードを入力し、スタートすると、円グラフが表示されます:
これはkey, valueを持つ二次元の表形式のデータをZeppelinに渡しているコードです。すると、Zeppelinではテーブルや棒、円グラフなどで表示することができるのですね。
今回、WebSocketで受信したデータをこのような形で渡せればよかったのですが、方法がわかりませんでした。とりあえず%angular
インタプリタを使えば出力を自在にカスタマイズできるので、この手法でいきます。誰かもっと簡単な方法をご存じの方は教えてください。
Angularインタプリタで任意のJavascriptを実行
Zeppelinでは、インタプリタが多数用意されていて、先頭の%angular
のように指定すると、記述するコードを実行するエンジンを切替えることができます。
今回はAngularを使ってJavascriptのコードを書いていきましょう。といってもAngular自体はあんまり使いませんが。。
試しに、以下のコードを実行してみましょう。script
タグに実行するJavascriptコードを、出力結果を表示するためのdiv
を用意して実行すると、下図のようになります:
D3で円グラフを描画する
さて、それではD3を使って円グラフを描画してみましょう。結構長いコードですが、以下をZeppelinにコピペしてみてください。graphId
は同じノートブック内に複数のパラグラフを作成する場合に重複しないように連番をふってます:
%angular
<script>
var graphId = '#realtime-graph-2';
var w = 600;
var h = 400;
var r = Math.min(w, h) / 2;
var color = d3.scale.category20c();
var svg = d3.select(graphId).append('svg')
.attr('width', w)
.attr('height', h)
.append('g')
.attr('transform', 'translate(' + r + ',' + r + ')');
var pie = d3.layout.pie()
.sort(null)
.value(function(d) {return d.value});
var arc = d3.svg.arc()
.outerRadius(r);
var updatePiChart = function(dataPoints) {
var arcs = svg.selectAll('.slice')
.data(pie(dataPoints));
// Add new slices if needed.
var g = arcs.enter()
.append('g')
.attr('class', 'slice');
g.append('path');
g.append('text')
.attr('text-anchor', 'middle');
// Update existing and added slices.
arcs.select('path')
.attr('fill', function(d, i){
return color(i);
})
.attr('d', function(d) {
return arc(d);
});
arcs.select('text')
.attr('transform', function(d) {
d.innerRadius = 0;
d.outerRadius = r;
return 'translate(' + arc.centroid(d) + ")";
})
.text(function(d){
return d.data.label;
});
// Remove exitted slices.
arcs.exit().remove();
}
</script>
<div id="realtime-graph-2">
</div>
D3の部分、ややこしいので解説をしたいのですが、またの機会に。。updatePiChart
関数に表示するデータを渡すと、円グラフを描画するようにしました。Zeppelinでこのコードを実行すると、updatePiChart
関数が作成されるので、WebブラウザのDevToolから試しに実行してみます。
dataPoints
は次のような配列です。label
とvalue
を持ったオブジェクトを格納しています:
[{"label": "a", "value": 1},
{"label": "b", "value": 2},
{"label": "c", "value": 3}]
うまくグラフが表示されましたか? dataPointsの中身を変えて、updatePiChart
を実行すると、円グラフが更新されると思います。
ここにWebSocketから受信したデータを流し込んでやればOKですね!
WebSocketでNiFiと接続する
先程のコードに、WebSocketでNiFiと接続するコードを追加します。必要なコードは以下のようになります:
%angular
<script>
var graphId = '#realtime-graph-2';
// ...
var updatePiChart = function(dataPoints) {
// ...
}
// ここから追加
var wsUri = "ws://localhost:9091/realtime-data";
websocket = new WebSocket(wsUri);
websocket.onopen = function(evt) {console.log('connected')};
websocket.onerror = function(evt) {console.log('ERR', evt)};
websocket.onmessage = function(evt) {
console.log(evt)
var dataPoints = JSON.parse(evt.data);
updatePiChart(dataPoints);
};
</script>
<div id="realtime-graph-2">
</div>
Kafka -> NiFi -> (WebSocket) -> Zeppelin -> D3!!
これですべての準備が整いました!Zeppelinでパラグラフを実行してWebSocketでNiFiに接続します。
その後、console-publisherを利用してKafkaにdataPointsのJSON文字列を送信してみましょう。
NiFiがWebSocketサーバとなり、見事にリアルタイムグラフ更新ができました!
複数のWebSocketクライアントに対応する
今回の記事では、話を簡単にするために、単一クライアントのみに対応すれば良いことにしました。WebSocketセッションIDをDistributedCacheに格納してKafkaからメッセージを受信した際に利用しました。
これだと、別のクライアントがWebSocketで接続した際にキャッシュ内のWebSocketセッションIDが更新されてしまい、複数のクライアントでは利用できません。
一つの解として、NiFiのフローで次のようなループを実装する手があります:
WebSocketクライアントが接続すると、このループをぐるぐると回ります。Kafkaから受信したJSONデータを受信時のタイムスタンプと共に、DistributedCacheに格納しておきます。
フローをループするFlow Fileには、WebSocketのセッションID、ControllerService Id、Endpoint Id、そして、最終Pushタイムスタンプを持たせます。
次にキャッシュを確認したときに前回よりも新しい情報であれば、WebSocketクライアントにPushし、そうでなければ何もせずループを継続します。
このようにフローを構築すれば、クライアントが接続してくる度にFlow Fileが作成され、クライアント毎の情報はそのFlow File内に保持されるため、複数のクライアントに対応することができます。
複数クライアント対応版のNiFiテンプレートファイルは、Realtime Data Visualization with Zeppelin via NiFi WebSocketのGistで公開しておきました。ご興味のある方はダウンロードしてNiFiにインポートして中身を探ってみてください。
まとめ
今回のチュートリアルでは、色々なOSSを組み合わせてリアルタイムのグラフを表示する仕組みを構築してみました。
NiFiをデータ収集ツールとして使うだけでなく、NiFiから何かを発信することもできてしまいます。しかもサーバ側では一切コーディングをせず、NiFiフローの構築だけでここまでできました。
年末年始、何か新しいもの触ってみたいなー、という方は是非お試しくださいませ!