16
Help us understand the problem. What are the problem?

More than 5 years have passed since last update.

posted at

updated at

MQTT Client -C言語-

当初、Eclipse Pahoを使ってサンプル作ろうと思いましたが、TLSで挫折しました。
以下の内容はEclipse Mosquittoのライブラリを使用する場合の例です。

ライブラリのインストール

Mosquittoの開発パッケージはOS標準リポジトリにも登録されていますが、なるべく最新のものをインストールしましょう。以前、古いバージョン使っていたら、SSL/TLS関連のセキュリティーホール対策の影響でBrokerに接続できなくなり、原因解析に時間を取られました...

CentOS

$ sudo wget http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-7/home:oojah:mqtt.repo -O "/etc/yum.repos.d/Mosquitto.repo"
$ sudo yum install mosquitto-devel

Debian/Ubuntu Linux

$ sudo apt-get install software-properties-common
$ sudo add-apt-repository ppa:mosquitto-dev/mosquitto-ppa
$ sudo apt-get install libmosquitto-dev

ソースからコンパイルする場合と、BSDについてはそのうち暇な時に調べてみます...

サンプルコード

サンプルコードは、あまり難しいことは考えずに、とりあえず動けばいいや、ということを最優先にして、Eclipse Mosquittoのダウンロードページからダウンロードできるtarファイルのclientのソースを切り貼りしました。一応、AWS IoT相手でも通信できるはずです。

正しいコードを見たい方はtarファイルをダウンロードしてそちらを参照してください。

Eclipse Mosquittoのclientソースコードのありか

$ wget http://mosquitto.org/files/source/mosquitto-1.4.9.tar.gz
$ tar zxvf mosquitto-1.4.9.tar.gz
$ cd mosquitto-1.4.9/client/

Publisher

mqtt_pub.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <mosquitto.h>

#ifndef TRUE
#define TRUE 1
#endif

#ifndef FALSE
#define FALSE 0
#endif

char *topic   = NULL;
char *message = NULL;
int connect_desire = TRUE;
/* debug mode flag */
int is_debug = FALSE;

/**
 * Brokerとの接続成功時に実行されるcallback関数
 */
void on_connect(struct mosquitto *mosq, void *obj, int result)
{
    if(is_debug) {
        printf("%s(%d)\n", __FUNCTION__, __LINE__);
    }
    mosquitto_publish(mosq, NULL, topic, strlen(message), message, 0, false);
}

/**
 * Brokerとの接続を切断した時に実行されるcallback関数
 */
void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
{
    if(is_debug) {
        printf("%s(%d)\n", __FUNCTION__, __LINE__);
    }
}

/**
 * BrokerにMQTTメッセージ送信後に実行されるcallback関数
 */
static void on_publish(struct mosquitto *mosq, void *userdata, int mid)
{
    connect_desire = FALSE;
    mosquitto_disconnect(mosq);
}

/**
 * コマンド引数エラー表示関数
 */
void usage()
{
    printf("mqtt_pub -t <topic> -m <message> [-d]\n");
    printf("  Required\n");
    printf("    -t topic\n");
    printf("    -m message\n");
    printf("  SSL Option\n");
    printf("    -C : CA cert file\n");
    printf("    -c : client cert file\n");
    printf("    -k : client private file\n");
    printf("  Optional\n");
    printf("    -p : port number(no ssl:1883:default, ssl:8883)\n");
    printf("    -d : debug mode\n");
    exit(EXIT_FAILURE);
}

/**
 * mqtt_pubメイン関数
 */
