24
18

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

aptpodAdvent Calendar 2018

Day 19

俺の MQTTブローカー がこんなに遅いわけがない

Last updated at Posted at 2018-12-19

はじめに

Aptpod Advent Calendar 19日目、前回から2日しか経っていませんが、またまた 何でも屋 岩田 がお届けします。

前回からお決まりのものを引用。(前回は IEEE1588 PTP について書きました

intdash は、 IoT でよくある 軽量・超多数のデバイス(だけどデータの発生頻度は低め) というよりは、単体でも高頻度にデータを発生させる重厚なデバイス(だけどデバイス数は少なめ) をターゲットにした、ちょっと変わったプラットフォームです。(自動車やロボットの研究開発部門などを中心にご利用いただいています)

今回は、 送信データが高頻度である低レイテンシに届けたい が故にぶつかった、MQTTブローカーのちょっと変わった挙動について書きます。

TL; DR

やること

  • docker で MQTTブローカー を立てて、Publish ~ Subscribe 間の遅延(エンドツーエンド遅延)について計測します。
  • influxDB + grafana を使って、サクッと簡単に計測結果を可視化します。

わかること

  • docker を使った MQTTブローカー の立て方
  • golang を使った MQTTクライアント の作り方
  • influxDB + grafana を使った簡単な可視化

MQTT 興味ないよ、って方にも読んでいただけるように、 grafana についてもちょこっと書きます。

以降の流れ

変わった挙動 についてだけ知りたい方は、 まとめ にすっ飛んで大丈夫です。

計測環境の構築

必要なファイルは、 すべて ryskiwt/mqtt-compare に置いてありますので、まずはこちらを clone してきます。

$ git clone https://github.com/ryskiwt/mqtt-compare.git

環境一式を docker で構築します。 docker-compose.yml は こちら をご参照ください。この docker-compose.yml によって、以下のコンテナが起動します。

  • influxDB, grafana (可視化用)
  • mosquitto (MQTTブローカー)
$ cd path/to/repository
$ docker-compose up -d

ここでは、詳細については触れませんが、多少解説しておきます。まず、各コンテナのポートを以下のようにバインドしてあります。

コンテナ ホストポート ゲストポート
grafana 3000 3000
influx 8086 8086
mosquitto 1883 1883

また、 grafana については、初期設定がちょっと面倒なので、 grafana ディレクトリを volume としてマウントしています。

MQTTブローカーの構築

MQTTブローカー( mosquitto ) については、 docker ディレクトリ に Dockerfile を配置してあるので、 docker-compose でコンテナを起動した時点で、すでにブローカーとして機能するようになっています。

可視化環境の構築

計測した結果を簡単に見るのに、テキストファイルに書き出して python でグラフ化...というのもよくある方法かと思いますが、今回はサクッと手軽に確認するために、 influxDB と grafana を使います。

さきほどの docker-compose によって、 http://localhost:3000grafana が起動しているので、アクセスして初期設定を行います。デフォルトの username/passwordadmin/admin になっているようです。ログイン後にパスワード変更が出てきますが、よしなに設定してあげてください。

Image from Gyazo

まずは 一緒に起動した influx を Data Source として登録します。(詳しい手順は、どこか他の記事を参照してくださいmm) 設定する内容は、 docker-compose.yml に合わせて、以下のようにします。

項目 設定値
Name 適当(ここでは test
Type InfluxDB
URL http://influx:8086
Database dbname
User username
Password password

入力が完了したら、 Save & Test ボタンを押して、正常に登録されたことを確認します。

Image from Gyazo

次に、ダッシュボードを設定します。ダッシュボードの設定は、あとから出てくる計測用クライアントに合わせて、以下のようにします。

Image from Gyazo

↑は、単純に measurement という計測から、 受信時刻 rx と、送信時刻 tx を取得して、その差分(=レイテンシ) を表示するだけの設定です。

Image from Gyazo

↑は、単位を nanoseconds にし、軸の上限を 100e6 (=100ms) に設定しています。

Image from Gyazo

↑は、現在時刻から 30s の範囲を、 5s 間隔で自動更新する設定です。

ここまでで、可視化のための準備が整いました。

計測用クライアントの作成

docker-compose によって、 MQTTブローカー、可視化環境が構築できたわけですが、MQTTブローカーにデータを送受信するクライアントがないと計測が始められません。

今回使用するのは、 こちらのサンプルコード です。以下で多少解説します。

MQTTクライアントの生成

今回は、 github.com/eclipse/paho.mqtt.golang を使います。

//
// client
//

opts := mqtt.NewClientOptions()
opts.AddBroker(url)
opts.SetTLSConfig(&tls.Config{
	InsecureSkipVerify: true,
	ClientAuth:         tls.NoClientCert,
})

client := mqtt.NewClient(opts)
defer client.Disconnect(250)

if err := wrap(client.Connect()); err != nil {
	return err
}

Publish

interval 間隔で、ペイロードに インデックス送信時刻 を格納して送信します。

//
// publish
//

buf := bytes.NewBuffer(make([]byte, 8+8))
eg.Go(func() error {
	ticker := time.NewTicker(interval)
	defer ticker.Stop()

	for i := 0; i < total; i++ {
		select {
		case <-ctx.Done():
			return nil

		case <-ticker.C:
			tx := time.Now()
			binary.Write(buf, binary.BigEndian, uint64(i))
			binary.Write(buf, binary.BigEndian, uint64(tx.UnixNano()))
			if err := wrap(client.Publish(topic, qos, false, buf.Bytes())); err != nil {
				return err
			}
			buf.Reset()
		}
	}

	return nil
})

Subscribe

ペイロードから インデックス送信時刻 を取り出し、受信時刻と合わせて、 influx への書き込み用 goroutine へ流します。

//
// subscribe
//

writeC := make(chan dataset, 256)
callback := func(client mqtt.Client, msg mqtt.Message) {
	rd := bytes.NewReader(msg.Payload())
	var i, t uint64
	binary.Read(rd, binary.BigEndian, &i)
	binary.Read(rd, binary.BigEndian, &t)
	tx := time.Unix(0, int64(t))
	writeC <- dataset{tx: tx, rx: time.Now()}

	if int(i) == total-1 {
		close(writeC)
	}
}
if err := wrap(client.Subscribe(topic, qos, callback)); err != nil {
	return err
}
defer client.Unsubscribe(topic)

InfluxDB への書き込み

Subscribe用 の goroutine から受け取ったデータを、1秒ごとに influxDBへ書き込みます。

//
// influx
//

eg.Go(func() error {
	client, err := influx.NewHTTPClient(influx.HTTPConfig{
		Addr:     "http://localhost:8086",
		Username: "username",
		Password: "password",
	})
	if err != nil {
		return err
	}
	defer client.Close()

	bpConfig := influx.BatchPointsConfig{
		Database:  "dbname",
		Precision: "ns",
	}
	bps, _ := influx.NewBatchPoints(bpConfig)
	defer client.Write(bps)

	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return nil

		case ds, ok := <-writeC:
			if !ok {
				return nil
			}

			fields := map[string]interface{}{
				"tx": ds.tx.UnixNano(),
				"rx": ds.rx.UnixNano(),
			}
			point, _ := influx.NewPoint("measurement", nil, fields, ds.tx)
			bps.AddPoint(point)

		case <-ticker.C:
			if err := client.Write(bps); err != nil {
				return err
			}
			bps, _ = influx.NewBatchPoints(bpConfig)
		}
	}
})

