19
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Hortonworks Advent Calendar 2016

Day 21

KafkaからNiFiでconsumeしてWebSocketでZeppelinに渡してD3でリアルタイムなグラフを描画する

Posted at

Hortonworksアドベントカレンダー 12/21 の記事です。

12/3 NiFi 1.1.0でWebSocketに対応に続き、今回もWebSocketネタで。

Zeppelin、NiFi、Kafka、D3と色々学べるお得なチュートリアル形式となっております。

先日、ある知人から、「NiFiのWebSocketを使って、Zeppelinにリアルタイムなグラフが表示できたらCoolじゃない?」という意見をいただいたのがきっかけでした。

まず、完成図のイメージから。

リアルタイムグラフ描画

Zeppelin(左上)は複数のグラフをノートブックという形でまとめて、可視化したデータをメンバ間で共有したり、分析するのに便利なツールです。

今回はZeppelinに図のような円グラフを表示します。円グラフに表示するデータはNiFiからWebSocket経由でPushします。

せっかくサーバサイドからPushできるので、さらにNiFiからKafkaのメッセージを受信するようにしてみましょう。SparkやStormなどのストリーム分析結果をKafkaにパブリッシュしてZeppelinにリアルタイムで表示するような利用シーンを想定しています。

お試しいただくには、Zeppelin、NiFi、KafkaとWebブラウザが必要です。それぞれのインストールと起動はコマンドを数発叩くだけなので非常に簡単です。

環境構築

Zeppelinのインストールと起動

ZeppelinのダウンロードページからBinary Packageをダウンロードして適当なところに展開しましょう。以下のコマンドを実行すると、8080ポートでZeppelinのUIにアクセスできます:

$ cd zeppelin-0.6.2-bin-all
$ ./bin/zeppelin-daemon.sh start

Kafkaのインストールと起動

KafkaのQuickStartに記載の通り、ダウンロードして適当な場所に展開し、次のコマンドで、まずZookeeperを起動します:

$ bin/zookeeper-server-start.sh config/zookeeper.properties

つづいて別のターミナルを開き、Kafkaサーバを起動しましょう:

$ bin/kafka-server-start.sh config/server.properties

また別のターミナルを開いてトピックを作りましょう:


$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

# ちゃんと作成できたか確認
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test

QuickStartに記載のコマンドでターミナルからメッセージの送受信ができるか確認しときましょう。

NiFiのインストールと起動

NiFiのダウンロードページからバイナリをダウンロードして、適当な場所に展開します。

展開したディレクトリの中に設定ファイルがあるので、少し変更しましょう。ZeppelinもNiFiもデフォルトのHTTPリスンポートが8080なので、NiFiのポートを変更します:

$ vi conf/nifi.properties

# web properties #
nifi.web.war.directory=./lib
nifi.web.http.host=
nifi.web.http.port=8081 // 8080以外に変える

その後、NiFiを起動します:

$ ./bin/nifi.sh start
# 起動には少々時間がかかるので、ログを見ときましょう
$ tail -f logs/nifi-app.log
(次のログが表示されれば、起動が完了した合図です)
2016-12-20 14:58:48,166 INFO [main] org.apache.nifi.web.server.JettyServer NiFi has started. The UI is available at the following URLs:
2016-12-20 14:58:48,166 INFO [main] org.apache.nifi.web.server.JettyServer http://127.0.0.1:8081/nifi

ブラウザからlocalhost:8081でNiFiのUIにアクセスします。さて、NiFiのフローを構築していきましょう!

NiFiのフローを作成する

まずはデータが必要、Kafka -> NiFiの流れ

これから、わりと複雑なデータフローを組んでいきます。最初から全部設計できてなくてもねんどをこねるように、動かしながらだんだんとフローを構築できるのがNiFiの良いところです。Kafkaのトピックが用意できているので、そこからデータを取得する部分から始めてみましょう。

image

画面上部に並んでいるアイコンの一番左にある、Processorのアイコンを白いキャンバスにドラッグ&ドロップします。

検索窓にkafkaと入力すると、いろいろ出てきます。Kafkaのバージョンごとに対応するProcessorがあります:

image

Kafka プロセッサ
0.8.x GetKafka, PutKafka
0.9.x ConsumeKafka, PublishKafka
0.10.x ConsumeKafka_0_10, PublishKafka_0_10

