36
39

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

mosquittoの性能に関するあまり知られていない7つの注意点

Last updated at Posted at 2017-06-28

2017/6/30追記 ペイロードサイズを変化させた場合を追記し、まとめを修正しました。

この記事ではAWS ec2上のCentOS7にインストールしたmosquitto1.5(develop)とGo1.8を使っています。

mosquittoのベンチマーク結果はチラホラ見かけますが、数万同時接続を前提とした結果はあまり見かけないため、やってみました。いろいろと面白い結果が得られたので共有します。

なお、6万接続以上を評価しようとしてハマってしまったのですが、そのハマりを一般化した情報は「AWS ec2 t2タイプには51300コネクションしか繋がらない」で公開しましたので、そちらを参照してください。このため、この記事では最大5万接続としています。

#1. ベンチマーク環境

以下の環境を共通にしてベンチマークします。

  • 構成は、送受信クライアント(Go+paho)on t2.medium 〜mosquitto on t2.medium
  • 送信部はマルチスレッド、多数同時接続あり
  • PUBLISH内容は、トピック:6バイト、ペイロード:6バイト
  • 受信部はマルチスレッド(Paho組み込み)、1接続、ワイルドカードトピックをSUBSCRIBE

また特に明記しない限り、以下をデフォルトとします。

  • 送信スレッドは100多重
  • 送信メッセージ数は100万
  • 受信QoS=0
  • mosquitto.confはデフォルトのまま

1回のベンチマーク実行後は、適宜リブートをしました。「AWS ec2 t2タイプには51300コネクションしか繋がらない」の影響からか、多数接続の試験直後は(TIME_WAIT等のコネクションの残り火が消えたのを確認したとしても)、安定しないように思われましたので。

送受信クライアント(一体型)のソースは以下です。接続数、メッセージ数、送信QoSは、起動時のコマンドラインでカスタマイズ可能です。その他のカスタマイズは、直接ソースを編集しておこないました。

なお、QoS=0での同時大量メッセージ送信をすると、mosquitto側のバッファが大量になり、うまく受信側にメッセージが流れないようでしたので、送信クライアント側にシェーピング機構を設けています。

mqtt_bench.go
package main

import (
  "fmt"
  //import the Paho Go MQTT library
  MQTT "github.com/eclipse/paho.mqtt.golang"
  "os"
  "time"
  "strconv"
  "sync"
)

var count int
var m *sync.Mutex
const maxThreads = 100

//define a function for the default message handler
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
//  n, _ := strconv.Atoi(msg.Topic())
  m.Lock()
  count++
  if count % 10000 == 0 {
    fmt.Println(count, "Received.")
  }
  m.Unlock()
//  fmt.Println("%05d", total_count, "TOPIC:", msg.Topic(), "MSG:", string(msg.Payload()))
}

