Theme
今回は前回に引き続き、以下を参考に進めていきます。
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Apache-NiFi-Dataflow-Automation-Concepts
前回のを見てない場合は、こちらからご確認いただければと思います。
Part1
今回はKafkaについて見ていきます。
Kafkaとは
Kafkaは大量のメッセージを高速に扱うことができる分散メッセージシステムで、メッセージングキューとして必要な様々な機能があります。
大まかに分けて、Producer(書き込み)、Topic(Queue)、Consumer(読み込み)で構成されます。
ProducerがTopicに書き込み、ConsumerはTopicにデータをとりに行きます。
Reporting ProcessGroupの作成
今回は、前回までのHTTP API
とは別にReporting
というProcess Groupを作成します。
参考記事からそのまま言葉をお借りしますが、
単一のグループで作業を進め、大きなデータフローになってくると、管理が煩雑になってしまいます。 チュートリアルでは、以下の2つに分けています:
- データを外部から収集し、共通のフォーマットに変換する部分
- 共通のフォーマットのデータを入力として、Kafkaにメッセージ登録を行う部分
こうすることで、HTTP以外の、TCPやMQTTなどでメッセージを受信するルートを増やす際に、変換部分のみを実装すれば良くなります。
Processの配置
Input Portの追加
他のProcessGroupからFlowFileを渡すために、ReportingグループへInput Portを追加していきます。
PublishKafka_2_0の追加
PublishKafkaに関しては、自分の環境(kafkaのversion)にあったものを設定してください。
私の環境は、ambari2.7.3で入れたものになるので、2.0となっています。
前回同様、ConfigureからPROPATIESに行き、以下を設定します。
-
Kafka Brokers
localhost:6667 -
Topic Name
input -
Delivery Guarantee
Guarantee Replicated Delivery -
Kafka Key
${message.key}
Kafkaメッセージはkeyとvalueを持っています。 Keyには、渡ってきたFlowFileのmessage.key Attributeを利用します。
Valueには、FlowFileのcontentが渡されます。
kafka BrokersのPortについては、ambariからkafkaをインストールした場合は、
Kafka > Configsから確認できます。
SETTINGS
タブでは、Automatically terminate relationships
においてsuccess
にチェックを入れます。
APPLYを選択した後、LogAttributeも設置し、すべてをConnectionを作成しましょう。
HTTP APIからReportingへデータを流し込む
ルートProcessGroupへと戻り、HTTP APIからReportingへとRelationをつなぎます。
こうすることで、ProcessGroup間はInput PortとOutput Portでデータの連携が可能になります。
Console Consumerで確認しながらテスト
サーバにSSHでログインし、以下のコマンドでConsole Consumerを起動します。
cd /usr/hdp/current/kafka-broker
./bin/kafka-console-consumer.sh --topic input --bootstrap-server localhost:6667 --new-consumer
別terminalで以下を実施します。
# curl -i -X POST -H "Content-type: application/json" -d '{"name": "C", "age": 20}' localhost:9095
#### Postした側 ####
HTTP/1.1 202 Accepted
Date: Tue, 15 Oct 2019 13:59:47 GMT
Transfer-Encoding: chunked
Server: Jetty(9.4.11.v20180605)
{"Result": "succeeded"}
#### consumer ####
20
Ctrl+Cを押すと
Processed a total of 1 messages
と表示されると思います。
これで、Postしたデータに変更を加えて、Kafkaに送信し、そのメッセージを受け取るまでのフローをNifiから操作できました。
次回
次は、このメッセージをStormに送ってリアルタイム分析をしていきます。