勢いよくバージョンアップしていくKafkaに追いつくためにNiFiをうまく使うのも良いですね。NiFiでは複数のKafkaバージョンに対応するデータフローを作ることができます。

さて、今回利用するのはKafka 0.10.xなのでConsumeKafka_0_10をダブルクリックします。

おめでとうございます!これでNiFiにプロセッサが追加できましたね:

image

ワーニングのアイコンにマウスをのせると、Topic Name、Group ID、Relationship Successが未設定ですよと教えてくれます。プロセッサを右クリックしてConfigureをクリックします:

image

PROPERTIESタブにある、Topic Name(s)のValue列をクリックして、先程作成したトピックの名前testを、Group IDにはConsumer Groupの名前を入力します。何でも良いです。ここではnifiとしました:

image

設定できたらAPPLYをクリックしてダイアログを閉じます。

image

残る警告はRelationship successが無いよ、というものです。Kafkaから受信したメッセージの転送先が未設定ということですね。何か作成しましょう。

こんな時に便利なのが、UpdateAttributeプロセッサです。UpdateAttributeプロセッサを追加しましょう。

そして、ConsumeKafka_0_10の真ん中あたりにマウスをのせると、丸い矢印が表示されます、それをドラッグして、UpdateAttributeプロセッサにドロップします。

image

すると、Create Connectionダイアログが表示されます。Addをクリックしてコネクションを作成しましょう。

image

これで、success Relationshipのコネクションが作成できました。ConsumeKafka_0_10の設定が完了したので、警告アイコンが消え、停止状態となっています:

image

UpdateAttributeには警告が出ていますが、そのままで構いません。ConsumeKafka_0_10を起動してみましょう!

image

プロセッサが再生中のアイコンに変わり、時折右上の方に数字の1が見えます。これはアクティブなスレッド数を表しています。NiFiでは同一のフローの中にある複数のプロセッサが並列で動きます:

image

Kafkaトピックにメッセージを送る

NiFi側の準備は整いました、Kafkaへとメッセージを送信しましょう。コンソールから次のコマンドを入力します:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
(何かテキストを入力してEnterすると送信される)
Hello Kafka.

このコマンドは次の入力を待つ状態で起動しっぱなしにしておきましょう。

NiFiのUIに戻ってみると、successQueued 1と表示されています!何か受信したようですね:

image

コネクションを右クリックして、List queueを選択します。

image

すると、successに滞留しているメッセージが確認できます。NiFiではこのデータをFlowFileと呼びます:

image

左端のインフォメーションアイコンをクリックすると、FlowFileの情報が色々確認できます。

ミニクイズ

コンソールから送信した文字列はNiFiのUIから確認できます。どこで閲覧できるかわかりますか? UIを色々探索してみましょう :)

NiFiからWebSocketでデータをPushする

Kafka -> NiFiのルートが開拓され、冒頭の絵に描いた餅の右下部分ができました!次は取得したメッセージをWebSocketでクライアントにPushする部分を作りましょう。

話を単純にする、まずは単一クライアントで

いきなり難しいフローを作成するのは大変なものです。まずは簡略化したフローでプロトタイプを行いましょう。

WebSocketは複数のクライアントを扱うことができます。NiFiのWebSocketコンポーネントは各クライアントのセッションをIDで管理しています。複数クライアントを相手にするには多少工夫が必要なので、後回しにして、単一のクライアントと対話できるフローにしましょう。

NiFiをWebSocketサーバにする

以前の投稿でNiFiのWebSocket実装について詳しく書きましたので、こちらも参照してください。

WebSocket周りのフローをまとめるために、Process Groupを作ってみましょう。

image

Process Groupアイコンをキャンバスに配置します。適当に名前をつけましょう:

image

image

作成したProcess Groupをダブルクリックすると、中に入れます。ここにWebSocketサーバの実装を組んでいきます。

ListenWebSocketとJettyWebSocketServer

ListenWebSocketプロセッサを配置して、ConfigureからPROPERTIESタブを表示します。WebSocket Server ControllerServiceからCreate new serviceを選択します:

image

JettyWebSocketServerを作成し、Server URL Pathには/realtime-dataと指定します:

image

APPLYしてプロセッサの設定は完了です。

