Posted at

NiFiのAPIを叩いてプログラマブルにフローを制御する

More than 1 year has passed since last update.

Apache NiFi(以下NiFi、読み:ないふぁい)は、Apache KafkaApache Stormと共に、データの収集、ストリーミング分析用途のオープンソースをまとめたHortonworks DataFlow(HDF)というパッケージに含まれています。

NiFiにはRDBMSやAWS, Azureなどのクラウド、Hadoopエコシステムなど、多様なデータストアに対応したProcessorという部品が用意されていて、プログラミングなしでシステム間のデータ連携が可能です。


サンプルデータフロー「郵便番号のナウ」

Advent Calendar向けにいくつか記事を書く予定があるので、サンプルのデータフローを作りました。このフローの詳細については、別の日に紹介しますね。

郵便番号検索APIのZipCloudと、天気情報APIのOpenWeatherMapを使って、郵便番号に関連する情報を収集してJSONファイルに保存するデータフローです。いわゆる情報の「エンリッチ」ってやつです。

※OpenWeatherMapについては無料天気予報APIのOpenWeatherMapを使ってみるを参考にさせていただきました。

こちらがNiFiのデータフローです。ProcessGroupにまとめて、「調べる」というInputPortを用意し、親のフローから呼べるようにしてあります。


本日のお題、NiFiのAPIを活用する

さて、NiFiのWeb UIは非常にリッチで、いろんなことができるんですが、UIの操作が必要ということは自動化できないのでは?という質問を良く伺います。

質問に対する答えは、「できます!NiFiのREST APIを使います!」です。

ブラウザからNiFiのWeb UIで実行する操作はすべて、NiFiのREST APIを呼び出しています。これをスクリプトで実行することで、自動化が可能なのです。


一発屋フローを実装する

NiFiの自動化が求められる理由の一つに、既存のETLが行うようなJob単位での実行をしたいというのがあります。NiFiはフローをずっと起動しておき、ストリーミング形式でイベントドリブンな処理を行うため、ETLの様なJobスケジューラに慣れ親しんだ方は多少戸惑うようです。

「どこかからデータを取得して、ちょっと加工して、このデータベースに保存して終わる」という一発屋フローは、NiFiではフローの操作なしではできないのです。

「どこかから新しいデータを取得して、ちょっと加工して、このデータベースに保存する」というフローがずーっと稼働しているのがNiFiです。

具体的な例を示しましょう。

前述の「郵便番号のナウ」をテストしたいとしましょう。入力のFlowFileを一回だけ渡して、その結果を確認したいです。その場合、GenerateFlowFileを使ってテスト用のFlowFileを生成することができます:

このGenerateFlowFileをスタートするのですが、問題はNiFiのスケジューラです:

NiFiでは、Scheduling StrategyをTimer drivenにしておくと、Run Scheduleの設定間隔でスケジューリングされます。デフォルトでは0 secとなっており、スレッドに空きがあれば常に動くことになります。

今回の目的では、一度だけ実行したいので、1dとし、次に実行されるのは一日後としました。この状態でスタートすれば、テスト用のFlowFileを一個だけ生成し、次の日まで何もしないことができます。

つまり、そのままにしておくと、明日またテストが動いてしまうのです。また、二回目以降のテストを同日に実行したいなら、一度GenerateFlowFileプロセッサをStopして、またStartして、という操作が必要になります。

「1回だけ動くJob的な処理? それってNiFiっぽくないっすね」

というイメージが伝わったら幸いです。


NiFiのREST APIで解決

それでも、いろいろな都合で一発屋を実装したり、自動化したい場合があるでしょう。その場合はREST APIを使いましょう。

NiFi Web UIをブラウザで操作しながら、ブラウザの開発者ツールを使うと、どの操作をするとどんなHTTPリクエストが飛んでいるのかを観察することができます。

「テスト開始」のGenerateFlowFileを右クリックしてStartすると、http://localhost:8080/nifi-api/processors/e0f759d1-0158-1000-3ec4-997043625a4cにPUTリクエストを送っています:

ブラウザから送信されたリクエストのペイロードを確認すると、state: "RUNNING"がJSONの中に含まれています。これでプロセッサの起動/停止状態を操作しているわけですね。

ちなみに、REST APIのエンドポイント一覧はNiFiのREST APIドキュメントに記載されています。

これをcURLコマンドとかで叩いても何とかなりますが、もう少し楽に実行できると良いですね。


JavaScript向けのAPIクライアントを作ってみました

私もNiFiのテスト自動化など、REST APIを使うことが多いので、Node.jsで自動化スクリプトを書いて実行するためのAPIクライアントをつくってみました。

使えるFunctionの一覧はこのあたりのコードを見てください。

使い方:

# Githubからプロジェクトをクローン

git clone https://github.com/ijokarumawak/nifi-api-client-js.git

# インストール
$ cd nifi-api-client-js
$ npm install

/tmpにプロジェクトをクローンしたとして、これを使うJSファイルを作成します:

/tmp/test-zipcode-now.js

/tmp/nifi-api-client-js (クローンしたAPIクライアント)

test-zipcode-now.jsの内容はこんな感じにします:

// クライアントライブラリを読み込む

var nifiApiClient = require('./nifi-api-client-js');

// 操作するNiFiの接続先設定
var conf = {
host: 'localhost',
port: 8080
};

// クライアントを初期化
var nifiApi = new nifiApiClient.NiFiApi(conf);

// 引数でstart/stopを指定
var running = (process.argv.length > 2 && 'start' === process.argv[2]);

// ProcessorのUUIDを指定して操作
nifiApi.updateProcessorState('e0f759d1-0158-1000-3ec4-997043625a4c', running, (err) => {
if (err) return console.log('Failed.', err);
console.log('Processor is now', running ? 'running' : 'stopped');
});

そして、以下のコマンドで、Processorの起動停止ができました!

# スタート!

$ node test-zipcode-now.js start
Processor is now running

# ストップ!
$ node test-zipcode-now.js stop
Processor is now stopped

NiFiのUIを確認すると起動できたのが確認できます。ちなみに、プロセッサのUUIDは対象をクリックすると、Operateパレットに表示されます:


まとめ

いかがでしたでしょうか。NiFiのREST APIを使うとどんなことができるのか、、UIでできることはすべてできますね!この仕組を使って、NiFiのシステムテストを実施するプロジェクトも個人的に作っています。参考になれば幸いです。