Spring BootでRedisというと、セッションストアやリザルトキャッシュとしての使用例が王道ですが、RedisはPub/Subの機能も持っています。

Spring Bootでは、RedisのPub/Subを扱う仕組みも提供されているので、使い方についてメモしました。
動作確認はSpring Boot 1.5.8.RELEASEで行っています。

使い方をまとめると、次のようになります。

  • Pub側
    • RedisTemplate#convertAndSend(String, Object) メソッドを使い、チャンネル名を指定してメッセージを送信(第一引数がチャンネル名、第二引数が送信したいメッセージ)
  • Sub側
    • MessageListener インタフェースを実装したクラスを定義して、メッセージを受信した時のロジックを記述する
  • Configuration
    • Subscriberを RedisMessagelistenerContainerに登録する
    • Pub -Redis - Subの間でやり取りするするためのRedisTemplate, Topic等を設定する

サンプル

今回は、文字列をPub/Subするサンプルを試してみました。

Webアプリを複数起動し、片方でPublishしたメッセージをもう片方でSubscribeする、という動作です。

依存関係は次のものになります 1

build.gradle
dependencies {
    compile 'org.springframework.boot:spring-boot-starter-web'
    compile 'org.springframework.boot:spring-boot-starter-cache'
    compile 'org.springframework.boot:spring-boot-starter-data-redis'
}

なお、以下のコードでは、Redisを利用するための基本的な設定については、Spring BootのAuto configurationに任せ、省略しています。
そのため、Redisの使うための基本設定(接続先やプールの設定など)については、自動設定が適用されており、 RedisConnectionFactory などのBeanについてはすでにインジェクション可能な状態にあることを前提としています。

Pub

まずは送信側のサンプルから。

RedisMessagePublisher.java
@Service
public class RedisMessagePublisher {

    private final StringRedisTemplate redisTemplate;

    private final ChannelTopic topic;

    public RedisMessagePublisher(StringRedisTemplate redisTemplate,
                                 ChannelTopic topic) {
        this.redisTemplate = redisTemplate;
        this.topic = topic;
    }

    public void publish(String message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

メインのロジックは publish で、引数で渡された文字列をチャンネルに publish します。
RedisTemplate#convertAndSend メソッドの内部では、Redisに標準で用意されている PUBLISH コマンドが呼ばれます。

Sub

受信側のサンプルです。

RedisMessageSubscriber.java
public class RedisMessageSubscriber implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        System.out.println("message received : " + new String(message.getBody(), StandardCharsets.UTF_8));
    }
}

受信側は MessageListener インタフェースを実装したクラスになります。

チャンネルにpublishされてきたメッセージを標準出力に表示します。
メッセージ本体は Message#getBody メソッドで取得できますが、メソッドの戻り値はバイト列なので、文字列に戻してから出力しています。

Configuration

続いて、アプリを動かすための configuration です。

RedisConfig.java
@Configuration
public class RedisConfig {

    /**
     * Redis にアクセスするための RedisTemplate
     */
    @Bean
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {
        StringRedisTemplate redisTemplate = new StringRedisTemplate();
        redisTemplate.setConnectionFactory(connectionFactory);
        return redisTemplate;
    }

    /**
     * リスナ
     */
    @Bean
    public MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter(new RedisMessageSubscriber());
    }

    /**
     * Pub/Subで利用するチャンネル
     */
    @Bean
    public ChannelTopic topic() {
        return new ChannelTopic("demo");
    }

    /**
     * リスナを登録
     */
    @Bean
    public RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory,
                                                        MessageListenerAdapter messageListener) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();

        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(messageListener(), topic());
        return container;
    }
}

Redisとやり取りするメッセージはただの文字列なので、Redisのアクセスは RedisTemplate ではなく StringRedisTemplate を使います。

ポイントは、RedisMessageListenerContainerの中でSubscriberを登録しているところです。

Controller

動作確認用のコントローラを作ります。

DemoController.java
@RestController
public class DemoController {

    private final RedisMessagePublisher publisher;

    public DemoController(RedisMessagePublisher publisher) {
        this.publisher = publisher;
    }

    @GetMapping("publish")
    public String publish(@RequestParam String message) {
        publisher.publish(message);

        return "published " + message;
    }
}

動作確認

準備ができたので、動作を確認してみます。

まずはアプリをビルドします。

$ ./gradlew build

ポート8888でアプリを1つ起動します(xxx.jarのところは実際のファイル名に読み替えてください)