func main() {
  // confirm arg
  maxConns := 100  // arg #1
  numPublish := 100  // arg #2
  mqtthost := "localhost:1883"  // arg #3
  qos := 0  // arg #4 (apply qos to Publish connections only)
  if len(os.Args) >= 2 {
    maxConns, _ = strconv.Atoi(os.Args[1])
  }
  if len(os.Args) >= 3 {
    numPublish, _ = strconv.Atoi(os.Args[2])
  }
  if len(os.Args) >= 4 {
    mqtthost = os.Args[3]
  }
  if len(os.Args) >= 5 {
    qos, _ = strconv.Atoi(os.Args[4])
  }
  mqtthost = "tcp://" + mqtthost
  fmt.Println("maxConns :=", maxConns, "numPublish :=", numPublish, "mqtthost :=", mqtthost, "qos :=", qos)

  //connect Sub
  fmt.Println("Connecting & Subscribing Sub...")

  opts := MQTT.NewClientOptions().AddBroker(mqtthost)
  opts.SetDefaultPublishHandler(f)
//  opts.SetKeepAlive(3600 * time.Second)
//  opts.SetPingTimeout(60 * time.Second)
  opts.SetClientID("Sub")
  cs := MQTT.NewClient(opts)
  if token := cs.Connect(); token.Wait() && token.Error() != nil {
    panic(token.Error())
  }

  //subscribe Sub
  if token := cs.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
    fmt.Println(token.Error())
    os.Exit(1)
  }

  //connect Pub (multithreads version)
  fmt.Println("Connecting Pubs...")
  time_start1 := time.Now()

  cp := make([]MQTT.Client, maxConns)
  count = 0
  m = new(sync.Mutex)

  var wg sync.WaitGroup
  for val := 0; val < maxThreads; val++ {
    wg.Add(1)
    go func(i int) {
      for j := 0; j <= maxConns / maxThreads; j++ {
        conns := j * maxThreads + i
        if conns >= maxConns {
          break
        }
        opts := MQTT.NewClientOptions().AddBroker(mqtthost)
        opts.SetClientID(fmt.Sprintf("Pub/%07d", conns))
//        opts.SetKeepAlive(3600 * time.Second)
//        opts.SetPingTimeout(60 * time.Second)
        cp[conns] = MQTT.NewClient(opts)
        if token := cp[conns].Connect(); token.Wait() && token.Error() != nil {          
          fmt.Println("NO.", conns, "failed. Because, ", token.Error())
          time.Sleep(3600 * time.Second)
          panic(token.Error())
        }
        m.Lock()
        count++    
        if count % 1000 == 0 {
          fmt.Println(count, "Pubs Connected.")
        }
        m.Unlock()
      }
      wg.Done()
    } (val)
  }
  wg.Wait()
  if count % 1000 > 0 {
    fmt.Println(count, "Pubs Connected.")
  }
  time_stop1 := time.Now()

  //publish numPublish times (multithreads version)
  time_start2 := time.Now()
  fmt.Println("Publish started.")
  count = 0
  count2 := 0
  m2 := new(sync.Mutex)

  for val := 0; val < maxThreads; val++ {
    wg.Add(1)
    go func(i int) {
      for j := 0; j <= numPublish / maxThreads; j++ {
        num := j * maxThreads + i
        if num >= numPublish {
          break
        }
        token := cp[num % maxConns].Publish(fmt.Sprintf("%06d", num % maxConns), byte(qos), false, fmt.Sprintf("%06d",num / maxConns))
        token.Wait()
        m2.Lock()
        count2++    
        if count2 % 10000 == 0 {
          fmt.Println(count2, "Published.")
          if count2 > count + 100000 {
            fmt.Println("Shaping...")
            time.Sleep(500 * time.Millisecond)
          }
        }
        m2.Unlock()
      }
      wg.Done()
    } (val)
  }
  wg.Wait()
  if count2 % 10000 > 0 {
    fmt.Println(count2, "Published.")
  }
  time_stop2 := time.Now()

  // wait max 30s idle timeout and stop timer
  count3 := count
  time_start3 := time_start2
  time_stop3 := time.Now()
  for idle := 0; idle < 3000; idle++ {
    if count > count3 {
      time_stop3 = time.Now()
      count3 = count
      idle = 0
    }
    if count >= numPublish {
      break
    }
    time.Sleep(10 * time.Millisecond)
  }
  fmt.Println("Publish finished.")

  fmt.Println("Disconnecting Pubs...")
  time_start4 := time.Now()

  //disconnect Pub
  for i := 0; i < maxConns; i++ {
    cp[i].Disconnect(250)
    if (i + 1) % 1000 == 0 {
      fmt.Println(i + 1, "Pubs Disconnected.")
    }
  }
  time_stop4 := time.Now()

//  fmt.Println("Unsubscribing & Disconnecting Sub...")

  //unsubscribe & disconnect Sub
//  if token := cs.Unsubscribe("#"); token.Wait() && token.Error() != nil {
//    fmt.Println(token.Error())
//    os.Exit(1)
//  }
  cs.Disconnect(250)

  // print result
  duration1 := time_stop1.Sub(time_start1).Seconds()*1000
  duration2 := time_stop2.Sub(time_start2).Seconds()*1000
  duration3 := time_stop3.Sub(time_start3).Seconds()*1000
  duration4 := time_stop4.Sub(time_start4).Seconds()*1000
  fmt.Println("-------")
  fmt.Println("maxConns :=", maxConns, "numPublish :=", numPublish, "mqtthost :=", mqtthost, "qos :=", qos)
  fmt.Println(count, "received on", maxConns, "connections,", count * 100 / numPublish, "% success.")
  fmt.Println("TOTAL connecting time:", int(duration1), "msec", int(float64(maxConns) / duration1 * 1000), "conns/s")
  fmt.Println("TOTAL msgs send time:", int(duration2), "msec", int(float64(numPublish) / duration2 * 1000), "msgs/s")
  fmt.Println("TOTAL msgs receive time:", int(duration3), "msec", int(float64(count) / duration3 * 1000), "msgs/s")
  fmt.Println("TOTAL disconnecting time:", int(duration4), "msec", int(float64(maxConns) / duration4 * 1000), "conns/s")
}

