LoginSignup
4
5

Node.jsでのMQTT使用法

Posted at

image.png

はじめに

MQTTは、パブリッシュ/サブスクライブモデルに基づく軽量のIoTメッセージングプロトコルです。非常に少ないコードと帯域幅で、ネットワークデバイスにリアルタイムかつ信頼性の高いメッセージングサービスを提供することができます。IoT、モバイルインターネット、スマートハードウェア、Internet of Vehicles、電力エネルギーなどの産業で広く利用されている。

Node.jsは、イベント駆動型のアーキテクチャとリアルタイムのデータハンドリングにより、IoTで広く利用されています。デバイス、サーバ、APIを簡単に接続することができます。Node.jsとMQTTを組み合わせることで、開発者は、デバイスとリアルタイムで通信し、情報を交換し、複雑なデータ分析を行う、スケーラブルで安全なIoTアプリケーションを構築できます。

この記事では、Node.jsプロジェクトでMQTTを使用して、クライアントとMQTTブローカー間のシームレスな通信を実現するための包括的なガイドを提供します。接続の確立、トピックのサブスクライブとアンサブスクライブ、メッセージの公開、およびリアルタイムでのメッセージの受信方法について学びます。このガイドでは、MQTTを活用してスケーラブルで効率的なIoTアプリケーションを構築するためのスキルを身につけることができます。

Node.js MQTTプロジェクト準備編

Node.jsのバージョン確認

このプロジェクトでは、開発およびテストに Node.js v16.20.0 を使用しています。正しいバージョンのNode.jsがインストールされていることを確認するために、読者は以下のコマンドを使用することができます:

node --version  
v16.20.0 

MQTT.jsをインストールする

MQTT.jsは、MQTTプロトコルのクライアントライブラリで、node.jsとブラウザ用にJavaScriptで書かれています。JavaScriptのシングルスレッド機能により、MQTT.jsは完全な非同期型MQTTクライアントとなります。現在、JavaScriptエコシステムで最も広く使われているMQTTクライアントライブラリです。

MQTT.jsのインストールには、NPMやYarnを使用します。

  • NPM
\# create a new project 
npm init -y  

# Install dependencies 
npm install mqtt --save 
  • YARN
yarn add mqtt 

インストールしたら、カレントディレクトリに新しいindex.jsファイルを作成し、これがプロジェクトのエントリファイルとして機能します。ここで、MQTT接続テストの完全なロジックを実装することができます。

MQTTブローカー実装の準備

先に進む前に、通信とテストを行うためのMQTTブローカーがあることを確認してください。MQTTブローカーを入手するには、いくつかのオプションがあります:

  • プライベート展開
    EMQXは、IoT、IIoT、コネクテッドカー向けの最もスケーラブルなオープンソースのMQTTブローカーです。以下のDockerコマンドを実行することでEMQXをインストールすることができます。
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx 
  • フルマネージドクラウドサービス
    フルマネージドクラウドサービスは、MQTTサービスを開始するための最も簡単な方法です。EMQX Cloudを利用すれば、わずか数分でサービスを開始でき、AWS、Google Cloud、Microsoft Azureの20以上のリージョンでMQTTサービスを実行し、グローバルな可用性と高速接続を確保することが可能です。
    最新版のEMQX Cloud Serverlessは、開発者が数秒で簡単にMQTTの導入を開始できるように、永久無料の1Mセッション分/月の無償提供をしています。
  • 無料公開のMQTTブローカー
    無料公開MQTTブローカーは、MQTTプロトコルの学習とテストを希望する人だけが利用できます。セキュリティリスクやダウンタイムの懸念があるため、本番環境での使用は避けることが重要です。

このブログ記事では、 broker.emqx.io
の無料公開MQTTブローカーを使用することにします。

MQTTブローカー情報
サーバー: broker.emqx.io
TCPポート: 1883
WebSocketポート: 8083
SSL/TLSポート: 8883
セキュアWebSocketポート: 8084

