はじめに
Aptpod Advent Calendar 19日目、前回から2日しか経っていませんが、またまた 何でも屋 岩田 がお届けします。
前回からお決まりのものを引用。(前回は IEEE1588 PTP について書きました)
intdash は、 IoT でよくある 軽量・超多数のデバイス(だけどデータの発生頻度は低め) というよりは、単体でも高頻度にデータを発生させる重厚なデバイス(だけどデバイス数は少なめ) をターゲットにした、ちょっと変わったプラットフォームです。(自動車やロボットの研究開発部門などを中心にご利用いただいています)
今回は、 送信データが高頻度である
、 低レイテンシに届けたい
が故にぶつかった、MQTTブローカーのちょっと変わった挙動について書きます。
TL; DR
やること
- docker で MQTTブローカー を立てて、Publish ~ Subscribe 間の遅延(エンドツーエンド遅延)について計測します。
- influxDB + grafana を使って、サクッと簡単に計測結果を可視化します。
わかること
- docker を使った MQTTブローカー の立て方
- golang を使った MQTTクライアント の作り方
- influxDB + grafana を使った簡単な可視化
MQTT 興味ないよ、って方にも読んでいただけるように、 grafana についてもちょこっと書きます。
以降の流れ
変わった挙動
についてだけ知りたい方は、 まとめ にすっ飛んで大丈夫です。
計測環境の構築
必要なファイルは、 すべて ryskiwt/mqtt-compare に置いてありますので、まずはこちらを clone してきます。
$ git clone https://github.com/ryskiwt/mqtt-compare.git
環境一式を docker で構築します。 docker-compose.yml は こちら をご参照ください。この docker-compose.yml によって、以下のコンテナが起動します。
- influxDB, grafana (可視化用)
- mosquitto (MQTTブローカー)
$ cd path/to/repository
$ docker-compose up -d
ここでは、詳細については触れませんが、多少解説しておきます。まず、各コンテナのポートを以下のようにバインドしてあります。
コンテナ | ホストポート | ゲストポート |
---|---|---|
grafana | 3000 | 3000 |
influx | 8086 | 8086 |
mosquitto | 1883 | 1883 |
また、 grafana
については、初期設定がちょっと面倒なので、 grafana ディレクトリを volume としてマウントしています。
MQTTブローカーの構築
MQTTブローカー( mosquitto
) については、 docker ディレクトリ に Dockerfile を配置してあるので、 docker-compose でコンテナを起動した時点で、すでにブローカーとして機能するようになっています。
可視化環境の構築
計測した結果を簡単に見るのに、テキストファイルに書き出して python でグラフ化...というのもよくある方法かと思いますが、今回はサクッと手軽に確認するために、 influxDB と grafana を使います。
さきほどの docker-compose によって、 http://localhost:3000 で grafana
が起動しているので、アクセスして初期設定を行います。デフォルトの username/password
は admin/admin
になっているようです。ログイン後にパスワード変更が出てきますが、よしなに設定してあげてください。
まずは 一緒に起動した influx を Data Source として登録します。(詳しい手順は、どこか他の記事を参照してくださいmm) 設定する内容は、 docker-compose.yml に合わせて、以下のようにします。
項目 | 設定値 |
---|---|
Name | 適当(ここでは test ) |
Type | InfluxDB |
URL | http://influx:8086 |
Database | dbname |
User | username |
Password | password |
入力が完了したら、 Save & Test
ボタンを押して、正常に登録されたことを確認します。
次に、ダッシュボードを設定します。ダッシュボードの設定は、あとから出てくる計測用クライアントに合わせて、以下のようにします。
↑は、単純に measurement
という計測から、 受信時刻 rx
と、送信時刻 tx
を取得して、その差分(=レイテンシ) を表示するだけの設定です。
↑は、単位を nanoseconds
にし、軸の上限を 100e6
(=100ms) に設定しています。
↑は、現在時刻から 30s
の範囲を、 5s
間隔で自動更新する設定です。
ここまでで、可視化のための準備が整いました。
計測用クライアントの作成
docker-compose によって、 MQTTブローカー、可視化環境が構築できたわけですが、MQTTブローカーにデータを送受信するクライアントがないと計測が始められません。
今回使用するのは、 こちらのサンプルコード です。以下で多少解説します。
MQTTクライアントの生成
今回は、 github.com/eclipse/paho.mqtt.golang
を使います。
//
// client
//
opts := mqtt.NewClientOptions()
opts.AddBroker(url)
opts.SetTLSConfig(&tls.Config{
InsecureSkipVerify: true,
ClientAuth: tls.NoClientCert,
})
client := mqtt.NewClient(opts)
defer client.Disconnect(250)
if err := wrap(client.Connect()); err != nil {
return err
}
Publish
interval
間隔で、ペイロードに インデックス
と 送信時刻
を格納して送信します。
//
// publish
//
buf := bytes.NewBuffer(make([]byte, 8+8))
eg.Go(func() error {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for i := 0; i < total; i++ {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
tx := time.Now()
binary.Write(buf, binary.BigEndian, uint64(i))
binary.Write(buf, binary.BigEndian, uint64(tx.UnixNano()))
if err := wrap(client.Publish(topic, qos, false, buf.Bytes())); err != nil {
return err
}
buf.Reset()
}
}
return nil
})
Subscribe
ペイロードから インデックス
と 送信時刻
を取り出し、受信時刻と合わせて、 influx への書き込み用 goroutine へ流します。
//
// subscribe
//
writeC := make(chan dataset, 256)
callback := func(client mqtt.Client, msg mqtt.Message) {
rd := bytes.NewReader(msg.Payload())
var i, t uint64
binary.Read(rd, binary.BigEndian, &i)
binary.Read(rd, binary.BigEndian, &t)
tx := time.Unix(0, int64(t))
writeC <- dataset{tx: tx, rx: time.Now()}
if int(i) == total-1 {
close(writeC)
}
}
if err := wrap(client.Subscribe(topic, qos, callback)); err != nil {
return err
}
defer client.Unsubscribe(topic)
InfluxDB への書き込み
Subscribe用 の goroutine から受け取ったデータを、1秒ごとに influxDBへ書き込みます。
//
// influx
//
eg.Go(func() error {
client, err := influx.NewHTTPClient(influx.HTTPConfig{
Addr: "http://localhost:8086",
Username: "username",
Password: "password",
})
if err != nil {
return err
}
defer client.Close()
bpConfig := influx.BatchPointsConfig{
Database: "dbname",
Precision: "ns",
}
bps, _ := influx.NewBatchPoints(bpConfig)
defer client.Write(bps)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case ds, ok := <-writeC:
if !ok {
return nil
}
fields := map[string]interface{}{
"tx": ds.tx.UnixNano(),
"rx": ds.rx.UnixNano(),
}
point, _ := influx.NewPoint("measurement", nil, fields, ds.tx)
bps.AddPoint(point)
case <-ticker.C:
if err := client.Write(bps); err != nil {
return err
}
bps, _ = influx.NewBatchPoints(bpConfig)
}
}
})
動作確認
grafana の画面を見ながら、クライアントを実行してみます。
$ cd path/to/repository
$ go run main.go
こんな感じで、グラフが表示されれば成功です。
(grafanaの最短更新間隔が5秒で、Gyazo無料版の最長録画時間が6秒だったので、ちょっとしか録れませんでしたw)
実際に計測してみる
!?
うわっ… 私のMQTTブローカー…遅すぎ…?
何が起きたのか
Publish間隔 100ms では特に不穏な動きはありませんでしたが、 10ms にすると(高頻度にすると)、突如10ms程度の遅延が加算されるようになりました。
今回、ブローカーやクライアントを実行してるのは、すべてローカルのPC上なので、この環境下で10msもの遅延が起きるはずはありません。何かがおかしい。
実はこの MQTTブローカーが突然遅くなる
という現象、弊社でブローカーの性能比較を行う際にも何度か遭遇したことがあり、悩みのタネになっていました。MQTTブローカーの中でも、発生するものとしないものがあり、どうやら MQTTプロトコルが原因ではなく、MQTTブローカーの実装に原因がありそうです。
こちらについて調査するためにネットの海をさまよっていたところ、偶然こんな記述を見つけました。
The delay can be fixed by activating the TCP option TCP_NODELAY which disables Nagle algorithm for the socket.
Google翻訳
遅延は、ソケットのNagleアルゴリズムを無効にするTCPオプションTCP_NODELAYをアクティブにすることで修正できます。
なんと、MQTTブローカーのせいではなく、 TCPのNagleアルゴリズムが悪さをしているらしい、とのこと。
さらに先を読み進めてみると、
I'm quite conflicted here. The operation as it is at the moment is exactly the correct behaviour. Nagle exists for a reason, we don't want to generate lots of small packets.
Disabling Nagle can give you a reduction in latency, but will likely also introduce a reduction in throughput. I personally think that throughput is more important for the broker than latency, but that is of course entirely dependent on your workload and model.
Google翻訳
私はここでかなり葛藤している。現時点での操作はまさに正しい動作です。 Nagleは理由のために存在するため、小さなパケットをたくさん生成したくありません。
Nagleを無効にするとレイテンシが減少しますが、スループットが低下する可能性もあります。私は個人的には、スループットがブローカよりも待ち時間よりも重要だと考えていますが、それはもちろんワークロードとモデルに完全に依存しています。
開発者の葛藤が見えます。レイテンシよりもスループットが大事だと。
しかし結局、 set_tcp_nodelay
オプションの追加、という形で決着したようです。
Could you try out the code in the develop branch please? You'll need to set the set_tcp_nodelay option.
Google翻訳
開発ブランチのコードを試してみてください。 set_tcp_nodelayオプションを設定する必要があります。
ちなみに、上の issue の中でも言及されていますが、 emqtt ではこちらは発生しないんだそうです。(emqtt
にも Nagleアルゴリズム を強制オフするオプションがあります。デフォルトでオンになってる?)
I did the same test with emqttd and there is no delay at all.
Google翻訳
私はemqttdと同じテストを行い、全く遅延はありません。
再計測
以上を踏まえて、 set_tcp_nodelay
オプションを有効化して、再度計測をしてみましょう。(set_tcp_nodelay
は v1.5
からなので、最新版をインストールしないと無効なエラー扱いになって落ちます)
具体的には、 docker/mosquitto/mosquitto.conf に、以下の行を追加します。
set_tcp_nodelay true
docker を restart します。
$ cd path/to/repository
$ docker-compose down && docker-compose build && docker-compose up -d
はい、やはり、俺のMQTTブローカーがこんなに遅いわけないですね。ちゃんと安定して流れるようになりました。
まとめ
- docker で MQTTブローカー と grafana をサクッと立てて、エンドツーエンドの遅延を測った
- 10ms間隔で Publish したところ、ローカル環境で閉じているにも関わらず、 10ms の遅延がのってくることがあった
- MQTTプロトコルが原因ではなく、
TCPのNagleアルゴリズム
が悪さをしていた - mosquitto も、最新版では
set_tcp_nodelay
オプションによって、 Nagleアルゴリズムを強制オフできることがわかった - Nagleアルゴリズムを強制オフして測ると、遅延は解消した
弊社のように、10ms間隔なんて高粒度なデータを流したり、スループットよりもレイテンシ!なんて トチ狂った 人たちはあまり多くはないと思いますが、みなさんも、MQTTブローカーを利用する際は Nagleアルゴリズム にくれぐれもご注意ください。
めでたしめでたし。