#2. 結果と考察

##2.1. 同時接続数を変化させた場合

デフォルト条件(送信100多重、100万メッセージ)で、同時接続数を変化させてみました。CPU%とMEM%は、mosquitto側でtopをとって目分量でピーク値を出しています。

###2.1.1. 送信QoS=0の場合

同時接続数 100 1000 1万 5万
接続conn/sec 10,469 7,424 6,680 3,014
送信msg/sec 55,956 45,070 40,524 25,967
受信msg/sec 53,205 43,182 30,083 21,117
切断conn/sec 23,941 147 18,649 24,996
受信成功率 100% 100% 100% 99.9%
CPU% 48% 50% 100% 100%
MEM% 0.4% 0.2% 0.4% 1.2%

同時接続数が増えると、接続/送信/受信が類似の傾向で遅くなっていくようです。特に1万から5万にかけて、落ち込みが激しいです。1万の時点でCPUが100%になっていることから、mosquitto側のCPU性能限界となっているようです。同時接続数が増えるにしたがって、接続/送信/受信の1接続あたりのCPU処理量が増えているように思われます。moqtuittoのソースを見ると、ソケットからのメッセージ取得にpoll()システムコールを使っており、epoll()には対応していないようです。このため、同時接続数の増加により、メッセージ検索のオーバーヘッドが拡大して、だんだんと遅くなるのではないかと推測します。

一方、切断に関しては、同時接続数の影響はあまり見られません。同時接続1000で値が落ち込んでいるのは、時々見られる「固まった状態」によるもので、たまたまのようです。そもそも切断については、このくらいの量の試験では、絶対時間が短すぎて、有効な結果は得られません。参考程度に見てください。

CPU%が100%で頭打ちなのは、mosquittoが1CPUしか使えていないからであると思われます。t2.mediumは2CPUなので、最大値は200%なのです。mosquittoのソースを見ると、pthread_create()がクライアントにしか使われていません。ブローカーは非同期ではあるもののシングルスレッドで動作しているようです。nginxなど最近の潮流は、非同期+マルチスレッドワーカープロセスなので、mosquittoはやや作り方が古いですね。

MEM%は同時接続数に応じて微増します。コネクション管理テーブルのための消費と思われます。しかし、絶対値としてはわずかなもので、mosquittoは、かなり効率的なメモリ管理をしていることがわかります。AWS最小のt2.nanoでも問題無く、かつ性能も変わらずに動作できそうです。

###2.1.2. 送信QoS=1の場合

同時接続数 100 1000 1万 5万
接続conn/sec 10,805 7,537 6,844 2,811
送信msg/sec 35,444 25,972 13,483 3,961
受信msg/sec 34,736 25,972 13,483 3,961
切断conn/sec 25,301 27,427 22,671 18,413
受信成功率 100% 100% 100% 99.9%
CPU% 80% 92% 97% 100%
MEM% 0.2% 0.1% 0.2% 0.8%

QoS=0と同じ傾向ですが、送信msg/secと受信msg/secが、全体的に遅くなっています。QoS=1の通信オーバーヘッドにより、送信msg/secが遅くなり、それに引っ張られる形で受信msg/secが遅くなったものと思われます。

接続と切断については、QoSの影響は無いため、QoS=0とほぼ同様の結果となっています。

###2.1.3. 送信QoS=2の場合

同時接続数 100 1000 1万 5万
接続conn/sec 9,525 8,087 6,837 2,475
送信msg/sec 19,862 11,842 7,788 1,965
受信msg/sec 19,862 11,840 7,788 1,965
切断conn/sec 24,925 27,658 17,862 25,918
受信成功率 100% 100% 100% 99.9%
CPU% 70% 92% 97% 100%
MEM% 0.1% 0.1% 0.2% 0.8%

