0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MQTTを使用してデバイスと通信し、そのデバイスに対してハートビートを送信するSpring Bootの実装

Posted at

実装の概要

  1. デバイスがMQTTを使ってトピック /device/register にメッセージを送信
    • メッセージの内容に deviceId を含む。
  2. Spring Bootがデバイスを登録
    • デバイスの deviceId を保持。
  3. スケジューラーでMQTT経由でハートビートを送信
    • 一定間隔(例: 30秒ごと)で /device/heartbeat/{deviceId} にMQTTメッセージを送信。

1. 必要なライブラリ(pom.xml

xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>


2. MQTT設定 (MqttConfig.java)

java

package com.example.mqtt;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

@Configuration
public class MqttConfig {

    private static final String MQTT_BROKER_URL = "tcp://localhost:1883";
    private static final String CLIENT_ID = "spring-mqtt-client";

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{MQTT_BROKER_URL});
        options.setCleanSession(true);
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MqttPahoMessageDrivenChannelAdapter inboundAdapter(MqttPahoClientFactory mqttClientFactory) {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(CLIENT_ID + "_in", mqttClientFactory, "/device/register");
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler messageHandler() {
        return message -> {
            String payload = message.getPayload().toString();
            System.out.println("Received MQTT message: " + payload);
        };
    }

    @Bean
    public MessageChannel mqttOutputChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MqttPahoMessageHandler outboundHandler(MqttPahoClientFactory mqttClientFactory) {
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(CLIENT_ID + "_out", mqttClientFactory);
        handler.setAsync(true);
        return handler;
    }
}


3. デバイス登録とハートビート送信 (DeviceService.java)

java

package com.example.mqtt;

import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.HashSet;
import java.util.Set;

@Service
public class DeviceService {

    private final MessageChannel mqttOutputChannel;
    private final Set<String> registeredDevices = new HashSet<>();

    public DeviceService(MessageChannel mqttOutputChannel) {
        this.mqttOutputChannel = mqttOutputChannel;
    }

    public void registerDevice(String deviceId) {
        registeredDevices.add(deviceId);
        System.out.println("Device registered: " + deviceId);
    }

    @Scheduled(fixedRate = 30000)
    public void sendHeartbeat() {
        for (String deviceId : registeredDevices) {
            String topic = "/device/heartbeat/" + deviceId;
            String payload = "heartbeat";

            Message<String> message = MessageBuilder.withPayload(payload)
                    .setHeader(MqttHeaders.TOPIC, topic)
                    .build();

            mqttOutputChannel.send(message);
            System.out.println("Sent heartbeat to: " + topic);
        }
    }
}


4. デバイス登録を処理する (MqttMessageHandler.java)

java
package com.example.mqtt;

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;

@Component
public class MqttMessageHandler {

    private final DeviceService deviceService;

    public MqttMessageHandler(DeviceService deviceService) {
        this.deviceService = deviceService;
    }

    @ServiceActivator(inputChannel = "mqttInputChannel")
    public void handleMqttMessage(String payload) {
        System.out.println("Received device registration: " + payload);
        deviceService.registerDevice(payload);
    }
}


5. @EnableScheduling を追加 (MqttApplication.java)

java
package com.example.mqtt;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class MqttApplication {
    public static void main(String[] args) {
        SpringApplication.run(MqttApplication.class, args);
    }
}


動作確認

1. MQTTブローカーの起動

ローカルで mosquitto を動かしている場合:

bash
mosquitto -c /etc/mosquitto/mosquitto.conf

Dockerで Eclipse Mosquitto を使う場合:

bash
docker run -d -p 1883:1883 eclipse-mosquitto


2. デバイスが登録を送信

MQTTクライアント(mosquitto_pub)でデバイスを登録:

bash
mosquitto_pub -h localhost -t "/device/register" -m "device123"

Spring Bootのコンソール:

yaml
Received device registration: device123
Device registered: device123


3. ハートビートの受信

30秒ごとに mosquitto_sub で受信を確認:

bash
mosquitto_sub -h localhost -t "/device/heartbeat/device123"

期待される出力:

nginx
heartbeat


まとめ

デバイスは MQTT を使って /device/register に登録

Spring Bootは登録されたデバイスを保存

30秒ごとに /device/heartbeat/{deviceId} にハートビートを送信

MQTTでデバイスと通信しながら、Spring Bootで管理・自動ハートビート送信を実装しました!🚀

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?