もう一度ListenWebSocketのPROPERTIESを開き、JettyWebSocketServerの隣に表示されている矢印をクリック、Editアイコンをクリックして、ControllerServiceの設定に遷移します。

image

Listen Portに9091と設定し、APPLYで設定を反映させます。そして稲妻アイコンをクリックしてこのControllerServiceを有効化しましょう!

image

もう一度、ListenWebSocketのConfigureから、今度はSETTINGSタブを表示します。binary messagetext messageは今回受信しても利用しないので、自動的に終了するようにします。

image

ConsumeKafkaと同じように、connectedの先にはUpdateAttributeを仮置きして、ListenWebSocketを起動しましょう。

image

これで、WebSocketクライアントからの接続を受け付ける準備が整いました!

websocket.orgのエコーテスト

websocket.orgのEcho Testを使って、接続確認を行います。

LocationにはNiFiへとつながるURLを入力します。Connectして、LogにCONNECTEDと表示されればOKです:

image

NiFi側ではconnectedにクライアントが接続してきたイベントがFlowFileとして生成されているはずです。List queuesから中身を確認すると、次のAttributesが設定されています:

image

これらの情報があれば、接続しているWebSocketクライアントへとメッセージを返すことができます。
この情報をNiFiのキャッシュに保持しておきましょう。

WebSocketセッションIDをDistributedCacheに格納する

さぁ、ここからはNiFi標準で使えるDistributedCacheという仕組みを利用してみましょう。これはNiFi上で動くオンメモリのKVSみたいなものです。もっと簡単に言ってしまうと、連想配列です。任意のキーで任意の値を保存でき、フローの色々な場所から共有できます。

まずはDistributedCacheサーバを立てましょう。UIの左側にあるOperateパレットの設定アイコンをクリックすると、ControllerServiceの一覧画面が表示されます:

image

image

ここには、先程作成したJettyWebSocketServerが表示されていますね。プラスアイコンをクリックして、DistributedMapCacheServerを追加しましょう:

image

image

DistributedMapCacheServerの設定はデフォルトのままで大丈夫です。続いてこのキャッシュサーバに接続するクライアントのDistributedMapCacheClientを追加します。Server Hostnameにはlocalhostと設定しましょう:

image

作成したキャッシュサーバとクライアントのサービスを有効化しておきましょう:

image

キャッシュサーバの準備ができたのでデータを格納しましょう。WebSocketクライアントが接続した時に生成されたセッションIDを捕獲します。キャッシュサーバにはFlowFileのコンテンツ部分が格納されるので、websocket.session.idのAttribute値を、コンテンツ部分に抽出する必要があります。

これにはReplaceTextプロセッサを使います。このプロセッサは、テキスト形式のFlowFileのコンテンツ内文字列を置換するのに利用できます。Replacement StrategyAlways Replaceを、Replacement Value${websocket.session.id}と設定します。これはNiFiエクスプレッションの式で、FlowFileのAttributeの値を返します。詳細はまたの機会に。。この設定で、該当のAttribute値でFlowFileコンテンツの値を置換します。

image

PutDistributedMapCacheプロセッサを追加してReplaceTextの出力をつなぎましょう。キャッシュのキーは固定値でwebsocket.session.idとしました。先程作成したDistributedMapCacheClientServiceも指定します:

image

長くなりましたが、ここまで手順にそって進めていくと、次の図のようなフローになっていると思います。ListenWebSocketconnectedの先端にある青いポイントをドラッグして、接続先をReplaceTextに移しましょう。こんなことができるのもNiFiならではですね!

image

ReplaceTextPutDistributedMapCachefailure relationshipはAuto Terminateしています。出来上がったフローはこの通り:

image

これで生成されたWebSocketのセッションIDがキャッシュに格納できました!

WebSocketクライアントにKafkaから取得したメッセージをPushする

さあ、WebSocketクライアントにメッセージをPushしてみましょう!使うプロセッサはPutWebSocketです。設定を見てみましょう、デフォルトでは次のようになっています:

image

Session Id, ControllerService Id, Endpoint Idはそれぞれ、渡ってきたFlowFileから解決するようになっています。これは複数のクライアントを相手にする場合に効果を発揮しますが、今回は単一クライアントでのプロトタイプなので、固定値を設定してしまいましょう。

