0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

akka-http + Scala.jsでSSEする

Posted at

こちらの記事で紹介した、対戦型のオセロ実装ですが、記事中にAkkaやSSEの実装の話が全く出て来なかったので、少し紹介したいと思います!

SSEとは

Server-Sent Eventの略です。
技術的な詳細はMozillaのサイトの解説が詳しいです。

サーバ側から非同期にメッセージを送信出来る仕組みで、通常のリクエスト/レスポンスだと、クライアント側が起点となってリクエストしますが、SSEを使うとサーバ側が起点となってイベントを送信できます。

よく比較される技術としてWebSocketがありますが、こちらはクライアント/サーバ間で双方向の通信が可能となっています。SSEはあくまでサーバからクライアント方向にのみ動作します。

akka-httpでSSEする

SSEはHTTPプロトコル上で動作しますので、サーバ側実装も簡単です。
akka-httpでもakka-streamと組み合わせて流れ続けるストリームを作り、それをクライアント側に接続することで実現できます。公式ドキュメントに実装例があります。

以下は、実際にオセロの実装で使った実装のルーティング部分の抜粋です。
events/以下に来たリクエストをSSEとして扱っています。

        path("events" / IntNumber) { id =>
          get {
            complete {
              Source
                .actorRef[GameEvent](10, OverflowStrategy.dropBuffer)
                .map(gameEvent => ServerSentEvent(gameEvent.asJson.noSpaces))
                .keepAlive(1.second, () => ServerSentEvent.heartbeat)
                .mapMaterializedValue { actorRef =>
                  subscribeGameEvent(actorRef, GameId(id))
                }
            }
          }
        },

Source.actorRef[T](...)で、ActorRefをマテリアライズするSourceを作成します。このActorRefが指す実体のActorは、T型のメッセージを受け取り、下流に流します。メッセージはバッファリングされますが、backpressureは効かないことに注意してください。つまり、下流が詰まっているのにメッセージを送り続けると、溢れます。
詳細はドキュメントを確認してください。
https://doc.akka.io/docs/akka/current/stream/operators/Source/actorRef.html

リクエストが来るたびに上記の新しいActorが生成され、そのActorRefが上流側のマテリアライズ対象となり、その下流はSSEとしてクライアントにつながっているわけです。

では、どうやって、このActorにメッセージを送るのか?
メッセージを送信するにはActorRefが必要ですが、手に入れるには、マテリアライズする必要があります。しかし、akka-httpが要求するのはSource型。sinkしてマテリアライズしてしまうと閉じたGraphになってしまい、akka-http側でsinkすることが出来ません。

そこで使うのが、mapMaterializedValueメソッドです。このメソッドを使えば、sinkすることなく、マテリアライズされる値を受け取る無名関数を差し込むことが出来ます。
前述のコードでは、このメソッドを利用して手に入れたActorRefsubscribeGameEventメソッドに引き渡し、イベントを購読させています。

subscribeGameEventは、こんな感じです。

  def subscribeGameEvent(subscriber: ActorRef, gameId: GameId): Unit =
    gameEventActors.get(gameId).foreach(_ ! Subscribe(subscriber, gameId))

gameEventActorsは、各Game毎のイベントを発行するActorActorRefのMapで、GameIdがキーになっています。
このActorSubscribeメッセージを送る(メンバに購読側のActorRefを持っている)ことで、指定したActorRefにそのゲームのイベントが流れてくるようになっています。

GameActorのライフサイクルの課題がありますが、これについては別の機会があれば紹介します。

Scala.jsでSSEして、scalajs-reactに結合する

SSE部分については、あまり特別なことはありません。前述のMozillaのドキュメントにもある通り、ブラウザ側に標準のAPIがあり、Scala.js側でもscalajs-domの中にfacadeが組み込まれているので、サクッと使えます。
scalajs-react側に引き渡すために使いやすいよう、EventSourceを軽くラップしています。

trait EventSourceConnection {
  val baseUrl: String
  private var eventSource: Option[EventSource] = None
  def open(path: String, callback: MessageEvent => Unit): EventSourceConnection = eventSource match {
    case None =>
      val es = new EventSource(baseUrl + path)
      es.onmessage = callback(_)
      eventSource = Some(es)
      this
    case Some(_) => this
  }
  def close: EventSourceConnection = eventSource match {
    case Some(es) =>
      es.close()
      eventSource = None
      this
    case _ => this
  }
}

ラップした理由としては、EventSourceのインスタンスの生成/管理を、下位のコンポーネント(ここではGameComponent)で行いたくないからです。外部接続やファイルハンドルなど、初期化から破棄までのライフサイクルを持ったリソース系は、ともすれば閉じ忘れや複数オープンなど、潜在的なリークや不具合の温床になりやすく、願わくば上位で保持して下位に貸与するような管理をしたいです。
ということで、今回はこのEventSourceConnectionのインスタンスをRootのコンポーネントにprops経由で渡しています。

実際に使用するのはゲーム内なので、このインスタンスはそのままRootを経由してGameComponentに受け渡されます。接続はコンポーネント初期化時にオープンします。このコンポーネントが使用されるときには必ず特定のゲーム、つまりGameIdが確定しているので、エンドポイントも決まっているからです。

メッセージを受け取り、ハンドリングしている処理はこんな感じです。

    def handleMessageEvent(messageEvent: MessageEvent): Unit = {
      import io.circe.generic.auto._
      import io.circe.parser._
      decode[GameEvent](messageEvent.data.toString).foreach {
        e => $.props.flatMap(p => p.handler(ReceiveEvent(p.participantId, e))).runNow()
      }
    }

SSEから流れてくるメッセージはプレーンなテキストですので、任意のライブラリを使ってシリアライズ/デシリアライズします。
今回はcirceを使いました。

コンポーネント内部でのハンドリングなので、BackendScopeが使えます。
この辺の詳細は、公式ドキュメントで確認してください。

オセロの実装方針として、状態に変化を与えるような処理は全てActionとして、上の階層で網羅的に扱うので、ここでも受け取ったSSEのイベントをReceiveEventという専用のActionに包んで上位に流します。
Actionをハンドリングするためのhandlerについては、冒頭でも紹介した記事で触れているので良ければ読んでみてください。

ここで、handlerの取得がBackendScopeを経由したpropsから行われていることに注意してください。値の書き換えだけでなく、値の取得も副作用です。そしてそれは、scalajs-reactでは、CallbackToという、専用の型に包まれます。(Callbackについてはこちらを参照してください)
このCallbackToにはmap/flatMapをはじめとする、いわゆるIOっぽいコンビネータが生えているので、これらを使って、Actionの処理を包んだCallback(=CallbackTo[Unit]) を作ります。
で、ここも注意ですが、IO同様、包んだだけでは実行されないので、runNow()を明示的に呼んでいます。

これで晴れてSSE経由で受け取ったイベントをscalajs-react内で取り込んでハンドリングすることが出来ました。

あとがき

サーバサイドにAkka(akka-http/akka-stream)、クライアントサイドにScala.js(scalajs-react)を使ってSSEする実装をざっくりと紹介しました。
Akkaはアクターモデル、scalajs-reactはFP寄りなアプローチで、それぞれ性格は違いますが、それぞれ互いに得意な分野を生かしてゲームのリアルタイムなイベントデータの受信という目的を実現しています。

必要な要件に応じて適材適所で技術やパラダイムを選択することが出来るのがScalaの魅力であり、また難しいところでもあると思います。この記事が、少しでもそういった技術選択の参考になれば幸いです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?