Rustは、パフォーマンスと安全性、特に安全な並行処理に特化したマルチパラダイムプログラミング言語です。RustはC++に構文が似ていますが、borrow checkerを使用して参照を検証することでメモリ安全性を保証できます。Rustはガベージコレクションを使用せずにメモリ安全性を実現し、参照カウントはオプションです。
MQTTは、パブリッシュ/サブスクライブモデルに基づいた軽量IoTメッセージングプロトコルで、非常に少ないコードと帯域幅でネットワーク機器にリアルタイムで信頼性の高いメッセージサービスを提供できます。また、IoT、モバイルインターネット、スマートハードウェア、IoV、電力・エネルギー産業など幅広く使用されています。
この記事では、Rustプロジェクトでpaho-mqttクライアントライブラリの使用方法を主に紹介し、クライアントとMQTTブローカー間の接続、サブスクライブ、メッセージング、アンサブスクライブなどの実装方法について説明します。
プロジェクトの初期化
このプロジェクトはRust 1.44.0を使用して開発・テストされ、Cargo 1.44.0パッケージ管理ツールを使用して管理されています。以下のコマンドを使用して現在のRustのバージョンを確認できます。
~ rustc --version
rustc 1.44.0 (49cae5576 2020-06-01)
MQTTクライアントライブラリの選択
現在のRustで最も汎用性が高く、広く使用されているMQTTクライアントはpaho-mqtt
です。最新バージョンの0.7.1
は、MQTT v5、3.1.1、3.1をサポートし、標準TCP、SSL/TLS、WebSocketsを介したデータ転送、QoSサポート0、1、2などをサポートしています。
初期化プロジェクト
以下のコマンドを実行して、mqtt-example
という名前の新しいRustプロジェクトを作成します。
~ cargo new mqtt-example
Created binary (application) `mqtt-example` package
プロジェクトのCargo.toml
ファイルを編集し、dependencies
にpaho-mqtt
ライブラリのアドレスを追加し、サブスクライブ、パブリッシュのコードファイルに対応するバイナリファイルを指定します。
[dependencies]
paho-mqtt = { git = "https://github.com/eclipse/paho.mqtt.rust.git", branch = "master" }
[[bin]]
name = "sub"
path = "src/sub/main.rs"
[[bin]]
name = "pub"
path = "src/pub/main.rs"
Rust MQTTの使用
クライアント接続の作成
この記事では、EMQXが提供する無料の公開MQTTブローカーをテスト接続のMQTTブローカーとして使用します。このサービスはEMQXのMQTTクラウドサービスに基づいて作成されています。サーバーのアクセス情報は以下のとおりです:
- ブローカー: broker.emqx.io
- TCPポート: 1883
- Websocketポート: 8083
MQTTブローカー接続パラメータの設定
MQTTブローカーの接続アドレス(ポートを含む)、トピック(ここでは2つのトピックを設定)、クライアントIDを設定します。
const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
MQTT接続ソースコード
MQTT接続ソースコードを記述し、バイナリファイルを実行する際にコマンドライン引数として接続アドレスを渡すことができます。通常、クライアントを作成し、broker.emqx.io
に接続する必要があります。
let host = env::args().nth(1).unwrap_or_else(||
DFLT_BROKER.to_string()
);
// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id(DFLT_CLIENT.to_string())
.finalize();
// Create a client.
let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
println!("Error creating the client: {:?}", err);
process::exit(1);
});
// Define the set of options for the connection.
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(true)
.finalize();
// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
println!("Unable to connect:\n\t{:?}", e);
process::exit(1);
}
メッセージのパブリッシュ
ここでは、rust/mqtt
およびrust/test
の2つのトピックに合計5つのメッセージをパブリッシュします。ループの偶数か奇数かによります。
for num in 0..5 {
let content = "Hello world! ".to_string() + &num.to_string();
let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
if num % 2 == 0 {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
} else {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
}
let tok = cli.publish(msg);
if let Err(e) = tok {
println!("Error sending message: {:?}", e);
break;
}
}
サブスクライブ
クライアントが接続される前に、消費者を初期化する必要があります。ここでは消費者のメッセージキューをループ処理し、受信したメッセージのサブスクライブされたトピック名と内容を出力します。
fn subscribe_topics(cli: &mqtt::Client) {
if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
println!("Error subscribes topics: {:?}", e);
process::exit(1);
}
}
fn main() {
...
// Initialize the consumer before connecting.
let rx = cli.start_consuming();
...
// Subscribe topics.
subscribe_topics(&cli);
println!("Processing requests...");
for msg in rx.iter() {
if let Some(msg) = msg {
println!("{}", msg);
}
else if !cli.is_connected() {
if try_reconnect(&cli) {
println!("Resubscribetopics...");
subscribe_topics(&cli);
} else {
break;
}
}
}
...
}
完全なソースコード
メッセージをパブリッシュするためのコード
use std::{
env,
process,
time::Duration
};
extern crate paho_mqtt as mqtt;
const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// Define the qos.
const QOS:i32 = 1;
fn main() {
let host = env::args().nth(1).unwrap_or_else(||
DFLT_BROKER.to_string()
);
// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id(DFLT_CLIENT.to_string())
.finalize();
// Create a client.
let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
println!("Error creating the client: {:?}", err);
process::exit(1);
});
// Define the set of options for the connection.
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(true)
.finalize();
// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
println!("Unable to connect:\n\t{:?}", e);
process::exit(1);
}
// Create a message and publish it.
// Publish message to 'test' and 'hello' topics.
for num in 0..5 {
let content = "Hello world! ".to_string() + &num.to_string();
let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
if num % 2 == 0 {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
} else {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
}
let tok = cli.publish(msg);
if let Err(e) = tok {
println!("Error sending message: {:?}", e);
break;
}
}
// Disconnect from the broker.
let tok = cli.disconnect(None);
println!("Disconnect from the broker");
tok.unwrap();
}
メッセージをサブスクライブのサンプルコード
ユーザーエクスペリエンスを向上させるために、メッセージのサブスクリプションは切断され、接続が再確立された後にトピックが再サブスクライブされます。
use std::{
env,
process,
thread,
time::Duration
};
extern crate paho_mqtt as mqtt;
const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_subscribe";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// The qos list that match topics above.
const DFLT_QOS:&[i32] = &[0, 1];
// Reconnect to the broker when connection is lost.
fn try_reconnect(cli: &mqtt::Client) -> bool
{
println!("Connection lost. Waiting to retry connection");
for _ in 0..12 {
thread::sleep(Duration::from_millis(5000));
if cli.reconnect().is_ok() {
println!("Successfully reconnected");
return true;
}
}
println!("Unable to reconnect after several attempts.");
false
}
// Subscribes to multiple topics.
fn subscribe_topics(cli: &mqtt::Client) {
if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
println!("Error subscribes topics: {:?}", e);
process::exit(1);
}
}
fn main() {
let host = env::args().nth(1).unwrap_or_else(||
DFLT_BROKER.to_string()
);
// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id(DFLT_CLIENT.to_string())
.finalize();
// Create a client.
let mut cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
println!("Error creating the client: {:?}", err);
process::exit(1);
});
// Initialize the consumer before connecting.
let rx = cli.start_consuming();
// Define the set of options for the connection.
let lwt = mqtt::MessageBuilder::new()
.topic("test")
.payload("Consumer lost connection")
.finalize();
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(false)
.will_message(lwt)
.finalize();
// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
println!("Unable to connect:\n\t{:?}", e);
process::exit(1);
}
// Subscribe topics.
subscribe_topics(&cli);
println!("Processing requests...");
for msg in rx.iter() {
if let Some(msg) = msg {
println!("{}", msg);
}
else if !cli.is_connected() {
if try_reconnect(&cli) {
println!("Resubscribe topics...");
subscribe_topics(&cli);
} else {
break;
}
}
}
// If still connected, then disconnect now.
if cli.is_connected() {
println!("Disconnecting");
cli.unsubscribe_many(DFLT_TOPICS).unwrap();
cli.disconnect(None).unwrap();
}
println!("Exiting");
}
実行とテスト
バイナリファイルのコンパイル
以下のコマンドは、mqtt-example/target/debug
ディレクトリにsub
、pub
のバイナリファイルを生成します。
cargo build
メッセージのサブスクリプション
sub
バイナリファイルを実行し、メッセージのパブリッシュを待ちます。
メッセージの
pub
バイナリファイルを実行すると、rust/test
およびrust/mqtt
トピックにそれぞれメッセージがパブリッシュされていることが確認できます。
同時に、メッセージのサブスクリプションでもパブリッシュされたメッセージが見えます。
まとめ
paho-mqttのRust言語クライアントを使用してMQTTブローカーに接続し、テストクライアントとMQTTブローカー間の接続、メッセージのパブリッシュ、サブスクリプションを実装することが完了しました。
次に、EMQが提供するMQTTプロトコル簡単ガイドシリーズの記事をチェックして、MQTTプロトコルの機能を学び、MQTTのより高度なアプリケーションを探求し、MQTTアプリケーションおよびサービス開発を始めることができます。