WebFlux で Server-Sent-Events ができるので、これと Redis を利用してリアルタイムなチャットのようなものを作ってみた。
環境
- Spring Boot 2.2.0.RC1
- (Spring Framework 5.2.0.RELEASE)
- (Spring Data Redis 2.2.0.RELEASE)
- Redis 5.0.5
Server-Sent-Events を行うには
body に Flux<ServerSentEvent>
を設定してやればいいらしい。
public class HelloHandler {
public RouterFunction<ServerResponse> routes() {
return RouterFunctions.route()
.GET("/sse", this::sse)
.build();
}
public Mono<ServerResponse> sse(ServerRequest request) {
return ServerResponse.ok()
.body(Flux.interval(Duration.ofMillis(1000)).take(10)
.map(l -> ServerSentEvent.builder(l).event("sse").build()), ServerSentEvent.class);
}
}
@SpringBootApplication
public class HelloWebfluxApplication {
public static void main(String[] args) {
SpringApplication.run(HelloWebfluxApplication.class, args);
}
@Bean
public RouterFunction<ServerResponse> routes() {
return new HelloHandler().routes();
}
}
curl で localhost:8080\sse
にアクセスすると、一定間隔でデータが取得できることが確認できる。
Redis の Pub/Sub を利用する
Flux
に対して動的にデータを追加できれば、チャットっぽくなりそう。
自力で実装できるような気もするが、Redis はリアクティブに対応した API を持っているので、この機能を利用して実装してみる。
Redis クライアントの lettuce をそのまま使うのではなく、Spring Data Redis Reactive を利用する。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
チャットのメッセージは以下のクラスで表現する。
public class Message {
private String name;
private String text;
@JsonCreator
public Message(@JsonProperty("name") String name, @JsonProperty("text") String text) {
this.name = name;
this.text = text;
}
public String getName() {
return name;
}
public String getText() {
return text;
}
}
Jackson を利用して JSON でデータをやり取りするために以下のような設定を行う。
@SpringBootApplication
public class HelloWebfluxApplication {
public static void main(String[] args) {
SpringApplication.run(HelloWebfluxApplication.class, args);
}
@Bean
public RouterFunction<ServerResponse> routes(ReactiveRedisConnectionFactory factory) {
return new RedisHandler(reactiveRedisTemplate(factory)).route();
}
@Bean
public ReactiveRedisTemplate<String, Message> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
StringRedisSerializer keySerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer<Message> valueSerializer = new Jackson2JsonRedisSerializer<>(Message.class);
RedisSerializationContext.RedisSerializationContextBuilder<String, Message> builder =
RedisSerializationContext.newSerializationContext(keySerializer);
RedisSerializationContext<String, Message> context = builder.value(valueSerializer).build();
return new ReactiveRedisTemplate<>(factory, context);
}
}
参考: https://www.baeldung.com/spring-data-redis-reactive
続いて Handler を作成する。
public class RedisHandler {
private final ReactiveRedisTemplate<String, Message> template;
public RedisHandler(ReactiveRedisTemplate<String, Message> template) {
this.template = template;
}
public RouterFunction route() {
return RouterFunctions.route()
.GET("/redis/sse", this::sse)
.POST("/redis/post", this::post)
.build();
}
public Mono<ServerResponse> post(ServerRequest request) {
return request.bodyToMono(Message.class).flatMap(m ->
template.convertAndSend("messages", m).flatMap(l -> ServerResponse.ok().body(Mono.just("done"), String.class))
);
}
public Mono<ServerResponse> sse(ServerRequest request) {
return ServerResponse.ok()
.body(template.listenToChannel("messages").map(ReactiveSubscription.Message::getMessage)
.map(o -> ServerSentEvent.builder(o).event("messages").build()), ServerSentEvent.class);
}
}
ReactiveRedisTemplate
クラスの convertAndSend
メソッドで publish し、listenToChannel
メソッドで subscribe することができる。
listenToChannel
の戻り値が Flux<? extends ReactiveSubscription.Message<String, V>>
となっているので変換が必要となる。
参考: https://docs.spring.io/spring-data/redis/docs/2.2.0.RELEASE/reference/html/#redis:reactive:pubsub
localhost:8080/redis/sse にアクセスした状態で、localhost:8080/redis/post にデータを送ると、送ったデータが即座に表示されることがわかる。
データを保存したい
Redis の Pub/Sub 機能はメッセージをやり取りするためのもので、データを永続化するものではない。
今の状態では IRC のような感じで過去のメッセージを参照することができない。
なので、Redis にメッセージを保存するようにしたい。
subscriber を一つ追加して、メッセージが来たらデータを保存するようにしてみた。
public class RedisHandler {
private final ReactiveRedisTemplate<String, Message> template;
public RedisHandler(ReactiveRedisTemplate<String, Message> template) {
this.template = template;
this.template.listenToChannel("messages")
.flatMap(o -> this.template.opsForList().rightPush("messages", o.getMessage()))
.subscribe();
}
// 省略
}
このクラスでやるべき処理じゃないような気もするが、とりあえず実装してみた。
subscribe()
メソッドを実行するのを忘れると、データが保存されないので注意。
SSE を返却する処理では、保存されているデータをマージして返却するようにする。
public class RedisHandler {
// 省略
public Mono<ServerResponse> sse(ServerRequest request) {
Flux<Message> messages = this.template.opsForList().range("messages", 0, -1);
return ServerResponse.ok()
.body(messages.concatWith(template.listenToChannel("messages").map(ReactiveSubscription.Message::getMessage))
.map(o -> ServerSentEvent.builder(o).event("messages").build()), ServerSentEvent.class);
}
}
concatWith
を使って Flux
を結合している。これで、保存されたデータも取得できるようになった。
なお、データの保存に失敗したとしても接続済みのユーザにはメッセージが配信される。
Redis Stream を利用する
ReactiveRedisTemplate
に opsForStream
というメソッドがあり、Redis Stream という存在に気付いた。
Redis Stream は Pub/Sub とは違い、永続化されたデータを持つらしい。
ただし、Pub/Sub はサーバ(Redis)側からクライアントにプッシュするが、Redis Stream はクライアントからのポーリングが必要とのこと。
また、データ構造としてはリストのような形だが、ハッシュのように複数フィールドを保持することができるようだ。
https://qiita.com/Reddie_Japan/items/ee0eeddc49ffbc603cba
ポーリングするのは面倒だが、StreamReciever
というクラスがいい感じにポーリングして、無限ストリームにメッセージを emit してくれるらしい。
AutoConfigurer は無いっぽいので自分で Bean 定義する。
なお、Spring Data Redis 2.2.0 から Redis Stream をサポートしている。
@SpringBootApplication
public class HelloWebfluxApplication {
public static void main(String[] args) {
SpringApplication.run(HelloWebfluxApplication.class, args);
}
@Bean
public RouterFunction<ServerResponse> routes(ReactiveRedisConnectionFactory factory) {
return new RedisHandler(reactiveRedisTemplate(factory), streamReceiver(factory)).route();
}
@Bean
public ReactiveRedisTemplate<String, Message> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
StringRedisSerializer keySerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer<Message> valueSerializer = new Jackson2JsonRedisSerializer<>(Message.class);
RedisSerializationContext.RedisSerializationContextBuilder<String, Message> builder =
RedisSerializationContext.newSerializationContext(keySerializer);
RedisSerializationContext<String, Message> context = builder.value(valueSerializer).build();
return new ReactiveRedisTemplate<>(factory, context);
}
@Bean
public StreamReceiver streamReceiver(ReactiveRedisConnectionFactory factory) {
return StreamReceiver.create(factory);
}
}
以下のようにして利用するらしい。
public class RedisHandler {
private final ReactiveRedisTemplate<String, Message> template;
private final StreamReceiver<String, MapRecord<String, String, String>> receiver;
public RedisHandler(ReactiveRedisTemplate<String, Message> template, StreamReceiver streamReceiver) {
this.template = template;
this.receiver = streamReceiver;
}
public RouterFunction route() {
return RouterFunctions.route()
.GET("/stream", this::stream)
.POST("/stream", this::streamPost)
.build();
}
public Mono<ServerResponse> streamPost(ServerRequest request) {
return request.bodyToMono(Message.class).map(m -> {
Map<String, String> map = new HashMap<>();
map.put("name", m.getName());
map.put("text", m.getText());
return MapRecord.create("message-stream", map);
}).flatMap(o -> ServerResponse.ok().body(this.template.opsForStream().add(o), RecordId.class));
}
public Mono<ServerResponse> stream(ServerRequest request) {
return ServerResponse.ok().body(this.receiver.receive(StreamOffset.fromStart("message-stream"))
.map(e -> new Message(e.getValue().get("name"), e.getValue().get("text")))
.map(m -> ServerSentEvent.builder(m).event("stream").build()), ServerSentEvent.class);
}
}
Map
形式でデータを保存する MapRecord<S, K, V>
と、オブジェクト形式でデータを保存する ObjectRecord<S, V>
があるらしいが、後者の使い方がわからなかったので、いちいち Map
と Message
の変換処理を行っている。ださい。
データの登録は今までと変わらず ReactiveRedisTemplate
を利用するが、利用するメソッドが opsForStream().add()
に変わっている。
データの取得は StreamReceiver
の receive
メソッドを利用する。取得できるデータは Map<K,V>
なので Message
型への変換を行っている。
これを利用してみると投稿したメッセージは SSE を使ってリアルタイムに配信されるし、一度接続を切って繋ぎなおしても保存されたメッセージを取得できる。
さいごに
R2DBC を利用すれば RDBMS でも似たようなことができるのだろうか・・・?