Theme
今回、以下を参考に新しいバージョンでのNifi, Kafka, Stormをつかった検証を行っていきたいと思います。
https://github.com/ijokarumawak/hdf-tutorials-ja/wiki/Apache-NiFi-Dataflow-Automation-Concepts
主に、今回はNifiのパートになります。
Apache NiFiとは
システム間のデータフローを自動化し管理するためのOSS
Nifi 基本操作
login
NifiはNifiを動かしているサーバの9090
portを使用します。
Process Groupの作成
赤く囲った部分をドラッグして、方眼上に持っていき、リリースしてください。
今回、名前をHTTP APIとします。
ADDを押し、少し時間を置くと以下のようにProcess Groupが生成されます。
そうしたら、ダブルクリックをして中に入ってください。
処理を定義
HTTP APIに入ると、左下のOperator
がHTTP API
に変わります。
HandleHttpRequestプロセッサの追加
検索場所にてHandle
と打つと、HandleHttpRequest
というのがあるので、それをダブルクリックします。
(追加後)
上の様に表示されましたら、追加したHandleHttpRequestプロセッサを右クリックし、Configure
を選択します。
Property
タブに移動し、Listening Port
を9095
にします。
その後、APPLYを選択します。
ReplaceTextの追加
こちらでは、HTTPレスポンスで返すボディの文字列を設定します。
前と同様に、Processorを追加します。
追加されたProcessorに対して、右クリックし、Configure
を選択します。
Property
タブに移動し、Replacement Value
を{"Result": "Succeeded"}
,
Replacement Strategy
をAlways Replace
に変更し、APPLYを選択します。
HandleHttpResponseプロセッサの追加
次は、HandleHttpResponceプロセッサを追加します。
(追加後)
そうしたら、HandleHttpResponseのstatusコードを設定していきます。
HandleHttpResponce上で右クリックをし、Configure
を選択します。
その後、PROPERTIES
にてHTTP Status Code
を決め打ちで202(Accepted)
にします。
3つのプロセッサの接続
NiFiのデータフローは、プロセッサ同士をRelationship
でつなぐことで流れを生成します。
プロセッサの中心からマウスをドラッグし、接続先のプロセッサでドロップすることで、つなぐことができます。
From Processor
とTo Processor
を確認し、For relationships
はSuccess
を選択し、ADDを選択します。
(接続後)
LogAttributeにFailureを流す
プロセッサの処理が失敗した場合、渡ってきたFlowFileはRelationship
のfailure
に流されます。
失敗内容が確認できるように、LogAttributeプロセッサを追加します。
(追加後)
その後、'Relationship'で接続します。
不要なRelationshipのAuto-Terminate
NiFiの内部を流れるFlowFileは、行き先がなくなる (終点までたどり着く) と、削除されます。
今回の設定では、HandleHttpResponse
とLogAttribute
において、処理がSuccessになった後の処理がありません。
なので、そのあとはNifiFlowを自動で消去する設定を入れてあげます。
以下のRelationship
に関して、プロセッサで右クリックをし、Configure
を選択します。'SETTINGSタブから
Automatically Terminate Relationshipsにおいて、
success`にチェックを入れます。
- HandleHttpResponse: success
- LogAttribute: success
Http Context Mapの作成
HandleHttpRequest上で右クリックをし、Configure
を選択します。
その後PROPERTIES
タブのHttp Context Map
のプルダウンより、Create new service...
を選択します。
すると、次の画面のようになります。
CREATE
を選択します。
その後、APPLYを選択すると、以下の画面になります。
右側、真ん中の雷のようなアイコンを選択すると以下の画面が表示されるので、ENABLE
を選択します。
同様にHandleHttpRequestのHTTP Context Mapでも同じControllerServiceを指定します。
これでプロセッサに必要な設定が完了します。
フローの開始
左側にあるOperate
の再生ボタンを選択し、フローを開始します。
一度、方眼上でクリックをし、特定のプロセッサから選択を外してから行ってください。
全て起動が成功すると、以下の様になります。
cURLで簡単なテスト
curl -i -X POST -H "Content-type: application/json" -d '{"name": "Nifi", "process": "test"}' localhost:9095
HTTP/1.1 202 Accepted
Date: Mon, 14 Oct 2019 10:47:51 GMT
Transfer-Encoding: chunked
Server: Jetty(9.4.11.v20180605)
{"Result": "succeeded"}
NifiでもIn/Outから、処理の流れが確認できます。
データの変換
プロセッサの作成
POSTで受信したデータをKafkaに登録する前に整形していきます。
次のプロセッサを追加します:
-
EvaluateJsonPath: FlowFileのcontentをJSONとしてパースし、JSONPathで任意の要素の値を抽出します。
抽出結果をFlowFileのAttributeに保存します。 -
ReplaceText: Kafkaに登録する際、FlowFileのContentがメッセージのvalueとなります。
抽出したJSON内の値を再びFlowFileのContentに戻します。
EvaluateJsonPath
Processの追加をします。
(追加後)
その後、右クリックでConfigure
を選択します。
PROPERTIES
にて以下の設定をしていきます。
以下2つは、右上の+ボタン
を押して追加していきます。
- message.key
$.name - message.value
$.age
これによって、NifiFlowの中にmessage.key: $.name(つまりJsonでのnameに対するvalue)
というkey-value型のデータが格納されます。
message.value
も同様です。
ReplaceText
Processの追加をします。
(追加後)
その後、右クリックでConfigure
を選択します。
PROPERTIES
にて以下の設定をしていきます。
-
Replacement Value
${message.value}
これは、$で始まっていますが、Apache NiFi Expression Language Guideというもので、JsonPathではありません。
Output Portの追加
以下の箇所から、Output Portを作成し、名前を入力してADD
を選択します。
processの接続
今回作成したprocessを接続して、前回したテストを実行してみます。
EvaluateJsonPathからReplaceTextへのRelationshipでは、matched
を選択します。
では、起動してテストをしてみます。
テスト結果
curl -i -X POST -H "Content-type: application/json" -d '{"name": "Nifi", "process": "test"}' localhost:9095
HTTP/1.1 202 Accepted
Date: Mon, 14 Oct 2019 13:21:56 GMT
Transfer-Encoding: chunked
Server: Jetty(9.4.11.v20180605)
{"Result": "succeeded"}
先ほどと同様の結果です。
では、今回作ったフローの流れはどうなっているかというと
フローのIn/Outはそれぞれ1になっています。
Output Portの前のConnectionの上で右クリックしてList Queue
を選択してみます。
すると、下の様にQueueの中に先ほどのNifiFlowが格納されています。
次回
次回では、このNifiからKafkaに対してメッセージを送りたいと思います。