LoginSignup
5
8

More than 3 years have passed since last update.

Spring WebFlux + Redis でリアルタイムチャットを作ってみた

Posted at

WebFlux で Server-Sent-Events ができるので、これと Redis を利用してリアルタイムなチャットのようなものを作ってみた。

環境

  • Spring Boot 2.2.0.RC1
  • (Spring Framework 5.2.0.RELEASE)
  • (Spring Data Redis 2.2.0.RELEASE)
  • Redis 5.0.5

Server-Sent-Events を行うには

body に Flux<ServerSentEvent> を設定してやればいいらしい。

public class HelloHandler {

    public RouterFunction<ServerResponse> routes() {
        return RouterFunctions.route()
                .GET("/sse", this::sse)
                .build();
    }

    public Mono<ServerResponse> sse(ServerRequest request) {
        return ServerResponse.ok()
                .body(Flux.interval(Duration.ofMillis(1000)).take(10)
                        .map(l -> ServerSentEvent.builder(l).event("sse").build()), ServerSentEvent.class);
    }
}
@SpringBootApplication
public class HelloWebfluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloWebfluxApplication.class, args);
    }

    @Bean
    public RouterFunction<ServerResponse> routes() {
        return new HelloHandler().routes();
    }
}

curl で localhost:8080\sse にアクセスすると、一定間隔でデータが取得できることが確認できる。

sse.gif

Redis の Pub/Sub を利用する

Flux に対して動的にデータを追加できれば、チャットっぽくなりそう。
自力で実装できるような気もするが、Redis はリアクティブに対応した API を持っているので、この機能を利用して実装してみる。
Redis クライアントの lettuce をそのまま使うのではなく、Spring Data Redis Reactive を利用する。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

チャットのメッセージは以下のクラスで表現する。

public class Message {

    private String name;
    private String text;

    @JsonCreator
    public Message(@JsonProperty("name") String name, @JsonProperty("text") String text) {
        this.name = name;
        this.text = text;
    }

    public String getName() {
        return name;
    }

    public String getText() {
        return text;
    }

}

Jackson を利用して JSON でデータをやり取りするために以下のような設定を行う。

@SpringBootApplication
public class HelloWebfluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloWebfluxApplication.class, args);
    }

    @Bean
    public RouterFunction<ServerResponse> routes(ReactiveRedisConnectionFactory factory) {
        return new RedisHandler(reactiveRedisTemplate(factory)).route();
    }

    @Bean
    public ReactiveRedisTemplate<String, Message> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
        StringRedisSerializer keySerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer<Message> valueSerializer = new Jackson2JsonRedisSerializer<>(Message.class);
        RedisSerializationContext.RedisSerializationContextBuilder<String, Message> builder =
                RedisSerializationContext.newSerializationContext(keySerializer);
        RedisSerializationContext<String, Message> context = builder.value(valueSerializer).build();
        return new ReactiveRedisTemplate<>(factory, context);
    }
}

参考: https://www.baeldung.com/spring-data-redis-reactive

続いて Handler を作成する。

public class RedisHandler {
    private final ReactiveRedisTemplate<String, Message> template;

    public RedisHandler(ReactiveRedisTemplate<String, Message> template) {
        this.template = template;
    }

    public RouterFunction route() {
        return RouterFunctions.route()
                .GET("/redis/sse", this::sse)
                .POST("/redis/post", this::post)
                .build();
    }

    public Mono<ServerResponse> post(ServerRequest request) {
        return request.bodyToMono(Message.class).flatMap(m ->
                template.convertAndSend("messages", m).flatMap(l -> ServerResponse.ok().body(Mono.just("done"), String.class))
        );
    }

    public Mono<ServerResponse> sse(ServerRequest request) {
        return ServerResponse.ok()
                .body(template.listenToChannel("messages").map(ReactiveSubscription.Message::getMessage)
                        .map(o -> ServerSentEvent.builder(o).event("messages").build()), ServerSentEvent.class);
    }
}

ReactiveRedisTemplate クラスの convertAndSend メソッドで publish し、listenToChannel メソッドで subscribe することができる。
listenToChannel の戻り値が Flux<? extends ReactiveSubscription.Message<String, V>> となっているので変換が必要となる。

参考: https://docs.spring.io/spring-data/redis/docs/2.2.0.RELEASE/reference/html/#redis:reactive:pubsub

localhost:8080/redis/sse にアクセスした状態で、localhost:8080/redis/post にデータを送ると、送ったデータが即座に表示されることがわかる。

pubsub.gif

データを保存したい

Redis の Pub/Sub 機能はメッセージをやり取りするためのもので、データを永続化するものではない。
今の状態では IRC のような感じで過去のメッセージを参照することができない。
なので、Redis にメッセージを保存するようにしたい。

subscriber を一つ追加して、メッセージが来たらデータを保存するようにしてみた。

public class RedisHandler {
    private final ReactiveRedisTemplate<String, Message> template;

