ベンチマーク
mqtt
mosquitto
IoT

mosquittoの性能に関するあまり知られていない7つの注意点

More than 1 year has passed since last update.

2017/6/30追記 ペイロードサイズを変化させた場合を追記し、まとめを修正しました。

この記事ではAWS ec2上のCentOS7にインストールしたmosquitto1.5(develop)とGo1.8を使っています。

mosquittoのベンチマーク結果はチラホラ見かけますが、数万同時接続を前提とした結果はあまり見かけないため、やってみました。いろいろと面白い結果が得られたので共有します。

なお、6万接続以上を評価しようとしてハマってしまったのですが、そのハマりを一般化した情報は「AWS ec2 t2タイプには51300コネクションしか繋がらない」で公開しましたので、そちらを参照してください。このため、この記事では最大5万接続としています。


1. ベンチマーク環境

以下の環境を共通にしてベンチマークします。


  • 構成は、送受信クライアント(Go+paho)on t2.medium 〜mosquitto on t2.medium

  • 送信部はマルチスレッド、多数同時接続あり

  • PUBLISH内容は、トピック:6バイト、ペイロード:6バイト

  • 受信部はマルチスレッド(Paho組み込み)、1接続、ワイルドカードトピックをSUBSCRIBE

また特に明記しない限り、以下をデフォルトとします。


  • 送信スレッドは100多重

  • 送信メッセージ数は100万

  • 受信QoS=0

  • mosquitto.confはデフォルトのまま

1回のベンチマーク実行後は、適宜リブートをしました。「AWS ec2 t2タイプには51300コネクションしか繋がらない」の影響からか、多数接続の試験直後は(TIME_WAIT等のコネクションの残り火が消えたのを確認したとしても)、安定しないように思われましたので。

送受信クライアント(一体型)のソースは以下です。接続数、メッセージ数、送信QoSは、起動時のコマンドラインでカスタマイズ可能です。その他のカスタマイズは、直接ソースを編集しておこないました。

なお、QoS=0での同時大量メッセージ送信をすると、mosquitto側のバッファが大量になり、うまく受信側にメッセージが流れないようでしたので、送信クライアント側にシェーピング機構を設けています。


mqtt_bench.go

package main

import (
"fmt"
//import the Paho Go MQTT library
MQTT "github.com/eclipse/paho.mqtt.golang"
"os"
"time"
"strconv"
"sync"
)

var count int
var m *sync.Mutex
const maxThreads = 100

//define a function for the default message handler
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
// n, _ := strconv.Atoi(msg.Topic())
m.Lock()
count++
if count % 10000 == 0 {
fmt.Println(count, "Received.")
}
m.Unlock()
// fmt.Println("%05d", total_count, "TOPIC:", msg.Topic(), "MSG:", string(msg.Payload()))
}

func main() {
// confirm arg
maxConns := 100 // arg #1
numPublish := 100 // arg #2
mqtthost := "localhost:1883" // arg #3
qos := 0 // arg #4 (apply qos to Publish connections only)
if len(os.Args) >= 2 {
maxConns, _ = strconv.Atoi(os.Args[1])
}
if len(os.Args) >= 3 {
numPublish, _ = strconv.Atoi(os.Args[2])
}
if len(os.Args) >= 4 {
mqtthost = os.Args[3]
}
if len(os.Args) >= 5 {
qos, _ = strconv.Atoi(os.Args[4])
}
mqtthost = "tcp://" + mqtthost
fmt.Println("maxConns :=", maxConns, "numPublish :=", numPublish, "mqtthost :=", mqtthost, "qos :=", qos)

//connect Sub
fmt.Println("Connecting & Subscribing Sub...")

opts := MQTT.NewClientOptions().AddBroker(mqtthost)
opts.SetDefaultPublishHandler(f)
// opts.SetKeepAlive(3600 * time.Second)
// opts.SetPingTimeout(60 * time.Second)
opts.SetClientID("Sub")
cs := MQTT.NewClient(opts)
if token := cs.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}

//subscribe Sub
if token := cs.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}