詳しくは、こちらをご確認ください:無料公開のMQTTブローカー

Node.js MQTTの使用法

MQTTコネクションの作成

TCPコネクション

MQTT.jsクライアントライブラリのインポート

注)Node.js環境では、依存モジュールのインポートにcommonjs仕様を使用してください。

const mqtt = require('mqtt')

MQTT接続を確立するために、接続アドレス、ポート、クライアントIDを設定する必要がある。この例では、クライアントIDの生成にJavaScriptに内蔵されている乱数生成関数を利用しています。

const protocol = 'mqtt' 
const host = 'broker.emqx.io' 
const port = '1883' 
const clientId = \`mqtt\_${Math.random().toString(16).slice(3)}\` 
const connectUrl = \`${protocol}://${host}:${port}\` 

次に、ホストとポートをつなげて作成した URL を使って接続を確立する。これを実現するために、MQTTモジュールの組み込みconnect関数を呼び出し、接続が確立されると、Clientインスタンスを返します。

const client = mqtt.connect(connectUrl, {   
clientId,   
clean: true,   
connectTimeout: 4000,   
username: 'emqx',   
password: 'public',   
reconnectPeriod: 1000, })  

client.on('connect', () => {   
console.log('Connected') 
}) 

詳しくは、ブログ「MQTT接続の確立時にパラメータを設定する方法」をご確認ください。

ウェブソケット

MQTT over WebSocketを使ってブローカーに接続する場合、いくつか注意しなければならないことがあります:

  • WebSocketの接続URLは、 ws
    プロトコルで始まる必要があります。
  • ポートを正しいWebSocketポートに更新します(例: broker.emqx.io
    の場合は8083)。
  • 接続URLの末尾にpathパラメータを付加するようにしてください(例: broker.emqx.io
    の場合は /mqtt
    のように)。
const protocol = 'ws' 
const host = 'broker.emqx.io' 
const port = '8083' 
const path = '/mqtt' 
const clientId = \`mqtt\_${Math.random().toString(16).slice(3)}\` 
const connectUrl = \`${protocol}://${host}:${port}${path}\` 

TLS/SSL

MQTTでTLSを使用することで、情報の機密性と完全性を確保し、情報の漏洩や改ざんを防止することができます。TLS認証は、一方向性認証と双方向性認証に分類される。

一方的な認証

const fs = require('fs')  
const protocol = 'mqtts' 
const host = 'broker.emqx.io' 
const port = '8883' const clientId = \`mqtt\_${Math.random().toString(16).slice(3)}\` 
const connectUrl = \`${protocol}://${host}:${port}\` 
const client = mqtt.connect(connectUrl, {   clientId,   clean: true,   connectTimeout: 4000,   username: 'emqx',   password: 'public',   reconnectPeriod: 1000,    

// If the server is using a self-signed certificate, you need to pass the CA. 
ca: fs.readFileSync('./broker.emqx.io-ca.crt'), }) 

双方向認証

const fs = require('fs')  
const protocol = 'mqtts' 
const host = 'broker.emqx.io' const port = '8883' 
const clientId = \`mqtt\_${Math.random().toString(16).slice(3)}\` 
const connectUrl = \`${protocol}://${host}:${port}\` 
const client = mqtt.connect(connectUrl, {   
clientId,   
clean: true,   
connectTimeout: 4000,   
username: 'emqx',   
password: 'public',   
reconnectPeriod: 1000,    

// Enable the SSL/TLS, whether a client verifies the server's certificate chain and host name 
rejectUnauthorized: true,   
// If you are using Two-way authentication, you need to pass the CA, client certificate, and client private key. 
ca: fs.readFileSync('./broker.emqx.io-ca.crt'),   
key: fs.readFileSync('./client.key'),   
cert: fs.readFileSync('./client.crt'), }) 

MQTTトピックを購読する

返されたClientインスタンスの on
関数を使って接続状態を監視し、接続成功後のコールバック関数でトピック /nodejs/mqtt
にサブスクライブしています。

const topic = '/nodejs/mqtt'

client.on('connect', () => {
  console.log('Connected')
  client.subscribe([topic], () => {
    console.log(`Subscribe to topic '${topic}'`)
  })
})

トピックの購読に成功すると、 on
関数を使用して受信メッセージを監視することができます。新しいメッセージが到着すると、この関数のコールバック関数内で関連するトピックとメッセージを取得することができます。これにより、受信したメッセージを効果的に処理し、それに応じて応答することができます。

注:コールバック関数内で受信したメッセージはBuffer型であり、toString関数で文字列に変換する必要があります。

client.on('message', (topic, payload) => {   
console.log('Received Message:', topic, payload.toString()) 
}) 

MQTTメッセージの発行

以上のトピック購読とメッセージ監視が完了したら、メッセージを公開するための関数を書きます。

注:メッセージはMQTT接続が成功した後に公開する必要があるので、接続が成功した後のコールバック関数に記述しています。

client.on('connect', () => {
  client.publish(topic, 'nodejs mqtt test', { qos: 0, retain: false }, (error) => {
    if (error) {
      console.error(error)
    }
  })
})

MQTT接続を切断する

MQTT.js では、ブローカーとの接続を解除するために、 mqtt.Client
オブジェクトの end()
メソッドを使用する必要があります。このメソッドはリソースを解放し、接続を閉じます。このメソッドに true
というパラメータを渡すと強制的に切断され、 DISCONNECT
メッセージは送信されず、代わりに接続が直接切断されます。また、切断が完了した時点で呼び出されるコールバック関数を渡すことも可能です。

// Disconnect
client.end()

// Force disconnect
client.end(true)

// Callback for disconnection
client.end(false, {}, () => {
  console.log('client disconnected')
})

end()
メソッドの詳細については、公式ドキュメントを参照してください。

エラー処理

  • 接続エラー処理
client.on('error', (error) => {  
console.error('connection failed', error) 
}) 
  • 再接続のエラー処理
client.on('reconnect', (error) => {  
console.error('reconnect failed', error) 
}) 
  • サブスクリプションのエラー処理
client.on('connect', () => {
 client.subscribe('topic', subOpts, (error) => {
   if (error) {
     console.error('subscription failed', error)
   }
 })
})
  • パブリッシングエラー処理
client.on('connect', () => {
 client.publish('topic', 'hello mqtt', (error) => {
   if (error) {
     console.error('publish failed', error)
   }
 })
})

コンプリートコード

サーバー接続、トピック購読、メッセージ公開、受信のコードは以下の通りです。

const mqtt = require('mqtt')

const host = 'broker.emqx.io'
const port = '1883'
const clientId = `mqtt_${Math.random().toString(16).slice(3)}`

const connectUrl = `mqtt://${host}:${port}`

