目次
はじめに
MQTT.js は MQTT プロトコル 用のクライアントライブラリで、Node.js とブラウザの両方で動作する JavaScript で書かれています。 現在、JavaScript エコシステムで最も広く使われている MQTT クライアントライブラリ です。
MQTT は、パブリッシュ/サブスクライブ型の軽量な IoT メッセージングプロトコルです。ネットワークに接続されたデバイスに対して、ごくわずかなコードと帯域幅でリアルタイムかつ信頼性の高いメッセージングサービスを提供できます。IoT、モバイルインターネット、スマートハードウェア、自動運転などの分野で広く利用されています。
JavaScript のシングルスレッドの特徴により、MQTT.js は完全に非同期の MQTT クライアントです。 MQTT/TCP、MQTT/TLS、MQTT/WebSocket をサポートしています。 異なる実行環境でのサポート度合いは以下のとおりです。
- ブラウザ: WebSocket 経由の MQTT
- Node.js: MQTT と WebSocket 経由の MQTT
注意: 接続パラメータを除いて、他の API は環境によらず同じです。 読者はニーズに最も適したパラメータを使用できます。 MQTT.js v3.0.0 以降のバージョンでは、MQTT 5.0 が完全にサポートされています。
インストール
NPM または Yarn を使用した MQTT.js のインストール
NPM または Yarn を使用して MQTT.js をインストールするには、次のコマンドを実行します。
npm install mqtt --save
# または、Yarn を使用
yarn add mqtt
注意: v4.0.0 (2020年4月にリリース) 以降、MQTT.js はライフサイクル終了した Node.js バージョンのサポートを停止し、現在は Node v12 と v14 をサポートしています。
CDN を使用した MQTT.js のインストール
ブラウザ では、MQTT.js をインポートするために CDN も使用できます。 MQTT.js のバンドルパッケージは http://unpkg.com によって管理されており、直接 unpkg.com/mqtt/dist/mqtt.min.js を追加して使用できます。
<script src="<https://unpkg.com/mqtt/dist/mqtt.min.js>"></script>
<script>
// mqtt 変数がグローバルに初期化されます
console.log(mqtt)
</script>
グローバルインストール
上記のインストール方法に加えて、MQTT.js にはコマンドラインツールを使用して MQTT 接続、パブリッシュ、サブスクライブを完了するグローバルインストール方法も用意されています。以下のチュートリアルで、MQTT.js のコマンドラインツールの使用方法について詳しく説明します。
NPM を使用して MQTT.js をグローバルにインストールするには、次のコマンドを実行します。
npm install mqtt -g
MQTT ブローカーの準備
続行する前に、通信およびテストを行うための MQTT ブローカー が必要です。 MQTT ブローカーを取得するオプションはいくつかあります。
-
プライベートデプロイ
EMQX は、IoT、IIoT、コネクテッドビークル向けの最もスケーラブルなオープンソース MQTT ブローカーです。EMQX をインストールするには、次の Docker コマンドを実行します。
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx
-
フルマネージドクラウドサービス
フルマネージドクラウドサービスは、MQTT サービスを開始する最も簡単な方法です。 EMQX Cloud を使用すると、わずか数分で開始し、AWS、Google Cloud、Microsoft Azure の 20 カ所以上のリージョンで MQTT サービスを実行できるため、グローバルな可用性と高速な接続が保証されます。
最新エディションの EMQX Cloud Serverless は、開発者が秒単位で MQTT のデプロイを開始できるように、月間 1M セッションミニッツの無料プランを提供しています。
-
フリーのパブリック MQTT ブローカー
フリーのパブリック MQTT ブローカーは、MQTT プロトコルを学習およびテストしたいユーザー専用です。 本番環境での使用はセキュリティリスクやダウンタイムの懸念があるため、避ける必要があります。
このブログ記事では、broker.emqx.io
のフリーのパブリック MQTT ブローカーを使用します。
MQTT ブローカー情報
サーバー:
broker.emqx.io
TCP ポート:1883
WebSocket ポート:8083
SSL/TLS ポート:8883
セキュア WebSocket ポート:8084
詳細は、こちらをご覧ください: Free Public MQTT Broker
MQTT.js の簡単な例
ここでは、MQTT.js を使用して EMQX Cloud に接続し、トピックをサブスクライブしてメッセージの送受信を行う例を紹介します。
注意: WebSocket 接続はブラウザでのみサポートされています。そのため、ブラウザと Node.js の環境には異なる接続パラメータを使用します。ただし、接続 URL を除くすべてのパラメータは同じです。読者はニーズに最適なパラメータを使用できます。
const mqtt = require('mqtt')
/***
* ブラウザ
* このドキュメントでは、ws と wss プロトコルを使用した WebSocket 経由の MQTT の使用方法を説明しています。
* EMQX の ws 接続用のデフォルトポートは 8083、wss 接続用は 8084 です。
* 接続アドレスの後にパスを追加する必要があることに注意してください。例: /mqtt
*/
const url = 'ws://broker.emqx.io:8083/mqtt'
/***
* Node.js
* このドキュメントでは、mqtt と mqtts プロトコルを使用した TCP 経由の MQTT の使用方法を説明しています。
* EMQX の mqtt 接続のデフォルトポートは 1883、mqtts は 8883 です。
*/
// const url = 'mqtt://broker.emqx.io:1883'
// MQTT クライアントインスタンスを作成
const options = {
// クリアセッション
clean: true,
connectTimeout: 4000,
// 認証
clientId: 'emqx_test',
username: 'emqx_test',
password: 'emqx_test',
}
const client = mqtt.connect(url, options)
client.on('connect', function () {
console.log('Connected')
// トピックをサブスクライブ
client.subscribe('test', function (err) {
if (!err) {
// トピックにメッセージをパブリッシュ
client.publish('test', 'Hello mqtt')
}
})
})
// メッセージを受信
client.on('message', function (topic, message) {
// message は Buffer
console.log(message.toString())
client.end()
})
MQTT.js コマンドライン
MQTT.js をグローバルにインストールした後、コマンドラインツールを使用してトピックをサブスクライブし、メッセージの送受信を行うことができます。
例: broker.emqx.io
に接続し、testtopic/#
トピックをサブスクライブ:
mqtt sub -t 'testtopic/#' -h 'broker.emqx.io' -v
例: broker.emqx.io
に接続し、testtopic/hello
トピックにメッセージを送信:
mqtt pub -t 'testtopic/hello' -h 'broker.emqx.io' -m 'from MQTT.js'
より包括的な MQTT コマンドラインツールが必要な場合は、MQTTX CLI を参照してください。
MQTT.js API 紹介
mqtt.connect([url], options)
この API は、指定された MQTT ブローカー関数に接続し、常に Client
オブジェクトを返します。 最初のパラメータに URL 値を渡します。これには、mqtt
、mqtts
、tcp
、tls
、ws
、wss
などのプロトコルを使用できます。 または、URL を URL.parse()
が返すオブジェクトにすることもできます。
次に、この API は Options
オブジェクトを渡して、MQTT 接続のオプションを構成します。 WebSocket 接続を使用する場合は、アドレスの後にパスを追加する必要があることに注意してください。例: /mqtt
Options オブジェクトの一般的に使用される属性値は以下のとおりです。
-
Options
-
keepalive
: 単位はseconds
で、タイプは integar です。デフォルトは 60 秒で、0 に設定すると無効になります。 -
clientId
: デフォルトは'mqttjs_' + Math.random().toString(16).substr(2, 8)
で、カスタム修正文字列をサポートできます。 -
protocolVersion
: MQTT プロトコルのバージョン番号。デフォルトは 4 (v3.1.1) で、3 (v3.1) と 5 (v5.0) に変更できます。 -
clean
: セッションのクリアの有無。デフォルトはtrue
です。true
に設定すると、切断後にセッションがクリアされ、サブスクライブしたトピックも無効になります。false
に設定すると、QoS 1 および 2 のメッセージはオフラインでも受信できます。 -
reconnectPeriod
: 再接続間隔時間。単位はミリ秒で、デフォルトは 1000 ミリ秒です。注意: 0 に設定すると、自動再接続は無効になります。 -
connectTimeout
: CONNACK を受信するまでの待ち時間。単位はミリ秒で、デフォルトは 30000 ミリ秒です。 -
username
: 認証ユーザー名。ブローカーがユーザー名認証を必要とする場合は、この値を設定してください。 -
password
: 認証パスワード。ブローカーがパスワード認証を必要とする場合は、この値を設定してください。 -
will
: 遺言メッセージ。構成可能なオブジェクト値。クライアントが異常切断した場合、ブローカーは遺言のトピックにメッセージをパブリッシュします。-
topic
: 遺言によって送信されるトピック -
payload
: 遺言によってパブリッシュされるメッセージ -
QoS
: 遺言によって送信される QoS 値 -
retain
: 遺言によってパブリッシュされるメッセージの保持フラグ -
properties
: MQTT 5.0 で新たに追加された構成可能なオブジェクトのプロパティ値。詳細は、https://github.com/mqttjs/MQTT.js#mqttclientstreambuilder-options を参照してください。 -
SSL/TLS 接続を構成する必要がある場合、Option オブジェクトは
tls.connect()
に渡されます。option で以下のプロパティを構成できます。-
rejectUnauthorized
: サーバー証明書チェーンとアドレス名の検証の有無。false
に設定すると検証がスキップされ、中間者攻撃にさらされることになるため、本番環境では推奨されません。true
に設定すると、強力な認証モードが有効になります。自己署名証明書の場合は、証明書の設定時に Alt name を設定する必要があります。 -
ca
: 自己署名証明書で生成された CA ファイル。サーバーが自己署名証明書を使用している場合にのみ必要です。 -
cert
: クライアント証明書。サーバーがクライアント証明書認証(相互認証)を要求している場合にのみ必要です。 -
key
: クライアントキー。サーバーがクライアント証明書認証(相互認証)を要求している場合にのみ必要です。
-
Client Event
接続に成功すると、返された Client オブジェクトは on 関数を使用して複数のイベントをリッスンできます。 コールバック関数内でビジネスロジックを完了できます。 一般的なイベントは以下のとおりです。
-
connect
接続が成功したときにトリガされ、パラメータは connack です。
client.on('connect', function (connack) { console.log('Connected') })
-
reconnect
切断後に再接続間隔でブローカーが自動的に再接続したときにトリガされます。
client.on('reconnect', function () { console.log('Reconnecting...') })
-
close
切断後にトリガされます。
client.on('close', function () { console.log('Disconnected') })
-
disconnect
ブローカーから送信された切断パケットを受信したときにトリガされ、パラメータのパケットは切断時に受信したパケットです。これは MQTT 5.0 の新機能です。
client.on('disconnect', function (packet) { console.log(packet) })
-
offline
クライアントがオフラインになったときにトリガされます。
client.on('offline', function () { console.log('offline') })
-
error
クライアントが正常に接続できないか、パースエラーが発生したときにトリガされます。パラメータの error はエラーメッセージです。
client.on('error', function (error) { console.log(error) })
-
message
クライアントがパブリッシュされたペイロードを受信したときにこのイベントがトリガされ、3つのパラメーターが含まれます: topic、payload、packet。topic は受信したメッセージのトピックを参照します。payload は受信したメッセージのコンテンツです。packet は QoS、retain などの情報を含む MQTT パケット です。
注意: 受信したペイロードは Buffer 型の値です。必要に応じて、最終的なフォーマットを表示するために JSON.parse、JSON.stringify、または toString() メソッドを使用できます。
client.on('message', function (topic, payload, packet) { // Payload は Buffer console.log(`Topic: ${topic}, Message: ${payload.toString()}, QoS: ${packet.qos}`) })
Client 関数
イベントをリッスンするだけでなく、Client にはパブリッシュとサブスクライブのための組み込み関数もあります。 一般的に使用される関数は以下のとおりです。
-
Client.publish(topic, message, [options], [callback])
トピックにメッセージをパブリッシュする関数で、4つのパラメータがあります。
- topic: 送信するトピックで、文字列です。
- message: 送信するトピックのメッセージで、文字列または Buffer にできます。
- options: オプションの値。メッセージのパブリッシュ時にオプションを設定するために使用される構成情報です。メッセージのパブリッシュ時に QoS と Retain 値を設定するのに主に使用されます。
- callback: メッセージがパブリッシュされた後のコールバック関数。パラメータは error です。このパラメータはパブリッシュに失敗した場合にのみ存在します。
// QoS 0 で testtopic という名前のトピックにテストメッセージを送信します client.publish('testtopic', 'Hello, MQTT!', { qos: 0, retain: false }, function (error) { if (error) { console.log(error) } else { console.log('Published') } })
-
Client.subscribe(topic/topic array/topic object, [options], [callback])
1つまたは複数のトピックをサブスクライブする関数です。接続が成功すると、メッセージを取得するためにトピックをサブスクライブする必要があります。この関数には3つのパラメータがあります。
- topic: 文字列または文字列の配列を渡すことができます。または、トピックオブジェクト
{'test1': {qos: 0}, 'test2': {qos: 1}}
を渡すこともできます。 - options: オプションの値。トピックのサブスクライブ時の構成情報です。主にサブスクライブしたトピックの QoS レベルを設定するために使用されます。
- callback: トピックのサブスクライブ後のコールバック関数。パラメータは error と granted です。error パラメータはサブスクリプションに失敗した場合にのみ存在します。granted は {topic, QoS} の配列で、topic はサブスクライブしたトピック、QoS はトピックに付与された QoS レベルです。
// QoS 0 で testtopic というトピックをサブスクライブします client.subscribe('testtopic', { qos: 0 }, function (error, granted) { if (error) { console.log(error) } else { console.log(`${granted[0].topic} was subscribed`) } })
- topic: 文字列または文字列の配列を渡すことができます。または、トピックオブジェクト
-
Client.unsubscribe(topic/topic array, [options], [callback])
1つまたは複数のトピックのサブスクリプションを解除します。この関数には3つのパラメータがあります。
- Topic: 文字列または文字列の配列を渡すことができます
- Options: オプションの値。サブスクリプション解除時の構成情報です。
- Callback: サブスクリプション解除時のコールバック関数。パラメータは error です。エラーパラメータはサブスクリプション解除に失敗した場合にのみ存在します。
// testtopic というトピックのサブスクリプションを解除します client.unsubscribe('testtopic', function (error) { if (error) { console.log(error) } else { console.log('Unsubscribed') } })
-
Client.end([force], [options], [callback])
クライアントを閉じます。この関数には3つのパラメータがあります。
- force:
true
に設定すると、クライアントは切断メッセージが受け入れられるのを待たずにすぐに閉じられます。このパラメータはオプションで、デフォルトはfalse
です。注意:true
に設定すると、ブローカーは切断パケットを受信できなくなります。 - options: オプションの値。クライアントを閉じるときの構成情報です。主に切断時の reasonCode を構成するために使用されます。
- callback: クライアントが閉じられたときのコールバック関数
client.end()
- force:
MQTT.js を JavaScript で使用する完全な例はこちらをご覧ください: https://github.com/emqx/MQTT-Client-Examples/tree/master/mqtt-client-JavaScript
MQTT.js Q&A
ブラウザで相互認証接続を実装できますか?
いいえ、ブラウザでの接続時に JavaScript コードを使用してクライアント証明書を指定することはできません。OS の証明書ストアまたはスマートカードにクライアント証明書が設定されていても同様です。つまり、MQTT.js はこれを行うことができません。また、証明書機関(CA)も指定できません。これはブラウザによって制御されるためです。
リファレンス: https://github.com/mqttjs/MQTT.js/issues/1515
MQTT.js を TypeScript で使用できますか?
はい、MQTT.js は TypeScript で使用できます。ライブラリには TypeScript 型定義が含まれています。
型ファイルはこちらにあります: https://github.com/mqttjs/MQTT.js/tree/main/types
TypeScript を使用する場合の例:
import * as mqtt from "mqtt" const client: mqtt.MqttClient = mqtt.connect('mqtt://broker.emqx.io:1883')
1つの MQTT.js クライアントで複数のブローカーに接続できますか?
いいえ、各 MQTT.js クライアントは一度に1つのブローカーにのみ接続できます。複数のブローカーに接続する場合は、複数の MQTT.js クライアントインスタンスを作成する必要があります。
MQTT.js を Vue、React、Angular アプリケーションで使用できますか?
はい、MQTT.js は Vue、React、Angular フレームワークを含む、JavaScript ベースのアプリケーションに統合できるライブラリです。
MQTT.js 上級
MQTT.js アプリケーションのデバッグ方法
MQTT.js アプリケーションのデバッグは、開発プロセスに不可欠な部分です。このガイドでは、Node.js とブラウザ環境で MQTT.js のデバッグログを有効にする方法と、より深いトラブルシューティングのために Wireshark などのネットワークプロトコルアナライザーを使用するタイミングについて説明します。
Node.js での MQTT.js のデバッグ
Node.js 環境では、
DEBUG
環境変数を使用して MQTT.js のデバッグログを有効にできます。DEBUG=mqttjs* node your-app.js
デバッグ情報が出力されるので、各ステップを比較して、MQTT メッセージが送信中に何が起こったかを確認できます。
DEBUG=mqttjs* node index.js mqttjs connecting to an MQTT broker... +0ms mqttjs:client MqttClient :: options.protocol mqtt +0ms mqttjs:client MqttClient :: options.protocolVersion 4 +0ms mqttjs:client MqttClient :: options.username emqx_test +1ms mqttjs:client MqttClient :: options.keepalive 60 +0ms mqttjs:client MqttClient :: options.reconnectPeriod 1000 +0ms mqttjs:client MqttClient :: options.rejectUnauthorized undefined +0ms mqttjs:client MqttClient :: options.topicAliasMaximum undefined +0ms mqttjs:client MqttClient :: clientId emqx_nodejs_986165 +0ms mqttjs:client MqttClient :: setting up stream +0ms mqttjs:client _setupStream :: calling method to clear reconnect +1ms mqttjs:client _clearReconnect : clearing reconnect timer +0ms mqttjs:client _setupStream :: using streamBuilder provided to client to create stream +0ms mqttjs calling streambuilder for mqtt +3ms mqttjs:tcp port 1883 and host broker.emqx.io +0ms mqttjs:client _setupStream :: pipe stream to writable stream +3ms mqttjs:client _setupStream: sending packet `connect` +2ms mqttjs:client sendPacket :: packet: { cmd: 'connect' } +0ms mqttjs:client sendPacket :: emitting `packetsend` +1ms mqttjs:client sendPacket :: writing to stream +0ms mqttjs:client sendPacket :: writeToStream result true +11ms ...
このコマンドを実行すると、コンソールにデバッグログが生成され、接続、メッセージのパブリッシュとサブスクリプション、潜在的なエラーなど、MQTT クライアントの操作に関する詳細情報が提供されます。
ブラウザでの MQTT.js のデバッグブラウザ環境でデバッグする場合は、JavaScript コード内の localStorage オブジェクトに特定の値を設定する必要があります。
localStorage.debug = 'mqttjs*'
この設定を行った状態でブラウザを更新すると、MQTT.js は ブラウザのコンソールに詳細なデバッグ情報を記録し始めます。これは特に MQTT over WebSocket 接続のデバッグに役立ちます。
MQTT.js のデバッグログで問題を修正できない場合は、Wireshark などのネットワークプロトコルアナライザーを使用してみてください。これにより、MQTT.js アプリケーションと MQTT ブローカー間のネットワークトラフィックをキャプチャおよび解釈できるため、MQTT 通信の詳細、IP アドレス、ポート番号、TCP ハンドシェイクが表示されます。MQTT.js デバッグログから始め、必要に応じて Wireshark に切り替えることで、MQTT.js アプリケーションを包括的にトラブルシューティングできます。
RxJS を使用した MQTT.js でのメッセージ処理の最適化
RxJS は、オブザーバーパターンと関数型プログラミングの原則に従う JavaScript のリアクティブプログラミングライブラリです。 非同期のデータストリームとイベントストリームを開発者が簡単に扱えるようにし、マップ、フィルター、リデュースなど、これらのストリームを変換および組み合わせるためのさまざまな演算子を提供します。
実際の開発では、MQTT サーバーがクライアントにさまざまなタイプのメッセージを送信し、処理する必要があります。 たとえば、メッセージをデータベースに保存したり、処理後に UI にレンダリングしたりする場合があります。 しかし、MQTT.js では、これらのメッセージを処理するためにコールバックに依存する必要があり、受信した各メッセージでコールバック関数がトリガされます。 これは、高頻度のメッセージを扱う場合にコールバックの頻繁な呼び出しによるパフォーマンスの問題を引き起こす可能性があります。
RxJS の強力な機能を活用することで、MQTT.js のメッセージをより便利で効率的に処理できます。 RxJS を使用すると、MQTT.js メッセージのサブスクリプションを Observable に変換できるため、非同期データストリームとイベントストリームを扱うのがより簡単になります。 また、RxJS はメッセージを変換およびフィルタリングするためのさまざまな演算子を提供するため、メッセージをより効率的に処理できます。 RxJS は、複数のストリームのマージやパーティションなど、高度な機能の実装にも役立ちます。 さらに、RxJS はメッセージのキャッシュと処理遅延の機能を提供し、複雑なデータストリームをより便利で柔軟に処理できるようにします。
ここでは、RxJS を使用して MQTT.js でのメッセージ処理を最適化する簡単な例を示します。
import { fromEvent } from 'rxjs' import { bufferTime, map, takeUntil } from 'rxjs/operators' // 接続終了イベントを Observable に変換 const unsubscribe$ = fromEvent(client, 'close') // メッセージのサブスクリプションを Observable に変換し、接続が終了するまでメッセージの受信と処理を続行 const message$ = fromEvent(client, 'message').pipe(takeUntil(unsubscribe$)).pipe( map(([topic, payload, packet]: [string, Buffer, IPublishPacket]) => { return processMessage(topic, payload, packet) }), ) // filter を使用してシステムメッセージをフィルタリング const nonSYSMessage$ = message$.pipe(filter((message: MessageModel) => !message.topic.includes('$SYS'))) // bufferTime を使用してメッセージをキャッシュし、1秒ごとにバッチでデータベースに保存 nonSYSMessage$.pipe(bufferTime(1000)).subscribe((messages: MessageModel[]) => { messages.length && saveMessage(id, messages) }) // bufferTime を使用してメッセージをキャッシュし、2秒ごとに UI にレンダリング nonSYSMessage$.pipe(bufferTime(500)).subscribe((messages: MessageModel[]) => { messages.length && renderMessage(messages) })
まとめ
この記事では、MQTT.js の一般的な API の使用機能について簡単に説明しました。 MQTT のトピック、ワイルドカード、保持メッセージ、ラストウィルなどの機能については、EMQ が提供する MQTT Guide 2023: Beginner to Advanced の記事シリーズをご覧ください。 より高度な MQTT のアプリケーションを探索し、MQTT アプリケーションとサービス開発を開始してください。
実際のプロジェクトでの具体的な使用法については、以下のリンクを参照してください。
- Vue での MQTT の使用方法
- React での MQTT の使用方法
- Angular での MQTT の使用方法
- Electron での MQTT の使用方法
- Node.js での MQTT の使用方法
- [WebSocket を使用した MQTT クイックスタートガイド](
-
-