16
16

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

MQTT Client -C言語-

Last updated at Posted at 2016-08-20

当初、Eclipse Pahoを使ってサンプル作ろうと思いましたが、TLSで挫折しました。
以下の内容はEclipse Mosquittoのライブラリを使用する場合の例です。

ライブラリのインストール

Mosquittoの開発パッケージはOS標準リポジトリにも登録されていますが、なるべく最新のものをインストールしましょう。以前、古いバージョン使っていたら、SSL/TLS関連のセキュリティーホール対策の影響でBrokerに接続できなくなり、原因解析に時間を取られました...

CentOS

$ sudo wget http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-7/home:oojah:mqtt.repo -O "/etc/yum.repos.d/Mosquitto.repo"
$ sudo yum install mosquitto-devel

Debian/Ubuntu Linux

$ sudo apt-get install software-properties-common
$ sudo add-apt-repository ppa:mosquitto-dev/mosquitto-ppa
$ sudo apt-get install libmosquitto-dev

ソースからコンパイルする場合と、BSDについてはそのうち暇な時に調べてみます...

サンプルコード

サンプルコードは、あまり難しいことは考えずに、とりあえず動けばいいや、ということを最優先にして、Eclipse Mosquittoのダウンロードページからダウンロードできるtarファイルのclientのソースを切り貼りしました。一応、AWS IoT相手でも通信できるはずです。

正しいコードを見たい方はtarファイルをダウンロードしてそちらを参照してください。

Eclipse Mosquittoのclientソースコードのありか

$ wget http://mosquitto.org/files/source/mosquitto-1.4.9.tar.gz
$ tar zxvf mosquitto-1.4.9.tar.gz
$ cd mosquitto-1.4.9/client/

Publisher

mqtt_pub.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <mosquitto.h>

#ifndef TRUE
#define TRUE 1
#endif

#ifndef FALSE
#define FALSE 0
#endif

char *topic   = NULL;
char *message = NULL;
int connect_desire = TRUE;
/* debug mode flag */
int is_debug = FALSE;

/**
 * Brokerとの接続成功時に実行されるcallback関数
 */
void on_connect(struct mosquitto *mosq, void *obj, int result)
{
    if(is_debug) {
        printf("%s(%d)\n", __FUNCTION__, __LINE__);
    }
    mosquitto_publish(mosq, NULL, topic, strlen(message), message, 0, false);
}

/**
 * Brokerとの接続を切断した時に実行されるcallback関数
 */
void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
{
    if(is_debug) {
        printf("%s(%d)\n", __FUNCTION__, __LINE__);
    }
}

/**
 * BrokerにMQTTメッセージ送信後に実行されるcallback関数
 */
static void on_publish(struct mosquitto *mosq, void *userdata, int mid)
{
    connect_desire = FALSE;
    mosquitto_disconnect(mosq);
}

/**
 * コマンド引数エラー表示関数
 */
void usage()
{
    printf("mqtt_pub -t <topic> -m <message> [-d]\n");
    printf("  Required\n");
    printf("    -t topic\n");
    printf("    -m message\n");
    printf("  SSL Option\n");
    printf("    -C : CA cert file\n");
    printf("    -c : client cert file\n");
    printf("    -k : client private file\n");
    printf("  Optional\n");
    printf("    -p : port number(no ssl:1883:default, ssl:8883)\n");
    printf("    -d : debug mode\n");
    exit(EXIT_FAILURE);
}

/**
 * mqtt_pubメイン関数
 */
