PlayFrameworkでチャットアプリを実装してみた話。WebSocket、Actor、Akka-Streamなど未知なものを多く使ったので、間違いや助言があれば指摘お願いします。ソースはGitHubにあります。
目的
- チャット機能の実現
- Scalaに慣れる
- WebSocketを知る
チャット機能は以下のイメージ。ブラウザがメッセージを送信すると、WebSocket接続しているブラウザ全てでそのメッセージを共有することができる。ただそれだけ。
環境
- PlayFramework2.5
- Scala2.11
参考
- Play 2.5.x documentation
- Akka Documentation
- south37さんの記事
- kodukiさんの記事
- WebSocket のチャットサーバを Akka Streams の MergeHub と BroadcastHub で実装する
WebSocketを使う
チャットアプリの実現にはWebSocketを使うのが良さそう。WebSocketとは、ブラウザとサーバの双方向通信用のプロトコル(通信規格)のことをいう。通常ブラウザがサーバから何かしらの情報を取得したいとき、サーバにリクエストし返却されたレスポンスから情報を得る。つまりサーバの情報に更新があっても、ブラウザはリクエストしない限りサーバに更新があったことを知らない仕組み。しかしWebSocketを使用すると、リクエストしなくでもサーバ発信でブラウザに情報を送信することができるという。
公式サイトを見ると
PlayでWebSocketを使用するにはFlowを使用すると、Playがよしなに動いてくれるようです。とりあえず公式ドキュメントのコードをそのままコピペしてみました。以下が公式のコードです
import play.api.libs.json.JsValue
import play.api.mvc._
import play.api.libs.streams._
class Controller4 @Inject() (implicit system: ActorSystem, materializer: Materializer) {
import akka.actor._
class MyWebSocketActor(out: ActorRef) extends Actor {
import play.api.libs.json.JsValue
def receive = {
case msg: JsValue =>
out ! msg
}
}
object MyWebSocketActor {
def props(out: ActorRef) = Props(new MyWebSocketActor(out))
}
def socket = WebSocket.accept[JsValue, JsValue] { request =>
ActorFlow.actorRef(out => MyWebSocketActor.props(out))
}
}
ブラウザから送信されたメッセージはActorを継承したMyWebSocketActorのreceiveで受け取り、コネクションしているブラウザにメッセージを送信するようです。こんなに簡単にチャットが実装できるのかと感動し、動作をさせてみました。結論から言うと、期待した動作をしてくれませんでした。メッセージ送信したブラウザにはメッセージが送信されますが、他のブラウザにはメッセージが送信されませんでした。Actorの知識が乏しいのでハッキリとした理由はわかりませんが、ある記事によるとWebSocketの接続口を動的に増減させるには別の仕組みが必要だそうです
Akka-Stream を使う
参考サイトによると、Akka-Streamを使用するとチャット機能の実現ができるという。Akka-StreamとはAkkaが提供するライブラリで、Akkaの得意な並行、分散などの機能を利用してストリーム処理を行うことができる割と新しいライブラリのよう。ストリームやアクターなどに詳しくはないけれど、チャット機能実現のために使うことにした。
Akka-StreamではSource、Flow、Sink などといった部品を組み合わせてGraphというものを作りStream処理を定義するのが基本だそうです。
絵に描けばわかった気にはなります。Sourceからデータを流しFlowでデータ処理、Sinkに処理後のデータを溜める。イメージはシンプルなものですが、実際は用途に合わせて複雑なGraphが出来上がると思います。処理部分にはFlow以外にもさまざまな形の部品があるようです。今回は送信されたメッセージには特に加工しないので、SourceとSinkでGraphを作ります。
package chat
import java.util.concurrent.atomic.AtomicReference
import javax.inject.{ Inject, Singleton }
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ BroadcastHub, Flow, Keep, MergeHub, Sink }
import akka.stream.{ KillSwitches, Materializer, UniqueKillSwitch }
import akka.stream.scaladsl.{ Sink, Source }
import akka.NotUsed
import scala.collection.mutable.{ Map => MutableMap }
import scala.concurrent.duration._
case class Room(roomId: String, bus: Flow[Message, Message, UniqueKillSwitch])
@Singleton
class RoomClient @Inject()(implicit val materializer: Materializer, implicit val system: ActorSystem) {
def chatRoom(roomId: String): Room = synchronized {
RoomClient.roomPool.get.get(roomId) match {
case Some(room) =>
room
case None =>
val room = create(roomId)
RoomClient.roomPool.get() += (roomId -> room)
room
}
}
private def create(roomId: String): Room = {
val (sink, source) =
MergeHub.source[Message](perProducerBufferSize = 16)
.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both)
.run()
source.runWith(Sink.ignore)
val bus = Flow.fromSinkAndSource(sink, source)
.joinMat(KillSwitches.singleBidi[Message, Message])(Keep.right)
.backpressureTimeout(3.seconds)
Room(roomId, bus)
}
}
object RoomClient {
val roomPool = new AtomicReference[MutableMap[String, Room]](MutableMap[String, Room]())
}
入力された部屋番号(roomId)がプールされていない場合、Roomを作成します。RoomはSourceとSinkをまとめたFlowを持っています。
呼び出し元のコントローラです。
package controllers
import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.{ Flow, Keep, Source }
import javax.inject._
import play.api._
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.libs.streams.ActorFlow
import play.api.mvc._
import play.api.libs.json.JsValue
import chat._
import chat.actors._
import play.api.libs.iteratee.Concurrent
@Singleton
class ChatController @Inject() (implicit system: ActorSystem, materializer: Materializer, roomClient: RoomClient) extends Controller {
def get = Action { implicit request =>
Ok(views.html.chat())
}
def ws(roomId: String) = WebSocket.accept[JsValue, JsValue] { request =>
val room = roomClient.chatRoom(roomId)
val userInput = ActorFlow.actorRef[JsValue, Message](RequestActor.props)
val userOutPut = ActorFlow.actorRef[Message, JsValue](ResponseActor.props)
userInput.viaMat(room.bus)(Keep.right).viaMat(userOutPut)(Keep.right)
}
}
ブラウザとはJSONでやりとりします。RequestActorはJSONをMessageに、ResponseActorはMessageをJSONにします。RequestActorとResponseActor間はMessageを使ってRoomを経由しています。この辺は参考サイトを見て作っているのでザックリとしか理解できていませんが、プールされているRoomを接続しているユーザで共有するのでメッセージも共有できるものだと思っています。この辺りの理解も課題です。
これで動作させると期待通りの動作をしてくれました。
画面側のHTMLやJavaScriptは載せませんが、GitHubにはあります。
最後に
WebSocketでのサーバとのやりとりは新鮮で楽しめました。WebSocketの学習が目的でしたが、気付けばAkkaの学習に時間を割いていました。Akka-Streamに関しては良いことを書いている記事が多くあるようなので、今後のためにもAkka-Streamも含めAkkaの知識を深めたいと思います。