みなさんこんばんは、最近子供と閲覧している某TVアニメのオープニング曲に若干感化されたタイトルとなっております、Apache NiFiコミッタのNiFiおじさんです。いくZ!
Hortonworks Advent Calendar 2016、12/3のポストとして、昨日リリースされたNiFi 1.1.0から、私が担当したWebSocket対応についてご紹介しまーす。
※本投稿の内容は該当機能追加のプルリクをReviewしてもらいやすくするために書いた私物ブログの焼き直しです
WebSocketプロトコルの基本
WebSocketはduplexで双方向のプロトコルです。最も個人的に面白いと思う特性として、一度WebSocketのコネクションが確立された後は、サーバやクライアントといった立場がないという点があります。
それぞれが、自由なタイミングでメッセージを送信できるのです。コネクションはいつもWebSocketクライアントからはじまります。
例えば:
- クライアントがHTTPリクエストをサーバに送信 (URL例: ws://example.com/web-socket)
- サーバがリクエストを受信し、HTTPプロトコルからWebSocketプロトコルへとアップグレード
- (この時点で、それぞれのpeerで非同期にメッセージが送受信できるようになる)
- クライアントがメッセージをサーバに送信
- サーバがメッセージを受信
- サーバがメッセージをクライアントに送信
- クライアントがメッセージを受信
さて、NiFiはクライアントになるべきか、サーバになるべきか? どうせならどちらもできるようにしてしまえ!
どうやって動くの? モジュールの解説
プロトコル自体が他と比べて複雑な(異なる)ため、NiFiのデータフロー処理に落とし込むにはちと苦労しました。複雑すぎると感じる人もいるかもしれませんが、どうやって動くのか説明しましょう。
上図に示すように、拡張性とテストしやすさ性のため、以下の3つのモジュールで構成されています。
これらの3つは独立したNAR (NiFi Archive)となっています。
nifi-websocket-nar
とnifi-websocket-service-jetty-nar
はnifi-websocket-service-api-nar
に依存しています。
nifi-websocket-services-api
- WebSocketClientService: WebSocketクライアントとして振る舞うInterface。
- WebSocketServerService: WebSocketサーバとして振る舞うInterface。
- 機能:
- WebSocketイベント: クライアント、サーバの両サービスではどちらもイベント・ドリブン的にWebSocketプロセッサが動作します。
-
connected
: リモートクライアントがWebSocketServerServiceに接続する、または、WebSocketClientServiceがリモートのWebSocketサーバに接続したときに発動するイベント -
text message
: テキストのWebSocketメッセージ受信時に発動 -
binary message
: バイナリのWebSocketメッセージ受信時に発動 - 複数のエンドポイント: これらのサーバはプロセッサをエンドポイントに登録します。WebSocketServerServiceはURIパスをエンドポイントとして利用します。例えば、同一のWebSocketサーバインスタンスは2つのWebSocketエンドポイントとして、
ws://hostname:listen-port/endpoint-1
とws://hostname:listen-port/endpoint-2
を登録できます。同様に、WebSocketClientServiceはclientId
を利用してエンドポイントを識別します。複数のWebSocketクライアントインスタンスは同一のWebSocketClientServiceインスタンスを共有できます。
nifi-websocket-services-jetty
このモジュールはnifi-websocket-services-apiのJettyを使った実装を格納しています。
- 機能:
- プレインなWebSocket(ws://)と、セキュアなWebSocket(wss://)プロトコルをサポート
- SSLContextServiceを利用して、Javaキーストアとトラストストアをセキュアな接続に利用
nifi-websocket-processors
これらの機能をNiFiのデータフローとして利用するには、プロセッサとしてキャンバスに配置する必要があります。
-
ConnectWebSocket
とListenWebSocket
: これらはWebSocketゲートウェイとして動作します。これらのプロセッサはWebSocketServerに登録して前述のWebSocketイベントを受信します。発動したイベントは、NiFiのFlowFileに変換され、対応するrelationshipへと流れます。Relationshipはconnected
、text message
、binary message
の3種類があります。ConnectWebSocketはWebSocketClientServiceを利用して自発的にリモートのWebSocketエンドポイントに接続します、一方、ListenWebSocketはWebSocketServerServiceを利用して、リモートのWebSocketクライアントが接続してくるのを待ち受けます。 -
PutWebSocket
: このプロセッサはConnectWebSocket、ListenWebSocketどちらとも組み合わせて利用できます。コネクションが確立された後はどちらの区別もないからです。入力のFlowFileコンテンツをメッセージペイロードとして、WebSocketメッセージを送信します。
どうやって使うのさ?
説明はもう十分だZ! これらのコンポーネントをどうやってNiFiデータフローで使うのか見てみましょう!
NiFiをクライアントとして、外部のWebSocketサーバと対話する
NiFiをWebSocketクライアントとするには、WebSocketClientServiceが必要です。サービスを追加するには:
- オペレートパレットにあるギアアイコンをクリック
- 足すアイコンをクリック
-
WebSocket
とコントローラサービス検索窓に入力 - JettyWebSocketClientコントローラサービスの編集アイコンをクリック
そして、サービスを次のように設定します:
-
- WebSocket URIにws://echo.wesocket.orgを設定。このURIはWebSocketクライアントテスト用に利用できます。単純に受信したメッセージを返してくれます。
-
- 有効化アイコンをクリックすれば、サービスの利用準備OKです!
次に、プロセッサをデータフローに配置しましょう:
- ConnectWebSocket: 前述のJettyWebSocketClientServiceを使います。
connected
とtext message
をReplaceTextにつなげています。binary message
はこの例では利用しないので、auto terminateしています - ReplaceText: テキストを加工します
- PutWebSocket: このプロセッサはリモートのWebSocketサーバにメッセージを送信します。
Run Schedule
にデフォルトより長い、例えば3 sec
とかを設定しましょう。さもなくばDoS攻撃になってしまいます。。 - UpdateAttribute: これはデータフローの終端です。停止状態にしておくと渡ってきたFlowFileを蓄積して内容の確認ができます
success
relationshipを右クリックすると、滞留しているFlowFileが確認できます。ReplaceTextが各アイテムにテキストを足しこんで、サイズが増加している様子がわかるでしょう。
NiFiをサーバとして、リモートのWebSocketクライアントと対話する
クライアントとしての使い方がわかったら、NiFiをWebSocketサーバとして使うのは簡単です、ってかほとんど同じです!
単にJettyWebSocketServer
コントローラサービスを代わりに利用して、Listen Port
を設定します:
そして、ConnectWebSocketプロセッサを、ListenWebSocket
プロセッサで置き換えましょう。Server URL Path
を設定してWebSocketリクエストを待ち受けます:
websocket.org echoをブラウザで開き、locationにws:localhost:9001/server-demo
を設定して、Connectをクリックした後、何かメッセージをSendしてみましょう。NiFiがメッセージを送り返すはずです!
セキュアなWebSocket接続
セキュアなWebSocket接続を利用するには、もう一つStandardSSLContextService
コントローラサービスが必要です。そしてJettyWebSocketClientやJettyWebSocketServerから利用します。URLはwss://
プロトコルを利用します。
スケーラビリティ
NiFiをクラスタとしてデプロイする場合、WebSocketコンポーネントが各ノード上で動作することになります。NiFiをWebSocketサーバとして利用する際に負荷を分散させるには。HAProxyなどのロードバランサをNiFiクラスタの手前に配置するとよいでしょう。
NiFi Cluster and Load Balancerも参照してください。
まとめ
この投稿では、基本的なWebSocketコントローラサービスとプロセッサの利用方法を解説しました。WebSocketゲートウェイプロセッサ(ConnectWebSocket/ListenWebSocket)とPutWebSocketは分かれていて、間に他のプロセッサを追加することでより複雑なフローを構成することもできます。
ぜひNiFi 1.1.0から利用可能になったこれらのコンポーネントを使って、エキサイティングなNiFiデータフローを作ってみてください :)