Scala
Akka
Akka-Stream

Akkaことはじめ(2)

Akkaを使う上で理解しておくとよいライブラリ、コンポーネントについて紹介します。いろいろあると思いますが、Typesafe Configライブラリ、ルーティング、ストリーミング処理、デプロイツールなどを解説します。

Typesafe Config

Scalaでよく使われるコンフィグ設定を読み込むためのライブラリ。src/resourcesフォルダなどのクラスパスに含まれるコンフィグファイルを自動的に読み込んでくれます。公式サイトはこちら

インポートのためには、build.sbtに以下を入力します。ただし、akka-actorのパッケージにも含まれているようで、その場合は以下の設定は必須ではなようです。

build.sbt
libraryDependencies += "com.typesafe" % "config" % "1.3.2"

コンフィグファイルの名称は以下にしておく必要があります。起動パラメータで変更できますが、その必要はないでしょう。公式サイトによると、ライブラリやフレームワークの設定はrefarence.conf、アプリケーション固有の設定はapplication.confに記載するべきとのことです。

  • application.conf (HOCON形式の設定プロパティ)
  • application.json (JSON形式の設定プロパティ)
  • application.properties (Javaのプロパティファイル形式の設定プロパティ)
  • reference.conf (HOCON形式の設定プロパティ)

HOCONはtypesafe社が提唱・実装している形式のファイルです。JSONでもよいですが、可読性が高いのでこちらを使うとよいと思います。

例えば、以下のように記載します。適当ですが、MQTTの設定ファイルだと考えてください。ちなみに、url=${?HOSTNAME}などと書くことによって、該当する環境変数を取得することもできます。この書き方だと、その環境変数が存在する場合のみオーバーライドされます。

src/resources/application.conf
mqtt {
  url = "localhost"
  port = 1883
  clientId = "test"
  useTLS = false
}

アプリ側からは以下のように取得します。

main.scala
  val config = ConfigFactory.load()
  val url = config.getString("mqtt.url")
  val port = config.getInt("mqtt.port")
  val tls = config.getBoolean("mqtt.useTLS")

AkkaでのTypesafe Config

Akkaを利用する場合は、自動的にConfigFactory.load()が使われてアクターシステムが生成されます。独自のconfigをアクターシステムの引数として設定することも可能です。Loggerの設定や、次に説明するルータの設定などはTypesafe Configを使って設定することが推奨されています。以下のコードは上記と等価です。

main.scala
  val system = ActorSystem("testSystem")
  val config = system.settings.config
  val url = config.getString("mqtt.url")
  val port = config.getInt("mqtt.port")
  val tls = config.getBoolean("mqtt.useTLS")

ルーティング

Akkaでは、Actorのスケールが容易にできるようになっています。同じタスクのActorがいくつもあった際に、それぞれのメールボックスへの適切にメッセージを振り分ける仕組みがルーティングです。ルーティングを行う主体であるActorであるルーターで、振り分ける先をルーティーと呼びます。そしてどのように振り分けるか決定するのがルーターロジックです。Akkaには利用可能なルーターが多くありますが、一般的なものは以下です。ちなみにPoolというのは、ルーターがルーティーを管理するという種別です。システムがルーティーを管理するGroupという種別もあります。

  • RoundRobinPool : 生成したルーティーに順番にタスクを振り分ける
  • BalancingPool : アイドル状態のルーティーに順次配信する
  • ConsistentHashingPool : コンシステントハッシュを使用してルーティーを選択

一番シンプルなプールルーターの生成方法は以下です。

RouterExample.scala
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import akka.routing.BalancingPool
import scala.io.StdIn

class PrintMyActorRefActor extends Actor with ActorLogging {
  override def receive: Receive = {
    case "hello" 
      log.info("Hello world")
  }
}


object RouterExample extends App {

  val system = ActorSystem("testSystem")
  val router = system.actorOf(BalancingPool(5).props(Props[PrintMyActorRefActor]), "poolRouter")

  for (i <- 1 to 100) {
    router ! "hello"
  }

  println(">>> Press ENTER to exit <<<")
  try StdIn.readLine()
  finally system.terminate()
}

BalancingPoolの引数で生成するルーティーの数を決めています。上記の例はハードコーディングしていますが、Typesafe Configを使った設定の仕方が推奨されています。ルーター名である"poolRouter"がコンフィグと一致していないと失敗するので注意です。ちなみに、Resizerを使って生成するインスタンスの数を動的に変えることもできます。詳細は公式サイトを参照ください。

application.conf
akka.actor.deployment {
  /poolRouter {
    router = balancing-pool
    nr-of-instances = 5
  }
}
RouterExample.scala
 val system = ActorSystem("testSystem")
 val router = system.actorOf(FromConfig.props(Props[PrintMyActorRefActor]), "poolRouter")

Akka Streams

Akkaを使ってActorで連続的なデータソースを処理するためには、Akka Streamsが使えます。Sourceと呼ばれる入力と、Sinkと呼ばれる出力、そしてそれらをつないでフォーマット変換などを行うのがFlowです。それらをつないだものはRunnableGraphと呼ばれます。RunnableGraphはActorMaterializerと呼ばれる、ストリーミングの調整機構で実行される際にAkkaのActorに変換されます。何ともとっつきにくいので、まずは公式サイトここで勉強するのが良いでしょう。

正直なところ、かなり理解までにハードルが高そうなので、これは別途記事を設けるつもりです。

Alpakka

Akka Streamsを使って構築されたライブラリ群で、MQTTだのAMQPだのkafkaを容易につないでくれます。Akka Streamsをそのまま使うより、まずはこちらでconnectorを探すのが良いかなと考えています。

sbt-native-packagerによるデプロイ

ScalaやAkkaで作ったアプリケーションをどのようにデプロイするのが良いでしょう?普通はscalaをインストールした環境にgithubでソースをクローンして、sbt runをするみたいなことを考えるとは思いますが、それらを簡略化する素晴らしいツールが提供されています。これを使うと、Docker含めた様々な環境に合わせたデプロイ形式に自動的に変換してくれます。具体的には以下の形式に変換可能です。

  • Universal zip,tar.gz, xz archives
  • deb and rpm packages for Debian/RHEL based systems
  • dmg for OSX
  • msi for Windows
  • docker images

sbt-native-packager 公式サイト

sbtの追加のコマンドとして定義するために、以下のようにプラグインを定義します。

plugins.sbt
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.6")
build.sbt
enablePlugins(JavaAppPackaging)

アプリケーションを作ったら、以下のようなコマンドを打つと、targetフォルダ以下に指定したディストリビューションの形式で出力してくれます。docker imageなどは便利ですね(dockerの事前インストールは必須)。

# universal zip
sbt universal:packageBin

# debian package
sbt debian:packageBin

# rpm package
sbt rpm:packageBin

# docker image
sbt docker:publishLocal

ふぅ、奥が深い。。