Akka実践バイブルをゆっくり読み解く 第2章最小のAkkaアプリケーション

Akka実践バイブルをゆっくり読み解く企画の第2章です。

第2章 最小のAkkaアプリケーション

この章から実際にAkkaを使ったコードを見ていく。
サンプルコードが公開されているので、それを利用しながら読み解いていく。
題材としてチケット販売アプリケーションを構築する。

あと、httpieというコマンドラインツールを利用しながらサンプルを動かしていくので、予めhttpieを入れておく。

sudo apt-get install httpie

Akkaを動かすのに必要なライブラリ

必要なライブラリは以下のように定義されている。

build.sbt
libraryDependencies ++= {
  val akkaVersion = "2.5.4"
  val akkaHttpVersion = "10.0.10"
  Seq(
    "com.typesafe.akka" %% "akka-actor"      % akkaVersion,
    "com.typesafe.akka" %% "akka-stream"     % akkaVersion,
    "com.typesafe.akka" %% "akka-http-core"  % akkaHttpVersion,
    "com.typesafe.akka" %% "akka-http"       % akkaHttpVersion,
    "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion,
    "com.typesafe.akka" %% "akka-slf4j"      % akkaVersion,
    "ch.qos.logback"    %  "logback-classic" % "1.1.3",
    "com.typesafe.akka" %% "akka-testkit"    % akkaVersion   % "test",
    "org.scalatest"     %% "scalatest"       % "3.0.0"       % "test"
  )
}

実際にサンプルを動かしてみる

サンプルのREST APIは以下のような内容となっている。

説明 HTTPメソッド URL リクエストボディ ステータスコード
イベントの作成 POST /event/RHCP {"ticket":250} 201 Created
全イベントの取得 GET /events - 200 OK
チケットの購入 POST /events/RHCP/tickets {"tickets:2} 201 Created
イベントのキャンセル DELETE /events/RHCP - 200 OK

まずはアプリケーションを実行する。

sbt run

続けてhttpieを使ってhttpリクエストを投げてみる。

http POST localhost:5000/events/RHCP tickets:=10

ちゃんとレスポンスが返ってくることを確認できた。

HTTP/1.1 201 Created
Content-Length: 28
Content-Type: application/json
Date: Mon, 01 Jan 2018 11:30:51 GMT
Server: GoTicks.com REST API

{
    "name": "RHCP", 
    "tickets": 10
}

他の例も書籍のとおり動かしてみて、チケット売買のイメージを掴めたところでいよいよ本題へ。

アプリケーションでのActorの探求

今回はアクターの操作のうち、以下の2つについて触れていくことになる。

  • 生成
  • 送受信

Actorの生成

アプリケーションの各アクターは、メッセージを通じて互いにやり取りする。
その際のメッセージはアクターのコンパニオンオブジェクト内にcase class/case objectとしてまとめて定義される。

BoxOffice.scala
// BoxOfficeアクタークラス
class BoxOffice(implicit timeout: Timeout) extends Actor {
  ...
}

// BoxOfficeアクタークラスのコンパニオンオブジェクト
object BoxOfficee class CreateEvent(name: String, tickets: Int)
  // イベントを取得するメッセージ
  case class GetEvent(name: String)
  // 全てのイベントを要求するメッセージ
  case object GetEvents
  // イベントを作成するメッセージ
  case class GetTickets(event: String, tickets: Int)
  // イベントをキャンセルするメッセージ
  case class CancelEvent(name: String)

  ...
}
TicketSeller.scala
// TicketSellerアクタークラス
class TicketSeller(event: String) extends Actor {
  ...
}

// TicketSellerアクタークラスのコンパニオンオブジェクト
object TicketSeller {
  // TicketSellerにチケットを追加するメッセージ
  case class Add(tickets: Vector[Ticket])
  // TicketSellerからチケットを購入するメッセージ
  case class Buy(tickets: Int)
  // チケット
  case class Ticket(id: Int)

  ...
}

これらcase class/case objectとして定義されたメッセージたちは全てイミュータブルなメッセージとなる。

チケット販売を行うActor

Actorシステム全体の流れは以下のようになっている。
 RestApi
   ↓生成
 BoxOffice
   ↓生成
 TicketSeller

まずはTicketSellerが呼び出される様子を確認する。

