当初、Eclipse Pahoを使ってサンプル作ろうと思いましたが、TLSで挫折しました。
以下の内容はEclipse Mosquittoのライブラリを使用する場合の例です。
ライブラリのインストール
Mosquittoの開発パッケージはOS標準リポジトリにも登録されていますが、なるべく最新のものをインストールしましょう。以前、古いバージョン使っていたら、SSL/TLS関連のセキュリティーホール対策の影響でBrokerに接続できなくなり、原因解析に時間を取られました...
CentOS
$ sudo wget http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-7/home:oojah:mqtt.repo -O "/etc/yum.repos.d/Mosquitto.repo"
$ sudo yum install mosquitto-devel
Debian/Ubuntu Linux
$ sudo apt-get install software-properties-common
$ sudo add-apt-repository ppa:mosquitto-dev/mosquitto-ppa
$ sudo apt-get install libmosquitto-dev
ソースからコンパイルする場合と、BSDについてはそのうち暇な時に調べてみます...
サンプルコード
サンプルコードは、あまり難しいことは考えずに、とりあえず動けばいいや、ということを最優先にして、Eclipse Mosquittoのダウンロードページからダウンロードできるtarファイルのclientのソースを切り貼りしました。一応、AWS IoT相手でも通信できるはずです。
正しいコードを見たい方はtarファイルをダウンロードしてそちらを参照してください。
Eclipse Mosquittoのclientソースコードのありか
$ wget http://mosquitto.org/files/source/mosquitto-1.4.9.tar.gz
$ tar zxvf mosquitto-1.4.9.tar.gz
$ cd mosquitto-1.4.9/client/
Publisher
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <mosquitto.h>
#ifndef TRUE
#define TRUE 1
#endif
#ifndef FALSE
#define FALSE 0
#endif
char *topic = NULL;
char *message = NULL;
int connect_desire = TRUE;
/* debug mode flag */
int is_debug = FALSE;
/**
* Brokerとの接続成功時に実行されるcallback関数
*/
void on_connect(struct mosquitto *mosq, void *obj, int result)
{
if(is_debug) {
printf("%s(%d)\n", __FUNCTION__, __LINE__);
}
mosquitto_publish(mosq, NULL, topic, strlen(message), message, 0, false);
}
/**
* Brokerとの接続を切断した時に実行されるcallback関数
*/
void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
{
if(is_debug) {
printf("%s(%d)\n", __FUNCTION__, __LINE__);
}
}
/**
* BrokerにMQTTメッセージ送信後に実行されるcallback関数
*/
static void on_publish(struct mosquitto *mosq, void *userdata, int mid)
{
connect_desire = FALSE;
mosquitto_disconnect(mosq);
}
/**
* コマンド引数エラー表示関数
*/
void usage()
{
printf("mqtt_pub -t <topic> -m <message> [-d]\n");
printf(" Required\n");
printf(" -t topic\n");
printf(" -m message\n");
printf(" SSL Option\n");
printf(" -C : CA cert file\n");
printf(" -c : client cert file\n");
printf(" -k : client private file\n");
printf(" Optional\n");
printf(" -p : port number(no ssl:1883:default, ssl:8883)\n");
printf(" -d : debug mode\n");
exit(EXIT_FAILURE);
}
/**
* mqtt_pubメイン関数
*/
int main(int argc, char *argv[])
{
int ret = 0;
int cmdopt = 0;
char *id = "mqtt/pub";
char *host = "localhost";
int port = 1883;
char *cafile = NULL;
char *certfile = NULL;
char *keyfile = NULL;
int keepalive = 60;
bool clean_session = true;
struct mosquitto *mosq = NULL;
while((cmdopt=getopt(argc, argv, "h:p:C:c:k:t:m:d")) > 0) {
switch(cmdopt) {
case 'h': /* broker uri */
host = (char*)strdup(optarg);
break;
case 'p': /* port number */
port = atoi(optarg);
break;
case 'C': /* cafile cert file */
cafile = (char*)strdup(optarg);
break;
case 'c': /* client cert file */
certfile = (char*)strdup(optarg);
break;
case 'k': /* client key file */
keyfile = (char*)strdup(optarg);
break;
case 't': /* Topic */
topic = (char*)strdup(optarg);
break;
case 'm': /* Message */
message = (char*)strdup(optarg);
break;
case 'd': /* debug mode */
is_debug = TRUE;
break;
default:
usage();
}
}
/* topicとmessageが指定されていない場合、引数NG */
if((topic == NULL) || (message == NULL)) {
usage();
}
/* クライアント証明書とクライアント秘密鍵はどちらか一方を
* 指定した場合は、他方の指定も必須
*/
if(
((certfile == NULL) && (keyfile != NULL)) ||
((certfile != NULL) && (keyfile == NULL))
) {
usage();
}
if(is_debug) {
printf(" %s\n", host);
printf(" %d\n", port);
printf(" %s\n", cafile);
printf(" %s\n", certfile);
printf(" %s\n", keyfile);
printf(" %s\n", topic);
printf(" %s\n", message);
}
mosquitto_lib_init();
mosq = mosquitto_new(id, clean_session, NULL);
if(!mosq){
fprintf(stderr, "Cannot create mosquitto object\n");
mosquitto_lib_cleanup();
return(EXIT_FAILURE);
}
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_disconnect_callback_set(mosq, on_disconnect);
mosquitto_publish_callback_set(mosq, on_publish);
if(cafile != NULL) {
ret = mosquitto_tls_set(mosq, cafile, NULL, certfile, keyfile, NULL);
if(ret != MOSQ_ERR_SUCCESS) {
printf("mosquitto_tls_set function is failed.\n");
}
mosquitto_tls_insecure_set(mosq, true);
}
if(mosquitto_connect_bind(mosq, host, port, keepalive, NULL)){
fprintf(stderr, "failed to connect broker.\n");
mosquitto_lib_cleanup();
return(EXIT_FAILURE);
}
do {
ret = mosquitto_loop_forever(mosq, -1, 1);
} while((ret == MOSQ_ERR_SUCCESS) && (connect_desire != FALSE));
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return(EXIT_SUCCESS);
}
$ gcc -Wall -g mqtt_pub.c -o mqtt_pub -lmosquitto
Subscriber
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <mosquitto.h>
#ifndef TRUE
#define TRUE 1
#endif
#ifndef FALSE
#define FALSE 0
#endif
char *topic = NULL;
/* debug mode flag */
int is_debug = FALSE;
/**
* Brokerとの接続成功時に実行されるcallback関数
*/
void on_connect(struct mosquitto *mosq, void *obj, int result)
{
if(is_debug) {
printf("%s(%d)\n", __FUNCTION__, __LINE__);
}
mosquitto_subscribe(mosq, NULL, topic, 0);
}
/**
* Brokerとの接続を切断した時に実行されるcallback関数
*/
void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
{
if(is_debug) {
printf("%s(%d)\n", __FUNCTION__, __LINE__);
}
}
/**
* メッセージ受信処理
*/
void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
if(is_debug) {
printf("%s(%d)\n", __FUNCTION__, __LINE__);
}
if(message->payloadlen){
printf("%s ", message->topic);
fwrite(message->payload, 1, message->payloadlen, stdout);
printf("\n");
}else{
printf("%s (null)\n", message->topic);
}
fflush(stdout);
}
/**
* コマンド引数エラー表示関数
*/
void usage()
{
printf("mqtt_pub -t <topic> [-d]\n");
printf(" Required\n");
printf(" -t topic\n");
printf(" SSL Option\n");
printf(" -C : CA cert file\n");
printf(" -c : client cert file\n");
printf(" -k : client private file\n");
printf(" Optional\n");
printf(" -p : port number(no ssl:1883:default, ssl:8883)\n");
printf(" -d : debug mode\n");
exit(EXIT_FAILURE);
}
/**
* mqtt_subメイン関数
*/
int main(int argc, char *argv[])
{
int ret = 0;
int cmdopt = 0;
char *id = "mqtt/sub";
char *host = "localhost";
int port = 1883;
char *cafile = NULL;
char *certfile = NULL;
char *keyfile = NULL;
int keepalive = 60;
bool clean_session = true;
struct mosquitto *mosq = NULL;
while((cmdopt=getopt(argc, argv, "h:p:C:c:k:t:m:d")) > 0) {
switch(cmdopt) {
case 'h': /* broker uri */
host = (char*)strdup(optarg);
break;
case 'p': /* port number */
port = atoi(optarg);
break;
case 'C': /* cafile cert file */
cafile = (char*)strdup(optarg);
break;
case 'c': /* client cert file */
certfile = (char*)strdup(optarg);
break;
case 'k': /* client key file */
keyfile = (char*)strdup(optarg);
break;
case 't': /* Topic */
topic = (char*)strdup(optarg);
break;
case 'd': /* debug mode */
is_debug = TRUE;
break;
default:
usage();
}
}
/* topicが指定されていない場合、引数NG */
if(topic == NULL) {
usage();
}
/* クライアント証明書とクライアント秘密鍵はどちらか一方を
* 指定した場合は、他方の指定も必須
*/
if(
((certfile == NULL) && (keyfile != NULL)) ||
((certfile != NULL) && (keyfile == NULL))
) {
usage();
}
if(is_debug) {
printf(" %s\n", host);
printf(" %d\n", port);
printf(" %s\n", cafile);
printf(" %s\n", certfile);
printf(" %s\n", keyfile);
printf(" %s\n", topic);
}
mosquitto_lib_init();
mosq = mosquitto_new(id, clean_session, NULL);
if(!mosq){
fprintf(stderr, "Cannot create mosquitto object\n");
mosquitto_lib_cleanup();
return(EXIT_FAILURE);
}
mosquitto_connect_callback_set(mosq, on_connect);
mosquitto_disconnect_callback_set(mosq, on_disconnect);
mosquitto_message_callback_set(mosq, on_message);
if(cafile != NULL) {
ret = mosquitto_tls_set(mosq, cafile, NULL, certfile, keyfile, NULL);
if(ret != MOSQ_ERR_SUCCESS) {
printf("mosquitto_tls_set function is failed.\n");
}
mosquitto_tls_insecure_set(mosq, true);
}
if(mosquitto_connect_bind(mosq, host, port, keepalive, NULL)){
fprintf(stderr, "failed to connect broker.\n");
mosquitto_lib_cleanup();
return(EXIT_FAILURE);
}
ret = mosquitto_loop_forever(mosq, -1, 1);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return(EXIT_SUCCESS);
}
$ gcc -Wall -g mqtt_sub.c -o mqtt_sub -lmosquitto
その他メモ
Subscriberで指定するトピック
Subscriberプログラムでトピックに「#」を指定すると、「すべてのトピック」の意味になります。Subscriberのコーディングをした時にトピック関連処理のバグかどうかの切り分けに役立つことが、たまに、あります。
一つのプログラムでPublisher/Subscriberの両方を実装する場合
-
Publisher/SubscriberごとにMosquittoのコネクションを貼るのではなく、1つのコネクションで両方の通信を行う必要があるようです。コーディングミスかもしれませんが、別々にコネクションを貼って通信しようとしたらプログラムが落ちました...
-
MQTT専用プログラムであれば特に気にする必要はないのですが、他にソケット通信をやらせたりしようとする場合は、シングルスレッドよりマルチスレッドの方が、格段にやりやすそうです。Subscriberの実装をselect待ちでやるのは結構面倒で、一応頑張ってはみましたが、ソケットから取得した電文解析で諦めました。