const client = mqtt.connect(connectUrl, {
  clientId,
  clean: true,
  connectTimeout: 4000,
  username: 'emqx',
  password: 'public',
  reconnectPeriod: 1000,
})

const topic = '/nodejs/mqtt'

client.on('connect', () => {
  console.log('Connected')

  client.subscribe([topic], () => {
    console.log(`Subscribe to topic '${topic}'`)
    client.publish(topic, 'nodejs mqtt test', { qos: 0, retain: false }, (error) => {
      if (error) {
        console.error(error)
      }
    })
  })
})

client.on('message', (topic, payload) => {
  console.log('Received Message:', topic, payload.toString())
})

このプロジェクトの完全なコードについては、GitHubで確認してください。

テスト

package.jsonファイルのscriptフィールドに起動スクリプトを1行追加します。

"scripts": { 
"start": "node index.js" 
} 

単純に npm start
でプロジェクトを実行すればいいのです。

npm start 

すると、コンソールの出力情報が以下のように表示されます:

image.png

クライアントがMQTTブローカーに正常に接続し、トピックをサブスクライブし、メッセージを受信し、正常にパブリッシュしたことがわかります。このとき、もう1つのクライアントとしてMQTT Client Tool - MQTTXを使用して、メッセージの公開と受信のテストを行うことにします。