    public RedisHandler(ReactiveRedisTemplate<String, Message> template) {
        this.template = template;
        this.template.listenToChannel("messages")
                .flatMap(o -> this.template.opsForList().rightPush("messages", o.getMessage()))
                .subscribe();
    }

    // 省略
}

このクラスでやるべき処理じゃないような気もするが、とりあえず実装してみた。
subscribe() メソッドを実行するのを忘れると、データが保存されないので注意。

SSE を返却する処理では、保存されているデータをマージして返却するようにする。

public class RedisHandler {
    // 省略

    public Mono<ServerResponse> sse(ServerRequest request) {
        Flux<Message> messages = this.template.opsForList().range("messages", 0, -1);
        return ServerResponse.ok()
                .body(messages.concatWith(template.listenToChannel("messages").map(ReactiveSubscription.Message::getMessage))
                        .map(o -> ServerSentEvent.builder(o).event("messages").build()), ServerSentEvent.class);
    }
}

concatWith を使って Flux を結合している。これで、保存されたデータも取得できるようになった。

persistence.gif

なお、データの保存に失敗したとしても接続済みのユーザにはメッセージが配信される。

Redis Stream を利用する

ReactiveRedisTemplateopsForStream というメソッドがあり、Redis Stream という存在に気付いた。

Redis Stream は Pub/Sub とは違い、永続化されたデータを持つらしい。
ただし、Pub/Sub はサーバ(Redis)側からクライアントにプッシュするが、Redis Stream はクライアントからのポーリングが必要とのこと。

また、データ構造としてはリストのような形だが、ハッシュのように複数フィールドを保持することができるようだ。
https://qiita.com/Reddie_Japan/items/ee0eeddc49ffbc603cba

ポーリングするのは面倒だが、StreamReciever というクラスがいい感じにポーリングして、無限ストリームにメッセージを emit してくれるらしい。
AutoConfigurer は無いっぽいので自分で Bean 定義する。

なお、Spring Data Redis 2.2.0 から Redis Stream をサポートしている。

@SpringBootApplication
public class HelloWebfluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloWebfluxApplication.class, args);
    }

    @Bean
    public RouterFunction<ServerResponse> routes(ReactiveRedisConnectionFactory factory) {
        return new RedisHandler(reactiveRedisTemplate(factory), streamReceiver(factory)).route();
    }

    @Bean
    public ReactiveRedisTemplate<String, Message> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
        StringRedisSerializer keySerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer<Message> valueSerializer = new Jackson2JsonRedisSerializer<>(Message.class);
        RedisSerializationContext.RedisSerializationContextBuilder<String, Message> builder =
                RedisSerializationContext.newSerializationContext(keySerializer);
        RedisSerializationContext<String, Message> context = builder.value(valueSerializer).build();
        return new ReactiveRedisTemplate<>(factory, context);
    }

    @Bean
    public StreamReceiver streamReceiver(ReactiveRedisConnectionFactory factory) {
        return StreamReceiver.create(factory);
    }

}

以下のようにして利用するらしい。

public class RedisHandler {
    private final ReactiveRedisTemplate<String, Message> template;
    private final StreamReceiver<String, MapRecord<String, String, String>> receiver;

    public RedisHandler(ReactiveRedisTemplate<String, Message> template, StreamReceiver streamReceiver) {
        this.template = template;
        this.receiver = streamReceiver;
    }

    public RouterFunction route() {
        return RouterFunctions.route()
                .GET("/stream", this::stream)
                .POST("/stream", this::streamPost)
                .build();
    }

    public Mono<ServerResponse> streamPost(ServerRequest request) {
        return request.bodyToMono(Message.class).map(m -> {
            Map<String, String> map = new HashMap<>();
            map.put("name", m.getName());
            map.put("text", m.getText());
            return MapRecord.create("message-stream", map);
        }).flatMap(o -> ServerResponse.ok().body(this.template.opsForStream().add(o), RecordId.class));
    }

    public Mono<ServerResponse> stream(ServerRequest request) {
        return ServerResponse.ok().body(this.receiver.receive(StreamOffset.fromStart("message-stream"))
                .map(e -> new Message(e.getValue().get("name"), e.getValue().get("text")))
                .map(m -> ServerSentEvent.builder(m).event("stream").build()), ServerSentEvent.class);
    }
}

Map 形式でデータを保存する MapRecord<S, K, V> と、オブジェクト形式でデータを保存する ObjectRecord<S, V> があるらしいが、後者の使い方がわからなかったので、いちいち MapMessage の変換処理を行っている。ださい。

データの登録は今までと変わらず ReactiveRedisTemplate を利用するが、利用するメソッドが opsForStream().add() に変わっている。
データの取得は StreamReceiverreceive メソッドを利用する。取得できるデータは Map<K,V> なので Message 型への変換を行っている。

これを利用してみると投稿したメッセージは SSE を使ってリアルタイムに配信されるし、一度接続を切って繋ぎなおしても保存されたメッセージを取得できる。

stream.gif

さいごに

R2DBC を利用すれば RDBMS でも似たようなことができるのだろうか・・・?

5
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
8