LoginSignup
2
3

More than 3 years have passed since last update.

Nifi + Kafka + Strom を用いたデータ処理ハンズオン(1) ~Nifiの基礎部分~

Posted at

Theme

今回、以下を参考に新しいバージョンでのNifi, Kafka, Stormをつかった検証を行っていきたいと思います。
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Apache-NiFi-Dataflow-Automation-Concepts

主に、今回はNifiのパートになります。

Apache NiFiとは

image.png

システム間のデータフローを自動化し管理するためのOSS

Nifi 基本操作

login

NifiはNifiを動かしているサーバの9090portを使用します。

image.png

Process Groupの作成

赤く囲った部分をドラッグして、方眼上に持っていき、リリースしてください。

image.png

今回、名前をHTTP APIとします。

ADDを押し、少し時間を置くと以下のようにProcess Groupが生成されます。
そうしたら、ダブルクリックをして中に入ってください。

image.png

処理を定義

HTTP APIに入ると、左下のOperatorHTTP APIに変わります。

image.png

HandleHttpRequestプロセッサの追加

検索場所にてHandleと打つと、HandleHttpRequestというのがあるので、それをダブルクリックします。

image.png

(追加後)

image.png

上の様に表示されましたら、追加したHandleHttpRequestプロセッサを右クリックし、Configureを選択します。

image.png

Propertyタブに移動し、Listening Port9095にします。
その後、APPLYを選択します。

image.png

ReplaceTextの追加

こちらでは、HTTPレスポンスで返すボディの文字列を設定します。
前と同様に、Processorを追加します。

image.png

追加されたProcessorに対して、右クリックし、Configureを選択します。
Propertyタブに移動し、Replacement Value{"Result": "Succeeded"},
Replacement StrategyAlways Replace
に変更し、APPLYを選択します。

image.png

HandleHttpResponseプロセッサの追加

次は、HandleHttpResponceプロセッサを追加します。

image.png

(追加後)

image.png

そうしたら、HandleHttpResponseのstatusコードを設定していきます。
HandleHttpResponce上で右クリックをし、Configureを選択します。
その後、PROPERTIESにてHTTP Status Codeを決め打ちで202(Accepted)
にします。

image.png

3つのプロセッサの接続

NiFiのデータフローは、プロセッサ同士をRelationshipでつなぐことで流れを生成します。
プロセッサの中心からマウスをドラッグし、接続先のプロセッサでドロップすることで、つなぐことができます。

From ProcessorTo Processorを確認し、For relationshipsSuccessを選択し、ADDを選択します。

image.png

(接続後)

image.png

LogAttributeにFailureを流す

プロセッサの処理が失敗した場合、渡ってきたFlowFileはRelationshipfailureに流されます。
失敗内容が確認できるように、LogAttributeプロセッサを追加します。

image.png

(追加後)

image.png

その後、'Relationship'で接続します。

image.png

不要なRelationshipのAuto-Terminate

NiFiの内部を流れるFlowFileは、行き先がなくなる (終点までたどり着く) と、削除されます。
今回の設定では、HandleHttpResponseLogAttributeにおいて、処理がSuccessになった後の処理がありません。
なので、そのあとはNifiFlowを自動で消去する設定を入れてあげます。

以下のRelationshipに関して、プロセッサで右クリックをし、Configureを選択します。'SETTINGSタブからAutomatically Terminate Relationshipsにおいて、success`にチェックを入れます。

  • HandleHttpResponse: success
  • LogAttribute: success

image.png

Http Context Mapの作成

HandleHttpRequest上で右クリックをし、Configureを選択します。
その後PROPERTIESタブのHttp Context Mapのプルダウンより、Create new service...を選択します。

image.png

すると、次の画面のようになります。
CREATEを選択します。

image.png

その後、APPLYを選択すると、以下の画面になります。

image.png

右側、真ん中の雷のようなアイコンを選択すると以下の画面が表示されるので、ENABLEを選択します。

image.png

同様にHandleHttpRequestのHTTP Context Mapでも同じControllerServiceを指定します。
これでプロセッサに必要な設定が完了します。

フローの開始

左側にあるOperateの再生ボタンを選択し、フローを開始します。
一度、方眼上でクリックをし、特定のプロセッサから選択を外してから行ってください。

image.png

全て起動が成功すると、以下の様になります。

image.png

cURLで簡単なテスト

curl -i -X POST -H "Content-type: application/json" -d '{"name": "Nifi", "process": "test"}' localhost:9095
HTTP/1.1 202 Accepted
Date: Mon, 14 Oct 2019 10:47:51 GMT
Transfer-Encoding: chunked
Server: Jetty(9.4.11.v20180605)

{"Result": "succeeded"}

NifiでもIn/Outから、処理の流れが確認できます。

image.png

データの変換

プロセッサの作成

POSTで受信したデータをKafkaに登録する前に整形していきます。

次のプロセッサを追加します:

  • EvaluateJsonPath: FlowFileのcontentをJSONとしてパースし、JSONPathで任意の要素の値を抽出します。
    抽出結果をFlowFileのAttributeに保存します。

  • ReplaceText: Kafkaに登録する際、FlowFileのContentがメッセージのvalueとなります。
    抽出したJSON内の値を再びFlowFileのContentに戻します。

EvaluateJsonPath

Processの追加をします。

image.png

(追加後)

image.png

その後、右クリックでConfigureを選択します。
PROPERTIESにて以下の設定をしていきます。

  • Destination flowfile-attribute image.png

以下2つは、右上の+ボタンを押して追加していきます。
- message.key
$.name
- message.value
$.age

image.png

これによって、NifiFlowの中にmessage.key: $.name(つまりJsonでのnameに対するvalue)というkey-value型のデータが格納されます。
message.valueも同様です。

ReplaceText

Processの追加をします。

image.png

(追加後)

image.png

その後、右クリックでConfigureを選択します。
PROPERTIESにて以下の設定をしていきます。

  • Replacement Value
    ${message.value}
    これは、$で始まっていますが、Apache NiFi Expression Language Guideというもので、JsonPathではありません。

  • Replacement Strategy
    Always Replace
    image.png

Output Portの追加

以下の箇所から、Output Portを作成し、名前を入力してADDを選択します。
image.png

(追加後)
image.png

processの接続

今回作成したprocessを接続して、前回したテストを実行してみます。

EvaluateJsonPathからReplaceTextへのRelationshipでは、matchedを選択します。

image.png

最終的に以下のようになります。
image.png

では、起動してテストをしてみます。
テスト結果

curl -i -X POST -H "Content-type: application/json" -d '{"name": "Nifi", "process": "test"}' localhost:9095
HTTP/1.1 202 Accepted
Date: Mon, 14 Oct 2019 13:21:56 GMT
Transfer-Encoding: chunked
Server: Jetty(9.4.11.v20180605)

{"Result": "succeeded"}

先ほどと同様の結果です。
では、今回作ったフローの流れはどうなっているかというと
フローのIn/Outはそれぞれ1になっています。

Output Portの前のConnectionの上で右クリックしてList Queueを選択してみます。

image.png

すると、下の様にQueueの中に先ほどのNifiFlowが格納されています。
image.png

次回

次回では、このNifiからKafkaに対してメッセージを送りたいと思います。

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3