こちらの記事で紹介した、対戦型のオセロ実装ですが、記事中にAkkaやSSEの実装の話が全く出て来なかったので、少し紹介したいと思います!
SSEとは
Server-Sent Event
の略です。
技術的な詳細はMozillaのサイトの解説が詳しいです。
サーバ側から非同期にメッセージを送信出来る仕組みで、通常のリクエスト/レスポンスだと、クライアント側が起点となってリクエストしますが、SSEを使うとサーバ側が起点となってイベントを送信できます。
よく比較される技術としてWebSocketがありますが、こちらはクライアント/サーバ間で双方向の通信が可能となっています。SSEはあくまでサーバからクライアント方向にのみ動作します。
akka-httpでSSEする
SSEはHTTPプロトコル上で動作しますので、サーバ側実装も簡単です。
akka-http
でもakka-stream
と組み合わせて流れ続けるストリームを作り、それをクライアント側に接続することで実現できます。公式ドキュメントに実装例があります。
以下は、実際にオセロの実装で使った実装のルーティング部分の抜粋です。
events/
以下に来たリクエストをSSEとして扱っています。
path("events" / IntNumber) { id =>
get {
complete {
Source
.actorRef[GameEvent](10, OverflowStrategy.dropBuffer)
.map(gameEvent => ServerSentEvent(gameEvent.asJson.noSpaces))
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
.mapMaterializedValue { actorRef =>
subscribeGameEvent(actorRef, GameId(id))
}
}
}
},
Source.actorRef[T](...)
で、ActorRef
をマテリアライズするSourceを作成します。このActorRef
が指す実体のActor
は、T型のメッセージを受け取り、下流に流します。メッセージはバッファリングされますが、backpressureは効かないことに注意してください。つまり、下流が詰まっているのにメッセージを送り続けると、溢れます。
詳細はドキュメントを確認してください。
https://doc.akka.io/docs/akka/current/stream/operators/Source/actorRef.html
リクエストが来るたびに上記の新しいActor
が生成され、そのActorRef
が上流側のマテリアライズ対象となり、その下流はSSEとしてクライアントにつながっているわけです。
では、どうやって、このActor
にメッセージを送るのか?
メッセージを送信するにはActorRefが必要ですが、手に入れるには、マテリアライズする必要があります。しかし、akka-http
が要求するのはSource型。sinkしてマテリアライズしてしまうと閉じたGraphになってしまい、akka-http
側でsinkすることが出来ません。
そこで使うのが、mapMaterializedValue
メソッドです。このメソッドを使えば、sinkすることなく、マテリアライズされる値を受け取る無名関数を差し込むことが出来ます。
前述のコードでは、このメソッドを利用して手に入れたActorRef
をsubscribeGameEvent
メソッドに引き渡し、イベントを購読させています。
subscribeGameEvent
は、こんな感じです。
def subscribeGameEvent(subscriber: ActorRef, gameId: GameId): Unit =
gameEventActors.get(gameId).foreach(_ ! Subscribe(subscriber, gameId))
gameEventActors
は、各Game毎のイベントを発行するActor
のActorRef
のMapで、GameId
がキーになっています。
このActor
にSubscribe
メッセージを送る(メンバに購読側のActorRef
を持っている)ことで、指定したActorRef
にそのゲームのイベントが流れてくるようになっています。
Game
やActor
のライフサイクルの課題がありますが、これについては別の機会があれば紹介します。
Scala.jsでSSEして、scalajs-reactに結合する
SSE部分については、あまり特別なことはありません。前述のMozillaのドキュメントにもある通り、ブラウザ側に標準のAPIがあり、Scala.js側でもscalajs-domの中にfacadeが組み込まれているので、サクッと使えます。
scalajs-react
側に引き渡すために使いやすいよう、EventSource
を軽くラップしています。
trait EventSourceConnection {
val baseUrl: String
private var eventSource: Option[EventSource] = None
def open(path: String, callback: MessageEvent => Unit): EventSourceConnection = eventSource match {
case None =>
val es = new EventSource(baseUrl + path)
es.onmessage = callback(_)
eventSource = Some(es)
this
case Some(_) => this
}
def close: EventSourceConnection = eventSource match {
case Some(es) =>
es.close()
eventSource = None
this
case _ => this
}
}
ラップした理由としては、EventSource
のインスタンスの生成/管理を、下位のコンポーネント(ここではGameComponent
)で行いたくないからです。外部接続やファイルハンドルなど、初期化から破棄までのライフサイクルを持ったリソース系は、ともすれば閉じ忘れや複数オープンなど、潜在的なリークや不具合の温床になりやすく、願わくば上位で保持して下位に貸与するような管理をしたいです。
ということで、今回はこのEventSourceConnection
のインスタンスをRootのコンポーネントにprops
経由で渡しています。
実際に使用するのはゲーム内なので、このインスタンスはそのままRootを経由してGameComponent
に受け渡されます。接続はコンポーネント初期化時にオープンします。このコンポーネントが使用されるときには必ず特定のゲーム、つまりGameId
が確定しているので、エンドポイントも決まっているからです。
メッセージを受け取り、ハンドリングしている処理はこんな感じです。
def handleMessageEvent(messageEvent: MessageEvent): Unit = {
import io.circe.generic.auto._
import io.circe.parser._
decode[GameEvent](messageEvent.data.toString).foreach {
e => $.props.flatMap(p => p.handler(ReceiveEvent(p.participantId, e))).runNow()
}
}
SSEから流れてくるメッセージはプレーンなテキストですので、任意のライブラリを使ってシリアライズ/デシリアライズします。
今回はcirceを使いました。
コンポーネント内部でのハンドリングなので、BackendScope
が使えます。
この辺の詳細は、公式ドキュメントで確認してください。
オセロの実装方針として、状態に変化を与えるような処理は全てAction
として、上の階層で網羅的に扱うので、ここでも受け取ったSSEのイベントをReceiveEvent
という専用のAction
に包んで上位に流します。
Action
をハンドリングするためのhandlerについては、冒頭でも紹介した記事で触れているので良ければ読んでみてください。
ここで、handlerの取得がBackendScope
を経由したprops
から行われていることに注意してください。値の書き換えだけでなく、値の取得も副作用です。そしてそれは、scalajs-react
では、CallbackTo
という、専用の型に包まれます。(Callbackについてはこちらを参照してください)
このCallbackTo
にはmap/flatMapをはじめとする、いわゆるIOっぽいコンビネータが生えているので、これらを使って、Action
の処理を包んだCallback
(=CallbackTo[Unit]
) を作ります。
で、ここも注意ですが、IO同様、包んだだけでは実行されないので、runNow()
を明示的に呼んでいます。
これで晴れてSSE経由で受け取ったイベントをscalajs-react内で取り込んでハンドリングすることが出来ました。
あとがき
サーバサイドにAkka(akka-http/akka-stream)、クライアントサイドにScala.js(scalajs-react)を使ってSSEする実装をざっくりと紹介しました。
Akkaはアクターモデル、scalajs-reactはFP寄りなアプローチで、それぞれ性格は違いますが、それぞれ互いに得意な分野を生かしてゲームのリアルタイムなイベントデータの受信という目的を実現しています。
必要な要件に応じて適材適所で技術やパラダイムを選択することが出来るのがScalaの魅力であり、また難しいところでもあると思います。この記事が、少しでもそういった技術選択の参考になれば幸いです。