Go
NATS

NATSについて調べて動作を試してみた

More than 1 year has passed since last update.

NATSとは?

NATSは軽量でハイパフォーマンスなメッセージングシステムを提供するミドルウェア。
比較されるミドルウェアとして、ActiveMQ、Kafka、Kestreal、NSQ、RabbitMQ等がある。

2016年7月現在のリリースバージョンは0.8.1。

NATSとNATS Streaming

NATSには大きく2つの仕組みがある。
いわゆるNATSと呼ばれるものとNATS Streamingと呼ばれるものの2つ。

NATS

NATSの特徴

NATSで実現できるメッセージングモデルは以下。リンク先の図を見ると一目でわかる。

NATSを実現するのがNATSサーバでgnatsdと呼ばれる。gnatsdはGoで実装されている。
(Go実装以前はRubyで実装されていた。)

NATS Streaming

NATSをベースにAt-least-once-deliveryなメッセージングを実現する。
単純なPub/Subの場合Subクライアントは接続した時点からメッセージを取得できるが、
NATS Streamingの場合、インメモリ(+外部ストレージ)でメッセージを保持し、
クライアントに必ず最低1回はメッセージを配信できる。

  • At-least-onceはメッセージ配信機構
  • Publisher/Subscriber Rate Limit NATSストリーミングを実現するサーバがNATSストリーミングサーバで、 NATSサーバをベースに実装されている。 内部ではProtocol Buffersも使っている。

こちらも公式サイトの図がわかりやすい

NATSプロトコルの基本

サブジェクト名

メッセージの配信先を決めるID的なもの。大文字小文字の区別があり、.でトークンを区切る。

subj.hello.one SUBJ.HELLO.one 等。

サブジェクトにはワイルドカードが使用できる。

  • * : なんでもマッチする。 subj.hello.* SUBJ.*
  • > : subj.>の場合はsubj.hello subj.hoge.fugaにマッチするがsubjのみにはマッチしない。

* > ともにトークン内の一部に使用することはできない。 su*j.helloは使えない。(厳密に言うと指定することはできるが)

プロトコルメッセージ

NATSのプロトコルで規定されているメッセージは以下。

  • INFO : クライアントとサーバでTCP/IP接続が確立された後にサーバから送られる情報
  • CONNECT : クライアントから接続情報を送る
  • PUB : メッセージ配信する
  • SUB : メッセージ購読する
  • UNSUB : メッセージ購読を止める
  • MSG : クライアントへの実際の配信メッセージ
  • PING/PONG : いわゆるping-pong
  • +OK : プロトコルメッセージへの正常応答。CONNECTverboseモードをfalseにすれば省略される。 (ほとんどのクライアントでデフォルトOFFにしているみたい)
  • ERR : エラーが発生した際にサーバから通知されるプロトコル

プロトコルの詳細については説明を省略する。
かなりシンプルなので、ドキュメントを読めば詳しいことがわかる。

NATSを動かしてみる

まずはNATSの動作を確認してみる。

gnatsdのインストールと起動

NATSサーバ(gnatsd)を動かす。
今回はGoがインストールされていることが前提で説明。

gnatsdのバージョンは検証時のmasterブランチ最新を使っている。バージョンは0.9.0.beta。

インストール

$ go get github.com/nats-io/gnatsd

gnatsd起動

$ gnatsd -D -V
[69123] 2016/07/14 11:28:58.755985 [INF] Starting nats-server version 0.9.0.beta
[69123] 2016/07/14 11:28:58.756125 [DBG] Go build version go1.6.2
[69123] 2016/07/14 11:28:58.756133 [INF] Listening for client connections on 0.0.0.0:4222
[69123] 2016/07/14 11:28:58.756274 [DBG] Server id is J4shpW8NfLsxMF0c4uHqvv
[69123] 2016/07/14 11:28:58.756320 [INF] Server is ready

起動が完了。

メッセージング

メッセージングモデル毎に動作を見てみる。

Pub/Sub

まずはPub/Sub。

汎用的なPub/Subクライアントとして、Goクライアントのexamplesから
以下のファイルを準備してビルドしておく。

サブスクライバクライアントをサブジェクトをそれぞれ example.one example.one example.two example.* で起動しておく。
(ここでは仮にそれぞれclient-A, B, C, Dとします)