//connect Pub (multithreads version)
fmt.Println("Connecting Pubs...")
time_start1 := time.Now()

cp := make([]MQTT.Client, maxConns)
count = 0
m = new(sync.Mutex)

var wg sync.WaitGroup
for val := 0; val < maxThreads; val++ {
wg.Add(1)
go func(i int) {
for j := 0; j <= maxConns / maxThreads; j++ {
conns := j * maxThreads + i
if conns >= maxConns {
break
}
opts := MQTT.NewClientOptions().AddBroker(mqtthost)
opts.SetClientID(fmt.Sprintf("Pub/%07d", conns))
// opts.SetKeepAlive(3600 * time.Second)
// opts.SetPingTimeout(60 * time.Second)
cp[conns] = MQTT.NewClient(opts)
if token := cp[conns].Connect(); token.Wait() && token.Error() != nil {
fmt.Println("NO.", conns, "failed. Because, ", token.Error())
time.Sleep(3600 * time.Second)
panic(token.Error())
}
m.Lock()
count++
if count % 1000 == 0 {
fmt.Println(count, "Pubs Connected.")
}
m.Unlock()
}
wg.Done()
} (val)
}
wg.Wait()
if count % 1000 > 0 {
fmt.Println(count, "Pubs Connected.")
}
time_stop1 := time.Now()

//publish numPublish times (multithreads version)
time_start2 := time.Now()
fmt.Println("Publish started.")
count = 0
count2 := 0
m2 := new(sync.Mutex)

for val := 0; val < maxThreads; val++ {
wg.Add(1)
go func(i int) {
for j := 0; j <= numPublish / maxThreads; j++ {
num := j * maxThreads + i
if num >= numPublish {
break
}
token := cp[num % maxConns].Publish(fmt.Sprintf("%06d", num % maxConns), byte(qos), false, fmt.Sprintf("%06d",num / maxConns))
token.Wait()
m2.Lock()
count2++
if count2 % 10000 == 0 {
fmt.Println(count2, "Published.")
if count2 > count + 100000 {
fmt.Println("Shaping...")
time.Sleep(500 * time.Millisecond)
}
}
m2.Unlock()
}
wg.Done()
} (val)
}
wg.Wait()
if count2 % 10000 > 0 {
fmt.Println(count2, "Published.")
}
time_stop2 := time.Now()

// wait max 30s idle timeout and stop timer
count3 := count
time_start3 := time_start2
time_stop3 := time.Now()
for idle := 0; idle < 3000; idle++ {
if count > count3 {
time_stop3 = time.Now()
count3 = count
idle = 0
}
if count >= numPublish {
break
}
time.Sleep(10 * time.Millisecond)
}
fmt.Println("Publish finished.")

fmt.Println("Disconnecting Pubs...")
time_start4 := time.Now()

//disconnect Pub
for i := 0; i < maxConns; i++ {
cp[i].Disconnect(250)
if (i + 1) % 1000 == 0 {
fmt.Println(i + 1, "Pubs Disconnected.")
}
}
time_stop4 := time.Now()

// fmt.Println("Unsubscribing & Disconnecting Sub...")

//unsubscribe & disconnect Sub
// if token := cs.Unsubscribe("#"); token.Wait() && token.Error() != nil {
// fmt.Println(token.Error())
// os.Exit(1)
// }
cs.Disconnect(250)

// print result
duration1 := time_stop1.Sub(time_start1).Seconds()*1000
duration2 := time_stop2.Sub(time_start2).Seconds()*1000
duration3 := time_stop3.Sub(time_start3).Seconds()*1000
duration4 := time_stop4.Sub(time_start4).Seconds()*1000
fmt.Println("-------")
fmt.Println("maxConns :=", maxConns, "numPublish :=", numPublish, "mqtthost :=", mqtthost, "qos :=", qos)
fmt.Println(count, "received on", maxConns, "connections,", count * 100 / numPublish, "% success.")
fmt.Println("TOTAL connecting time:", int(duration1), "msec", int(float64(maxConns) / duration1 * 1000), "conns/s")
fmt.Println("TOTAL msgs send time:", int(duration2), "msec", int(float64(numPublish) / duration2 * 1000), "msgs/s")
fmt.Println("TOTAL msgs receive time:", int(duration3), "msec", int(float64(count) / duration3 * 1000), "msgs/s")
fmt.Println("TOTAL disconnecting time:", int(duration4), "msec", int(float64(maxConns) / duration4 * 1000), "conns/s")
}



