はじめに
MQTTをPython/Javaで使ってみたので投稿しています。
本記事は以下の3にあたります。まだの方は1をまずご覧ください。
- 【MQTT】コマンドベースでMQTTの導入(前々回)
- 【Python】PythonでMQTTのPub/Subをするクラスを実装した(前回)
- 【Java】JavaでMQTTのPub/Subをするクラスを実装した(本記事)
- 【ROS】MQTT通信するノードを実装した(次回)
ライブラリ、ブローカーのインストール
これは前々回の記事にあるのでまだインストールしていない人のために書いておきます。
ライブラリ、ブローカーのインストール
# Mosquitto(Broker)をインストール
$ sudo apt-get install mosquitto
# Mosquittoクライアントをインストール
$ sudo apt-get install mosquitto-clients
クライアントライブラリのインストール
Javaのクライアントライブラリのインストール方法は以下になります。
こちらのリンクから
org.eclipse.paho.client.mqttv3_1.2.3.jar
をダウンロードしてください。
PublisherとSubscriberと別スレッド処理のコード
Pythonの記事と同じように、ROSやOpenRTMなど、別で動作しているシステムから呼び出したいです。
PythonではSubscriberを立ち上げるさいにloop_start()
とすると別スレッドの処理が始まるようだったのですがJavaではそうではないようなので(ここは不明点です。どなたか簡単に別スレッドで処理ができるようでしたら教えてください。)別スレッドで処理をするクラスも作っています。
よって、本記事のプログラムは以下のプログラムから成り立ちます。
- Publisher(別システムから呼び出す)
- Subscriber(別スレッドとして呼び出される)
- Thread(Subscriberを呼び出す)
こちらの記事をとても参考にしました。
Publisher
package mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttPublisher {
String broker = "";
String topic = "";
/**
* コンストラクタ
* @param brokerHostName
* @param publishTopic
*/
public MqttPublisher(String brokerHostName,String publishTopic) {
broker = "tcp://"+brokerHostName+":1883";
topic = publishTopic;
}
/**
* 引数をpublishする.
* @param publishMessage
*/
public void publish(String publishMessage) {
final int qos = 2;
final String clientId = "Publisher";
try {
MqttClient mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
mqttClient.connect(connOpts);
MqttMessage message = new MqttMessage(publishMessage.getBytes());
message.setQos(qos);
// System.out.println("publish message");
// System.out.println("Topic : "+topic+", Message : "+message);
mqttClient.publish(topic, message);
mqttClient.disconnect();
mqttClient.close();
} catch(MqttException me) {
System.out.println("reason: " + me.getReasonCode());
System.out.println("message: " + me.getMessage());
System.out.println("localize: " + me.getLocalizedMessage());
System.out.println("cause: " + me.getCause());
System.out.println("exception: "+ me);
}
}
public static void main(String[] args) {
MqttPublisher publisher = new MqttPublisher("localhost","testTopic2");
publisher.publish("test");
}
}
main関数にあるように、インスタンス生成時にホスト名、トピック名を指定してpublish
メソッドでメッセージをpublishします。
Subscriber
package mqtt;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Timestamp;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttSubscriber implements MqttCallback {
Timestamp recieveTime;
Timestamp lastTime;
String broker = "";
String topic = "";
/**
* コンストラクタ
* @param brokerHostName
* @param subscribeTopic
*/
public MqttSubscriber(String brokerHostName,String subscribeTopic) {
broker = "tcp://"+brokerHostName+":1883";
topic = subscribeTopic;
}
/**
* MQTTブローカーとの接続を失った時に呼び出される.
*/
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost");
System.exit(1);
}
/**
* メッセージを受信したときに呼び出される.
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws MqttException {
// System.out.println("Message arrived");
// System.out.println("Topic:"+ topic);
// System.out.println("Message: " + new String(message.getPayload()));
recieveTime = new Timestamp(System.currentTimeMillis());
MqttThread.recieveData = new String(message.getPayload());
}
/**
* Subscribeしたか否かを判断する.
* @return isNewフラグ
*/
public boolean isNew() {
boolean flag = false;
if(recieveTime==lastTime) flag = false;
else flag = true;
lastTime=recieveTime;
return flag;
}
public static void main(String[] args) throws InterruptedException {
try {
MqttSubscriber subscriber = new MqttSubscriber("localhost","testTopic1");
subscriber.subscribe();
} catch(MqttException me) {
System.out.println("reason: " + me.getReasonCode());
System.out.println("message: " + me.getMessage());
System.out.println("localize: " + me.getLocalizedMessage());
System.out.println("cause: " + me.getCause());
System.out.println("exception: "+ me);
}
}
/**
* メッセージを受信する.
* 標準入力があるまで接続し続ける.
*
* @throws MqttException
* @throws InterruptedException
*/
public void subscribe() throws MqttException, InterruptedException {
//Subscribe設定
final int qos = 2;
final String clientId = "Subscribe";
MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
client.setCallback(this);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
System.out.println("Connecting to broker:"+broker);
client.connect(connOpts);
client.subscribe(topic, qos);
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
try{
//標準入力を受け取るまで待ち続ける
br.readLine();
}catch(IOException e){
System.exit(1);
}
client.disconnect();
client.close();
System.out.println("Disconnected");
}
/**
* MqttCallbackに必要,subscribeからは呼び出されなさそう.
*/
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO 自動生成されたメソッド・スタブ
}
}
こちらもpublisherと同様にインスタンス生成時にホスト名とトピック名を指定してsubscribe
メソッドでsubscribeを開始しします。
スレッド処理
Thread
を継承して、Subscribeの処理をスレッド処理で行うクラスです。
package mqtt;
import org.eclipse.paho.client.mqttv3.MqttException;
public class MqttThread extends Thread{
String broker = "";
String topic = "";
static MqttSubscriber subscriber;
public static String recieveData = "";
//コンストラクタ
public MqttThread(String brokerHostName,String subscribeTopic) {
broker = brokerHostName;
topic = subscribeTopic;
subscriber = new MqttSubscriber(broker, topic);
}
public void run() {
try {
subscriber.subscribe();
} catch(MqttException me) {
System.out.println("reason: " + me.getReasonCode());
System.out.println("message: " + me.getMessage());
System.out.println("localize: " + me.getLocalizedMessage());
System.out.println("cause: " + me.getCause());
System.out.println("exception: "+ me);
} catch (InterruptedException e) {
// TODO 自動生成された catch ブロック
e.printStackTrace();
}
}
public boolean isNew() {
boolean flag = false;
flag = subscriber.isNew();
return flag;
}
}
使用例
Subscribeの処理は別スレッド、Pythonと同様にsubscribe時に呼び出されるcallback関数内(messageArrived
関数)で処理を行います。
Publishの処理は都度関数を呼び出してpublishします。
package main;
import mqtt.MqttThread;
import mqtt.MqttPublisher;
public class testMQTTClient {
public static void main(String[] args) {
MqttThread mthread = new MqttThread("ホスト名","トピック名");
mthread.start();
// 1回だけpublish
MqttPublisher publisher = new MqttPublisher("ホスト名","トピック名");
publisher.publish("メッセージ内容");
if(mthread.isNew()){
System.out.println(mthread.recieveData);
}
}
}
コンストラクタ呼び出し時(インスタンス生成時)にホスト名とトピック名を指定します。
mthread.start()
でsubscribeの処理が別スレッドで動きます。
publisher.publish("メッセージ")
で1回だけpublishされます。
subscribeされたデータ内容にはmthread.receiveData
でアクセスできます。Pythonのものと同様にisNew()
関数とセットで使ってください。
前々回の記事のPub/Subの起動の部分を参考に、PublisherとSubsrriberを起動して動作確認できると思います。
おわりに
今回はJavaでの実装例を紹介しました。
Pub/Sub単体での動作ではなく、実際にほかのシステムから動かす際に使えるようにしてみました。