動作確認

grafana の画面を見ながら、クライアントを実行してみます。

$ cd path/to/repository
$ go run main.go

こんな感じで、グラフが表示されれば成功です。
(grafanaの最短更新間隔が5秒で、Gyazo無料版の最長録画時間が6秒だったので、ちょっとしか録れませんでしたw)

Image from Gyazo

実際に計測してみる

  • Publish間隔 100ms
    Image from Gyazo

  • Publish間隔 10ms
    Image from Gyazo

!?

うわっ… 私のMQTTブローカー…遅すぎ…?

何が起きたのか

Publish間隔 100ms では特に不穏な動きはありませんでしたが、 10ms にすると(高頻度にすると)、突如10ms程度の遅延が加算されるようになりました。

今回、ブローカーやクライアントを実行してるのは、すべてローカルのPC上なので、この環境下で10msもの遅延が起きるはずはありません。何かがおかしい。

実はこの MQTTブローカーが突然遅くなる という現象、弊社でブローカーの性能比較を行う際にも何度か遭遇したことがあり、悩みのタネになっていました。MQTTブローカーの中でも、発生するものとしないものがあり、どうやら MQTTプロトコルが原因ではなく、MQTTブローカーの実装に原因がありそうです。

こちらについて調査するためにネットの海をさまよっていたところ、偶然こんな記述を見つけました。

