目次
はじめに
MQTTは、パブリッシュ/サブスクライブモデルに基づいた軽量なIoTメッセージングプロトコルです。ごく少量のコードと帯域幅で、ネットワーク接続されたデバイスに実時間かつ信頼性の高いメッセージングサービスを提供できます。IoT、モバイルインターネット、スマートハードウェア、コネクテッドカー、電力エネルギーなどの業界で広く利用されています。
Node.jsは、イベント駆動型アーキテクチャとリアルタイムデータ処理により、IoTで広く使用されています。デバイス、サーバー、APIを簡単に接続できます。 Node.jsとMQTTを組み合わせることで、開発者はリアルタイムでデバイスと通信し、情報を交換し、複雑なデータ分析を実行するスケーラブルでセキュアなIoTアプリケーションを構築できます。
この記事では、クライアントとMQTTブローカー間のシームレスな通信のために、Node.jsプロジェクトでMQTTを使用する包括的なガイドを提供します。接続の確立、トピックへのサブスクライブとアンサブスクライブ、メッセージのパブリッシュ、リアルタイムでのメッセージの受信の方法を学びます。このガイドを通じて、スケーラブルで効率的なIoTアプリケーションを構築するためにMQTTを活用するスキルが身につきます。
Node.js MQTTプロジェクトの準備
Node.jsのバージョンを確認する
このプロジェクトでは、開発とテストにNode.js v16.20.0を使用します。正しいバージョンのNode.jsがインストールされていることを確認するには、次のコマンドを使用できます:
node --version
v16.20.0
MQTT.jsのインストール
MQTT.jsは、Node.jsとブラウザの両方用にJavaScriptで書かれたMQTTプロトコルのクライアントライブラリです。 JavaScriptのシングルスレッド機能により、MQTT.jsは完全に非同期のMQTTクライアントです。現在、JavaScriptエコシステムで最も広く使用されているMQTTクライアントライブラリです。
NPMまたはYarnを使用してMQTT.jsをインストールできます。
-
NPM
# 新しいプロジェクトの作成 npm init -y # 依存関係のインストール npm install mqtt --save
-
Yarn
yarn add mqtt
インストール後、現在のディレクトリに新しいindex.jsファイルを作成できます。これは、プロジェクトのエントリファイルとして機能します。ここで、MQTT接続テストの完全なロジックを実装できます。
MQTTブローカーの準備
次に進む前に、通信とテストを行うMQTTブローカーが必要です。 MQTTブローカーを取得するにはいくつかのオプションがあります:
-
プライベートデプロイ
EMQXはIoT、IIoT、接続車両向けの最もスケーラブルなオープンソースMQTTブローカーです。次のDockerコマンドを実行してEMQXをインストールできます。
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx
-
フルマネージドクラウドサービス
フルマネージドクラウドサービスは、MQTTサービスを開始する最も簡単な方法です。 EMQX Cloudを使用すると、わずか数分で開始し、AWS、Google Cloud、Microsoft Azureの20か所以上のリージョンでMQTTサービスを実行できるため、高可用性と高速な接続が保証されます。
最新エディションのEMQX Cloud Serverless は、開発者が数秒で簡単にMQTTデプロイを開始できるよう、毎月100万セッションミニッツの永久に無料のプランを提供しています。
-
フリーのパブリックMQTTブローカー
フリーのパブリックMQTTブローカーは、MQTTプロトコルを学習およびテストしたいユーザー専用です。 本番環境で使用することは推奨できません。
この記事では、broker.emqx.io
のフリー公開MQTTブローカーを使用します。
MQTTブローカーインフォ
サーバー:
broker.emqx.io
TCPポート:1883
WebSocketポート:8083
SSL/TLSポート:8883
Secure WebSocketポート:8084
詳細はこちらをご確認ください: フリー公開 MQTT ブローカー
Node.jsでのMQTTの使用
MQTT接続の作成
TCP接続
MQTT.jsクライアントライブラリをインポートする
注:Node.js環境では、依存モジュールのインポートにcommonjs仕様を使用してください
const mqtt = require('mqtt')
MQTT接続を確立するには、接続アドレス、ポート、クライアントIDを設定する必要があります。この例では、JavaScriptの組み込み関数を使用してクライアントIDを生成しています。
const protocol = 'mqtt'
const host = 'broker.emqx.io'
const port = '1883'
const clientId = `mqtt_${Math.random().toString(16).slice(3)}`
const connectUrl = `${protocol}://${host}:${port}`
次に、ホストとポートを連結して構築されたURLを使用して接続を確立します。これを実現するために、MQTTモジュールの組み込みconnect関数を呼び出し、接続が確立されるとClientインスタンスを返します。
const client = mqtt.connect(connectUrl, {
clientId,
clean: true,
connectTimeout: 4000,
username: 'emqx',
password: 'public',
reconnectPeriod: 1000,
})
client.on('connect', () => {
console.log('Connected')
})
詳細は、ブログのMQTT接続の確立時にパラメータを設定する方法をご覧ください。
WebSocket
WebSocketを使用してブローカーに接続する場合は、次の点に注意する必要があります:
- WebSocket接続URLは
ws
プロトコルで始まる必要があります。 - ポートを正しいWebSocketポートに更新します(
broker.emqx.io
の場合は8083)。 - 接続URLの最後にパスパラメータを付加することを確認してください(
broker.emqx.io
の場合は/mqtt
)。
const protocol = 'ws'
const host = 'broker.emqx.io'
const port = '8083'
const path = '/mqtt'
const clientId = `mqtt_${Math.random().toString(16).slice(3)}`
const connectUrl = `${protocol}://${host}:${port}${path}`
TLS/SSL
MQTTでTLSを使用すると、情報の機密性と完全性を確保できるため、情報漏洩や改ざんを防ぐことができます。 TLS認証は、一方向認証と双方向認証に分類できます。
一方向認証
const fs = require('fs')
const protocol = 'mqtts'
const host = 'broker.emqx.io'
const port = '8883'
const clientId = `mqtt_${Math.random().toString(16).slice(3)}`
const connectUrl = `${protocol}://${host}:${port}`
const client = mqtt.connect(connectUrl, {
clientId,
clean: true,
connectTimeout: 4000,
username: 'emqx',
password: 'public',
reconnectPeriod: 1000,
// サーバーがCA証書を使用している場合は、CAを渡す必要があります
ca: fs.readFileSync('./broker.emqx.io-ca.crt'),
})
双方向認証
const fs = require('fs')
const protocol = 'mqtts'
const host = 'broker.emqx.io'
const port = '8883'
const clientId = `mqtt_${Math.random().toString(16).slice(3)}`
const connectUrl = `${protocol}://${host}:${port}`
const client = mqtt.connect(connectUrl, {
clientId,
clean: true,
connectTimeout: 4000,
username: 'emqx',
password: 'public',
reconnectPeriod: 1000,
// SSL/TLSを有効にするかどうか
rejectUnauthorized: true,
// 双方向認証を使用している場合は、CA、クライアント証明書、クライアント秘密鍵を渡す必要があります。
ca: fs.readFileSync('./broker.emqx.io-ca.crt'),
key: fs.readFileSync('./client.key'),
cert: fs.readFileSync('./client.crt'),
})
MQTTトピックへのサブスクライブ
戻されたClientインスタンスのon
関数を使用して、接続ステータスを監視し、接続成功後のコールバック関数で/nodejs/mqtt
トピックをサブスクライブします。
const topic = '/nodejs/mqtt'
client.on('connect', () => {
console.log('Connected')
client.subscribe([topic], () => {
console.log(`Subscribe to topic '${topic}'`)
})
})
トピックへのサブスクライブが成功すると、on
関数を使用して着信メッセージを監視できます。新しいメッセージが到着すると、この関数のコールバック関数内で関連トピックとメッセージを取得できます。これにより、受信したメッセージを処理し適切に応答することができます。
注: コールバック関数内で受信されたメッセージはBuffer型であり、toString関数によって文字列に変換する必要があります。
client.on('message', (topic, payload) => {
console.log('Received Message:', topic, payload.toString())
})
MQTTメッセージのパブリッシュ
上記のトピックサブスクリプションとメッセージモニタリングを完了した後、メッセージのパブリッシュ機能を記述します。
注: メッセージは、MQTT接続が成功した後にパブリッシュする必要があります。そのため、接続に成功した後のコールバック関数内に記述します。
client.on('connect', () => {
client.publish(topic, 'nodejs mqtt test', { qos: 0, retain: false }, (error) => {
if (error) {
console.error(error)
}
})
})
MQTT接続の切断
MQTT.jsでは、切断するにはmqtt.Client
オブジェクトのend()
メソッドを使用する必要があります。このメソッドはリソースを解放し、接続を閉じます。パラメータにtrue
を渡すと、強制的に切断します。DISCONNECT
メッセージは送信せず、代わりに接続を直接閉じます。切断が完了したときに呼び出されるコールバック関数を渡すこともできます。
// 切断
client.end()
// 強制的に切断
client.end(true)
// 切断のコールバック
client.end(false, {}, () => {
console.log('clientdisconnected')
})
end()
メソッドの詳細については、公式ドキュメントを参照してください。
エラー処理
-
接続エラーの処理
client.on('error', (error) => { console.error('connection failed', error) })
-
再接続エラーの処理
client.on('reconnect', (error) => { console.error('reconnect failed', error) })
-
サブスクリプションエラーの処理
client.on('connect', () => { client.subscribe('topic', subOpts, (error) => { if (error) { console.error('subscription failed', error) } }) })
-
パブリッシュエラーの処理
client.on('connect', () => { client.publish('topic', 'hello mqtt', (error) => { if (error) { console.error('publish failed', error) } }) })
完全なコード
サーバー接続、トピックサブスクリプション、メッセージパブリッシュ、受信のコードは次のとおりです。
const mqtt = require('mqtt')
const host = 'broker.emqx.io'
const port = '1883'
const clientId = `mqtt_${Math.random().toString(16).slice(3)}`
const connectUrl = `mqtt://${host}:${port}`
const client = mqtt.connect(connectUrl, {
clientId,
clean: true,
connectTimeout: 4000,
username: 'emqx',
password: 'public',
reconnectPeriod: 1000,
})
const topic = '/nodejs/mqtt'
client.on('connect', () => {
console.log('Connected')
client.subscribe([topic], () => {
console.log(`Subscribe to topic '${topic}'`)
client.publish(topic, 'nodejs mqtt test', { qos: 0, retain: false }, (error) => {
if (error) {
console.error(error)
}
})
})
})
client.on('message', (topic, payload) => {
console.log('Received Message:', topic, payload.toString())
})
このプロジェクトの完全なコードは、GitHubでご確認ください。
テスト
package.jsonファイルのscriptフィールドに起動スクリプトの行を追加します。
"scripts": {
"start": "node index.js"
}
単純にnpm start
を使用してプロジェクトを実行できます。
npm start
次に、コンソールの出力情報が次のように表示されるのがわかります:
クライアントがMQTTブローカーに正常に接続し、トピックをサブスクライブしてメッセージを正常に受信および公開したことがわかります。 この時点で、別のクライアントとしてMQTTクライアントツール- MQTTXを使用して、メッセージの公開と受信のテストを行います。
MQTTXで送信されたメッセージがコンソールに印刷されているのがわかります。
Q&A
MQTTメッセージはどの形式で送信されますか?
Node.jsでMQTTを使用すると、MQTTメッセージはBufferとして送信されます。これは、テキストだけでなく、任意の形式のデータを含める可能性があるため、生のバイナリデータを処理するように設計されているためです。
Node.jsでBufferとして受信されたMQTTメッセージを処理するには?
Node.jsでBufferを処理するには、toString()
メソッドを使用して文字列に変換できます。元のメッセージがJSONオブジェクトの場合、JSON.parse()
を使用して文字列をオブジェクトに再パースする必要がある場合があります。次に例を示します:
client.on('message', (topic, message) => {
// message is a Buffer
let strMessage = message.toString();
let objMessage = JSON.parse(strMessage);
console.log(objMessage);
})
送信されたMQTTメッセージがJSONフォーマットでない場合はどうなりますか?
MQTTメッセージがJSONでない場合でも、toString()
メソッドを使用して文字列に変換できますが、コンテンツが最初から文字列ではなかった場合(たとえば、バイナリデータの場合)、データの性質に応じて異なる方法で処理する必要がある場合があります。
Node.js MQTTクライアントがメッセージを受信しない場合はどうすればいいですか?
- 正しいトピックにサブスクライブしていることを確認してください。MQTTのトピックは大文字と小文字が区別されるため、一致する必要があります。
- ブローカーが実行されており、接続できることを確認してください。
- MQTT QoSレベル1または2を使用している場合は、同じQoSレベルでメッセージが公開されていることを確認してください。
- エラーがスローされているかどうかを確認するために、クライアントの
error
イベントをリッスンしてください。これがメッセージを受信できない理由を説明できる可能性があります。
Node.js MQTTアプリケーションの問題をデバッグするには?
- MQTTクライアントの
error
イベントをリッスンします。これにより、多くの場合、問題に関する役立つ情報が提供されます。 - コード内にconsole.logステートメントを使用してフローを確認し、変数の内容を確認します。
- MQTTメッセージに問題がある場合は、
#
トピックをサブスクライブしてみてください。これは、送信されたすべてのメッセージと一致するワイルドカードトピックであるため、送信されたすべてのものが表示されます。 - 接続の問題がある場合は、手がかりがないかブローカーのログを確認してください。
- MQTTXのようなツールを使用して、ブローカーに手動で接続し、トピックを公開/サブスクライブすることを検討してください。これにより、問題がNode.jsコードにあるのか、ブローカーにあるのかを判断できます。
Node.js MQTT応用
Express.jsなどのWebフレームワークでMQTTを使用するには
Express.jsは、Node.jsを使用してWebアプリケーションとAPIを構築するためのオープンソースのWebアプリケーションフレームワークです。 現在、最も人気のあるNode.js Webフレームワークの1つであり、Webアプリケーションを作成する際に高い柔軟性とスケーラビリティを備えています。
Express.jsにMQTTを統合するには、ミドルウェアでMQTT接続、パブリッシュ、サブスクライブ操作を処理できます。以下は、簡単なコードの例です:
import express from 'express'
import * as mqtt from 'mqtt'
const app = express()
const mqttClient = mqtt.connect('mqtt://localhost:1883')
// MQTTブローカーに接続
mqttClient.on('connect', function () {
console.log('Connected to MQTT broker')
})
// パブリッシュとサブスクライブのためのMQTTミドルウェア
app.use(function (req, res, next) {
// メッセージをパブリッシュ
req.mqttPublish = function (topic, message) {
mqttClient.publish(topic, message)
}
// トピックをサブスクライブ
req.mqttSubscribe = function (topic, callback) {
mqttClient.subscribe(topic)
mqttClient.on('message', function (t, m) {
if (t === topic) {
callback(m.toString())
}
})
}
next()
})
app.get('/', function (req, res) {
// パブリッシュ
req.mqttPublish('test', 'Hello MQTT!')
// サブスクライブ
req.mqttSubscribe('test', function (message) {
console.log('Received message: ' + message)
})
res.send('MQTT is working!')
})
app.listen(3000, function () {
console.log('Server is running on port 3000')
})
上記のコードはExpress.jsアプリケーションを作成し、req.mqttPublish()
関数とreq.mqttSubscribe()
関数を定義して、MQTT公開とサブスクライブを処理します。 これらの関数は、メッセージの公開とMQTTトピックのサブスクライブに使用できます。 この例では、ルートパスにアクセスすると、「test」トピックに「Hello MQTT!」メッセージが公開され、着信メッセージを受信および処理するためにサブスクライブされます。
これは単純な実装例にすぎません。実際のアプリケーションでは、複数のトピックの処理やリクエストパラメータの検証など、より複雑な処理が必要になる場合があります。
Node.jsでコマンドラインツールを構築する方法
Node.jsには、強力なオープンソースエコシステムがあり、開発者はさまざまなオープンソースライブラリを活用することで、特定のビジネス要件を満たすMQTTクライアントツールをすぐに作成できます。 さらに、Node.jsにはpkg
ツールを使用したプロジェクトの実行ファイルへのパッケージングが容易に提供されているため、クロスプラットフォームのデプロイが可能です。 たとえば、Node.jsのCommander.jsコマンドラインツールライブラリを利用して、カスタマイズされたコマンドラインツールを構築し、ビジネス環境に統合できます。
次に、Commander.jsを使用したシンプルなMQTTコマンドラインツールの構築方法を示します。このツールには、pub
とsub
の2つのコマンドがあります。
-
pub
コマンドを使用すると、ユーザーは指定されたトピックにメッセージを公開できます。 -
sub
コマンドを使用すると、ユーザーは指定されたトピックをサブスクライブし、受信したメッセージを表示できます。
// mqtt-cli.js
import { program } from 'commander'
import mqtt from 'mqtt'
// MQTTブローカーアドレス
const brokerUrl = 'mqtt://localhost:1883'
// CLIコマンドを定義する
program
.command('pub')
.description('指定されたトピックにメッセージを公開する')
.option('-t, --topic <TOPIC>', 'メッセージのトピック')
.option('-m, --message <BODY>', 'メッセージ本文')
.action((options) => {
const { topic, message } = options
const client = mqtt.connect(brokerUrl)
client.on('connect', () => {
client.publish(topic, message, () => {
console.log(`"${message}"メッセージを"${topic}"トピックに公開しました`)
client.end()
})
})
})
program
.command('sub')
.description('指定されたトピックをサブスクライブし、着信メッセージを記録する')
.option('-t, --topic <TOPIC>', 'メッセージのトピック')
.action((options) => {
const { topic } = options
const client = mqtt.connect(brokerUrl)
client.on('connect', () => {
console.log(`"${topic}"トピックをサブスクライブしました`)
client.subscribe(topic, () => {
client.on('message', (topic, message) => {
console.log(`"${message.toString()}"メッセージを"${topic}"トピックで受信しました`)
})
})
})
})
program.parse(process.argv)
# テストトピックをサブスクライブする
node mqtt-cli.js sub -t test
# MQTTメッセージをパブリッシュする
node mqtt-cli.js pub -t test -m 'Hello MQTT!'
まとめ
これまで、Node.jsをMQTTクライアントとして使用して、パブリックMQTTブローカーに接続し、テストクライアントとMQTTサーバー間の接続、メッセージのパブリッシュとサブスクリプションを実現してきました。
次に、EMQが提供するMQTTガイド: 初級者から上級者への記事シリーズをチェックアウトして、MQTTプロトコルの機能を学習したり、MQTTのさらに高度なアプリケーションを探求したり、MQTTアプリケーションとサービス開発を開始したりできます。