int main(int argc, char *argv[])
{
    int   ret           = 0;
    int   cmdopt        = 0;
    char *id            = "mqtt/pub";
    char *host          = "localhost";
    int   port          = 1883;
    char *cafile        = NULL;
    char *certfile      = NULL;
    char *keyfile       = NULL;
    int   keepalive     = 60;
    bool  clean_session = true;
    struct mosquitto *mosq = NULL;

    while((cmdopt=getopt(argc, argv, "h:p:C:c:k:t:m:d")) > 0) {
        switch(cmdopt) {
        case 'h':               /* broker uri */
            host = (char*)strdup(optarg);
            break;
        case 'p':               /* port number */
            port = atoi(optarg);
            break;
        case 'C':               /* cafile cert file */
            cafile = (char*)strdup(optarg);
            break;
        case 'c':               /* client cert file */
            certfile = (char*)strdup(optarg);
            break;
        case 'k':               /* client key file */
            keyfile = (char*)strdup(optarg);
            break;
        case 't':               /* Topic */
            topic = (char*)strdup(optarg);
            break;
        case 'm':               /* Message */
            message = (char*)strdup(optarg);
            break;
        case 'd':               /* debug mode */
            is_debug = TRUE;
            break;
        default:
            usage();
        }
    }
    /* topicとmessageが指定されていない場合、引数NG */
    if((topic == NULL) || (message == NULL)) {
        usage();
    }
    /* クライアント証明書とクライアント秘密鍵はどちらか一方を
     * 指定した場合は、他方の指定も必須
     */
    if(
       ((certfile == NULL) && (keyfile != NULL)) ||
       ((certfile != NULL) && (keyfile == NULL))
    ) {
        usage();
    }

    if(is_debug) {
        printf("  %s\n", host);
        printf("  %d\n", port);
        printf("  %s\n", cafile);
        printf("  %s\n", certfile);
        printf("  %s\n", keyfile);
        printf("  %s\n", topic);
        printf("  %s\n", message);
    }

    mosquitto_lib_init();
    mosq = mosquitto_new(id, clean_session, NULL);
    if(!mosq){
        fprintf(stderr, "Cannot create mosquitto object\n");
        mosquitto_lib_cleanup();
        return(EXIT_FAILURE);
    }
    mosquitto_connect_callback_set(mosq, on_connect);
    mosquitto_disconnect_callback_set(mosq, on_disconnect);
    mosquitto_publish_callback_set(mosq, on_publish);

    if(cafile != NULL) {
        ret = mosquitto_tls_set(mosq, cafile, NULL, certfile, keyfile, NULL);
        if(ret != MOSQ_ERR_SUCCESS) {
            printf("mosquitto_tls_set function is failed.\n");
        }
        mosquitto_tls_insecure_set(mosq, true);
    }

    if(mosquitto_connect_bind(mosq, host, port, keepalive, NULL)){
        fprintf(stderr, "failed to connect broker.\n");
        mosquitto_lib_cleanup();
        return(EXIT_FAILURE);
    }

    do {
        ret = mosquitto_loop_forever(mosq, -1, 1);
    } while((ret == MOSQ_ERR_SUCCESS) && (connect_desire != FALSE));

    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();

    return(EXIT_SUCCESS);
}
$ gcc -Wall -g mqtt_pub.c -o mqtt_pub -lmosquitto

Subscriber

mqtt_sub.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <mosquitto.h>

#ifndef TRUE
#define TRUE 1
#endif

#ifndef FALSE
#define FALSE 0
#endif

char *topic   = NULL;
/* debug mode flag */
int is_debug = FALSE;

/**
 * Brokerとの接続成功時に実行されるcallback関数
 */
void on_connect(struct mosquitto *mosq, void *obj, int result)
{
    if(is_debug) {
        printf("%s(%d)\n", __FUNCTION__, __LINE__);
    }
    mosquitto_subscribe(mosq, NULL, topic, 0);
}

/**
 * Brokerとの接続を切断した時に実行されるcallback関数
 */
void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
{
    if(is_debug) {
        printf("%s(%d)\n", __FUNCTION__, __LINE__);
    }
}

/**
 * メッセージ受信処理
 */
void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
    if(is_debug) {
        printf("%s(%d)\n", __FUNCTION__, __LINE__);
    }

    if(message->payloadlen){
        printf("%s ", message->topic);
        fwrite(message->payload, 1, message->payloadlen, stdout);
        printf("\n");
    }else{
        printf("%s (null)\n", message->topic);
    }
    fflush(stdout);
}

/**
 * コマンド引数エラー表示関数
 */
void usage()
{
    printf("mqtt_pub -t <topic> [-d]\n");
    printf("  Required\n");
    printf("    -t topic\n");
    printf("  SSL Option\n");
    printf("    -C : CA cert file\n");
    printf("    -c : client cert file\n");
    printf("    -k : client private file\n");
    printf("  Optional\n");
    printf("    -p : port number(no ssl:1883:default, ssl:8883)\n");
    printf("    -d : debug mode\n");
    exit(EXIT_FAILURE);
}