2. 結果と考察


2.1. 同時接続数を変化させた場合

デフォルト条件(送信100多重、100万メッセージ)で、同時接続数を変化させてみました。CPU%とMEM%は、mosquitto側でtopをとって目分量でピーク値を出しています。


2.1.1. 送信QoS=0の場合

同時接続数
100
1000
1万
5万

接続conn/sec
10,469
7,424
6,680
3,014

送信msg/sec
55,956
45,070
40,524
25,967

受信msg/sec
53,205
43,182
30,083
21,117

切断conn/sec
23,941
147
18,649
24,996

受信成功率
100%
100%
100%
99.9%

CPU%
48%
50%
100%
100%

MEM%
0.4%
0.2%
0.4%
1.2%

同時接続数が増えると、接続/送信/受信が類似の傾向で遅くなっていくようです。特に1万から5万にかけて、落ち込みが激しいです。1万の時点でCPUが100%になっていることから、mosquitto側のCPU性能限界となっているようです。同時接続数が増えるにしたがって、接続/送信/受信の1接続あたりのCPU処理量が増えているように思われます。moqtuittoのソースを見ると、ソケットからのメッセージ取得にpoll()システムコールを使っており、epoll()には対応していないようです。このため、同時接続数の増加により、メッセージ検索のオーバーヘッドが拡大して、だんだんと遅くなるのではないかと推測します。

一方、切断に関しては、同時接続数の影響はあまり見られません。同時接続1000で値が落ち込んでいるのは、時々見られる「固まった状態」によるもので、たまたまのようです。そもそも切断については、このくらいの量の試験では、絶対時間が短すぎて、有効な結果は得られません。参考程度に見てください。

CPU%が100%で頭打ちなのは、mosquittoが1CPUしか使えていないからであると思われます。t2.mediumは2CPUなので、最大値は200%なのです。mosquittoのソースを見ると、pthread_create()がクライアントにしか使われていません。ブローカーは非同期ではあるもののシングルスレッドで動作しているようです。nginxなど最近の潮流は、非同期+マルチスレッドワーカープロセスなので、mosquittoはやや作り方が古いですね。

MEM%は同時接続数に応じて微増します。コネクション管理テーブルのための消費と思われます。しかし、絶対値としてはわずかなもので、mosquittoは、かなり効率的なメモリ管理をしていることがわかります。AWS最小のt2.nanoでも問題無く、かつ性能も変わらずに動作できそうです。


2.1.2. 送信QoS=1の場合

同時接続数
100
1000
1万
5万

接続conn/sec
10,805
7,537
6,844
2,811

送信msg/sec
35,444
25,972
13,483
3,961

受信msg/sec
34,736
25,972
13,483
3,961

切断conn/sec
25,301
27,427
22,671
18,413

受信成功率
100%
100%
100%
99.9%

CPU%
80%
92%
97%
100%

MEM%
0.2%
0.1%
0.2%
0.8%

QoS=0と同じ傾向ですが、送信msg/secと受信msg/secが、全体的に遅くなっています。QoS=1の通信オーバーヘッドにより、送信msg/secが遅くなり、それに引っ張られる形で受信msg/secが遅くなったものと思われます。

接続と切断については、QoSの影響は無いため、QoS=0とほぼ同様の結果となっています。


2.1.3. 送信QoS=2の場合

同時接続数
100
1000
1万
5万

接続conn/sec
9,525
8,087
6,837
2,475

送信msg/sec
19,862
11,842
7,788
1,965

受信msg/sec
19,862
11,840
7,788
1,965

切断conn/sec
24,925
27,658
17,862
25,918

受信成功率
100%
100%
100%
99.9%

CPU%
70%
92%
97%
100%

MEM%
0.1%
0.1%
0.2%
0.8%

