2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

RustでMQTT

Posted at

image.png

Rustは、パフォーマンスと安全性、特に安全な並行処理に特化したマルチパラダイムプログラミング言語です。RustはC++に構文が似ていますが、borrow checkerを使用して参照を検証することでメモリ安全性を保証できます。Rustはガベージコレクションを使用せずにメモリ安全性を実現し、参照カウントはオプションです。

MQTTは、パブリッシュ/サブスクライブモデルに基づいた軽量IoTメッセージングプロトコルで、非常に少ないコードと帯域幅でネットワーク機器にリアルタイムで信頼性の高いメッセージサービスを提供できます。また、IoT、モバイルインターネット、スマートハードウェア、IoV、電力・エネルギー産業など幅広く使用されています。

この記事では、Rustプロジェクトでpaho-mqttクライアントライブラリの使用方法を主に紹介し、クライアントとMQTTブローカー間の接続、サブスクライブ、メッセージング、アンサブスクライブなどの実装方法について説明します。

プロジェクトの初期化

このプロジェクトはRust 1.44.0を使用して開発・テストされ、Cargo 1.44.0パッケージ管理ツールを使用して管理されています。以下のコマンドを使用して現在のRustのバージョンを確認できます。

~ rustc --version
rustc 1.44.0 (49cae5576 2020-06-01)

MQTTクライアントライブラリの選択

現在のRustで最も汎用性が高く、広く使用されているMQTTクライアントpaho-mqttです。最新バージョンの0.7.1は、MQTT v5、3.1.1、3.1をサポートし、標準TCP、SSL/TLS、WebSocketsを介したデータ転送、QoSサポート0、1、2などをサポートしています。

初期化プロジェクト

以下のコマンドを実行して、mqtt-exampleという名前の新しいRustプロジェクトを作成します。

~ cargo new mqtt-example
    Created binary (application) `mqtt-example` package

プロジェクトのCargo.tomlファイルを編集し、dependenciespaho-mqttライブラリのアドレスを追加し、サブスクライブ、パブリッシュのコードファイルに対応するバイナリファイルを指定します。

[dependencies]
paho-mqtt = { git = "https://github.com/eclipse/paho.mqtt.rust.git", branch = "master" }

[[bin]]
name = "sub"
path = "src/sub/main.rs"

[[bin]]
name = "pub"
path = "src/pub/main.rs"

Rust MQTTの使用

クライアント接続の作成

この記事では、EMQXが提供する無料の公開MQTTブローカーをテスト接続のMQTTブローカーとして使用します。このサービスはEMQXのMQTTクラウドサービスに基づいて作成されています。サーバーのアクセス情報は以下のとおりです:

  • ブローカー: broker.emqx.io
  • TCPポート: 1883
  • Websocketポート: 8083

MQTTブローカー接続パラメータの設定

MQTTブローカーの接続アドレス(ポートを含む)、トピック(ここでは2つのトピックを設定)、クライアントIDを設定します。

const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];

MQTT接続ソースコード

MQTT接続ソースコードを記述し、バイナリファイルを実行する際にコマンドライン引数として接続アドレスを渡すことができます。通常、クライアントを作成し、broker.emqx.ioに接続する必要があります。

let host = env::args().nth(1).unwrap_or_else(||
    DFLT_BROKER.to_string()
);

// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
    .server_uri(host)
    .client_id(DFLT_CLIENT.to_string())
    .finalize();

// Create a client.
let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
    println!("Error creating the client: {:?}", err);
    process::exit(1);
});

// Define the set of options for the connection.
let conn_opts = mqtt::ConnectOptionsBuilder::new()
    .keep_alive_interval(Duration::from_secs(20))
    .clean_session(true)
    .finalize();

// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
    println!("Unable to connect:\n\t{:?}", e);
    process::exit(1);
}

メッセージのパブリッシュ

ここでは、rust/mqttおよびrust/testの2つのトピックに合計5つのメッセージをパブリッシュします。ループの偶数か奇数かによります。

for num in 0..5 {
    let content =  "Hello world! ".to_string() + &num.to_string();
    let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
    if num % 2 == 0 {
        println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
        msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
    } else {
        println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
    }
    let tok = cli.publish(msg);

            if let Err(e) = tok {
                    println!("Error sending message: {:?}", e);
                    break;
            }
}

サブスクライブ

クライアントが接続される前に、消費者を初期化する必要があります。ここでは消費者のメッセージキューをループ処理し、受信したメッセージのサブスクライブされたトピック名と内容を出力します。

fn subscribe_topics(cli: &mqtt::Client) {
    if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
        println!("Error subscribes topics: {:?}", e);
        process::exit(1);
    }
}

fn main() {
    ...
    // Initialize the consumer before connecting.
    let rx = cli.start_consuming();
    ...
    // Subscribe topics.
    subscribe_topics(&cli);

    println!("Processing requests...");
    for msg in rx.iter() {
        if let Some(msg) = msg {
            println!("{}", msg);
        }
        else if !cli.is_connected() {
            if try_reconnect(&cli) {
                println!("Resubscribetopics...");
                subscribe_topics(&cli);
            } else {
                break;
            }
        }
    }
    ...
}

完全なソースコード

メッセージをパブリッシュするためのコード

