Theme
今回、以下を参考に新しいバージョンでのNifi, Kafka, Stormをつかった検証を行っていきたいと思います。
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Apache-NiFi-Dataflow-Automation-Concepts
主に、今回はNifiのパートになります。
Apache NiFiとは
システム間のデータフローを自動化し管理するためのOSS
Nifi 基本操作
login
NifiはNifiを動かしているサーバの9090portを使用します。
Process Groupの作成
赤く囲った部分をドラッグして、方眼上に持っていき、リリースしてください。
今回、名前をHTTP APIとします。
ADDを押し、少し時間を置くと以下のようにProcess Groupが生成されます。
そうしたら、ダブルクリックをして中に入ってください。
処理を定義
HTTP APIに入ると、左下のOperatorがHTTP APIに変わります。
HandleHttpRequestプロセッサの追加
検索場所にてHandleと打つと、HandleHttpRequestというのがあるので、それをダブルクリックします。
(追加後)
上の様に表示されましたら、追加したHandleHttpRequestプロセッサを右クリックし、Configureを選択します。
Propertyタブに移動し、Listening Portを9095にします。
その後、APPLYを選択します。
ReplaceTextの追加
こちらでは、HTTPレスポンスで返すボディの文字列を設定します。
前と同様に、Processorを追加します。
追加されたProcessorに対して、右クリックし、Configureを選択します。
Propertyタブに移動し、Replacement Valueを{"Result": "Succeeded"},
Replacement StrategyをAlways Replace
に変更し、APPLYを選択します。
HandleHttpResponseプロセッサの追加
次は、HandleHttpResponceプロセッサを追加します。
(追加後)
そうしたら、HandleHttpResponseのstatusコードを設定していきます。
HandleHttpResponce上で右クリックをし、Configureを選択します。
その後、PROPERTIESにてHTTP Status Codeを決め打ちで202(Accepted)
にします。
3つのプロセッサの接続
NiFiのデータフローは、プロセッサ同士をRelationshipでつなぐことで流れを生成します。
プロセッサの中心からマウスをドラッグし、接続先のプロセッサでドロップすることで、つなぐことができます。
From ProcessorとTo Processorを確認し、For relationshipsはSuccessを選択し、ADDを選択します。
(接続後)
LogAttributeにFailureを流す
プロセッサの処理が失敗した場合、渡ってきたFlowFileはRelationshipのfailureに流されます。
失敗内容が確認できるように、LogAttributeプロセッサを追加します。
(追加後)
その後、'Relationship'で接続します。
不要なRelationshipのAuto-Terminate
NiFiの内部を流れるFlowFileは、行き先がなくなる (終点までたどり着く) と、削除されます。
今回の設定では、HandleHttpResponseとLogAttributeにおいて、処理がSuccessになった後の処理がありません。
なので、そのあとはNifiFlowを自動で消去する設定を入れてあげます。
以下のRelationshipに関して、プロセッサで右クリックをし、Configureを選択します。'SETTINGSタブからAutomatically Terminate Relationshipsにおいて、success`にチェックを入れます。
- HandleHttpResponse: success
- LogAttribute: success
Http Context Mapの作成
HandleHttpRequest上で右クリックをし、Configureを選択します。
その後PROPERTIESタブのHttp Context Mapのプルダウンより、Create new service...を選択します。
すると、次の画面のようになります。
CREATEを選択します。
その後、APPLYを選択すると、以下の画面になります。
右側、真ん中の雷のようなアイコンを選択すると以下の画面が表示されるので、ENABLEを選択します。
同様にHandleHttpRequestのHTTP Context Mapでも同じControllerServiceを指定します。
これでプロセッサに必要な設定が完了します。
フローの開始
左側にあるOperateの再生ボタンを選択し、フローを開始します。
一度、方眼上でクリックをし、特定のプロセッサから選択を外してから行ってください。
全て起動が成功すると、以下の様になります。
cURLで簡単なテスト
curl -i -X POST -H "Content-type: application/json" -d '{"name": "Nifi", "process": "test"}' localhost:9095
HTTP/1.1 202 Accepted
Date: Mon, 14 Oct 2019 10:47:51 GMT
Transfer-Encoding: chunked
Server: Jetty(9.4.11.v20180605)
{"Result": "succeeded"}
NifiでもIn/Outから、処理の流れが確認できます。
データの変換
プロセッサの作成
POSTで受信したデータをKafkaに登録する前に整形していきます。
次のプロセッサを追加します:
-
EvaluateJsonPath: FlowFileのcontentをJSONとしてパースし、JSONPathで任意の要素の値を抽出します。
抽出結果をFlowFileのAttributeに保存します。 -
ReplaceText: Kafkaに登録する際、FlowFileのContentがメッセージのvalueとなります。
抽出したJSON内の値を再びFlowFileのContentに戻します。
EvaluateJsonPath
Processの追加をします。
(追加後)
その後、右クリックでConfigureを選択します。
PROPERTIESにて以下の設定をしていきます。
以下2つは、右上の+ボタンを押して追加していきます。
- message.key
$.name - message.value
$.age
これによって、NifiFlowの中にmessage.key: $.name(つまりJsonでのnameに対するvalue)というkey-value型のデータが格納されます。
message.valueも同様です。
ReplaceText
Processの追加をします。
(追加後)
その後、右クリックでConfigureを選択します。
PROPERTIESにて以下の設定をしていきます。
-
Replacement Value
${message.value}
これは、$で始まっていますが、Apache NiFi Expression Language Guideというもので、JsonPathではありません。
Output Portの追加
以下の箇所から、Output Portを作成し、名前を入力してADDを選択します。

processの接続
今回作成したprocessを接続して、前回したテストを実行してみます。
EvaluateJsonPathからReplaceTextへのRelationshipでは、matchedを選択します。
では、起動してテストをしてみます。
テスト結果
curl -i -X POST -H "Content-type: application/json" -d '{"name": "Nifi", "process": "test"}' localhost:9095
HTTP/1.1 202 Accepted
Date: Mon, 14 Oct 2019 13:21:56 GMT
Transfer-Encoding: chunked
Server: Jetty(9.4.11.v20180605)
{"Result": "succeeded"}
先ほどと同様の結果です。
では、今回作ったフローの流れはどうなっているかというと
フローのIn/Outはそれぞれ1になっています。
Output Portの前のConnectionの上で右クリックしてList Queueを選択してみます。
すると、下の様にQueueの中に先ほどのNifiFlowが格納されています。

次回
次回では、このNifiからKafkaに対してメッセージを送りたいと思います。





