QoS=1とほぼ同様の傾向で、送信msg/secと受信msg/secが、QoS=1のさらに半分の遅さになっています。


2.2. 送信メッセージ数を変化させた場合

デフォルト条件(送信100多重、100万メッセージ)から、メッセージ数を変化させてみました。QoS=0、同時接続数=1万固定です。

メッセージ数
50万
100万
200万
500万

接続conn/sec
6,836
6,680
6,912
6,799

送信msg/sec
40,965
40,524
39,069
38,166

受信msg/sec
28,342
30,083
32,692
34,810

切断conn/sec
832
18,649
22,341
18,585

受信成功率
100%
99.9%
99.9%
99.9%

当たり前ですが、メッセージ数によっての差異は認められません。受信msg/secがメッセージ数が増えると速度改善しているように見えるのは、受信失敗したメッセージを待ち受けている時間が等分に加わるためです。なお、切断については参考レベルとしてください。


2.3. 送信多重数を変化させた場合

デフォルト条件(送信100多重、100万メッセージ)から、送信多重数を変化させてみました。同時接続数=1万固定です。


2.3.1. 送信QoS=0の場合

送信多重数
1
10
100

接続conn/sec
269
2,192
6,680

送信msg/sec
33,090
39,639
40,524

受信msg/sec
33,079
30,330
30,083

切断conn/sec
28,079
28,966
18,649

受信成功率
100%
100%
100%

CPU%
100%
100%
100%

MEM%
0.4%
0.4%
0.8%

接続conn/secについて、多重数を減らすと、はっきりと値が減ります。その他の値には大きな変化は見られませんでした。また処理性能の違いにもかかわらず、CPU%は100%で変わりません。

MQTTの接続時にはアプリケーションレベルでCONNACKを返信しますが、送信/受信/切断では、TCPレベルのACKしか返信しない、という違いがあります。Pahoライブラリにおいて、CONNECT〜CONNACKのひと続きのメッセージ処理が同期処理で行われており、その間、mosquitto側のCPUが空回りしているようです。mosquitto側は非同期で処理されるため、Pahoを利用するクライアントGoプログラム側が多重処理をするだけで、接続処理が高速化するようです。

なお、実際は、最初に多重無しのプログラムを書いて、QoS>0で遅いな〜と思いながら、試行錯誤していました。また、Pahoのライブラリによっては、ライブラリ内の同期用APIと非同期用APIを両方持たせている場合があります。


2.3.2. 送信QoS=1の場合

送信多重数
1
10
100

接続conn/sec
256
2,066
6,844

送信msg/sec
331
3,371
13,483

受信msg/sec
331
3,371
13,483

切断conn/sec
30,006
30,938
22,671

受信成功率
100%
100%
100%

CPU%
100%
100%
97%

MEM%
0.2%
0.2%
0.2%

接続conn/secに加え、送信msg/secと受信msg/secにも変化傾向があらわれます。これは、送信においてQoS=1によるアプリケーションレベルの送達確認が入り、接続と同様の理由で、多重数を減らすと、遅くなるためと考えられます。なお、受信msg/sec側はQoS=0なので、送信側にひっぱられて遅くなっているだけと考えられます。


2.3.3. 送信QoS=2の場合

送信多重数
1
10
100

接続conn/sec
261
2,133
6,837

送信msg/sec
195
1,802
7,788

受信msg/sec
195
1,802
7,788

切断conn/sec
23,791
29,678
17,862

受信成功率
100%
100%
100%

CPU%
100%
100%
97%

MEM%
0.2%
0.2%
0.2%

QoS=1とほぼ同様の傾向で、送信msg/secと受信msg/secが、QoS=1のさらに半分の遅さになっています。


2.4. 受信QoS>0の場合

デフォルト条件(送信100多重、100万メッセージ)から、受信QoS=送信QoSにしてみました。同時接続数=1万固定です。またmosquitto.confには、max_inflight_messagesという設定があり、QoS=1/2でのレスポンスを待たないで連続送信できるメッセージ数を変更できます(デフォルトは20)。この設定を0(無限大)とすることも試みました(QoSの後に+を付記しているものが該当します)。