image.png

MQTTXで送信されたメッセージがコンソールに出力されていることが確認できます。

image.png

Q&A

MQTTメッセージはどのようなフォーマットで送られてくるのですか?

Node.jsでMQTTを使用する場合、MQTTメッセージはBufferとして提供されます。これは、生のバイナリデータを扱うように設計されているためで、テキストだけでなく、あらゆる形式のデータを含む可能性があります。

Node.jsでBufferとして受信したMQTTメッセージをどのように扱えばよいですか?

Node.jsでBufferを扱うには、 toString()
メソッドを使用して文字列に変換します。元のメッセージがJSONオブジェクトだった場合、 JSON.parse()
で文字列をパースしてオブジェクトに戻す必要があるかもしれません。以下はその例です:

client.on('message', (topic, message) => {
  // message is a Buffer
  let strMessage = message.toString();
  let objMessage = JSON.parse(strMessage);
  console.log(objMessage);
})

送信されたMQTTメッセージがJSON形式でない場合はどうすればよいですか?

MQTTメッセージがJSONでない場合でも、 toString()
メソッドを使って文字列に変換することができます。ただし、内容がもともと文字列でない場合(例えばバイナリデータの場合)には、データの性質に応じて異なる処理をする必要があるかもしれません。

Node.jsのMQTTクライアントがメッセージを受信しない場合の対処法とは?

  • 正しいトピックを購読していることを確認してください。MQTTのトピックは大文字と小文字が区別され、完全に一致する必要があります。
  • ブローカーが起動していること、接続できることを確認します。
  • MQTT QoSレベル1または2を使用している場合、メッセージが同じQoSレベルで発行されることを確認してください。
  • クライアントで error
    イベントをリッスンして、エラーが投げられたかどうかを確認します。

Node.js MQTTアプリケーションの問題をデバッグするにはどうすればよいですか?

  • MQTTクライアントで「エラー」イベントを聞いてください。これは、問題についての有益な情報を提供することが多い。
  • コードの中でconsole.log文を使い、流れを確認したり、変数の内容を見たりします。
  • MQTTメッセージに問題がある場合は、'#'トピックを購読してみてください。これは、すべてのメッセージにマッチするワイルドカードトピックなので、公開されているすべてのメッセージを見ることができます。
  • 接続に問題がある場合は、ブローカーのログを確認し、そこに手がかりがないかどうか確認してください。
  • MQTTXのようなツールを使って、ブローカーに手動で接続し、トピックを発行/購読することを検討してください。これは、問題がNode.jsのコードにあるのか、ブローカーにあるのかを判断するのに役立ちます。

Node.js MQTTアドバンス

Express.jsなどのWebフレームワークでMQTTを利用する方法

Express.jsは、Node.jsを使用してWebアプリケーションやAPIを構築する、オープンソースのWebアプリケーションフレームワークです。現在、最も人気のあるNode.jsのWebフレームワークの1つで、Webアプリケーションを作成する際に高い柔軟性とスケーラビリティを備えています。

MQTTをExpress.jsに組み込むには、そのミドルウェアでMQTTの接続、公開、購読の操作を行うことができます。以下は簡単なサンプルコードです:

import express from 'express'
import * as mqtt from 'mqtt'

const app = express()
const mqttClient = mqtt.connect('mqtt://localhost:1883')

// Connect to the MQTT broker
mqttClient.on('connect', function () {
  console.log('Connected to MQTT broker')
})

// MQTT middleware for publishing and subscribing
app.use(function (req, res, next) {
  // Publish messages
  req.mqttPublish = function (topic, message) {
    mqttClient.publish(topic, message)
  }

  // Subscribe to topic
  req.mqttSubscribe = function (topic, callback) {
    mqttClient.subscribe(topic)
    mqttClient.on('message', function (t, m) {
      if (t === topic) {
        callback(m.toString())
      }
    })
  }
  next()
})

