この記事はMarkLogic Advent Calendar 2017の16日目です。
はじめに
前回はMarkLogic9とNode.jsアプリケーションを連携し、座標情報を扱うところまで行いました。
今回はMQTTを導入してみようと思います。
MQTTとは
もうあちらこちらで解説されているので説明は不要かもしれませんが、MQTT(MQ Telemetry Transport)とは、TCP/IP上で動作する軽量なメッセージキュープロトコルです。Publish/Subscribe型のメッセージ配信モデルになります。
HTTPと比較して軽量なプロトコルのため、IoTなど制約の大きいハード、デバイスに適しています。
メッセージを送信するパブリッシャーと、それを受信するサブスクライバー、その仲介を担うブローカーの3者で構成されます。
パブリッシャーはブローカーにメッセージを送信し、それをブローカーが仲介してサブスクライバーに送信します。
メッセージの配布先はトピックというキーで決定されます。トピックはメッセージのカテゴリーのようなもので、"/"区切りで階層構造を表した文字列です。
予め、サブスクライバーは受信したいトピックに対してサブスクライブしておきます。
パブリッシャーはトピックとメッセージを送信します。
ブローカーは、そのトピックにサブスクライブしているサブスクライバーにメッセージを送信します。
MQTTについては以下のサイトが参考になります。
MQTT Version 3.1.1 OASIS Standard 29 October 2014
MQ Telemetry Transport (MQTT) V3.1 プロトコル仕様
やってみること
MarkLogic用のサブスクライバーを作成します。これはブローカーから受け取ったトピックとメッセージをDBに保存します。
ブローカーにはNode.js用のMQTTブローカーMoscaを使用します。
MQTTクライアントにはMQTT.jsを使用します。
今回、MQTTブローカーには、TLSによるサーバ認証とユーザ名・パスワードによる簡単なユーザ認証を導入します。
#環境
以下の環境を使用します。
環境 | バージョン |
---|---|
CentOS7 | 7.4.1708 |
Node.js | v8.9.1 |
npm | 5.5.1 |
Mosca | 2.7.0 |
MQTT.js | 2.14.0 |
MarkLogic9 | 9.0-3 |
MarkLogic Node.js Client API | 2.0.3 |
MQTTの環境構築
インストール
まずはMQTTブローカーのMoscaをインストールします。
# npm install mosca
続いて、MQTTクライアントのMQTT.jsをインストールします。
# npm install mqtt
TLS通信用のサーバ証明書(ブローカー用)の作成
本来はしかるべき認証局からサーバ証明書を発行してもらうべきですが、ここでは自前の認証局の自己署名証明書を作成し、そこからブローカー用のサーバ証明書を発行します。
認証局の証明書の作成
まず、認証局の自己署名証明書を作成します。
$ openssl req -new -x509 -days 365 -extensions v3_ca -keyout my_ca.key -out my_ca.crt
証明書の情報は以下のように設定しました。
項目名 | 設定値 |
---|---|
Country Name | JP |
State or Province Name | Tokyo-to |
Locality Name | Enter(空欄) |
Organization Name | Enter(空欄) |
Organizational Unit Name | Enter(空欄) |
Common Name | my-ca |
Email Address | Enter(空欄) |
ブローカー用のサーバ証明書の作成
ブローカー用のサーバ証明書を作成します。
まず、ブローカー用のRSA鍵を作成します。
$ openssl genrsa -out broker.key 2048
続いて、ブローカー用の鍵に対する証明書発行要求を作成します。
$ openssl req -out broker.csr -key broker.key -new
発行要求の情報は以下のように設定しました。
項目名 | 設定値 |
---|---|
Country Name | JP |
State or Province Name | Tokyo-to |
Locality Name | Enter(空欄) |
Organization Name | Enter(空欄) |
Organizational Unit Name | Enter(空欄) |
Common Name | my-broker |
Email Address | Enter(空欄) |
A challenge password | パスワード |
An optional company name | Enter(空欄) |
最後にブローカー用のサーバ証明書を発行します。
$ openssl x509 -req -in broker.csr -CA my_ca.crt -CAkey my_ca.key -CAcreateserial -out broker.crt -days 365
ブローカーのホスト名変更
ブローカーを動かすサーバのホスト名はブローカーの証明書のCN(Common Name)と一致させる必要があります。
上記の場合、ブローカーの証明書のCNは"my-broker"としたので、/etc/hostsにmy-brokerを記述します。
以下のような感じになります。
# vi /etc/hosts
127.0.0.1 my-broker localhost ・・・
・・・
ブローカーの作成
Moscaを使用してブローカーを作成します。
const mosca = require('mosca');
// ブローカーのサーバ証明書と秘密鍵
const SECURE_KEY = 'broker.key';
const SECURE_CERT = 'broker.crt';
// ブローカーの認証局の証明書
const CA_CERT = 'my_ca.crt';
// MQTTSの設定
const options = {
secure: {
port: 8443,
keyPath: SECURE_KEY,
certPath: SECURE_CERT,
}
};
const server = new mosca.Server(options);
// ユーザ認証
// ここでは、subscriberまたはpublisherというユーザ名・パスワードを受け付ける簡単な認証としておく。
server.authenticate = (client, username, password, callback) => {
if(
(username=='subscriber' && password == 'subscriber') ||
(username=='publisher' && password == 'publisher') ){
callback(null, true);
}else{
callback(null, false);
}
}
// サーバ起動時の処理
server.on('ready', () => {console.log('server started')});
// クライアントが接続したときの処理
server.on('clientConnected', (client) => {
console.log('client connected: ' + client.id)
});
// クライアントからメッセージを受信したときの処理
server.on('published', (packet) => {
console.log( 'published : ' + JSON.stringify(packet));
});
サブスクライバーの作成
MQTT.jsを使用してサブスクライバーを作成します。
このサブスクライバーは受信した全てのメッセージをMarkLogicに登録します。
const mqtt = require('mqtt');
const marklogic = require('marklogic');
const my = require('./my-connection.js');
const fs = require('fs');
// 認証局の証明書
const TRUSTED_CA = 'my_ca.crt';
// ユーザ認証とTLSの設定
const options = {
username: 'subscriber',
password: 'subscriber',
ca: [fs.readFileSync(TRUSTED_CA)],
rejectedUnauthorized: true
};
// TLS接続のため、mqttsとする。接続先のホスト名はサーバ証明書のホスト名(CN)と一致させること。
const client = mqtt.connect('mqtts://my-broker:8443', options);
const db = marklogic.createDatabaseClient(my.connInfo);
// 接続時の処理
client.on('connect', () => {
console.log('subscriber.connected.');
});
// トピックの受信登録。全てのトピックを受信対象とするため、ワイルドカードの'#'を指定する。
client.subscribe('#', (err, granted) => {
console.log('subscriber.subscribed.');
});
// メッセージの受信処理。受信したメッセージはMarkLogicに登録する。
client.on('message', (topic, message, packet) => {
// 登録するURI
let dir = "/mqtt/";
// MarkLogicに登録するJSONデータの作成。
let date = Date.now().toString();
let jsonStr = '{"receiveDate": ' + date + ',"topic":"' + topic + '", "message":' + message + '}';
db.documents.write(
{ extension: 'json',
directory: dir,
content: JSON.parse(jsonStr)
}
).result( (response) => {
console.log(JSON.stringify(response));
}, (error) => {
console.log(error);
});
});
パブリッシャーの作成
MQTT.jsを使用してパブリッシャーを作成します。1秒間隔で現在時刻を送信します。
const mqtt = require('mqtt');
const fs = require('fs');
// ユーザの認証情報
const username = 'publisher';
const password = 'publisher';
// 認証局の証明書
const TRUSTED_CA = 'my_ca.crt';
// ユーザ認証とTLSの設定
const options = {
username: username,
password: password,
ca: [fs.readFileSync(TRUSTED_CA)],
rejectedUnauthorized: true
};
// TLS接続のため、mqttsとする。接続先のホスト名はサーバ証明書のホスト名(CN)と一致させること。
const client = mqtt.connect('mqtts://my-broker:8443', options);
// 接続時の処理
client.on('connect', () => {
console.log('publisher.connected.');
});
// 1秒間隔でメッセージを送信する。
// メッセージは、usernameと現在時刻とする。
setInterval ( ()=> {
let time = Date.now().toString();
let message = '{"username":"' + username + '", "date":"' + time + '"}';
let topic = 'topic1/topic1-1/topic1-1-1';
client.publish(topic, message);
console.log('[publish] topic: ' + topic + ', message: ' + message);
}, 1000);
動作させてみる
作成したブローカー、サブスクライバー、パブリッシャーをNode.jsで動作させると、パブリッシャーが送信したメッセージがMarkLogicに登録されます。
MarkLogicのQueryConsole等で登録されたドキュメントを確認すると、以下のようになります。
for $i in xdmp:directory("/mqtt/", "infinity")
return $i
{"receiveDate":1512911948137, "topic":"topic1/topic1-1/topic1-1-1", "message":"1512911948136"}
{"receiveDate":1512911986236, "topic":"topic1/topic1-1/topic1-1-1", "message":"1512911986234"}
{"receiveDate":1512911983222, "topic":"topic1/topic1-1/topic1-1-1", "message":"1512911983220"}
・・・
おしまい
今回はTLSとユーザ認証に対応したMQTTを導入し、受信したメッセージをMarkLogicに登録するところまで実現しました。
MQTTはIoTデバイスの通信に適しています。デバイスに搭載された様々なセンサーからの情報をMQTTでサーバサイドに集約するような使い方が多いと思います。
センサーの情報はその種類によってスキーマが異なります。MarkLogicはスキーマレスであるため、様々な種類のセンサー情報をまとめて扱うことに適しています。異なるセンサー情報を横串で紐付けられるため、新たなセンサーの活用方法が見つけられると思います。
というか、MarkLogicにMQTTブローカーが搭載されていればもっと楽なんですけどね・・・。
次回予告
いよいよRaspberry Piを導入してみましょうか。