0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Nifi + Kafka + Strom を用いたデータ処理ハンズオン(2) ~Kafkaへのメッセージ送信~

Posted at

Theme

今回は前回に引き続き、以下を参考に進めていきます。
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Apache-NiFi-Dataflow-Automation-Concepts

前回のを見てない場合は、こちらからご確認いただければと思います。
Part1

今回はKafkaについて見ていきます。

Kafkaとは

image.png

Kafkaは大量のメッセージを高速に扱うことができる分散メッセージシステムで、メッセージングキューとして必要な様々な機能があります。

大まかに分けて、Producer(書き込み)、Topic(Queue)、Consumer(読み込み)で構成されます。
ProducerがTopicに書き込み、ConsumerはTopicにデータをとりに行きます。

Reporting ProcessGroupの作成

今回は、前回までのHTTP APIとは別にReportingというProcess Groupを作成します。

参考記事からそのまま言葉をお借りしますが、

単一のグループで作業を進め、大きなデータフローになってくると、管理が煩雑になってしまいます。 チュートリアルでは、以下の2つに分けています:

  • データを外部から収集し、共通のフォーマットに変換する部分
  • 共通のフォーマットのデータを入力として、Kafkaにメッセージ登録を行う部分

こうすることで、HTTP以外の、TCPやMQTTなどでメッセージを受信するルートを増やす際に、変換部分のみを実装すれば良くなります。

image.png

Processの配置

Input Portの追加

他のProcessGroupからFlowFileを渡すために、ReportingグループへInput Portを追加していきます。
image.png

(追加後)
image.png

PublishKafka_2_0の追加

PublishKafkaに関しては、自分の環境(kafkaのversion)にあったものを設定してください。
私の環境は、ambari2.7.3で入れたものになるので、2.0となっています。
image.png

前回同様、ConfigureからPROPATIESに行き、以下を設定します。

  • Kafka Brokers
    localhost:6667

  • Topic Name
    input

  • Delivery Guarantee
    Guarantee Replicated Delivery

  • Kafka Key
    ${message.key}

Kafkaメッセージはkeyとvalueを持っています。 Keyには、渡ってきたFlowFileのmessage.key Attributeを利用します。
Valueには、FlowFileのcontentが渡されます。

kafka BrokersのPortについては、ambariからkafkaをインストールした場合は、
Kafka > Configsから確認できます。

image.png

SETTINGSタブでは、Automatically terminate relationshipsにおいてsuccessにチェックを入れます。

APPLYを選択した後、LogAttributeも設置し、すべてをConnectionを作成しましょう。
image.png

HTTP APIからReportingへデータを流し込む

ルートProcessGroupへと戻り、HTTP APIからReportingへとRelationをつなぎます。
こうすることで、ProcessGroup間はInput PortとOutput Portでデータの連携が可能になります。
image.png

Console Consumerで確認しながらテスト

サーバにSSHでログインし、以下のコマンドでConsole Consumerを起動します。

cd /usr/hdp/current/kafka-broker
./bin/kafka-console-consumer.sh --topic input --bootstrap-server localhost:6667 --new-consumer

別terminalで以下を実施します。

# curl -i -X POST -H "Content-type: application/json" -d '{"name": "C", "age": 20}' localhost:9095

#### Postした側 ####
HTTP/1.1 202 Accepted
Date: Tue, 15 Oct 2019 13:59:47 GMT
Transfer-Encoding: chunked
Server: Jetty(9.4.11.v20180605)

{"Result": "succeeded"}

#### consumer ####
20

Ctrl+Cを押すと

Processed a total of 1 messages

と表示されると思います。

これで、Postしたデータに変更を加えて、Kafkaに送信し、そのメッセージを受け取るまでのフローをNifiから操作できました。

次回

次は、このメッセージをStormに送ってリアルタイム分析をしていきます。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?