$ ./nats-sub example.one    # client-A
$ ./nats-sub example.one    # client-B
$ ./nats-sub example.two    # client-C
$ ./nats-sub "example.*"    # client-D
Listening on [example.*]

サブジェクトを"example.one"として1つメッセージをパブリッシュしてみると、
client-A,B,Dでメッセージを受信することがわかる。

第1引数にサブジェクト、第2引数にメッセージを指定する。

$ ./nats-pub "example.one" Hello
Published [example.one] : 'Hello'
client-A,client-B,client-D
[#1] Received on [example.one]: 'Hello'

もしサブジェクトをexample.threeでパブリッシュしてみるとclient-Dだけがメッセージ受信することになる。

Request Reply

続いてRequest Replyタイプのメッセージングを確認してみる。

まずはSubクライアント(subject=example.one)とReplyクライアント(subject=example.one)をそれぞれ1つづつ起動する。

Replyクライアントは第2引数にRequestに対するReplyメッセージを指定する。

$ ./nats-sub example.one
$ ./nats-rply example.one "my NATS reply"

準備ができたらRequestを送ってみると、リクエストに対して結果メッセージを受け取っていることがわかる。

$ ./nats-req example.one "Hello NATS req-rply"
Published [example.one] : 'Hello NATS req-rply'
Received [_INBOX.57l6vK4e30lGw4lcBH9GmF] : 'my NATS reply'

SubクライアントとReplyクライアントはPub/Sub時と同様にメッセージを受け取っていることがわかる。

nats-sub,nats-rply
[#1] Received on [example.one]: 'Hello NATS req-rply'

Queueing

最後がQueueing(キューイング)。
キューグループを指定してサブスクライブすると、
同じキューグループ内でメッセージを振り分けて配信していく。

Queue-Subクライアント2つ(subject=example.one, queue-group=Q1)と
別のキューグループのQueue-Subクライアント2つ(subject=example.one, queue-group=Q2)を起動する。

$ ./nats-qsub example.one Q1                   # client-A1
$ ./nats-qsub example.one Q1                   # client-A2
$ ./nats-qsub example.one Q2                   # client-B1
$ ./nats-qsub example.one Q2                   # client-B2

Pubクライアントからメッセージを送ってみる。途中、client-B1を切断してその際の動作も見てみた。

$ ./nats-pub example.one "Message 1"
$ # ここでclient-B1を停止
$ ./nats-pub example.one "Message 2"
$ ./nats-pub example.one "Message 3"
$ ./nats-pub example.one "Message 4"
$ ./nats-pub example.one "Message 5"
$ ./nats-pub example.one "Message 6"

メッセージ送った際のそれぞれのQueue-Subクライアントの出力は以下。

client-A1
Listening on [example.one]
[#1] Received on [example.one] Queue[Q1] Pid[73099]: 'Message 1'
[#2] Received on [example.one] Queue[Q1] Pid[73099]: 'Message 6'
client-A2
Listening on [example.one]
[#1] Received on [example.one] Queue[Q1] Pid[73137]: 'Message 2'
[#2] Received on [example.one] Queue[Q1] Pid[73137]: 'Message 3'
[#3] Received on [example.one] Queue[Q1] Pid[73137]: 'Message 4'
[#4] Received on [example.one] Queue[Q1] Pid[73137]: 'Message 5'
client-B1
Listening on [example.one]
[#1] Received on [example.one] Queue[Q2] Pid[73173]: 'Message 1'
^C
client-B2
Listening on [example.one]
[#1] Received on [example.one] Queue[Q2] Pid[73212]: 'Message 2'
[#2] Received on [example.one] Queue[Q2] Pid[73212]: 'Message 3'
[#3] Received on [example.one] Queue[Q2] Pid[73212]: 'Message 4'
[#4] Received on [example.one] Queue[Q2] Pid[73212]: 'Message 5'
[#5] Received on [example.one] Queue[Q2] Pid[73212]: 'Message 6'

同じキューグループ内ではどれか1つのサブスクライバーにメッセージを投げていることがわかる。
client-A1,A2の動きをみると、メッセージの振り分けは単純なラウンドロビンでは無いらしい。

メッセージンングの基本はここまで。
続いてはNATSの周辺機能について紹介する。

周辺機能

ロギング

ログレベルの調整や時刻表示有無、ファイル書き出しなどをオプションで指定することができる。
syslog連携なんかもできる。

詳細については以下に記載があります。

https://nats.io/documentation/server/gnatsd-logging/

クラスタリング

  • 1ホップまでメッセージを伝搬。クライアントから受信したメッセージは自ノードが管理している隣接ノードに、 ルートから受信したメッセージはクライアントにのみ配信する。
  • 上記の理由からフルメッシュか完全グラフノード?な構成を推奨している

クラスタ構成の動作

今回はサーバを3台使って動作を確認する。

クラスタ管理用にポート番号4248を使ってgnatsdを起動する。
-clusterでクラスタ管理用ポートを指定する。
このサーバをここではc1と呼ぶ。

gnatsd-c1
$ gnatsd -D -m 8222 -cluster nats://127.0.0.1:4248
[3325] 2016/07/18 01:17:25.560418 [INF] Starting nats-server version 0.9.0.beta
[3325] 2016/07/18 01:17:25.560498 [DBG] Go build version go1.6.2
[3325] 2016/07/18 01:17:25.560550 [INF] Listening for route connections on 127.0.0.1:4248
[3325] 2016/07/18 01:17:25.560746 [INF] Listening for client connections on 0.0.0.0:4222
[3325] 2016/07/18 01:17:25.560785 [DBG] Server id is D15kVknP7pEnrzFdXUUNxm
[3325] 2016/07/18 01:17:25.560788 [INF] Server is ready

続いて2台目のサーバを起動する。
同じサーバ上で起動するので、クラスタ管理用のポートを5248として、
クラスタのルート情報を-routesオプションで指定する。(-pはクライアント接続用のポート番号指定)
このサーバをc2と呼ぶ。

gnatsd-c2
$ gnatsd -D -p 5222 -cluster nats://127.0.0.1:5248 -routes nats://127.0.0.1:4248
[3399] 2016/07/18 01:21:30.001742 [INF] Starting nats-server version 0.9.0.beta
[3399] 2016/07/18 01:21:30.001837 [DBG] Go build version go1.6.2
[3399] 2016/07/18 01:21:30.001893 [INF] Listening for route connections on 127.0.0.1:5248
[3399] 2016/07/18 01:21:30.002069 [INF] Listening for client connections on 0.0.0.0:5222
[3399] 2016/07/18 01:21:30.002120 [DBG] Server id is dq8nTQonZFdFEMyuxtV28C
[3399] 2016/07/18 01:21:30.002124 [INF] Server is ready
[3399] 2016/07/18 01:21:30.002162 [DBG] Trying to connect to route on 127.0.0.1:4248
[3399] 2016/07/18 01:21:30.002497 [DBG] 127.0.0.1:4248 - rid:1 - Route connection created
[3399] 2016/07/18 01:21:30.002534 [DBG] 127.0.0.1:4248 - rid:1 - Route connect msg sent
[3399] 2016/07/18 01:21:30.002801 [DBG] 127.0.0.1:4248 - rid:1 - Registering remote route "D15kVknP7pEnrzFdXUUNxm"
[3399] 2016/07/18 01:21:30.002815 [DBG] 127.0.0.1:4248 - rid:1 - Route sent local subscriptions

起動すると"127.0.0.1:4248 - rid:1 - Route connection created"でルートができていることがわかる。
c1側も同様にルートを認識している。

[3325] 2016/07/18 01:21:30.002512 [DBG] 127.0.0.1:50577 - rid:1 - Route connection created
[3325] 2016/07/18 01:21:30.002918 [DBG] 127.0.0.1:50577 - rid:1 - Registering remote route "dq8nTQonZFdFEMyuxtV28C"
[3325] 2016/07/18 01:21:30.002928 [DBG] 127.0.0.1:50577 - rid:1 - Route sent local subscriptions

3台目も起動してみる。これをc3と呼ぶ。

$ gnatsd -D -p 6222 -cluster nats://127.0.0.1:6248 -routes nats://127.0.0.1:4248
[3566] 2016/07/18 01:25:21.917616 [INF] Starting nats-server version 0.9.0.beta
[3566] 2016/07/18 01:25:21.917731 [DBG] Go build version go1.6.2
[3566] 2016/07/18 01:25:21.917793 [INF] Listening for route connections on 127.0.0.1:6248
[3566] 2016/07/18 01:25:21.918059 [INF] Listening for client connections on 0.0.0.0:6222
[3566] 2016/07/18 01:25:21.918113 [DBG] Server id is 8oa4rnaED3PeFZEVm8UzuT
[3566] 2016/07/18 01:25:21.918117 [INF] Server is ready
[3566] 2016/07/18 01:25:21.918157 [DBG] Trying to connect to route on 127.0.0.1:4248
[3566] 2016/07/18 01:25:21.918549 [DBG] 127.0.0.1:4248 - rid:1 - Route connection created
[3566] 2016/07/18 01:25:21.918582 [DBG] 127.0.0.1:4248 - rid:1 - Route connect msg sent
[3566] 2016/07/18 01:25:21.918852 [DBG] 127.0.0.1:4248 - rid:1 - Registering remote route "D15kVknP7pEnrzFdXUUNxm"
[3566] 2016/07/18 01:25:21.918866 [DBG] 127.0.0.1:4248 - rid:1 - Route sent local subscriptions
[3566] 2016/07/18 01:25:21.919811 [DBG] 127.0.0.1:65325 - rid:2 - Route connection created
[3566] 2016/07/18 01:25:21.920144 [DBG] 127.0.0.1:65325 - rid:2 - Registering remote route "dq8nTQonZFdFEMyuxtV28C"
[3566] 2016/07/18 01:25:21.920159 [DBG] 127.0.0.1:65325 - rid:2 - Route sent local subscriptions

c1c2をルートとして認識している。
c1側とc2側でも新たにc3を認識する。

この状態の場合、どのノードからメッセージを配信してもすべてのノードでメッセージを受け取ることができる。
(もちろん、サブジェクトの一致等のルールには従って。)

セキュリティ

パスワード認証

gnatsd--userオプション--passオプション
または--authオプションをつけて起動するとパスワード認証orトークン認証が有効になる。

$ gnatsd --auth hogetoken

クライアントを接続してみると蹴られる。

$ ./nats-sub subj
Can't connect: nats: authorization violation
gnatsd
[93190] 2016/07/15 21:16:48.334616 [ERR] ::1:56626 - cid:1 - Authorization Error

接続先にトークン情報をつけると認証成功して接続できる。

$ ./nats-sub -s "nats://hoge@127.0.0.1:4222" subj
Listening on [subj]

パスワード認証の場合はhogeのところをuser:passのような形式にすれば問題無い。

TLS

TLSを使ってサーバとクライアントの認証と通信内容自体を暗号化することも可能。

gnatsdの設定ファイルとして以下のようなtls.confを準備する。
今回はCERTファイル等はnats-io/gnatsdのtest/configs/certsディレクトリにあるものを使う。

tls.conf
listen: 127.0.0.1:4443

tls {
  cert_file: "./certs/server-cert.pem"
  key_file:  "./certs/server-key.pem"
  ca_file:   "./certs/ca.pem"
  verify:    true
}
$ gnatsd -D -V -config tls.conf
Starting gnatsd on port 5000
[95447] 2016/07/15 21:48:55.717289 [INF] Starting nats-server version 0.9.0.beta
[95447] 2016/07/15 21:48:55.717450 [DBG] Go build version go1.6.2
[95447] 2016/07/15 21:48:55.717455 [INF] Listening for client connections on 127.0.0.1:4443
[95447] 2016/07/15 21:48:55.717660 [INF] TLS required for client connections
[95447] 2016/07/15 21:48:55.717674 [DBG] Server id is m4w0EmrXB1xPvE5eMxpyX1
[95447] 2016/07/15 21:48:55.717677 [INF] Server is ready

TLSではないクライアントをつないでみともちろん蹴られる。

$ ./nats-sub -s "nats://127.0.0.1:4443" subj
Can't connect: nats: secure connection required

前述のnats-sub.goに以下の変更を加えて、TLS接続してみる。

nats-tls-sub.go.diff
$ diff -u nats-sub.go nats-tls-sub.go
--- nats-sub.go 2016-07-15 16:09:03.000000000 +0900
+++ nats-tls-sub.go 2016-07-18 00:28:36.000000000 +0900
@@ -4,7 +4,10 @@
 package main

 import (
+   "crypto/tls"
+   "crypto/x509"
    "flag"
+   "io/ioutil"
    "log"
    "runtime"

@@ -33,7 +36,25 @@
        usage()
    }

-   nc, err := nats.Connect(*urls)
+   pool := x509.NewCertPool()
+   pemData, err := ioutil.ReadFile("./certs/ca.pem")
+   if err != nil {
+       log.Fatalf("read error for ca.pem: %v\n", err)
+   }
+   pool.AppendCertsFromPEM(pemData)
+
+   cert, err := tls.LoadX509KeyPair("./certs/client-cert.pem", "./certs/client-key.pem")
+   if err != nil {
+       log.Fatalf("tls.LoadX509KeyPair() error=%v", err)
+   }
+
+   config := &tls.Config{
+       ServerName:   "localhost",
+       Certificates: []tls.Certificate{cert},
+       RootCAs:      pool,
+       MinVersion:   tls.VersionTLS12,
+   }
+   nc, err := nats.Connect(*urls, nats.Secure(config))
    if err != nil {
        log.Fatalf("Can't connect: %v\n", err)
    }

これを実行すると接続できていることがわかる。

$ go build nats-tls-sub.go
$ ./nats-tls-sub -s "tls://127.0.0.1:4443" subj
Listening on [subj]
[99264] 2016/07/18 00:28:53.609262 [DBG] 127.0.0.1:62669 - cid:1 - Client connection created
[99264] 2016/07/18 00:28:53.609327 [DBG] 127.0.0.1:62669 - cid:1 - Starting TLS client connection handshake
[99264] 2016/07/18 00:28:53.670571 [DBG] 127.0.0.1:62669 - cid:1 - TLS handshake complete
[99264] 2016/07/18 00:28:53.670590 [DBG] 127.0.0.1:62669 - cid:1 - TLS version 1.2, cipher suite TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
[99264] 2016/07/18 00:28:53.670830 [TRC] 127.0.0.1:62669 - cid:1 - ->> [CONNECT {"verbose":false,"pedantic":false,"tls_required":true,"name":"","lang":"go","version":"1.2.2"}]
[99264] 2016/07/18 00:28:53.670919 [TRC] 127.0.0.1:62669 - cid:1 - ->> [PING]
[99264] 2016/07/18 00:28:53.670926 [TRC] 127.0.0.1:62669 - cid:1 - <<- [PONG]
[99264] 2016/07/18 00:28:53.671085 [TRC] 127.0.0.1:62669 - cid:1 - ->> [SUB subj  1]
[99264] 2016/07/18 00:28:53.671108 [TRC] 127.0.0.1:62669 - cid:1 - ->> [PING]
[99264] 2016/07/18 00:28:53.671114 [TRC] 127.0.0.1:62669 - cid:1 - <<- [PONG]
[99264] 2016/07/18 00:28:56.754385 [DBG] 127.0.0.1:62669 - cid:1 - Client connection closed

その他ユーザ毎の権限設定
Bcrypt使ったパスワードハッシュ化も可能。

クライアント自動切断

クライアント自動切断は2つの仕組みがある。

ping-pongタイムアウト

ひとつはping-pongタイムアウトでクライアントを自動切断する。
タイムアウトの閾値はサーバからのping送信間隔が2分で、サーバからのpingに対して連続2回応答がなければ
その時点でクライアントとの接続を破棄する。

gnatsdに対してtelnet接続し動作を見てみる。

$ telnet 127.0.0.1 4222
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
INFO {"server_id":"Nl3d7EZpgKBUA510V1RmP4","version":"0.9.0.beta","go":"go1.6.2","host":"0.0.0.0","port":4222,"auth_required":false,"ssl_required":false,"tls_required":false,"tls_verify":false,"max_payload":1048576}

    :
    :

PING
PING
-ERR 'Stale Connection'
Connection closed by foreign host.

切れる。以下gnatsdのログ。

[4992] 2016/07/14 21:18:28.669370 [DBG] 127.0.0.1:52013 - cid:1 - Client connection created
[4992] 2016/07/14 21:20:28.678233 [DBG] 127.0.0.1:52013 - cid:1 - Client Ping Timer
[4992] 2016/07/14 21:20:28.678354 [TRC] 127.0.0.1:52013 - cid:1 - <<- [PING]
[4992] 2016/07/14 21:22:28.683716 [DBG] 127.0.0.1:52013 - cid:1 - Client Ping Timer
[4992] 2016/07/14 21:22:28.683793 [TRC] 127.0.0.1:52013 - cid:1 - <<- [PING]
[4992] 2016/07/14 21:24:28.689301 [DBG] 127.0.0.1:52013 - cid:1 - Client Ping Timer
[4992] 2016/07/14 21:24:28.689315 [DBG] 127.0.0.1:52013 - cid:1 - Stale Client Connection - Closing
[4992] 2016/07/14 21:24:28.689476 [DBG] 127.0.0.1:52013 - cid:1 - Client connection closed

現時点ではこの閾値を変更することはできない模様。
設定ファイルに ping_interval ping_max を設定しても、特に有効にならない。おそらく無視されてる。

送信待ちメッセージ閾値越え

もう一つはメッセージ受信側の処理が遅い場合等でクライアントへのメッセージが
gnatsdの送信待ちバッファに溜まっていく。そのバッファの閾値を超えた場合に切断される。
閾値のデフォルト値は10MB。

メッセージをパブリッシュし続けるクライアントをGoで実装しておく。

natssub-burst.go
package main

import (
    "time"

    "github.com/nats-io/nats"
)

const SUBJECT string = "subject"

type Publisher struct {
    NatsConn *nats.Conn
}

func (p *Publisher) Run() {
    for i := 0; i < 1024*1024*5; i++ {
        p.NatsConn.Publish(SUBJECT, []byte("HelloHello"))
    }
    p.NatsConn.Publish(SUBJECT, []byte("end"))
}

func main() {
    nc, _ := nats.Connect(nats.DefaultURL)

    p := &Publisher{NatsConn: nc}
    p.Run()

    time.Sleep(1 * time.Second)
}

またtelnetクライアントで試してみる。

$ telnet 127.0.0.1 4222
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
INFO {"server_id":"6TKF5eJSBqKZTX7RMauOLJ","version":"0.9.0.beta","go":"go1.6.2","host":"0.0.0.0","port":4222,"auth_required":false,"ssl_required":false,"tls_required":false,"tls_verify":false,"max_payload":1048576}
SUB subject 1
+OK

ここで先ほどのクライアントを起動。

$ go run pub-hugemsg.go
MSG subject 1 10
HelloHello
    :
    :
MSG subject 1 10
HelloHello
MSG subject 1 10
Connection closed by foreign host.

gnatsdのログ。

[13771] 2016/07/14 22:32:15.889409 [INF] 127.0.0.1:49911 - cid:2 - Slow Consumer Detected

確かに切断された。

どうもこちらも max_pending_size を設定しても反映されていないので、
まだ閾値変更には対応していないっぽい。

モニタリング

gnatsdに-m ポート番号オプションを指定することでモニタリングを有効にすることができる。

topコマンドライクなnats-topコマンドをインストールすれば、ひとまずモニタリングすることが可能。

$ nats-top

NATS server version 0.9.0.beta (uptime: 8m25s)
Server:
  Load: CPU:  0.0%  Memory: 9.6M  Slow Consumers: 0
  In:   Msgs: 15  Bytes: 162  Msgs/Sec: 2.0  Bytes/Sec: 14
  Out:  Msgs: 6  Bytes: 54  Msgs/Sec: 1.0  Bytes/Sec: 2

Connections Polled: 2
  HOST                 CID      NAME            SUBS    PENDING     MSGS_TO     MSGS_FROM   BYTES_TO    BYTES_FROM  LANG     VERSION  UPTIME   LAST ACTIVITY
  ::1:61187            3                        0       0           0           6           0           63          go       1.2.2    5s       2016-07-14 12:21:07.254618678 +0900 JST
  ::1:61447            4                        0       0           0           3           0           36          go       1.2.2    2s       2016-07-14 12:21:08.129647296 +0900 JST

nats-top以外にもWebUIを備えたツールなどもあります。

モニタリングの仕組み自体は単純で、 http://hostname:port/XXX にHTTPでアクセスするとJSONをXXXに応じたメトリクスを返してくれる。
対象のエンドポイントは以下。

  • /vars : NATSクライアントの接続数やCPU使用率等
  • /connz : NATSクライアントの数やそのクライアント毎の内容
  • /subsz : サブスクライバー数やマッチしたメッセージ数、破棄されたメッセージ数等
  • /routez : クラスタのルート関連

モニタリングの詳細

NATSクライアント

NATSクライアントとしてパブリッシャーやサブスクライバを実装するために様々な言語が使える。公式にサポートされている言語は以下。

  • C
  • C#
  • Elixir
  • Go
  • Java
  • NGINX
  • Node.js
  • Python Asyncio/Tornado
  • Ruby

GoとJavaはNATS Streamingクライアントも存在している。
公式ではないが他のミドルウェア同様コミュニティベースでその他主要な言語がサポートされている。

  • Erlang
  • Haskell
  • Lua
  • PHP
  • Perl
  • Rust
  • Scala

クライアント一覧

ダウンロードページのほうがよくメンテナンスされてる可能性が。

コネクタ

他のミドルウェアとの連携機能を提供するコネクタという仕組みがある。
(単純なミドルウェア間のブリッジなので、仕組みというほどのものでも無いかもしれませんが。。)

公式にはNATS Connector FrameworkというJavaで実装された仕組みがあり、
そのフレームワークの上で動くRedisとNATSを繋ぐプラグインも同時に提供されている。

その他にもコミュニティベースで様々なコネクタが存在する。
こちらはNATS Connector Frameworkの上にのせてはおらず独自に実装されたものが多い。

  • fluent-plugin-nats : FluentdとNATSを繋ぐFluentdプラグイン
  • Logstash : LogstashにNATSメッセージを投げれるコネクタ
  • nats-proxy : HTTP/WebsocketでNATSと話せるコネクタ

NATS Streamingも動かしてみる

さて一通りNATSの機能を見てきて、最後にNATS Streamingの方も試してみます。
まだバージョンが浅くすぐに動きが変わる可能性はありそう。
2016年7月時点での安定バージョンは0.2.0。動作はmasterの最新で確認してみる。コミットハッシュは以下の通り。

ビルド

事前に github.com/gogo/protobuf/gogoprotogo getしておく必要がある。

$ go get -u -v github.com/gogo/protobuf/gogoproto

サーバをビルド

$ git clone https://github.com/nats-io/nats-streaming-server.git
$ cd nats-streaming-server
$ go build

nats-streaming-serverができる。
続いてクライアント。
examplesディレクトリ配下にあるPub/Subクライアントをそれぞれビルドしておく。

$ git clone https://github.com/nats-io/go-nats-streaming.git
$ cd go-nats-streaming/examples
$ go build stan-pub.go
$ go build stan-sub.go

起動

nats-streaming-serverを起動。

$ ./nats-streaming-server -m 8222
[87442] 2016/07/15 20:14:50.605166 [INF] Starting nats-streaming-server[test-cluster] version 0.2.0
[87442] 2016/07/15 20:14:50.605457 [INF] Starting nats-server version 0.9.0.beta
[87442] 2016/07/15 20:14:50.605467 [INF] Listening for client connections on localhost:4222
[87442] 2016/07/15 20:14:50.607194 [INF] Server is ready
[87442] 2016/07/15 20:14:50.944737 [INF] STAN: Message store is MEMORY
[87442] 2016/07/15 20:14:50.944755 [INF] STAN: Maximum of 1000000 will be stored

nats-topを確認するとストリーミングサーバを立ち上げた時点で内部でクラスタ用のクライアントがいる模様。
gnatsdの上に乗ってるというのがここからわかる。

NATS server version 0.9.0.beta (uptime: 4m49s)
Server:
  Load: CPU:  0.0%  Memory: 11.0M  Slow Consumers: 0
  In:   Msgs: 1  Bytes: 45  Msgs/Sec: 0.0  Bytes/Sec: 0
  Out:  Msgs: 0  Bytes: 0  Msgs/Sec: 0.0  Bytes/Sec: 0

Connections Polled: 1
  HOST                 CID      NAME            SUBS    PENDING     MSGS_TO     MSGS_FROM   BYTES_TO    BYTES_FROM  LANG     VERSION  UPTIME   LAST ACTIVITY
  127.0.0.1:65386      2        NATS-Streaming-Server-test-cluster 5       0           0           1           0           45          go       1.2.2    4m49s    2016-07-15 20:15:48.18

Streaming Pub/Sub

まずはSubクライアントがいない状態でPubクライアントからメッセージを送ってみる。

$ ./stan-pub subject.hoge "Hello NATS Streaming"
Published [subject.hoge] : 'Hello NATS Streaming'
$ ./stan-pub subject.hoge "Hello NATS Streaming 2"
Published [subject.hoge] : 'Hello NATS Streaming 2'

ここでSubクライアントを起動。
--allオプションをつけてすべての有効なメッセージを受け取るようにしてみる。

$ ./stan-sub -id STANSUB1 --all subject.hoge
Connected to nats://localhost:4222 clusterID: [test-cluster] clientID: [STANSUB1]
subscribing with DeliverAllAvailable
Listening on [subject.hoge], clientID=[STANSUB1], qgroup=[] durable=[]
[#1] Received on [subject.hoge]: 'sequence:1 subject:"subject.hoge" data:"Hello NATS Streaming" timestamp:1468581922036801904 '
[#2] Received on [subject.hoge]: 'sequence:2 subject:"subject.hoge" data:"Hello NATS Streaming 2" timestamp:1468581932488709393 '

NATSでのPub/Subではパブリッシュした時点でサブスクライブしていないとメッセージを受信することはできなかったが、
NATS Streamingでは受信できている。

--last オプションをつけて最後のメッセージだけ受け取ることもできる。
実装を見切れてはいませんが、NATS Streaming側で最後のメッセージだけを配信するという仕組みが提供できているよう。
stan-sub.goのオプションみると、
メッセージの期限設定や期限チェックして有効であれば受信するといったこともできる感じ。

メッセージストア

nats-streaming-serverはデフォルトではインメモリでメッセージストアしているが、
-store FILE -dir ${DIRNAME}を指定するとDIRNAMEなディレクトリ作成してその配下にファイルで
メッセージをストアできる。

$ ./nats-streaming-server -m 8222 -store FILE -dir streaminglogs
            :
[91285] 2016/07/15 20:45:08.489025 [INF] STAN: Message store is FILE
            :
$ ls -ltr streaminglogs/*
-rw-r--r--  1 hattori-h  staff  195  7 15 20:43 streaminglogs/server.dat
-rw-r--r--  1 hattori-h  staff   71  7 15 20:45 streaminglogs/clients.dat

streaminglogs/subject.hoge:
total 48
-rw-r--r--  1 hattori-h  staff   4  7 15 20:45 subs.dat
-rw-r--r--  1 hattori-h  staff   4  7 15 20:45 msgs.5.dat
-rw-r--r--  1 hattori-h  staff   4  7 15 20:45 msgs.4.dat
-rw-r--r--  1 hattori-h  staff   4  7 15 20:45 msgs.3.dat
-rw-r--r--  1 hattori-h  staff   4  7 15 20:45 msgs.2.dat
-rw-r--r--  1 hattori-h  staff  62  7 15 20:45 msgs.1.dat

現時点ではメッセージストアはメモリ(MEMORY)かファイル(FILE)しか指定できない。

その他

関連記事

NATS作者のDerek Collisonさんが書かれた記事がいくつかある。

YouTubeで"NATS"とかで検索すると割とカンファレンスとかの動画が見つかります。

日本語記事

最後に

NATSについてひととおりの機能を試しながら紹介しました。
もう少しNATS Streamingの詳細やクラスタリングの動作を掘り下げていったり、
Goクライアントの紹介や実際にNATSを使用したサンプルアプリケーションの実装なんかも今後試してみたいところです。

少し調べ切れていないところや単語の表記ゆれ等があるかと思いますので、ご指摘等コメントいただければ。