$ java -jar -Dserver.port=8888 build/lib/xxx.jar

別のコンソールを開いて、ポート9999でもう1つアプリを起動します。

$ java -jar -Dserver.port=9999 build/lib/xxx.jar

アプリが2つとも起動した状態で http://localhost:8888/publish?message=hello にアクセスします。
そうすると、コンソールに

receive message : hello

が出力されました。
クエリパラメータ messageの値を変えると、それに応じて出力内容も変わることが確認できます。
これで、Pub/Sub機能が使えていることが確認できました。

実際に動かしてみるとわかりますが、2つのアプリで同じメッセージが出力されます。
これは、両方のアプリがSubscribeしている状態だからです。送信者以外が受信できるようにするには、少し手を加える必要があるかもしれません。

Junit 5でテストしてみる

意図したとおり動作することを確認するために、テストコードが欲しくなります。

今回はJUnit 5を使ってテストを書いてみました。
先に完成形の build.gradle を示します(JUnit5、依存関係の設定が大変ですね…)

build.gradle
buildscript {
    ext {
        springBootVersion = '1.5.8.RELEASE'
    }
    repositories {
        mavenCentral()
        jcenter()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("org.springframework:springloaded:1.2.8.RELEASE")
        classpath("io.spring.gradle:dependency-management-plugin:1.0.3.RELEASE")
        classpath("org.junit.platform:junit-platform-gradle-plugin:1.0.2")
    }
}

apply plugin: 'java'
apply plugin: 'groovy'
apply plugin: 'idea'
apply plugin: 'spring-boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'org.junit.platform.gradle.plugin'

sourceCompatibility = 1.8
targetCompatibility = 1.8

compileJava.options.encoding = 'UTF-8'

repositories {
    mavenCentral()
    maven { url 'https://jitpack.io' }
}

dependencies {
    compile 'org.springframework.boot:spring-boot-starter-web'
    compile 'org.springframework.boot:spring-boot-starter-cache'
    compile 'org.springframework.boot:spring-boot-starter-data-redis'
    compile 'com.github.ben-manes.caffeine:caffeine'

    testCompile 'org.springframework.boot:spring-boot-starter-test'
    testCompile 'org.junit.jupiter:junit-jupiter-api'
    testRuntime 'org.junit.jupiter:junit-jupiter-engine'
    testCompile 'com.github.sbrannen:spring-test-junit5:1.0.2'
}

Spring Boot and JUnit 5を参考に、spring-test-junit5というExtensionを追加しています。
これはSpring Bootのテストの仕組みとJUnit 5を統合するために必要です。余談ですが、SpringとJUnit 5のコミッタの方が作られたものみたいですね。

さて肝心のテストですが、次の通りに動作確認することにします。

  1. Publisherがメッセージを投げる
  2. Subscriberがメッセージを受け取る
  3. 2.で受け取ったメッセージが1.と同じことを検証する

プロダクトコードのSubscriberでは、受け取ったメッセージの検証がやりにくいです。
そこで、受け取ったメッセージの記録ができるような、テスト用のSubscriberを利用してあげることにします。

テストコードはこんな感じになりました。

RedisMessagePubSubTest.java
@SpringBootTest
@ExtendWith(SpringExtension.class)
public class RedisMessagePubSubTest {

    @Autowired
    RedisMessagePublisher publisher;

    static TestSubscriber subscriber = new TestSubscriber();

    @TestConfiguration
    static class TestConfig {

        @Bean
        public RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory,
                                                            Topic topic) {
            MessageListenerAdapter messageListener = new MessageListenerAdapter(subscriber);

            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.addMessageListener(messageListener, topic);
            return container;
        }
    }

    static class TestSubscriber implements MessageListener {

        List<String> receivedMessages = new ArrayList<>();

        @Override
        public void onMessage(Message message, byte[] pattern) {
            receivedMessages.add(new String(message.getBody(), StandardCharsets.UTF_8)));
        }

        Optional<String> lastMessage() {
            if (receivedMessages.isEmpty()) {
                return Optional.empty();
            }
            return Optional.of(receivedMessages.get(receivedMessages.size() - 1));
        }
    }

    @Test
    void testPubSub() throws Exception {
        publisher.publish("first message");
        Thread.sleep(100L);

        assertEquals("first message", subscriber.lastMessage().orElse("Not Found"));

        publisher.publish("second message");
        Thread.sleep(100L);

        assertEquals("second message", subscriber.lastMessage().orElse("Not Found"));
    }
}

