実装の概要
-
デバイスがMQTTを使ってトピック
/device/register
にメッセージを送信- メッセージの内容に
deviceId
を含む。
- メッセージの内容に
-
Spring Bootがデバイスを登録
- デバイスの
deviceId
を保持。
- デバイスの
-
スケジューラーでMQTT経由でハートビートを送信
- 一定間隔(例: 30秒ごと)で
/device/heartbeat/{deviceId}
にMQTTメッセージを送信。
- 一定間隔(例: 30秒ごと)で
1. 必要なライブラリ(pom.xml
)
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2. MQTT設定 (MqttConfig.java
)
java
package com.example.mqtt;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class MqttConfig {
private static final String MQTT_BROKER_URL = "tcp://localhost:1883";
private static final String CLIENT_ID = "spring-mqtt-client";
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{MQTT_BROKER_URL});
options.setCleanSession(true);
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoMessageDrivenChannelAdapter inboundAdapter(MqttPahoClientFactory mqttClientFactory) {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(CLIENT_ID + "_in", mqttClientFactory, "/device/register");
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler messageHandler() {
return message -> {
String payload = message.getPayload().toString();
System.out.println("Received MQTT message: " + payload);
};
}
@Bean
public MessageChannel mqttOutputChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "mqttOutputChannel")
public MqttPahoMessageHandler outboundHandler(MqttPahoClientFactory mqttClientFactory) {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(CLIENT_ID + "_out", mqttClientFactory);
handler.setAsync(true);
return handler;
}
}
3. デバイス登録とハートビート送信 (DeviceService.java
)
java
package com.example.mqtt;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.HashSet;
import java.util.Set;
@Service
public class DeviceService {
private final MessageChannel mqttOutputChannel;
private final Set<String> registeredDevices = new HashSet<>();
public DeviceService(MessageChannel mqttOutputChannel) {
this.mqttOutputChannel = mqttOutputChannel;
}
public void registerDevice(String deviceId) {
registeredDevices.add(deviceId);
System.out.println("Device registered: " + deviceId);
}
@Scheduled(fixedRate = 30000)
public void sendHeartbeat() {
for (String deviceId : registeredDevices) {
String topic = "/device/heartbeat/" + deviceId;
String payload = "heartbeat";
Message<String> message = MessageBuilder.withPayload(payload)
.setHeader(MqttHeaders.TOPIC, topic)
.build();
mqttOutputChannel.send(message);
System.out.println("Sent heartbeat to: " + topic);
}
}
}
4. デバイス登録を処理する (MqttMessageHandler.java
)
java
package com.example.mqtt;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;
@Component
public class MqttMessageHandler {
private final DeviceService deviceService;
public MqttMessageHandler(DeviceService deviceService) {
this.deviceService = deviceService;
}
@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleMqttMessage(String payload) {
System.out.println("Received device registration: " + payload);
deviceService.registerDevice(payload);
}
}
5. @EnableScheduling
を追加 (MqttApplication.java
)
java
package com.example.mqtt;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class MqttApplication {
public static void main(String[] args) {
SpringApplication.run(MqttApplication.class, args);
}
}
動作確認
1. MQTTブローカーの起動
ローカルで mosquitto
を動かしている場合:
bash
mosquitto -c /etc/mosquitto/mosquitto.conf
Dockerで Eclipse Mosquitto
を使う場合:
bash
docker run -d -p 1883:1883 eclipse-mosquitto
2. デバイスが登録を送信
MQTTクライアント(mosquitto_pub
)でデバイスを登録:
bash
mosquitto_pub -h localhost -t "/device/register" -m "device123"
Spring Bootのコンソール:
yaml
Received device registration: device123
Device registered: device123
3. ハートビートの受信
30秒ごとに mosquitto_sub
で受信を確認:
bash
mosquitto_sub -h localhost -t "/device/heartbeat/device123"
期待される出力:
nginx
heartbeat
まとめ
✅ デバイスは MQTT を使って /device/register
に登録
✅ Spring Bootは登録されたデバイスを保存
✅ 30秒ごとに /device/heartbeat/{deviceId}
にハートビートを送信
MQTTでデバイスと通信しながら、Spring Bootで管理・自動ハートビート送信を実装しました!🚀