use std::{
    env,
    process,
    time::Duration
};

extern crate paho_mqtt as mqtt;

const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// Define the qos.
const QOS:i32 = 1;

fn main() {
    let host = env::args().nth(1).unwrap_or_else(||
        DFLT_BROKER.to_string()
    );

    // Define the set of options for the create.
    // Use an ID for a persistent session.
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id(DFLT_CLIENT.to_string())
        .finalize();

    // Create a client.
    let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
        println!("Error creating the client: {:?}", err);
        process::exit(1);
    });

    // Define the set of options for the connection.
    let conn_opts = mqtt::ConnectOptionsBuilder::new()
        .keep_alive_interval(Duration::from_secs(20))
        .clean_session(true)
        .finalize();

    // Connect and wait for it to complete or fail.
    if let Err(e) = cli.connect(conn_opts) {
        println!("Unable to connect:\n\t{:?}", e);
        process::exit(1);
    }

    // Create a message and publish it.
    // Publish message to 'test' and 'hello' topics.
    for num in 0..5 {
        let content =  "Hello world! ".to_string() + &num.to_string();
        let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
        if num % 2 == 0 {
            println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
            msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
        } else {
            println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
        }
        let tok = cli.publish(msg);

                if let Err(e) = tok {
                        println!("Error sending message: {:?}", e);
                        break;
                }
    }


    // Disconnect from the broker.
    let tok = cli.disconnect(None);
    println!("Disconnect from the broker");
    tok.unwrap();
}

メッセージをサブスクライブのサンプルコード

ユーザーエクスペリエンスを向上させるために、メッセージのサブスクリプションは切断され、接続が再確立された後にトピックが再サブスクライブされます。

use std::{
    env,
    process,
    thread,
    time::Duration
};

extern crate paho_mqtt as mqtt;

const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_subscribe";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// The qos list that match topics above.
const DFLT_QOS:&[i32] = &[0, 1];

// Reconnect to the broker when connection is lost.
fn try_reconnect(cli: &mqtt::Client) -> bool
{
    println!("Connection lost. Waiting to retry connection");
    for _ in 0..12 {
        thread::sleep(Duration::from_millis(5000));
        if cli.reconnect().is_ok() {
            println!("Successfully reconnected");
            return true;
        }
    }
    println!("Unable to reconnect after several attempts.");
    false
}

// Subscribes to multiple topics.
fn subscribe_topics(cli: &mqtt::Client) {
    if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
        println!("Error subscribes topics: {:?}", e);
        process::exit(1);
    }
}

fn main() {
    let host = env::args().nth(1).unwrap_or_else(||
        DFLT_BROKER.to_string()
    );

    // Define the set of options for the create.
    // Use an ID for a persistent session.
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id(DFLT_CLIENT.to_string())
        .finalize();

    // Create a client.
    let mut cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
        println!("Error creating the client: {:?}", err);
        process::exit(1);
    });

    // Initialize the consumer before connecting.
    let rx = cli.start_consuming();

    // Define the set of options for the connection.
    let lwt = mqtt::MessageBuilder::new()
        .topic("test")
        .payload("Consumer lost connection")
        .finalize();
    let conn_opts = mqtt::ConnectOptionsBuilder::new()
        .keep_alive_interval(Duration::from_secs(20))
        .clean_session(false)
        .will_message(lwt)
        .finalize();

    // Connect and wait for it to complete or fail.
    if let Err(e) = cli.connect(conn_opts) {
        println!("Unable to connect:\n\t{:?}", e);
        process::exit(1);
    }

    // Subscribe topics.
    subscribe_topics(&cli);

    println!("Processing requests...");
    for msg in rx.iter() {
        if let Some(msg) = msg {
            println!("{}", msg);
        }
        else if !cli.is_connected() {
            if try_reconnect(&cli) {
                println!("Resubscribe topics...");
                subscribe_topics(&cli);
            } else {
                break;
            }
        }
    }

    // If still connected, then disconnect now.
    if cli.is_connected() {
        println!("Disconnecting");
        cli.unsubscribe_many(DFLT_TOPICS).unwrap();
        cli.disconnect(None).unwrap();
    }
    println!("Exiting");
}

実行とテスト

バイナリファイルのコンパイル

以下のコマンドは、mqtt-example/target/debugディレクトリにsubpubのバイナリファイルを生成します。

cargo build

rustmqttbin.png

メッセージのサブスクリプション

subバイナリファイルを実行し、メッセージのパブリッシュを待ちます。
rustmqttsub1.png

メッセージの

pubバイナリファイルを実行すると、rust/testおよびrust/mqttトピックにそれぞれメッセージがパブリッシュされていることが確認できます。

rustmqttpub.png 同時に、メッセージのサブスクリプションでもパブリッシュされたメッセージが見えます。

rustmqttsub2.png

まとめ

paho-mqttのRust言語クライアントを使用してMQTTブローカーに接続し、テストクライアントとMQTTブローカー間の接続、メッセージのパブリッシュ、サブスクリプションを実装することが完了しました。

次に、EMQが提供するMQTTプロトコル簡単ガイドシリーズの記事をチェックして、MQTTプロトコルの機能を学び、MQTTのより高度なアプリケーションを探求し、MQTTアプリケーションおよびサービス開発を始めることができます。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?