QoS=1とほぼ同様の傾向で、送信msg/secと受信msg/secが、QoS=1のさらに半分の遅さになっています。

##2.2. 送信メッセージ数を変化させた場合

デフォルト条件(送信100多重、100万メッセージ)から、メッセージ数を変化させてみました。QoS=0、同時接続数=1万固定です。

メッセージ数 50万 100万 200万 500万
接続conn/sec 6,836 6,680 6,912 6,799
送信msg/sec 40,965 40,524 39,069 38,166
受信msg/sec 28,342 30,083 32,692 34,810
切断conn/sec 832 18,649 22,341 18,585
受信成功率 100% 99.9% 99.9% 99.9%

当たり前ですが、メッセージ数によっての差異は認められません。受信msg/secがメッセージ数が増えると速度改善しているように見えるのは、受信失敗したメッセージを待ち受けている時間が等分に加わるためです。なお、切断については参考レベルとしてください。

##2.3. 送信多重数を変化させた場合

デフォルト条件(送信100多重、100万メッセージ)から、送信多重数を変化させてみました。同時接続数=1万固定です。

###2.3.1. 送信QoS=0の場合

送信多重数 1 10 100
接続conn/sec 269 2,192 6,680
送信msg/sec 33,090 39,639 40,524
受信msg/sec 33,079 30,330 30,083
切断conn/sec 28,079 28,966 18,649
受信成功率 100% 100% 100%
CPU% 100% 100% 100%
MEM% 0.4% 0.4% 0.8%

接続conn/secについて、多重数を減らすと、はっきりと値が減ります。その他の値には大きな変化は見られませんでした。また処理性能の違いにもかかわらず、CPU%は100%で変わりません。

MQTTの接続時にはアプリケーションレベルでCONNACKを返信しますが、送信/受信/切断では、TCPレベルのACKしか返信しない、という違いがあります。Pahoライブラリにおいて、CONNECT〜CONNACKのひと続きのメッセージ処理が同期処理で行われており、その間、mosquitto側のCPUが空回りしているようです。mosquitto側は非同期で処理されるため、Pahoを利用するクライアントGoプログラム側が多重処理をするだけで、接続処理が高速化するようです。

なお、実際は、最初に多重無しのプログラムを書いて、QoS>0で遅いな〜と思いながら、試行錯誤していました。また、Pahoのライブラリによっては、ライブラリ内の同期用APIと非同期用APIを両方持たせている場合があります。

###2.3.2. 送信QoS=1の場合

送信多重数 1 10 100
接続conn/sec 256 2,066 6,844
送信msg/sec 331 3,371 13,483
受信msg/sec 331 3,371 13,483
切断conn/sec 30,006 30,938 22,671
受信成功率 100% 100% 100%
CPU% 100% 100% 97%
MEM% 0.2% 0.2% 0.2%

接続conn/secに加え、送信msg/secと受信msg/secにも変化傾向があらわれます。これは、送信においてQoS=1によるアプリケーションレベルの送達確認が入り、接続と同様の理由で、多重数を減らすと、遅くなるためと考えられます。なお、受信msg/sec側はQoS=0なので、送信側にひっぱられて遅くなっているだけと考えられます。

###2.3.3. 送信QoS=2の場合

送信多重数 1 10 100
接続conn/sec 261 2,133 6,837
送信msg/sec 195 1,802 7,788
受信msg/sec 195 1,802 7,788
切断conn/sec 23,791 29,678 17,862
受信成功率 100% 100% 100%
CPU% 100% 100% 97%
MEM% 0.2% 0.2% 0.2%

QoS=1とほぼ同様の傾向で、送信msg/secと受信msg/secが、QoS=1のさらに半分の遅さになっています。

##2.4. 受信QoS>0の場合

デフォルト条件(送信100多重、100万メッセージ)から、受信QoS=送信QoSにしてみました。同時接続数=1万固定です。またmosquitto.confには、max_inflight_messagesという設定があり、QoS=1/2でのレスポンスを待たないで連続送信できるメッセージ数を変更できます(デフォルトは20)。この設定を0(無限大)とすることも試みました(QoSの後に+を付記しているものが該当します)。