int main(int argc, char *argv[])
{
    int   ret           = 0;
    int   cmdopt        = 0;
    char *id            = "mqtt/pub";
    char *host          = "localhost";
    int   port          = 1883;
    char *cafile        = NULL;
    char *certfile      = NULL;
    char *keyfile       = NULL;
    int   keepalive     = 60;
    bool  clean_session = true;
    struct mosquitto *mosq = NULL;

    while((cmdopt=getopt(argc, argv, "h:p:C:c:k:t:m:d")) > 0) {
        switch(cmdopt) {
        case 'h':               /* broker uri */
            host = (char*)strdup(optarg);
            break;
        case 'p':               /* port number */
            port = atoi(optarg);
            break;
        case 'C':               /* cafile cert file */
            cafile = (char*)strdup(optarg);
            break;
        case 'c':               /* client cert file */
            certfile = (char*)strdup(optarg);
            break;
        case 'k':               /* client key file */
            keyfile = (char*)strdup(optarg);
            break;
        case 't':               /* Topic */
            topic = (char*)strdup(optarg);
            break;
        case 'm':               /* Message */
            message = (char*)strdup(optarg);
            break;
        case 'd':               /* debug mode */
            is_debug = TRUE;
            break;
        default:
            usage();
        }
    }
    /* topicとmessageが指定されていない場合、引数NG */
    if((topic == NULL) || (message == NULL)) {
        usage();
    }
    /* クライアント証明書とクライアント秘密鍵はどちらか一方を
     * 指定した場合は、他方の指定も必須
     */
    if(
       ((certfile == NULL) && (keyfile != NULL)) ||
       ((certfile != NULL) && (keyfile == NULL))
    ) {
        usage();
    }

    if(is_debug) {
        printf("  %s\n", host);
        printf("  %d\n", port);
        printf("  %s\n", cafile);
        printf("  %s\n", certfile);
        printf("  %s\n", keyfile);
        printf("  %s\n", topic);
        printf("  %s\n", message);
    }

    mosquitto_lib_init();
    mosq = mosquitto_new(id, clean_session, NULL);
    if(!mosq){
        fprintf(stderr, "Cannot create mosquitto object\n");
        mosquitto_lib_cleanup();
        return(EXIT_FAILURE);
    }
    mosquitto_connect_callback_set(mosq, on_connect);
    mosquitto_disconnect_callback_set(mosq, on_disconnect);
    mosquitto_publish_callback_set(mosq, on_publish);

    if(cafile != NULL) {
        ret = mosquitto_tls_set(mosq, cafile, NULL, certfile, keyfile, NULL);
        if(ret != MOSQ_ERR_SUCCESS) {
            printf("mosquitto_tls_set function is failed.\n");
        }
        mosquitto_tls_insecure_set(mosq, true);
    }

    if(mosquitto_connect_bind(mosq, host, port, keepalive, NULL)){
        fprintf(stderr, "failed to connect broker.\n");
        mosquitto_lib_cleanup();
        return(EXIT_FAILURE);
    }

    do {
        ret = mosquitto_loop_forever(mosq, -1, 1);
    } while((ret == MOSQ_ERR_SUCCESS) && (connect_desire != FALSE));

    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();

    return(EXIT_SUCCESS);
}
$ gcc -Wall -g mqtt_pub.c -o mqtt_pub -lmosquitto

Subscriber

mqtt_sub.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <mosquitto.h>

#ifndef TRUE
#define TRUE 1
#endif

#ifndef FALSE
#define FALSE 0
#endif

char *topic   = NULL;
/* debug mode flag */
int is_debug = FALSE;

/**
 * Brokerとの接続成功時に実行されるcallback関数
 */
void on_connect(struct mosquitto *mosq, void *obj, int result)
{
    if(is_debug) {
        printf("%s(%d)\n", __FUNCTION__, __LINE__);
    }
    mosquitto_subscribe(mosq, NULL, topic, 0);
}

/**
 * Brokerとの接続を切断した時に実行されるcallback関数
 */
void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
{
    if(is_debug) {
        printf("%s(%d)\n", __FUNCTION__, __LINE__);
    }
}

/**
 * メッセージ受信処理
 */
void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
    if(is_debug) {
        printf("%s(%d)\n", __FUNCTION__, __LINE__);
    }

    if(message->payloadlen){
        printf("%s ", message->topic);
        fwrite(message->payload, 1, message->payloadlen, stdout);
        printf("\n");
    }else{
        printf("%s (null)\n", message->topic);
    }
    fflush(stdout);
}

/**
 * コマンド引数エラー表示関数
 */
void usage()
{
    printf("mqtt_pub -t <topic> [-d]\n");
    printf("  Required\n");
    printf("    -t topic\n");
    printf("  SSL Option\n");
    printf("    -C : CA cert file\n");
    printf("    -c : client cert file\n");
    printf("    -k : client private file\n");
    printf("  Optional\n");
    printf("    -p : port number(no ssl:1883:default, ssl:8883)\n");
    printf("    -d : debug mode\n");
    exit(EXIT_FAILURE);
}