ControllerService一覧からJettyWebSocketServerのインフォメーションアイコンをクリックして、SETTINGSタブにあるIdをコピーします:

image

image

コピーしたControllerServiceのIdをPutWebSocketプロセッサのWebSocket ControllerService Idに設定します。WebSocket Endpoint IdListenWebSocketプロセッサが待ち受けているPathを指定すれば良いので、/realtime-dataとしましょう:

image

あとは、WebSocket Session Idがあれば、WebSocketの接続相手にメッセージを返すことができます。

セッションIDはDistributedCacheに保存したのでしたね。取り出すために、FetchDistributedMapCacheを配置します。Cache Entry Identifierには固定値でwebsocket.session.idを指定しましょう。Distributed Cache ServiceにはセッションIDを格納するときにも利用したControllerServiceを再利用できます。

キャッシュから取得したセッションIDを、websocket.session.idというAttributeに保存するために、Put Cache Value in Attributeを設定します。
これで、Kafkaから受信したメッセージをFlowFileのコンテンツに残したまま、さらにWebSocketのセッションIDをAttributeに付加することができます。

image

これまで解説したプロセッサ群はWebSocket ServerというProcessor Group内で定義していました。ConsumeKafkaはルートProcess Groupにあります。Process Group間でFlow Fileをやりとりするために、Input Portを先頭に追加しましょう。渡ってきたFlow FileをクライアントにPushするので、Input Portの名前はpushとしました。

pushFetchDistributedMapCachePutWebSocketをスタートしておきましょう:

image

それでは、ルートProcess Group、NiFi Flowに戻って、ConsumeKafka_0_10から、WebSocket Serverへと、フローをつなげます:

image

では、Kafkaにコンソールからconsole-producerでメッセージを送信してみましょう。同時にEcho TestでメッセージがWebSocket経由で受信できるかも確認します:

image

おお!つながりましたね!

Zeppelinにグラフを表示する

いよいよ、Zeppelinを使う時がきました!ZeppelinのUIから、新しいノートブックを作成しましょう。名前は何でも良いです:

image

試しに、Zeppelinで円グラフを表示してみましょう。下図のようなコードを入力し、スタートすると、円グラフが表示されます:

image

これはkey, valueを持つ二次元の表形式のデータをZeppelinに渡しているコードです。すると、Zeppelinではテーブルや棒、円グラフなどで表示することができるのですね。

今回、WebSocketで受信したデータをこのような形で渡せればよかったのですが、方法がわかりませんでした。とりあえず%angularインタプリタを使えば出力を自在にカスタマイズできるので、この手法でいきます。誰かもっと簡単な方法をご存じの方は教えてください。

Angularインタプリタで任意のJavascriptを実行

Zeppelinでは、インタプリタが多数用意されていて、先頭の%angularのように指定すると、記述するコードを実行するエンジンを切替えることができます。

今回はAngularを使ってJavascriptのコードを書いていきましょう。といってもAngular自体はあんまり使いませんが。。

試しに、以下のコードを実行してみましょう。scriptタグに実行するJavascriptコードを、出力結果を表示するためのdivを用意して実行すると、下図のようになります:

image

D3で円グラフを描画する

さて、それではD3を使って円グラフを描画してみましょう。結構長いコードですが、以下をZeppelinにコピペしてみてください。graphIdは同じノートブック内に複数のパラグラフを作成する場合に重複しないように連番をふってます:


%angular

<script>
var graphId = '#realtime-graph-2';
var w = 600;
var h = 400;
var r = Math.min(w, h) / 2;
var color = d3.scale.category20c();

var svg = d3.select(graphId).append('svg')
            .attr('width', w)
            .attr('height', h)
            .append('g')
            .attr('transform', 'translate(' + r + ',' + r + ')');
            
var pie = d3.layout.pie()
            .sort(null)
            .value(function(d) {return d.value});

var arc = d3.svg.arc()
            .outerRadius(r);