TicketSeller.scala
class TicketSeller(event: String) extends Actor {
  import TicketSeller._

  // チケットのリスト
  var tickets = Vector.empty[Ticket]

  def receive = {
    // Addメッセージを受け取ると、既存のチケットリストに新しいチケットを加える
    case Add(newTickets) => tickets = tickets ++ newTickets
    // リストからチケットを必要枚数分だけ取り出し、チケットが十分であればチケットを含むTicketsメッセージを返す。
    // 足りなければ空のTicketsメッセージを返す。
    case Buy(nrOfTickets) =>
      val entries = tickets.take(nrOfTickets)
      if(entries.size >= nrOfTickets) {
        sender() ! Tickets(event, entries)
        tickets = tickets.drop(nrOfTickets)
      } else sender() ! Tickets(event)
    // GetEventを受けとると、残りのチケット数を含むイベントを返す。
    case GetEvent => sender() ! Some(BoxOffice.Event(event, tickets.size))

    ...
  }
}

receiveメソッドでメッセージを受信しているのがわかる。
このAddとかBuyとかは、コンパニオンオブジェクト内にcase classで定義したものがパターンマッチされている。

それでは、次にBoxOfficeを見てみる。

BoxOffice.scala
class BoxOffice(implicit timeout: Timeout) extends Actor {
  import BoxOffice._
  import context._

  // contextを使ってTicketSellerを生成する。
  // 生成部分を別メソッドとして切り出しているのは、テストを容易にするため。
  def createTicketSeller(name: String) =
    context.actorOf(TicketSeller.props(name), name)

  def receive = {
    // CreateEventメッセージを受け取ると呼び出される
    case CreateEvent(name, tickets) =>

      def create() = {
        // TicketSellerを生成
        val eventTickets = createTicketSeller(name)
        // 指定枚数分のチケットを用意したコレクションを生成
        val newTickets = (1 to tickets).map { ticketId =>
          TicketSeller.Ticket(ticketId)
        }.toVector
        // TicketSellerにAddメッセージを送信する
        eventTickets ! TicketSeller.Add(newTickets)
        sender() ! EventCreated(Event(name, tickets))
      }

      // ここから返す内容が、RestApiのonSuccess内でパターンマッチされる
      context.child(name).fold(create())(_ => sender() ! EventExists)
    ...
}

createTicketSellerメソッド内で使用しているcontextは、AkkaのActorトレイト内で定義されているimplicitな値。

contextのchild(name)はOption[ActorRef]を返してくる。Option#foldは以下のように定義されている。

Option.scala
fold[B](ifEmpty: => B)(f: A => B): B=
  if (isEmpty) ifEmpty else f(this.get)

どうやらchild(name)でnameに指定したチケットが存在すればSome[A]を返してくれるようなので、

  • イベントが存在しない場合:createメソッド内でTicketSellerアクターのAddにメッセージを送信し、EventCreatedを返す
  • イベントが存在する場合:EventExistsを返す

となるらしい。

また、RestApiの子アクターBoxOfficeから孫アクターTicketSellerにメッセージを送信(上記TicketSeller.Addのように)する場合、TicketSellerはAddの結果をBoxOfficeではなく、RestApiに直接返すこととなる。
context.child(name).fold(create())(_ => sender() ! EventExists)はrecieveメソッドの最後に実行されることになるため、結果がそのまま返る。

次にBoxOfficeアクターのGetEventsを見てみる。

BoxOfffice.scala
case GetEvents =>
  import akka.pattern.ask
  import akka.pattern.pipe

  def getEvents = context.children.map { child =>
    // askは非同期操作を実現する
    // GetEvent(name)は自アクターにメッセージを送信している?
    self.ask(GetEvent(child.path.name)).mapTo[Option[Event]]
  }
  // 各TicketSellerへの非同期問い合わせの結果がOption[Event]として返ってくる。
  // それをIterableに格納して渡されるためFuture[Iterable[Option[Event]]]として渡される。
  def convertToEvents(f: Future[Iterable[Option[Event]]]) =
    f.map(_.flatten).map(l=> Events(l.toVector))