QoS
0
1
2
1+
2+

接続conn/sec
6,680
6,658
6,438
5,831
6,474

送信msg/sec
40,524
3,231
3,065
1,916
1,422

受信msg/sec
30,083
95
84
1,916
1,422

切断conn/sec
18,649
23,461
23,797
27,698
25,475

受信成功率
100%
3%
3%
100%
100%

まず、max_inflight_messagesを設定しないと、QoS>0では、全体のうちのわずかしか受信できていなことが分かります。受信1セッションをアプリケーションレベルの送達確認をしつつ送り出すために、非常に遅くなってしまい、実行時間内にほとんど終了しなかったためです。

max_inflight_messagesを無限大にすることで、mosquittoは、待たすに次々とQoS>0のメッセージを送信できるようになります。受信アプリケーションはもともとPaho組み込みの機能で非同期処理になっていますので、来たメッセージをどんどん受け取ることができます。その効果で、それなりの性能で受信できるようになっています。しかし、それでも、QoS=0での受信よりははるかに遅いことが分かります。


2.5. ペイロードサイスを変化させた場合

デフォルト条件(送信100多重、100万メッセージ)から、ペイロードサイズを6Byteから変化させてみました。同時接続数=1万固定です。100KBの時は、時間がかかるため10万メッセージで測定しました。

ペイロードサイズ
6B
100B
1KB
10KB
100KB

接続conn/sec
6,680
6,479
6,180
6,402
6,547

送信msg/sec
40,524
35,688
25,553
5,426
433

送信bps/sec
1.9M
29M
204M
434M
346M

受信msg/sec
30,083
25,319
23,131
5,301
432

受信bps/sec
1.4M
20M
185M
424M
346M

切断conn/sec
18,649
19,688
18,294
15,150
20,453

受信成功率
100%
99.9%
100%
100%
100%

CPU%
100%
100%
100%
100%
100%

MEM%
0.4%
2.9%
3.2%
31%
14%

ペイロードサイズが大きくなると、当たり前ですが、メッセージ送受信回数での性能は小さくなります。しかし、bpsとしてのスループットは上昇します。

1KBまでは順調でしたが、10KB以降は念入りにシェーピング機構をチューニングしないと、テストが終了しませんでした。ペイロードサイズが大きいと、ブローカー内処理または受信側がボトルネックになってしまうようです。標準化策定中のmqttv5では、受信側ボトルネックは、共有サブスクリプションという機能で解決されるようです。mosquitto自体のマルチコア対応も必要な気がします。なお、100Mだとチューニングがなかなか難しく、スループットが若干下がってしまいました。

また、ブローカー側のメモリ消費は、急速に増えています。なお、10KB以降での値はシェーピング機能をチューニングした後の値ですので、単純に比較はでいないことを注意ください。特に未チューニングでの100KB試験では、開始しばらくしてメモリ消費89%を記録して、そのままフリーズしてしまいました。

なお、iperfで測定したところ、ネットワーク性能は1Gbps×全二重でした。


3. まとめ〜mosquittoの性能に関するあまり知られていない7つの注意点〜

以上で得られた重要な知見、およびそこから導き出される考察をまとめました。


  1. mosquittoの接続処理はメッセージ送受信と比較して遅い。メッセージ送受信の中でも、QoS=1/2のメッセージ送受信は、QoS=0と比較して遅い。

  2. 同時接続数が増えると、接続およびメッセージ送受信はだんだんと遅くなる。

  3. ペイロードサイズが増えると、メッセージ送受信回数は小さくなるが、スループットは(ある程度まで)速くなる。ただし、メモリに注意。

  4. 送信クライアント側は非同期または多重処理をすること(特に、Proxy経由等で送信クライアント側が少数になる場合に注意)。

  5. 大量受信する受信クライアント側は、非同期または多重処理はもちろんのこと、極力QoS=0で実装すること。

  6. mosquittoはマルチコアを使えない。

  7. mosquittoのCPU%が100%だからといって、性能を使い切っているとは限らない。