/**
 * mqtt_subメイン関数
 */
int main(int argc, char *argv[])
{
    int   ret           = 0;
    int   cmdopt        = 0;
    char *id            = "mqtt/sub";
    char *host          = "localhost";
    int   port          = 1883;
    char *cafile        = NULL;
    char *certfile      = NULL;
    char *keyfile       = NULL;
    int   keepalive     = 60;
    bool  clean_session = true;
    struct mosquitto *mosq = NULL;

    while((cmdopt=getopt(argc, argv, "h:p:C:c:k:t:m:d")) > 0) {
        switch(cmdopt) {
        case 'h':               /* broker uri */
            host = (char*)strdup(optarg);
            break;
        case 'p':               /* port number */
            port = atoi(optarg);
            break;
        case 'C':               /* cafile cert file */
            cafile = (char*)strdup(optarg);
            break;
        case 'c':               /* client cert file */
            certfile = (char*)strdup(optarg);
            break;
        case 'k':               /* client key file */
            keyfile = (char*)strdup(optarg);
            break;
        case 't':               /* Topic */
            topic = (char*)strdup(optarg);
            break;
        case 'd':               /* debug mode */
            is_debug = TRUE;
            break;
        default:
            usage();
        }
    }
    /* topicが指定されていない場合、引数NG */
    if(topic == NULL) {
        usage();
    }
    /* クライアント証明書とクライアント秘密鍵はどちらか一方を
     * 指定した場合は、他方の指定も必須
     */
    if(
       ((certfile == NULL) && (keyfile != NULL)) ||
       ((certfile != NULL) && (keyfile == NULL))
    ) {
        usage();
    }

    if(is_debug) {
        printf("  %s\n", host);
        printf("  %d\n", port);
        printf("  %s\n", cafile);
        printf("  %s\n", certfile);
        printf("  %s\n", keyfile);
        printf("  %s\n", topic);
    }

    mosquitto_lib_init();
    mosq = mosquitto_new(id, clean_session, NULL);
    if(!mosq){
        fprintf(stderr, "Cannot create mosquitto object\n");
        mosquitto_lib_cleanup();
        return(EXIT_FAILURE);
    }
    mosquitto_connect_callback_set(mosq, on_connect);
    mosquitto_disconnect_callback_set(mosq, on_disconnect);
    mosquitto_message_callback_set(mosq, on_message);

    if(cafile != NULL) {
        ret = mosquitto_tls_set(mosq, cafile, NULL, certfile, keyfile, NULL);
        if(ret != MOSQ_ERR_SUCCESS) {
            printf("mosquitto_tls_set function is failed.\n");
        }
        mosquitto_tls_insecure_set(mosq, true);
    }

    if(mosquitto_connect_bind(mosq, host, port, keepalive, NULL)){
        fprintf(stderr, "failed to connect broker.\n");
        mosquitto_lib_cleanup();
        return(EXIT_FAILURE);
    }

    ret = mosquitto_loop_forever(mosq, -1, 1);

    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();

    return(EXIT_SUCCESS);
}
$ gcc -Wall -g mqtt_sub.c -o mqtt_sub -lmosquitto

その他メモ

Subscriberで指定するトピック

Subscriberプログラムでトピックに「#」を指定すると、「すべてのトピック」の意味になります。Subscriberのコーディングをした時にトピック関連処理のバグかどうかの切り分けに役立つことが、たまに、あります。

一つのプログラムでPublisher/Subscriberの両方を実装する場合

  • Publisher/SubscriberごとにMosquittoのコネクションを貼るのではなく、1つのコネクションで両方の通信を行う必要があるようです。コーディングミスかもしれませんが、別々にコネクションを貼って通信しようとしたらプログラムが落ちました...

  • MQTT専用プログラムであれば特に気にする必要はないのですが、他にソケット通信をやらせたりしようとする場合は、シングルスレッドよりマルチスレッドの方が、格段にやりやすそうです。Subscriberの実装をselect待ちでやるのは結構面倒で、一応頑張ってはみましたが、ソケットから取得した電文解析で諦めました。

16
16
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
16

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?