LoginSignup
10
3

More than 5 years have passed since last update.

NiFi 1.1.0でWebSocketに対応したZ!

Posted at

みなさんこんばんは、最近子供と閲覧している某TVアニメのオープニング曲に若干感化されたタイトルとなっております、Apache NiFiコミッタのNiFiおじさんです。いくZ!

Hortonworks Advent Calendar 2016、12/3のポストとして、昨日リリースされたNiFi 1.1.0から、私が担当したWebSocket対応についてご紹介しまーす。

※本投稿の内容は該当機能追加のプルリクをReviewしてもらいやすくするために書いた私物ブログの焼き直しです

WebSocketプロトコルの基本

WebSocketはduplexで双方向のプロトコルです。最も個人的に面白いと思う特性として、一度WebSocketのコネクションが確立された後は、サーバやクライアントといった立場がないという点があります。

それぞれが、自由なタイミングでメッセージを送信できるのです。コネクションはいつもWebSocketクライアントからはじまります。

例えば:

  • クライアントがHTTPリクエストをサーバに送信 (URL例: ws://example.com/web-socket)
  • サーバがリクエストを受信し、HTTPプロトコルからWebSocketプロトコルへとアップグレード
  • (この時点で、それぞれのpeerで非同期にメッセージが送受信できるようになる)
  • クライアントがメッセージをサーバに送信
    • サーバがメッセージを受信
  • サーバがメッセージをクライアントに送信
    • クライアントがメッセージを受信

さて、NiFiはクライアントになるべきか、サーバになるべきか? どうせならどちらもできるようにしてしまえ!

どうやって動くの? モジュールの解説

プロトコル自体が他と比べて複雑な(異なる)ため、NiFiのデータフロー処理に落とし込むにはちと苦労しました。複雑すぎると感じる人もいるかもしれませんが、どうやって動くのか説明しましょう。

上図に示すように、拡張性とテストしやすさ性のため、以下の3つのモジュールで構成されています。
これらの3つは独立したNAR (NiFi Archive)となっています。
nifi-websocket-narnifi-websocket-service-jetty-narnifi-websocket-service-api-narに依存しています。

nifi-websocket-services-api

  • WebSocketClientService: WebSocketクライアントとして振る舞うInterface。
  • WebSocketServerService: WebSocketサーバとして振る舞うInterface。
  • 機能:
    • WebSocketイベント: クライアント、サーバの両サービスではどちらもイベント・ドリブン的にWebSocketプロセッサが動作します。
    • connected: リモートクライアントがWebSocketServerServiceに接続する、または、WebSocketClientServiceがリモートのWebSocketサーバに接続したときに発動するイベント
    • text message: テキストのWebSocketメッセージ受信時に発動
    • binary message: バイナリのWebSocketメッセージ受信時に発動
    • 複数のエンドポイント: これらのサーバはプロセッサをエンドポイントに登録します。WebSocketServerServiceはURIパスをエンドポイントとして利用します。例えば、同一のWebSocketサーバインスタンスは2つのWebSocketエンドポイントとして、ws://hostname:listen-port/endpoint-1ws://hostname:listen-port/endpoint-2を登録できます。同様に、WebSocketClientServiceはclientIdを利用してエンドポイントを識別します。複数のWebSocketクライアントインスタンスは同一のWebSocketClientServiceインスタンスを共有できます。

nifi-websocket-services-jetty

このモジュールはnifi-websocket-services-apiのJettyを使った実装を格納しています。

  • 機能:
    • プレインなWebSocket(ws://)と、セキュアなWebSocket(wss://)プロトコルをサポート
    • SSLContextServiceを利用して、Javaキーストアとトラストストアをセキュアな接続に利用

nifi-websocket-processors

これらの機能をNiFiのデータフローとして利用するには、プロセッサとしてキャンバスに配置する必要があります。

  • ConnectWebSocketListenWebSocket: これらはWebSocketゲートウェイとして動作します。これらのプロセッサはWebSocketServerに登録して前述のWebSocketイベントを受信します。発動したイベントは、NiFiのFlowFileに変換され、対応するrelationshipへと流れます。Relationshipはconnectedtext messagebinary messageの3種類があります。ConnectWebSocketはWebSocketClientServiceを利用して自発的にリモートのWebSocketエンドポイントに接続します、一方、ListenWebSocketはWebSocketServerServiceを利用して、リモートのWebSocketクライアントが接続してくるのを待ち受けます。
  • PutWebSocket: このプロセッサはConnectWebSocket、ListenWebSocketどちらとも組み合わせて利用できます。コネクションが確立された後はどちらの区別もないからです。入力のFlowFileコンテンツをメッセージペイロードとして、WebSocketメッセージを送信します。

どうやって使うのさ?

説明はもう十分だZ! これらのコンポーネントをどうやってNiFiデータフローで使うのか見てみましょう!

NiFiをクライアントとして、外部のWebSocketサーバと対話する

NiFiをWebSocketクライアントとするには、WebSocketClientServiceが必要です。サービスを追加するには:

  1. オペレートパレットにあるギアアイコンをクリック
  2. 足すアイコンをクリック
  3. WebSocketとコントローラサービス検索窓に入力
  4. JettyWebSocketClientコントローラサービスの編集アイコンをクリック

そして、サービスを次のように設定します:

  • 5. WebSocket URIにws://echo.wesocket.orgを設定。このURIはWebSocketクライアントテスト用に利用できます。単純に受信したメッセージを返してくれます。
  • 6. 有効化アイコンをクリックすれば、サービスの利用準備OKです!

次に、プロセッサをデータフローに配置しましょう:

  • ConnectWebSocket: 前述のJettyWebSocketClientServiceを使います。connectedtext messageをReplaceTextにつなげています。binary messageはこの例では利用しないので、auto terminateしています
  • ReplaceText: テキストを加工します
  • PutWebSocket: このプロセッサはリモートのWebSocketサーバにメッセージを送信します。Run Scheduleにデフォルトより長い、例えば3 secとかを設定しましょう。さもなくばDoS攻撃になってしまいます。。
  • UpdateAttribute: これはデータフローの終端です。停止状態にしておくと渡ってきたFlowFileを蓄積して内容の確認ができます

success relationshipを右クリックすると、滞留しているFlowFileが確認できます。ReplaceTextが各アイテムにテキストを足しこんで、サイズが増加している様子がわかるでしょう。

NiFiをサーバとして、リモートのWebSocketクライアントと対話する

クライアントとしての使い方がわかったら、NiFiをWebSocketサーバとして使うのは簡単です、ってかほとんど同じです!

単にJettyWebSocketServerコントローラサービスを代わりに利用して、Listen Portを設定します:

そして、ConnectWebSocketプロセッサを、ListenWebSocketプロセッサで置き換えましょう。Server URL Pathを設定してWebSocketリクエストを待ち受けます:

websocket.org echoをブラウザで開き、locationにws:localhost:9001/server-demoを設定して、Connectをクリックした後、何かメッセージをSendしてみましょう。NiFiがメッセージを送り返すはずです!

セキュアなWebSocket接続

セキュアなWebSocket接続を利用するには、もう一つStandardSSLContextServiceコントローラサービスが必要です。そしてJettyWebSocketClientやJettyWebSocketServerから利用します。URLはwss://プロトコルを利用します。

スケーラビリティ

NiFiをクラスタとしてデプロイする場合、WebSocketコンポーネントが各ノード上で動作することになります。NiFiをWebSocketサーバとして利用する際に負荷を分散させるには。HAProxyなどのロードバランサをNiFiクラスタの手前に配置するとよいでしょう。

NiFi Cluster and Load Balancerも参照してください。

まとめ

この投稿では、基本的なWebSocketコントローラサービスとプロセッサの利用方法を解説しました。WebSocketゲートウェイプロセッサ(ConnectWebSocket/ListenWebSocket)とPutWebSocketは分かれていて、間に他のプロセッサを追加することでより複雑なフローを構成することもできます。

ぜひNiFi 1.1.0から利用可能になったこれらのコンポーネントを使って、エキサイティングなNiFiデータフローを作ってみてください :)

10
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
3