/**
 * mqtt_subメイン関数
 */
int main(int argc, char *argv[])
{
    int   ret           = 0;
    int   cmdopt        = 0;
    char *id            = "mqtt/sub";
    char *host          = "localhost";
    int   port          = 1883;
    char *cafile        = NULL;
    char *certfile      = NULL;
    char *keyfile       = NULL;
    int   keepalive     = 60;
    bool  clean_session = true;
    struct mosquitto *mosq = NULL;

    while((cmdopt=getopt(argc, argv, "h:p:C:c:k:t:m:d")) > 0) {
        switch(cmdopt) {
        case 'h':               /* broker uri */
            host = (char*)strdup(optarg);
            break;
        case 'p':               /* port number */
            port = atoi(optarg);
            break;
        case 'C':               /* cafile cert file */
            cafile = (char*)strdup(optarg);
            break;
        case 'c':               /* client cert file */
            certfile = (char*)strdup(optarg);
            break;
        case 'k':               /* client key file */
            keyfile = (char*)strdup(optarg);
            break;
        case 't':               /* Topic */
            topic = (char*)strdup(optarg);
            break;
        case 'd':               /* debug mode */
            is_debug = TRUE;
            break;
        default:
            usage();
        }
    }
    /* topicが指定されていない場合、引数NG */
    if(topic == NULL) {
        usage();
    }
    /* クライアント証明書とクライアント秘密鍵はどちらか一方を
     * 指定した場合は、他方の指定も必須
     */
    if(
       ((certfile == NULL) && (keyfile != NULL)) ||
       ((certfile != NULL) && (keyfile == NULL))
    ) {
        usage();
    }

    if(is_debug) {
        printf("  %s\n", host);
        printf("  %d\n", port);
        printf("  %s\n", cafile);
        printf("  %s\n", certfile);
        printf("  %s\n", keyfile);
        printf("  %s\n", topic);
    }

    mosquitto_lib_init();
    mosq = mosquitto_new(id, clean_session, NULL);
    if(!mosq){
        fprintf(stderr, "Cannot create mosquitto object\n");
        mosquitto_lib_cleanup();
        return(EXIT_FAILURE);
    }
    mosquitto_connect_callback_set(mosq, on_connect);
    mosquitto_disconnect_callback_set(mosq, on_disconnect);
    mosquitto_message_callback_set(mosq, on_message);

    if(cafile != NULL) {
        ret = mosquitto_tls_set(mosq, cafile, NULL, certfile, keyfile, NULL);
        if(ret != MOSQ_ERR_SUCCESS) {
            printf("mosquitto_tls_set function is failed.\n");
        }
        mosquitto_tls_insecure_set(mosq, true);
    }

    if(mosquitto_connect_bind(mosq, host, port, keepalive, NULL)){
        fprintf(stderr, "failed to connect broker.\n");
        mosquitto_lib_cleanup();
        return(EXIT_FAILURE);
    }

    ret = mosquitto_loop_forever(mosq, -1, 1);

    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();

    return(EXIT_SUCCESS);
}
$ gcc -Wall -g mqtt_sub.c -o mqtt_sub -lmosquitto

その他メモ

Subscriberで指定するトピック

Subscriberプログラムでトピックに「#」を指定すると、「すべてのトピック」の意味になります。Subscriberのコーディングをした時にトピック関連処理のバグかどうかの切り分けに役立つことが、たまに、あります。

一つのプログラムでPublisher/Subscriberの両方を実装する場合

  • Publisher/SubscriberごとにMosquittoのコネクションを貼るのではなく、1つのコネクションで両方の通信を行う必要があるようです。コーディングミスかもしれませんが、別々にコネクションを貼って通信しようとしたらプログラムが落ちました...

  • MQTT専用プログラムであれば特に気にする必要はないのですが、他にソケット通信をやらせたりしようとする場合は、シングルスレッドよりマルチスレッドの方が、格段にやりやすそうです。Subscriberの実装をselect待ちでやるのは結構面倒で、一応頑張ってはみましたが、ソケットから取得した電文解析で諦めました。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Sign upLogin
16
Help us understand the problem. What are the problem?