The delay can be fixed by activating the TCP option TCP_NODELAY which disables Nagle algorithm for the socket.

Google翻訳

遅延は、ソケットのNagleアルゴリズムを無効にするTCPオプションTCP_NODELAYをアクティブにすることで修正できます。

なんと、MQTTブローカーのせいではなく、 TCPのNagleアルゴリズムが悪さをしているらしい、とのこと。

さらに先を読み進めてみると、

I'm quite conflicted here. The operation as it is at the moment is exactly the correct behaviour. Nagle exists for a reason, we don't want to generate lots of small packets.

Disabling Nagle can give you a reduction in latency, but will likely also introduce a reduction in throughput. I personally think that throughput is more important for the broker than latency, but that is of course entirely dependent on your workload and model.

Google翻訳

私はここでかなり葛藤している。現時点での操作はまさに正しい動作です。 Nagleは理由のために存在するため、小さなパケットをたくさん生成したくありません。

Nagleを無効にするとレイテンシが減少しますが、スループットが低下する可能性もあります。私は個人的には、スループットがブローカよりも待ち時間よりも重要だと考えていますが、それはもちろんワークロードとモデルに完全に依存しています。

開発者の葛藤が見えます。レイテンシよりもスループットが大事だと。
しかし結局、 set_tcp_nodelay オプションの追加、という形で決着したようです。

Could you try out the code in the develop branch please? You'll need to set the set_tcp_nodelay option.

Google翻訳

開発ブランチのコードを試してみてください。 set_tcp_nodelayオプションを設定する必要があります。

ちなみに、上の issue の中でも言及されていますが、 emqtt ではこちらは発生しないんだそうです。(emqtt にも Nagleアルゴリズム を強制オフするオプションがあります。デフォルトでオンになってる?)

I did the same test with emqttd and there is no delay at all.

Google翻訳

私はemqttdと同じテストを行い、全く遅延はありません。

再計測

以上を踏まえて、 set_tcp_nodelay オプションを有効化して、再度計測をしてみましょう。(set_tcp_nodelayv1.5 からなので、最新版をインストールしないと無効なエラー扱いになって落ちます)

具体的には、 docker/mosquitto/mosquitto.conf に、以下の行を追加します。

mosquitto.conf
set_tcp_nodelay true

docker を restart します。

$ cd path/to/repository
$ docker-compose down && docker-compose build && docker-compose up -d

Image from Gyazo

はい、やはり、俺のMQTTブローカーがこんなに遅いわけないですね。ちゃんと安定して流れるようになりました。

まとめ

  • docker で MQTTブローカー と grafana をサクッと立てて、エンドツーエンドの遅延を測った
  • 10ms間隔で Publish したところ、ローカル環境で閉じているにも関わらず、 10ms の遅延がのってくることがあった
  • MQTTプロトコルが原因ではなく、 TCPのNagleアルゴリズム が悪さをしていた
  • mosquitto も、最新版では set_tcp_nodelay オプションによって、 Nagleアルゴリズムを強制オフできることがわかった
  • Nagleアルゴリズムを強制オフして測ると、遅延は解消した

弊社のように、10ms間隔なんて高粒度なデータを流したり、スループットよりもレイテンシ!なんて トチ狂った 人たちはあまり多くはないと思いますが、みなさんも、MQTTブローカーを利用する際は Nagleアルゴリズム にくれぐれもご注意ください。

めでたしめでたし。

24
18
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
24
18

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?