NATSとは?
NATSは軽量でハイパフォーマンスなメッセージングシステムを提供するミドルウェア。
比較されるミドルウェアとして、ActiveMQ、Kafka、Kestreal、NSQ、RabbitMQ等がある。
2016年7月現在のリリースバージョンは0.8.1。
NATSとNATS Streaming
NATSには大きく2つの仕組みがある。
いわゆるNATSと呼ばれるものとNATS Streamingと呼ばれるものの2つ。
NATS
NATSの特徴
- シンプルなテキストベースのプロトコル
- At-most-once-deliveryなメッセージング(TCP/IP上でメッセージのやりとりを行うため、TCPレベルでの保証)
- 様々なメッセージングモデルを実現
- クラスタリング
- 遅いクライアントを自動で切断(Auto Pruning of Client)
- Ping-Pongタイムアウトまたはペンディングメッセージの閾値越え(10MB)
NATSで実現できるメッセージングモデルは以下。リンク先の図を見ると一目でわかる。
NATSを実現するのがNATSサーバでgnatsdと呼ばれる。gnatsdはGoで実装されている。
(Go実装以前はRubyで実装されていた。)
NATS Streaming
NATSをベースにAt-least-once-deliveryなメッセージングを実現する。
単純なPub/Subの場合Subクライアントは接続した時点からメッセージを取得できるが、
NATS Streamingの場合、インメモリ(+外部ストレージ)でメッセージを保持し、
クライアントに必ず最低1回はメッセージを配信できる。
- At-least-onceはメッセージ配信機構
- Publisher/Subscriber Rate Limit
NATSストリーミングを実現するサーバがNATSストリーミングサーバで、
NATSサーバをベースに実装されている。
内部ではProtocol Buffersも使っている。
こちらも公式サイトの図がわかりやすい。
NATSプロトコルの基本
サブジェクト名
メッセージの配信先を決めるID的なもの。大文字小文字の区別があり、.
でトークンを区切る。
subj.hello.one
SUBJ.HELLO.one
等。
サブジェクトにはワイルドカードが使用できる。
-
*
: なんでもマッチする。subj.hello.*
SUBJ.*
-
>
:subj.>
の場合はsubj.hello
subj.hoge.fuga
にマッチするがsubj
のみにはマッチしない。
*
>
ともにトークン内の一部に使用することはできない。 su*j.hello
は使えない。(厳密に言うと指定することはできるが)
プロトコルメッセージ
NATSのプロトコルで規定されているメッセージは以下。
-
INFO
: クライアントとサーバでTCP/IP接続が確立された後にサーバから送られる情報 -
CONNECT
: クライアントから接続情報を送る -
PUB
: メッセージ配信する -
SUB
: メッセージ購読する -
UNSUB
: メッセージ購読を止める -
MSG
: クライアントへの実際の配信メッセージ -
PING
/PONG
: いわゆるping-pong -
+OK
: プロトコルメッセージへの正常応答。CONNECT
でverbose
モードをfalse
にすれば省略される。
(ほとんどのクライアントでデフォルトOFFにしているみたい) -
ERR
: エラーが発生した際にサーバから通知されるプロトコル
プロトコルの詳細については説明を省略する。
かなりシンプルなので、ドキュメントを読めば詳しいことがわかる。
NATSを動かしてみる
まずはNATSの動作を確認してみる。
gnatsdのインストールと起動
NATSサーバ(gnatsd)を動かす。
今回はGoがインストールされていることが前提で説明。
gnatsdのバージョンは検証時のmasterブランチ最新を使っている。バージョンは0.9.0.beta。
インストール
$ go get github.com/nats-io/gnatsd
gnatsd起動
$ gnatsd -D -V
[69123] 2016/07/14 11:28:58.755985 [INF] Starting nats-server version 0.9.0.beta
[69123] 2016/07/14 11:28:58.756125 [DBG] Go build version go1.6.2
[69123] 2016/07/14 11:28:58.756133 [INF] Listening for client connections on 0.0.0.0:4222
[69123] 2016/07/14 11:28:58.756274 [DBG] Server id is J4shpW8NfLsxMF0c4uHqvv
[69123] 2016/07/14 11:28:58.756320 [INF] Server is ready
起動が完了。
メッセージング
メッセージングモデル毎に動作を見てみる。
Pub/Sub
まずはPub/Sub。
汎用的なPub/Subクライアントとして、Goクライアントのexamplesから
以下のファイルを準備してビルドしておく。
サブスクライバクライアントをサブジェクトをそれぞれ example.one
example.one
example.two
example.*
で起動しておく。
(ここでは仮にそれぞれclient-A, B, C, Dとします)
$ ./nats-sub example.one # client-A
$ ./nats-sub example.one # client-B
$ ./nats-sub example.two # client-C
$ ./nats-sub "example.*" # client-D
Listening on [example.*]
サブジェクトを"example.one"として1つメッセージをパブリッシュしてみると、
client-A,B,Dでメッセージを受信することがわかる。
第1引数にサブジェクト、第2引数にメッセージを指定する。
$ ./nats-pub "example.one" Hello
Published [example.one] : 'Hello'
[#1] Received on [example.one]: 'Hello'
もしサブジェクトをexample.three
でパブリッシュしてみるとclient-Dだけがメッセージ受信することになる。
Request Reply
続いてRequest Replyタイプのメッセージングを確認してみる。
まずはSubクライアント(subject=example.one
)とReplyクライアント(subject=example.one
)をそれぞれ1つづつ起動する。
Replyクライアントは第2引数にRequestに対するReplyメッセージを指定する。
$ ./nats-sub example.one
$ ./nats-rply example.one "my NATS reply"
準備ができたらRequestを送ってみると、リクエストに対して結果メッセージを受け取っていることがわかる。
$ ./nats-req example.one "Hello NATS req-rply"
Published [example.one] : 'Hello NATS req-rply'
Received [_INBOX.57l6vK4e30lGw4lcBH9GmF] : 'my NATS reply'
SubクライアントとReplyクライアントはPub/Sub時と同様にメッセージを受け取っていることがわかる。
[#1] Received on [example.one]: 'Hello NATS req-rply'
Queueing
最後がQueueing(キューイング)。
キューグループを指定してサブスクライブすると、
同じキューグループ内でメッセージを振り分けて配信していく。
- Queue-Subクライアント (nats-qsub)
Queue-Subクライアント2つ(subject=example.one
, queue-group=Q1)と
別のキューグループのQueue-Subクライアント2つ(subject=example.one
, queue-group=Q2)を起動する。
$ ./nats-qsub example.one Q1 # client-A1
$ ./nats-qsub example.one Q1 # client-A2
$ ./nats-qsub example.one Q2 # client-B1
$ ./nats-qsub example.one Q2 # client-B2
Pubクライアントからメッセージを送ってみる。途中、client-B1を切断してその際の動作も見てみた。
$ ./nats-pub example.one "Message 1"
$ # ここでclient-B1を停止
$ ./nats-pub example.one "Message 2"
$ ./nats-pub example.one "Message 3"
$ ./nats-pub example.one "Message 4"
$ ./nats-pub example.one "Message 5"
$ ./nats-pub example.one "Message 6"
メッセージ送った際のそれぞれのQueue-Subクライアントの出力は以下。
Listening on [example.one]
[#1] Received on [example.one] Queue[Q1] Pid[73099]: 'Message 1'
[#2] Received on [example.one] Queue[Q1] Pid[73099]: 'Message 6'
Listening on [example.one]
[#1] Received on [example.one] Queue[Q1] Pid[73137]: 'Message 2'
[#2] Received on [example.one] Queue[Q1] Pid[73137]: 'Message 3'
[#3] Received on [example.one] Queue[Q1] Pid[73137]: 'Message 4'
[#4] Received on [example.one] Queue[Q1] Pid[73137]: 'Message 5'
Listening on [example.one]
[#1] Received on [example.one] Queue[Q2] Pid[73173]: 'Message 1'
^C
Listening on [example.one]
[#1] Received on [example.one] Queue[Q2] Pid[73212]: 'Message 2'
[#2] Received on [example.one] Queue[Q2] Pid[73212]: 'Message 3'
[#3] Received on [example.one] Queue[Q2] Pid[73212]: 'Message 4'
[#4] Received on [example.one] Queue[Q2] Pid[73212]: 'Message 5'
[#5] Received on [example.one] Queue[Q2] Pid[73212]: 'Message 6'
同じキューグループ内ではどれか1つのサブスクライバーにメッセージを投げていることがわかる。
client-A1,A2の動きをみると、メッセージの振り分けは単純なラウンドロビンでは無いらしい。
メッセージンングの基本はここまで。
続いてはNATSの周辺機能について紹介する。
周辺機能
ロギング
ログレベルの調整や時刻表示有無、ファイル書き出しなどをオプションで指定することができる。
syslog連携なんかもできる。
詳細については以下に記載があります。
クラスタリング
- 1ホップまでメッセージを伝搬。クライアントから受信したメッセージは自ノードが管理している隣接ノードに、
ルートから受信したメッセージはクライアントにのみ配信する。 - 上記の理由からフルメッシュか完全グラフノード?な構成を推奨している
クラスタ構成の動作
今回はサーバを3台使って動作を確認する。
クラスタ管理用にポート番号4248を使ってgnatsd
を起動する。
-cluster
でクラスタ管理用ポートを指定する。
このサーバをここではc1
と呼ぶ。
$ gnatsd -D -m 8222 -cluster nats://127.0.0.1:4248
[3325] 2016/07/18 01:17:25.560418 [INF] Starting nats-server version 0.9.0.beta
[3325] 2016/07/18 01:17:25.560498 [DBG] Go build version go1.6.2
[3325] 2016/07/18 01:17:25.560550 [INF] Listening for route connections on 127.0.0.1:4248
[3325] 2016/07/18 01:17:25.560746 [INF] Listening for client connections on 0.0.0.0:4222
[3325] 2016/07/18 01:17:25.560785 [DBG] Server id is D15kVknP7pEnrzFdXUUNxm
[3325] 2016/07/18 01:17:25.560788 [INF] Server is ready
続いて2台目のサーバを起動する。
同じサーバ上で起動するので、クラスタ管理用のポートを5248として、
クラスタのルート情報を-routes
オプションで指定する。(-p
はクライアント接続用のポート番号指定)
このサーバをc2
と呼ぶ。
$ gnatsd -D -p 5222 -cluster nats://127.0.0.1:5248 -routes nats://127.0.0.1:4248
[3399] 2016/07/18 01:21:30.001742 [INF] Starting nats-server version 0.9.0.beta
[3399] 2016/07/18 01:21:30.001837 [DBG] Go build version go1.6.2
[3399] 2016/07/18 01:21:30.001893 [INF] Listening for route connections on 127.0.0.1:5248
[3399] 2016/07/18 01:21:30.002069 [INF] Listening for client connections on 0.0.0.0:5222
[3399] 2016/07/18 01:21:30.002120 [DBG] Server id is dq8nTQonZFdFEMyuxtV28C
[3399] 2016/07/18 01:21:30.002124 [INF] Server is ready
[3399] 2016/07/18 01:21:30.002162 [DBG] Trying to connect to route on 127.0.0.1:4248
[3399] 2016/07/18 01:21:30.002497 [DBG] 127.0.0.1:4248 - rid:1 - Route connection created
[3399] 2016/07/18 01:21:30.002534 [DBG] 127.0.0.1:4248 - rid:1 - Route connect msg sent
[3399] 2016/07/18 01:21:30.002801 [DBG] 127.0.0.1:4248 - rid:1 - Registering remote route "D15kVknP7pEnrzFdXUUNxm"
[3399] 2016/07/18 01:21:30.002815 [DBG] 127.0.0.1:4248 - rid:1 - Route sent local subscriptions
起動すると"127.0.0.1:4248 - rid:1 - Route connection created"でルートができていることがわかる。
c1
側も同様にルートを認識している。
[3325] 2016/07/18 01:21:30.002512 [DBG] 127.0.0.1:50577 - rid:1 - Route connection created
[3325] 2016/07/18 01:21:30.002918 [DBG] 127.0.0.1:50577 - rid:1 - Registering remote route "dq8nTQonZFdFEMyuxtV28C"
[3325] 2016/07/18 01:21:30.002928 [DBG] 127.0.0.1:50577 - rid:1 - Route sent local subscriptions
3台目も起動してみる。これをc3
と呼ぶ。
$ gnatsd -D -p 6222 -cluster nats://127.0.0.1:6248 -routes nats://127.0.0.1:4248
[3566] 2016/07/18 01:25:21.917616 [INF] Starting nats-server version 0.9.0.beta
[3566] 2016/07/18 01:25:21.917731 [DBG] Go build version go1.6.2
[3566] 2016/07/18 01:25:21.917793 [INF] Listening for route connections on 127.0.0.1:6248
[3566] 2016/07/18 01:25:21.918059 [INF] Listening for client connections on 0.0.0.0:6222
[3566] 2016/07/18 01:25:21.918113 [DBG] Server id is 8oa4rnaED3PeFZEVm8UzuT
[3566] 2016/07/18 01:25:21.918117 [INF] Server is ready
[3566] 2016/07/18 01:25:21.918157 [DBG] Trying to connect to route on 127.0.0.1:4248
[3566] 2016/07/18 01:25:21.918549 [DBG] 127.0.0.1:4248 - rid:1 - Route connection created
[3566] 2016/07/18 01:25:21.918582 [DBG] 127.0.0.1:4248 - rid:1 - Route connect msg sent
[3566] 2016/07/18 01:25:21.918852 [DBG] 127.0.0.1:4248 - rid:1 - Registering remote route "D15kVknP7pEnrzFdXUUNxm"
[3566] 2016/07/18 01:25:21.918866 [DBG] 127.0.0.1:4248 - rid:1 - Route sent local subscriptions
[3566] 2016/07/18 01:25:21.919811 [DBG] 127.0.0.1:65325 - rid:2 - Route connection created
[3566] 2016/07/18 01:25:21.920144 [DBG] 127.0.0.1:65325 - rid:2 - Registering remote route "dq8nTQonZFdFEMyuxtV28C"
[3566] 2016/07/18 01:25:21.920159 [DBG] 127.0.0.1:65325 - rid:2 - Route sent local subscriptions
c1
とc2
をルートとして認識している。
c1
側とc2
側でも新たにc3
を認識する。
この状態の場合、どのノードからメッセージを配信してもすべてのノードでメッセージを受け取ることができる。
(もちろん、サブジェクトの一致等のルールには従って。)
セキュリティ
パスワード認証
gnatsd
に--user
オプション--pass
オプション
または--auth
オプションをつけて起動するとパスワード認証orトークン認証が有効になる。
$ gnatsd --auth hogetoken
クライアントを接続してみると蹴られる。
$ ./nats-sub subj
Can't connect: nats: authorization violation
[93190] 2016/07/15 21:16:48.334616 [ERR] ::1:56626 - cid:1 - Authorization Error
接続先にトークン情報をつけると認証成功して接続できる。
$ ./nats-sub -s "nats://hoge@127.0.0.1:4222" subj
Listening on [subj]
パスワード認証の場合はhoge
のところをuser:pass
のような形式にすれば問題無い。
TLS
TLSを使ってサーバとクライアントの認証と通信内容自体を暗号化することも可能。
gnatsd
の設定ファイルとして以下のようなtls.conf
を準備する。
今回はCERTファイル等はnats-io/gnatsdのtest/configs/certsディレクトリにあるものを使う。
listen: 127.0.0.1:4443
tls {
cert_file: "./certs/server-cert.pem"
key_file: "./certs/server-key.pem"
ca_file: "./certs/ca.pem"
verify: true
}
$ gnatsd -D -V -config tls.conf
Starting gnatsd on port 5000
[95447] 2016/07/15 21:48:55.717289 [INF] Starting nats-server version 0.9.0.beta
[95447] 2016/07/15 21:48:55.717450 [DBG] Go build version go1.6.2
[95447] 2016/07/15 21:48:55.717455 [INF] Listening for client connections on 127.0.0.1:4443
[95447] 2016/07/15 21:48:55.717660 [INF] TLS required for client connections
[95447] 2016/07/15 21:48:55.717674 [DBG] Server id is m4w0EmrXB1xPvE5eMxpyX1
[95447] 2016/07/15 21:48:55.717677 [INF] Server is ready
TLSではないクライアントをつないでみともちろん蹴られる。
$ ./nats-sub -s "nats://127.0.0.1:4443" subj
Can't connect: nats: secure connection required
前述のnats-sub.go
に以下の変更を加えて、TLS接続してみる。
$ diff -u nats-sub.go nats-tls-sub.go
--- nats-sub.go 2016-07-15 16:09:03.000000000 +0900
+++ nats-tls-sub.go 2016-07-18 00:28:36.000000000 +0900
@@ -4,7 +4,10 @@
package main
import (
+ "crypto/tls"
+ "crypto/x509"
"flag"
+ "io/ioutil"
"log"
"runtime"
@@ -33,7 +36,25 @@
usage()
}
- nc, err := nats.Connect(*urls)
+ pool := x509.NewCertPool()
+ pemData, err := ioutil.ReadFile("./certs/ca.pem")
+ if err != nil {
+ log.Fatalf("read error for ca.pem: %v\n", err)
+ }
+ pool.AppendCertsFromPEM(pemData)
+
+ cert, err := tls.LoadX509KeyPair("./certs/client-cert.pem", "./certs/client-key.pem")
+ if err != nil {
+ log.Fatalf("tls.LoadX509KeyPair() error=%v", err)
+ }
+
+ config := &tls.Config{
+ ServerName: "localhost",
+ Certificates: []tls.Certificate{cert},
+ RootCAs: pool,
+ MinVersion: tls.VersionTLS12,
+ }
+ nc, err := nats.Connect(*urls, nats.Secure(config))
if err != nil {
log.Fatalf("Can't connect: %v\n", err)
}
これを実行すると接続できていることがわかる。
$ go build nats-tls-sub.go
$ ./nats-tls-sub -s "tls://127.0.0.1:4443" subj
Listening on [subj]
[99264] 2016/07/18 00:28:53.609262 [DBG] 127.0.0.1:62669 - cid:1 - Client connection created
[99264] 2016/07/18 00:28:53.609327 [DBG] 127.0.0.1:62669 - cid:1 - Starting TLS client connection handshake
[99264] 2016/07/18 00:28:53.670571 [DBG] 127.0.0.1:62669 - cid:1 - TLS handshake complete
[99264] 2016/07/18 00:28:53.670590 [DBG] 127.0.0.1:62669 - cid:1 - TLS version 1.2, cipher suite TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
[99264] 2016/07/18 00:28:53.670830 [TRC] 127.0.0.1:62669 - cid:1 - ->> [CONNECT {"verbose":false,"pedantic":false,"tls_required":true,"name":"","lang":"go","version":"1.2.2"}]
[99264] 2016/07/18 00:28:53.670919 [TRC] 127.0.0.1:62669 - cid:1 - ->> [PING]
[99264] 2016/07/18 00:28:53.670926 [TRC] 127.0.0.1:62669 - cid:1 - <<- [PONG]
[99264] 2016/07/18 00:28:53.671085 [TRC] 127.0.0.1:62669 - cid:1 - ->> [SUB subj 1]
[99264] 2016/07/18 00:28:53.671108 [TRC] 127.0.0.1:62669 - cid:1 - ->> [PING]
[99264] 2016/07/18 00:28:53.671114 [TRC] 127.0.0.1:62669 - cid:1 - <<- [PONG]
[99264] 2016/07/18 00:28:56.754385 [DBG] 127.0.0.1:62669 - cid:1 - Client connection closed
その他ユーザ毎の権限設定や
Bcrypt使ったパスワードハッシュ化も可能。
クライアント自動切断
クライアント自動切断は2つの仕組みがある。
ping-pongタイムアウト
ひとつはping-pongタイムアウトでクライアントを自動切断する。
タイムアウトの閾値はサーバからのping送信間隔が2分で、サーバからのpingに対して連続2回応答がなければ
その時点でクライアントとの接続を破棄する。
gnatsdに対してtelnet接続し動作を見てみる。
$ telnet 127.0.0.1 4222
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
INFO {"server_id":"Nl3d7EZpgKBUA510V1RmP4","version":"0.9.0.beta","go":"go1.6.2","host":"0.0.0.0","port":4222,"auth_required":false,"ssl_required":false,"tls_required":false,"tls_verify":false,"max_payload":1048576}
:
:
PING
PING
-ERR 'Stale Connection'
Connection closed by foreign host.
切れる。以下gnatsdのログ。
[4992] 2016/07/14 21:18:28.669370 [DBG] 127.0.0.1:52013 - cid:1 - Client connection created
[4992] 2016/07/14 21:20:28.678233 [DBG] 127.0.0.1:52013 - cid:1 - Client Ping Timer
[4992] 2016/07/14 21:20:28.678354 [TRC] 127.0.0.1:52013 - cid:1 - <<- [PING]
[4992] 2016/07/14 21:22:28.683716 [DBG] 127.0.0.1:52013 - cid:1 - Client Ping Timer
[4992] 2016/07/14 21:22:28.683793 [TRC] 127.0.0.1:52013 - cid:1 - <<- [PING]
[4992] 2016/07/14 21:24:28.689301 [DBG] 127.0.0.1:52013 - cid:1 - Client Ping Timer
[4992] 2016/07/14 21:24:28.689315 [DBG] 127.0.0.1:52013 - cid:1 - Stale Client Connection - Closing
[4992] 2016/07/14 21:24:28.689476 [DBG] 127.0.0.1:52013 - cid:1 - Client connection closed
現時点ではこの閾値を変更することはできない模様。
設定ファイルに ping_interval
ping_max
を設定しても、特に有効にならない。おそらく無視されてる。
送信待ちメッセージ閾値越え
もう一つはメッセージ受信側の処理が遅い場合等でクライアントへのメッセージが
gnatsdの送信待ちバッファに溜まっていく。そのバッファの閾値を超えた場合に切断される。
閾値のデフォルト値は10MB。
メッセージをパブリッシュし続けるクライアントをGoで実装しておく。
package main
import (
"time"
"github.com/nats-io/nats"
)
const SUBJECT string = "subject"
type Publisher struct {
NatsConn *nats.Conn
}
func (p *Publisher) Run() {
for i := 0; i < 1024*1024*5; i++ {
p.NatsConn.Publish(SUBJECT, []byte("HelloHello"))
}
p.NatsConn.Publish(SUBJECT, []byte("end"))
}
func main() {
nc, _ := nats.Connect(nats.DefaultURL)
p := &Publisher{NatsConn: nc}
p.Run()
time.Sleep(1 * time.Second)
}
またtelnetクライアントで試してみる。
$ telnet 127.0.0.1 4222
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
INFO {"server_id":"6TKF5eJSBqKZTX7RMauOLJ","version":"0.9.0.beta","go":"go1.6.2","host":"0.0.0.0","port":4222,"auth_required":false,"ssl_required":false,"tls_required":false,"tls_verify":false,"max_payload":1048576}
SUB subject 1
+OK
ここで先ほどのクライアントを起動。
$ go run pub-hugemsg.go
MSG subject 1 10
HelloHello
:
:
MSG subject 1 10
HelloHello
MSG subject 1 10
Connection closed by foreign host.
gnatsdのログ。
[13771] 2016/07/14 22:32:15.889409 [INF] 127.0.0.1:49911 - cid:2 - Slow Consumer Detected
確かに切断された。
どうもこちらも max_pending_size
を設定しても反映されていないので、
まだ閾値変更には対応していないっぽい。
モニタリング
gnatsdに-m ポート番号
オプションを指定することでモニタリングを有効にすることができる。
topコマンドライクなnats-topコマンドをインストールすれば、ひとまずモニタリングすることが可能。
$ nats-top
NATS server version 0.9.0.beta (uptime: 8m25s)
Server:
Load: CPU: 0.0% Memory: 9.6M Slow Consumers: 0
In: Msgs: 15 Bytes: 162 Msgs/Sec: 2.0 Bytes/Sec: 14
Out: Msgs: 6 Bytes: 54 Msgs/Sec: 1.0 Bytes/Sec: 2
Connections Polled: 2
HOST CID NAME SUBS PENDING MSGS_TO MSGS_FROM BYTES_TO BYTES_FROM LANG VERSION UPTIME LAST ACTIVITY
::1:61187 3 0 0 0 6 0 63 go 1.2.2 5s 2016-07-14 12:21:07.254618678 +0900 JST
::1:61447 4 0 0 0 3 0 36 go 1.2.2 2s 2016-07-14 12:21:08.129647296 +0900 JST
nats-top以外にもWebUIを備えたツールなどもあります。
モニタリングの仕組み自体は単純で、 http://hostname:port/XXX にHTTPでアクセスするとJSONをXXXに応じたメトリクスを返してくれる。
対象のエンドポイントは以下。
- /vars : NATSクライアントの接続数やCPU使用率等
- /connz : NATSクライアントの数やそのクライアント毎の内容
- /subsz : サブスクライバー数やマッチしたメッセージ数、破棄されたメッセージ数等
- /routez : クラスタのルート関連
NATSクライアント
NATSクライアントとしてパブリッシャーやサブスクライバを実装するために様々な言語が使える。公式にサポートされている言語は以下。
- C
- C#
- Elixir
- Go
- Java
- NGINX
- Node.js
- Python Asyncio/Tornado
- Ruby
GoとJavaはNATS Streamingクライアントも存在している。
公式ではないが他のミドルウェア同様コミュニティベースでその他主要な言語がサポートされている。
- Erlang
- Haskell
- Lua
- PHP
- Perl
- Rust
- Scala
ダウンロードページのほうがよくメンテナンスされてる可能性が。
コネクタ
他のミドルウェアとの連携機能を提供するコネクタという仕組みがある。
(単純なミドルウェア間のブリッジなので、仕組みというほどのものでも無いかもしれませんが。。)
公式にはNATS Connector FrameworkというJavaで実装された仕組みがあり、
そのフレームワークの上で動くRedisとNATSを繋ぐプラグインも同時に提供されている。
その他にもコミュニティベースで様々なコネクタが存在する。
こちらはNATS Connector Frameworkの上にのせてはおらず独自に実装されたものが多い。
- fluent-plugin-nats : FluentdとNATSを繋ぐFluentdプラグイン
- Logstash : LogstashにNATSメッセージを投げれるコネクタ
- nats-proxy : HTTP/WebsocketでNATSと話せるコネクタ
NATS Streamingも動かしてみる
さて一通りNATSの機能を見てきて、最後にNATS Streamingの方も試してみます。
まだバージョンが浅くすぐに動きが変わる可能性はありそう。
2016年7月時点での安定バージョンは0.2.0。動作はmasterの最新で確認してみる。コミットハッシュは以下の通り。
- nats-streaming-server#e900f10 (サーバ)
- go-nats-streaming#bc6ef46 (クライアント)
ビルド
事前に github.com/gogo/protobuf/gogoproto
をgo get
しておく必要がある。
$ go get -u -v github.com/gogo/protobuf/gogoproto
サーバをビルド
$ git clone https://github.com/nats-io/nats-streaming-server.git
$ cd nats-streaming-server
$ go build
nats-streaming-server
ができる。
続いてクライアント。
examples
ディレクトリ配下にあるPub/Subクライアントをそれぞれビルドしておく。
$ git clone https://github.com/nats-io/go-nats-streaming.git
$ cd go-nats-streaming/examples
$ go build stan-pub.go
$ go build stan-sub.go
起動
nats-streaming-server
を起動。
$ ./nats-streaming-server -m 8222
[87442] 2016/07/15 20:14:50.605166 [INF] Starting nats-streaming-server[test-cluster] version 0.2.0
[87442] 2016/07/15 20:14:50.605457 [INF] Starting nats-server version 0.9.0.beta
[87442] 2016/07/15 20:14:50.605467 [INF] Listening for client connections on localhost:4222
[87442] 2016/07/15 20:14:50.607194 [INF] Server is ready
[87442] 2016/07/15 20:14:50.944737 [INF] STAN: Message store is MEMORY
[87442] 2016/07/15 20:14:50.944755 [INF] STAN: Maximum of 1000000 will be stored
nats-topを確認するとストリーミングサーバを立ち上げた時点で内部でクラスタ用のクライアントがいる模様。
gnatsdの上に乗ってるというのがここからわかる。
NATS server version 0.9.0.beta (uptime: 4m49s)
Server:
Load: CPU: 0.0% Memory: 11.0M Slow Consumers: 0
In: Msgs: 1 Bytes: 45 Msgs/Sec: 0.0 Bytes/Sec: 0
Out: Msgs: 0 Bytes: 0 Msgs/Sec: 0.0 Bytes/Sec: 0
Connections Polled: 1
HOST CID NAME SUBS PENDING MSGS_TO MSGS_FROM BYTES_TO BYTES_FROM LANG VERSION UPTIME LAST ACTIVITY
127.0.0.1:65386 2 NATS-Streaming-Server-test-cluster 5 0 0 1 0 45 go 1.2.2 4m49s 2016-07-15 20:15:48.18
Streaming Pub/Sub
まずはSubクライアントがいない状態でPubクライアントからメッセージを送ってみる。
$ ./stan-pub subject.hoge "Hello NATS Streaming"
Published [subject.hoge] : 'Hello NATS Streaming'
$ ./stan-pub subject.hoge "Hello NATS Streaming 2"
Published [subject.hoge] : 'Hello NATS Streaming 2'
ここでSubクライアントを起動。
--all
オプションをつけてすべての有効なメッセージを受け取るようにしてみる。
$ ./stan-sub -id STANSUB1 --all subject.hoge
Connected to nats://localhost:4222 clusterID: [test-cluster] clientID: [STANSUB1]
subscribing with DeliverAllAvailable
Listening on [subject.hoge], clientID=[STANSUB1], qgroup=[] durable=[]
[#1] Received on [subject.hoge]: 'sequence:1 subject:"subject.hoge" data:"Hello NATS Streaming" timestamp:1468581922036801904 '
[#2] Received on [subject.hoge]: 'sequence:2 subject:"subject.hoge" data:"Hello NATS Streaming 2" timestamp:1468581932488709393 '
NATSでのPub/Subではパブリッシュした時点でサブスクライブしていないとメッセージを受信することはできなかったが、
NATS Streamingでは受信できている。
--last
オプションをつけて最後のメッセージだけ受け取ることもできる。
実装を見切れてはいませんが、NATS Streaming側で最後のメッセージだけを配信するという仕組みが提供できているよう。
stan-sub.go
のオプションみると、
メッセージの期限設定や期限チェックして有効であれば受信するといったこともできる感じ。
メッセージストア
nats-streaming-serverはデフォルトではインメモリでメッセージストアしているが、
-store FILE -dir ${DIRNAME}
を指定するとDIRNAMEなディレクトリ作成してその配下にファイルで
メッセージをストアできる。
$ ./nats-streaming-server -m 8222 -store FILE -dir streaminglogs
:
[91285] 2016/07/15 20:45:08.489025 [INF] STAN: Message store is FILE
:
$ ls -ltr streaminglogs/*
-rw-r--r-- 1 hattori-h staff 195 7 15 20:43 streaminglogs/server.dat
-rw-r--r-- 1 hattori-h staff 71 7 15 20:45 streaminglogs/clients.dat
streaminglogs/subject.hoge:
total 48
-rw-r--r-- 1 hattori-h staff 4 7 15 20:45 subs.dat
-rw-r--r-- 1 hattori-h staff 4 7 15 20:45 msgs.5.dat
-rw-r--r-- 1 hattori-h staff 4 7 15 20:45 msgs.4.dat
-rw-r--r-- 1 hattori-h staff 4 7 15 20:45 msgs.3.dat
-rw-r--r-- 1 hattori-h staff 4 7 15 20:45 msgs.2.dat
-rw-r--r-- 1 hattori-h staff 62 7 15 20:45 msgs.1.dat
現時点ではメッセージストアはメモリ(MEMORY)かファイル(FILE)しか指定できない。
その他
関連記事
NATS作者のDerek Collisonさんが書かれた記事がいくつかある。
- http://www.slideshare.net/derekcollison/gophercon-2014
- https://blog.gopheracademy.com/plumbing-and-semantics/
- https://blog.gopheracademy.com/advent-2015/nats-high-performance-cloud-native-messaging-written-in-go/
YouTubeで"NATS"とかで検索すると割とカンファレンスとかの動画が見つかります。
- https://www.youtube.com/watch?v=ylRKac5kSOk
- https://www.youtube.com/watch?v=qC9WhjmewIk
- https://www.youtube.com/watch?v=5GcAgMPECxE
日本語記事
-
http://www.slideshare.net/hamakn/reading-nats
いわゆるnats-rubyと呼ばれるGoで実装される前のNATS実装。の内部を解説 -
分散型メッセージングミドルウェアの詳細比較 | インフラ・ミドルウェア | POSTD
[本家記事]
最後に
NATSについてひととおりの機能を試しながら紹介しました。
もう少しNATS Streamingの詳細やクラスタリングの動作を掘り下げていったり、
Goクライアントの紹介や実際にNATSを使用したサンプルアプリケーションの実装なんかも今後試してみたいところです。
少し調べ切れていないところや単語の表記ゆれ等があるかと思いますので、ご指摘等コメントいただければ。