publishされたメッセージがsubscriberされるまでの時間は、ほとんど気にならないレベルですが、0ではないので、ちょっとだけウエイトを入れています。

あとはいつも通り、IDEやコマンドラインからテストを走らせることで、Greenになることが確認できます。

独自のメッセージオブジェクトを受けとるSubscriberを使う

上のサンプルではSubscriberは MessageListener インタフェースを実装したクラスでした。
受信したメッセージは単なるバイト列でしかないので、受信側が適切にメッセージを読み解く必要があるため、現実的なアプリケーションで使うにはけっこう辛いかもしれません。

Spring Bootでは、バイト列ではなく独自の型でPub/Sub間のメッセージをやり取りするための仕組みも用意されているので、試しました。

まずは準備として、Pub側とSub側でやり取りするメッセージを定義します。

RedisMessage.java
/**
 * メッセージオブジェクト
 */
public class RedisMessage implements Serializable {

    private final String body;

    public RedisMessage(String body) {
        this.body = body
    }

    public String getBody() {
        return body;
    }
}

ここでの注意点として、アプリとRedisの間でやり取りされるメッセージの実体はオブジェクトをシリアライズしたバイト列になるため2、シリアライズ・デシリアライズに失敗しないように、 Serializable インタフェースを実装しておきます。

続いてSubscriberを次のように変更します。

RedisMessageSubscriber.java
public class RedisMessageSubscriber {

    public void handleMessage(RedisMessage message) {
        System.out.println("message received : " + message.getBody());
    }
}

MessageListenerインタフェースを実装するのをやめて、先ほど作った RedisMessage を引数に取るメソッドを1つ定義するだけです。

次に RedisConfig を修正します。
以下のコードを追加し、JavaオブジェクトをRedisで扱うための RedisTempalte をBean登録するようにします。

RedisConfig.java

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setConnectionFactory(connectionFactory);
        return redisTemplate;
    }

最後に、Publish側で以下2点を変更します。

  • StringRedisTemplate ではなく RedisTemplate をインジェクションする。
  • 文字列ではなくRedisMessage をpublishする。
RedisMessagePublisher.java
    private final RedisTemplate<String, Object> redisTemplate;

    private final ChannelTopic topic;

    public RedisMessagePublisher(RedisTemplate<String, Object> redisTemplate,
                                 ChannelTopic topic) {
        this.redisTemplate = redisTemplate;
        this.topic = topic;
    }

    public void publish(String message) {
        redisTemplate.convertAndSend(topic.getTopic(), new RedisMessage(message));
    }

後は同じように動作確認をします。
結果は同じになると思います。

ところで、新しいRedisMessageSubscriber は、クラスを継承したりインタフェースを実装したりしていない素のクラスです。にも関わらず、Redisにメッセージがpublishされたタイミングで、このクラスの handleMessage メソッドが適切に呼ばれています。
これはなぜかというと、 RedisConfig 中で設定している MessageListenerAdapter クラスのおかげです。
MessageListenerAdapter はその名前の通り、 MessageListener のアダプタになっており,
コンストラクタで渡されたオブジェクトによって以下のように、メソッドを呼び出すようになっています

  • MessageListenerインタフェースを実装したクラスの場合・・・MessageListener#onMessageを呼び出す。
  • それ以外の場合・・・handleMessage メソッドを呼び出す。
    • このメソッド名はデフォルト値であり、 MessageListenerAdapter.ORIGINAL_DEFAULT_LISTENER_METHOD で定義されています。
    • 別の名前のメソッドを使いたい場合、コンストラクタの第二引数でメソッド名を渡せばOKです。

その他:Subscribe側の設定パラメータ

Redisに対してSUBSCRIBEコマンドが発行された時(=RedisMessageListenerContainerが初期化された時)から、アプリはRedisに対して接続しっぱなしになります。

接続が切れた場合の再接続を試みるまでの待ち時間など、細かい設定は RedisMessageListenerContainer に対して設定できます。

おわりに

Spring Bootを使ったRedisのPub/Sub機能は、Spring Way (Spring Boot Way)で、シンプルに実現できることがわかりました。


  1. バージョン指定をしていないのは、Spring Bootのdependency managementプラグインで自動で決めているためです 

  2. 設定で変えることはできます。JSONを使うようにしたりとか。 

Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account log in.