var updatePiChart = function(dataPoints) {
    
    var arcs = svg.selectAll('.slice')
        .data(pie(dataPoints));
        

    // Add new slices if needed.
    var g = arcs.enter()
        .append('g')
        .attr('class', 'slice');
    g.append('path');
    g.append('text')
        .attr('text-anchor', 'middle');

    // Update existing and added slices.
    arcs.select('path')
        .attr('fill', function(d, i){
            return color(i);
        })
        .attr('d', function(d) {
            return arc(d);
        });
        
    arcs.select('text')
        .attr('transform', function(d) {
            d.innerRadius = 0;
            d.outerRadius = r;
            return 'translate(' + arc.centroid(d) + ")";
        })
        .text(function(d){
            return d.data.label;
        });
    
    // Remove exitted slices.
    arcs.exit().remove();
}

</script>

<div id="realtime-graph-2">
</div>

D3の部分、ややこしいので解説をしたいのですが、またの機会に。。updatePiChart関数に表示するデータを渡すと、円グラフを描画するようにしました。Zeppelinでこのコードを実行すると、updatePiChart関数が作成されるので、WebブラウザのDevToolから試しに実行してみます。

image

dataPointsは次のような配列です。labelvalueを持ったオブジェクトを格納しています:

[{"label": "a", "value": 1},
 {"label": "b", "value": 2},
 {"label": "c", "value": 3}]

うまくグラフが表示されましたか? dataPointsの中身を変えて、updatePiChartを実行すると、円グラフが更新されると思います。

ここにWebSocketから受信したデータを流し込んでやればOKですね!

WebSocketでNiFiと接続する

先程のコードに、WebSocketでNiFiと接続するコードを追加します。必要なコードは以下のようになります:

%angular

<script>
var graphId = '#realtime-graph-2';

// ...

var updatePiChart = function(dataPoints) {
 // ...
}


// ここから追加
var wsUri = "ws://localhost:9091/realtime-data";

websocket = new WebSocket(wsUri);
websocket.onopen = function(evt) {console.log('connected')};
websocket.onerror = function(evt) {console.log('ERR', evt)};
websocket.onmessage = function(evt) {
    console.log(evt)
    var dataPoints = JSON.parse(evt.data);
    updatePiChart(dataPoints);
};
</script>

<div id="realtime-graph-2">
</div>

Kafka -> NiFi -> (WebSocket) -> Zeppelin -> D3!!

これですべての準備が整いました!Zeppelinでパラグラフを実行してWebSocketでNiFiに接続します。

その後、console-publisherを利用してKafkaにdataPointsのJSON文字列を送信してみましょう。

NiFiがWebSocketサーバとなり、見事にリアルタイムグラフ更新ができました!

image

複数のWebSocketクライアントに対応する

今回の記事では、話を簡単にするために、単一クライアントのみに対応すれば良いことにしました。WebSocketセッションIDをDistributedCacheに格納してKafkaからメッセージを受信した際に利用しました。

これだと、別のクライアントがWebSocketで接続した際にキャッシュ内のWebSocketセッションIDが更新されてしまい、複数のクライアントでは利用できません。

一つの解として、NiFiのフローで次のようなループを実装する手があります:

WebSocketクライアントが接続すると、このループをぐるぐると回ります。Kafkaから受信したJSONデータを受信時のタイムスタンプと共に、DistributedCacheに格納しておきます。

フローをループするFlow Fileには、WebSocketのセッションID、ControllerService Id、Endpoint Id、そして、最終Pushタイムスタンプを持たせます。

次にキャッシュを確認したときに前回よりも新しい情報であれば、WebSocketクライアントにPushし、そうでなければ何もせずループを継続します。

このようにフローを構築すれば、クライアントが接続してくる度にFlow Fileが作成され、クライアント毎の情報はそのFlow File内に保持されるため、複数のクライアントに対応することができます。

複数クライアント対応版のNiFiテンプレートファイルは、Realtime Data Visualization with Zeppelin via NiFi WebSocketのGistで公開しておきました。ご興味のある方はダウンロードしてNiFiにインポートして中身を探ってみてください。

まとめ

今回のチュートリアルでは、色々なOSSを組み合わせてリアルタイムのグラフを表示する仕組みを構築してみました。

NiFiをデータ収集ツールとして使うだけでなく、NiFiから何かを発信することもできてしまいます。しかもサーバ側では一切コーディングをせず、NiFiフローの構築だけでここまでできました。

年末年始、何か新しいもの触ってみたいなー、という方は是非お試しくださいませ!

19
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
19
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?