QoS 0 1 2 1+ 2+
接続conn/sec 6,680 6,658 6,438 5,831 6,474
送信msg/sec 40,524 3,231 3,065 1,916 1,422
受信msg/sec 30,083 95 84 1,916 1,422
切断conn/sec 18,649 23,461 23,797 27,698 25,475
受信成功率 100% 3% 3% 100% 100%

まず、max_inflight_messagesを設定しないと、QoS>0では、全体のうちのわずかしか受信できていなことが分かります。受信1セッションをアプリケーションレベルの送達確認をしつつ送り出すために、非常に遅くなってしまい、実行時間内にほとんど終了しなかったためです。

max_inflight_messagesを無限大にすることで、mosquittoは、待たすに次々とQoS>0のメッセージを送信できるようになります。受信アプリケーションはもともとPaho組み込みの機能で非同期処理になっていますので、来たメッセージをどんどん受け取ることができます。その効果で、それなりの性能で受信できるようになっています。しかし、それでも、QoS=0での受信よりははるかに遅いことが分かります。

##2.5. ペイロードサイスを変化させた場合

デフォルト条件(送信100多重、100万メッセージ)から、ペイロードサイズを6Byteから変化させてみました。同時接続数=1万固定です。100KBの時は、時間がかかるため10万メッセージで測定しました。

ペイロードサイズ 6B 100B 1KB 10KB 100KB
接続conn/sec 6,680 6,479 6,180 6,402 6,547
送信msg/sec 40,524 35,688 25,553 5,426 433
送信bps/sec 1.9M 29M 204M 434M 346M
受信msg/sec 30,083 25,319 23,131 5,301 432
受信bps/sec 1.4M 20M 185M 424M 346M
切断conn/sec 18,649 19,688 18,294 15,150 20,453
受信成功率 100% 99.9% 100% 100% 100%
CPU% 100% 100% 100% 100% 100%
MEM% 0.4% 2.9% 3.2% 31% 14%

ペイロードサイズが大きくなると、当たり前ですが、メッセージ送受信回数での性能は小さくなります。しかし、bpsとしてのスループットは上昇します。

1KBまでは順調でしたが、10KB以降は念入りにシェーピング機構をチューニングしないと、テストが終了しませんでした。ペイロードサイズが大きいと、ブローカー内処理または受信側がボトルネックになってしまうようです。標準化策定中のmqttv5では、受信側ボトルネックは、共有サブスクリプションという機能で解決されるようです。mosquitto自体のマルチコア対応も必要な気がします。なお、100Mだとチューニングがなかなか難しく、スループットが若干下がってしまいました。

また、ブローカー側のメモリ消費は、急速に増えています。なお、10KB以降での値はシェーピング機能をチューニングした後の値ですので、単純に比較はでいないことを注意ください。特に未チューニングでの100KB試験では、開始しばらくしてメモリ消費89%を記録して、そのままフリーズしてしまいました。

なお、iperfで測定したところ、ネットワーク性能は1Gbps×全二重でした。

#3. まとめ〜mosquittoの性能に関するあまり知られていない7つの注意点〜

以上で得られた重要な知見、およびそこから導き出される考察をまとめました。

  1. mosquittoの接続処理はメッセージ送受信と比較して遅い。メッセージ送受信の中でも、QoS=1/2のメッセージ送受信は、QoS=0と比較して遅い。
  2. 同時接続数が増えると、接続およびメッセージ送受信はだんだんと遅くなる。
  3. ペイロードサイズが増えると、メッセージ送受信回数は小さくなるが、スループットは(ある程度まで)速くなる。ただし、メモリに注意。
  4. 送信クライアント側は非同期または多重処理をすること(特に、Proxy経由等で送信クライアント側が少数になる場合に注意)。
  5. 大量受信する受信クライアント側は、非同期または多重処理はもちろんのこと、極力QoS=0で実装すること。
  6. mosquittoはマルチコアを使えない。
  7. mosquittoのCPU%が100%だからといって、性能を使い切っているとは限らない。
36
39
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
36
39

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?