  pipe(convertToEvents(Future.sequence(getEvents))) to sender()

pipeは処理完了時に値をFutureで包んでアクターに送信する。この場合はRestApiに送信している(?)。
送信する内容はPipeableFuture[T]という型となるらしい。初見の型なので詳細はよくわからないけど、後の章で詳しく教えてくれるようなのでここは華麗にスルーしてみる。

Actorシステムのルート

ここまでBoxOffice/TicketSellerを見てきたけど、これらアクターを支配しているルートのアクターがRestApi。
 RestApi ←コイツのこと
   ↓生成
 BoxOffice
   ↓生成
 TicketSeller

中身はこんなカンジになっている。

RestApi.scala
class RestApi(system: ActorSystem, timeout: Timeout) extends RestRoutes {
  implicit val requestTimeout = timeout
  implicit def executionContext = system.dispatcher

  def createBoxOffice = system.actorOf(BoxOffice.props, BoxOffice.name)
}

trait RestRoutes extends BoxOfficeApi with EventMarshalling {
  import StatusCodes._

  def routes: Route = eventsRoute ~ eventRoute ~ ticketsRoute

  ...

  def eventRoute =
    pathPrefix("events" / Segment) { event =>
      pathEndOrSingleSlash {
        post {
          // POST /events/:event
          entity(as[EventDescription]) { ed =>
            onSuccess(createEvent(event, ed.tickets)) {
              // BoxOfficeApiのcreateEventを実行し、成功した場合はFutureの中身によってパターンマッチング

              // イベント作成に成功
              case BoxOffice.EventCreated(event) => complete(Created, event)  // 201 Createdを返す

              // イベントが既に存在した
              case BoxOffice.EventExists =>
                val err = Error(s"$event event exists already.")
                complete(BadRequest, err)  // 404 BadRequestを返す
            }
          }
        } ~
        get {
          // GET /events/:event
          ...
        } ~
        delete {
          // DELETE /events/:event
          ...
        }
      }
    }

  ...
}

trait BoxOfficeApi {
  import BoxOffice._

  def createBoxOffice(): ActorRef

  implicit def executionContext: ExecutionContext
  implicit def requestTimeout: Timeout

  lazy val boxOffice = createBoxOffice()

  def createEvent(event: String, nrOfTickets: Int) =
    // BoxOfficeアクターにCreateEventメッセージを送信
    boxOffice.ask(CreateEvent(event, nrOfTickets))
      .mapTo[EventResponse]

   ...
}

リクエストのURLを解析して、pathPrefixが該当するメソッド(eventRouteなど)が呼び出されるようになっている。
実行する処理が決まったら、BoxOfficeApi内に定義されているアクター呼び出しを実行する。そして、返された結果がonSuccess内でパターンマッチされ、結果を生成するようになっている。
ちなみに、リクエストパラーメータ、およびレスポンスボディの処理については、JSONを自動的に相互変換する仕組みが存在するため、それを利用している。

クラウドへのアップ

Herokuへのデプロイ手順が記載されているが、これはまた気が向いたら・・・

個人的まとめ

ここまでで以下の理解が曖昧。今後の章でスッキリするんだろうか?

不明なヤツら モヤっとポイント ★解決★
! メッセージ送信を示すメソッド。askを使う場合との違いがよくわからん。理解曖昧だけど大丈夫なのか???とりあえずActorRefに定義されているのは確認した! 補足は後述
sender() 送り元、つまり親アクターをしてしているっぽい。ただし、親というのはActorシステムのルートのアクターを示すので、BoxOfficeから見てもTicketSellerから見ても、親はRestApi。 補足は後述
child 自分が生成した子アクターを示しているっぽい。BoxOfficeアクターから見たTicketSellerアクターたちがchildで、Stringで渡される文字列をキーに識別している?

httpie入れたは良いけど、あんまり活躍しなかったなー。

2018.01.03 追記
@j_kugiya さんにコメント頂いて、モヤっとが晴れました。

不明だったヤツら 解説
! メッセージ送信を示すメソッド。tellメソッドと同意。!/tellがUnit型なのに大して、askを使うとFuture[Any]な戻り値を得ることができる。
sender() 送り元のアクターを示すActorRef。

今後の展開の予告

今回、チケット管理システムを扱ったが、チケットをメモリー上で保持するだけでは実運用には耐えられない。チケットを記憶するための永続的なストレージが必要となる。
この問題については今後解決の方法を教えてくれるようなので期待しときます。