app.get('/', function (req, res) {
  // Publish
  req.mqttPublish('test', 'Hello MQTT!')

  // Subscribe
  req.mqttSubscribe('test', function (message) {
    console.log('Received message: ' + message)
  })

  res.send('MQTT is working!')
})

app.listen(3000, function () {
  console.log('Server is running on port 3000')
})

上記のコードでは、Express.jsアプリケーションを作成し、MQTTのパブリッシュとサブスクライブを処理するための req.mqttPublish()
と req.mqttSubscribe()
関数を定義しています。これらの関数は、MQTTトピックへのメッセージのパブリッシュとサブスクライブを行うためのルート処理で使用することができます。この例では、ルートパスにアクセスすると、「Hello MQTT!」メッセージが「test」トピックに公開され、その後、受信したメッセージを受信して処理するために購読されます。

これはあくまで簡単な実装例であり、実際のアプリケーションでは、複数のトピックの処理やリクエストパラメータの検証など、より複雑な処理が必要となる場合があります。

Node.jsでコマンドラインツールを構築する方法

Node.jsは堅牢なオープンソースエコシステムを誇り、開発者は様々なオープンソースライブラリを活用することで、特定のビジネス要件を満たすMQTTクライアントツールを迅速に作成することが可能です。さらに、Node.js は、 pkg
ツールを使用してプロジェクトを実行可能なファイルに簡単にパッケージ化できるため、クロスプラットフォームでの展開が可能です。例えば、Node.jsのコマンドラインツールライブラリであるCommander.jsを利用すれば、カスタマイズしたコマンドラインツールを構築して、ビジネス環境に統合することができます。

次に、Commander.jsを使用して、簡単なMQTTコマンドラインツールを構築する方法を説明します。このツールの特徴は、2つのコマンドです: pub
と sub
です。

  • pub
    コマンドは、ユーザーが指定したトピックにメッセージを公開することができます。
  • sub
    コマンドは、指定されたトピックを購読し、受信したメッセージを表示することを許可します。
// mqtt-cli.js

import { program } from 'commander'
import mqtt from 'mqtt'

// MQTT Broker URL
const brokerUrl = 'mqtt://localhost:1883'

// Define CLI commands
program
  .command('pub')
  .description('Publish message to the given topic')
  .option('-t, --topic <TOPIC>', 'the message topic')
  .option('-m, --message <BODY>', 'the message body')
  .action((options) => {
    const { topic, message } = options

    const client = mqtt.connect(brokerUrl)

    client.on('connect', () => {
      client.publish(topic, message, () => {
        console.log(`Published message "${message}" to topic "${topic}"`)
        client.end()
      })
    })
  })

program
  .command('sub')
  .description('Subscribe to the given topic and log incoming messages')
  .option('-t, --topic <TOPIC>', 'the message topic')
  .action((options) => {
    const { topic } = options

    const client = mqtt.connect(brokerUrl)

    client.on('connect', () => {
      console.log(`Subscribed to topic "${topic}"`)

      client.subscribe(topic, () => {
        client.on('message', (topic, message) => {
          console.log(`Received message "${message.toString()}" on topic "${topic}"`)
        })
      })
    })
  })

program.parse(process.argv)
# Subscribe to test topic 
node mqtt-cli.js sub -t test  
# Publish an MQTT message 
node mqtt-cli.js pub -t test -m 'Hello MQTT!' 

まとめ

これまで、Node.jsをMQTTクライアントとして、パブリックなMQTTブローカーに接続し、テストクライアントとMQTTサーバー間の接続、メッセージ発行、購読を実現してきました。

次に、MQTTガイドをチェックすることができます:EMQが提供する「Beginner to Advanced」シリーズで、MQTTプロトコルの機能を学び、MQTTのより高度なアプリケーションを探求し、MQTTアプリケーションとサービス開発を始めましょう。

元々は www.emqx.com で公開されました

4
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
5