Akka と Erlang を Reactive に組み合わせるためのライブラリを作りました #Akka #Erlang

  • 53
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

Akka と Erlang を Reactive に組み合わせるためのライブラリを作りました

Akka と Erlang を優雅にインテグレートするためのライブラリ、Ainterface を作ったので紹介いたします。

Ainterface とは

Ainterface は、Akka actor と Erlang プロセス間のメッセージパッシングを実現するためのライブラリです。Ainterface を用いると、分散Erlangのプロトコルを通して、Akka らしい文法で Erlang と通信を行うことができるようになります。

他言語から Erlang とおしゃべりする試みは他にもあります。例えば Java プログラムと Erlang 間でメッセージをやりとりするための Jinterface はその一つです。

去年の Scala アドヴェント・カレンダーに投稿した「AkkaのactorとErlangのプロセス間でメッセージパッシングしてみた」でも、Jinterface をラップして Akka actor から使用していました。いい感じのインターフェースが実現できたものの、Jinterface はイベント駆動な API でなかったりと、Akka と組み合わせるには少し筋が悪い面もありました。

その辺を考慮してより Reactive に実装しなおしたのが Ainterface となります。

Example

起動

まず Erlang node(及び EPMD)を立ち上げておきましょう。現在時点では long name にしか対応していません。

erl -name mofu
Erlang/OTP 17 [erts-6.2.1] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]

Eshell V6.2.1  (abort with ^G)
(mofu@okumin-mini.local)1>

AinterfaceSystem を起動します。一つの ActorSystem につき一つの AinterfaceSystem を作ることができます。ノード名などの設定はおなじみの hocon にて記述します。

$ sbt "project ainterface-sample" console
[info] Loading global plugins from /Users/okumin/.sbt/0.13/plugins
[info] Loading project definition from /Users/okumin/Documents/program/product/scala/akka/erlang-ainterface/project
[info] Set current project to erlang-ainterface (in build file:/Users/okumin/Documents/program/product/scala/akka/erlang-ainterface/)
[info] Set current project to ainterface-sample (in build file:/Users/okumin/Documents/program/product/scala/akka/erlang-ainterface/)
[info] Starting scala interpreter...
[info]
Welcome to Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_55).
Type in expressions to have them evaluated.
Type :help for more information.

scala> val config = com.typesafe.config.ConfigFactory.parseString("""
     |   akka {
     |   loglevel = "ERROR"
     |   ainterface {
     |     root-name = "ainterface-system"
     |     init.name = "ainterface-sample" // the node name
     |   }
     | }""")
config: com.typesafe.config.Config = Config(SimpleConfigObject({"akka":{"ainterface":{"init":{"name":"ainterface-sample"},"root-name":"ainterface-system"},"loglevel":"ERROR"}}))

scala> val system = akka.actor.ActorSystem("sample", config)
system: akka.actor.ActorSystem = akka://sample

scala> akka.ainterface.AinterfaceSystem.init(system) // initialize

Erlang プロセス作成

プロセスの実装は、ErlProcessActor をミックスインすることで行います。ErlProcessActor#process を通じて、Erlang プロセスとしての処理を呼び出すことができます。

class EchoActor extends ErlProcessActor {
  override def receive: Receive = {
    case ErlTuple(from: ErlPid, message) =>
      process.send(from, ErlTuple(process.self, message))
  }
}

サンプルプロジェクトにある ProcessActor を作成してみましょう。これは見ての通り、メッセージを介して Erlang 系 API を呼び出せるようにしたものです。

ついでに REPL から扱いやすくなるよう、下準備をしておきます。

scala> val process = system.actorOf(akka.actor.Props[ainterface.ProcessActor], "process")
process: akka.actor.ActorRef = Actor[akka://sample/user/process#-94975417]

scala> import ainterface.ProcessActorProtocol._
import ainterface.ProcessActorProtocol._

scala> import akka.ainterface.datatype._
import akka.ainterface.datatype._

scala> import akka.pattern.ask
import akka.pattern.ask

scala> import scala.concurrent._
import scala.concurrent._

scala> import scala.concurrent.duration._
import scala.concurrent.duration._

scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global

scala> implicit val timeout = akka.util.Timeout(1.second)
timeout: akka.util.Timeout = Timeout(1 second)

scala> def await(x: Future[Any]): Any = Await.result(x, 1.second)
await: (x: scala.concurrent.Future[Any])Any

ProcessActor は Akka actor であると同時に Erlang プロセスでもあります。その証拠に、pid を持っています。

scala> val self = await(process ? SelfPid).asInstanceOf[ErlPid]
self: akka.ainterface.datatype.ErlPid = ErlPid(ErlAtom(ainterface-sample@okumin-mini.local), 1, 0, 3)

メッセージパッシング

名前をつけてみましょう。erlang:register/2 に相当する処理を行います。

scala> process ! Register(ErlAtom("process"), self)

これで Erlang プロセスからメッセージを送ることができますね。pid とともにメッセージを送ります。

(mofu@okumin-mini.local)1> {process, 'ainterface-sample@okumin-mini.local'} ! {hello, self()}.
{hello,<0.38.0>}

届きました。返信してみましょう。

scala> val Some(ErlTuple(message, pid: ErlPid)) = await(process ? Receive)
message: akka.ainterface.datatype.ErlTerm = ErlAtom(hello)
pid: akka.ainterface.datatype.ErlPid = ErlPid(ErlAtom(mofu@okumin-mini.local), 38, 0, 2)

scala> process ! Send(pid, ErlAtom("okey-dokey"))

Erlang 側にも届きました。

(mofu@okumin-mini.local)2> flush().
Shell got 'okey-dokey'
ok

リンク

もう一つ、link を試してみましょう。

scala> process ! Link(pid)

Erlang 側のプロセスを殺してみます。

(mofu@okumin-mini.local)3> exit(self(), tsuraidesu).
** exception exit: tsuraidesu

exit が伝播しました。

scala> [ERROR] [08/19/2015 01:24:57.295] [sample-akka.actor.default-dispatcher-2] [akka://sample/user/process] ** exception exit: ErlAtom(tsuraidesu)
akka.ainterface.datatype.ErlExit: ** exception exit: ErlAtom(tsuraidesu)
    at akka.ainterface.ErlProcessContext$class.exit(ErlProcessContext.scala:153)
    at akka.ainterface.ErlProcessActor$$anon$1.exit(ErlProcessActor.scala:9)
    at akka.ainterface.ErlProcessActor$class.akka$ainterface$ErlProcessActor$$handleControlMessage(ErlProcessActor.scala:60)
    at akka.ainterface.ErlProcessActor$class.aroundReceive(ErlProcessActor.scala:35)
    at ainterface.ProcessActor.aroundReceive(ProcessActor.scala:9)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

pid を再取得してみると、Akka 側のプロセスも再起動している(pid が新しくなっている)ことが分かります。若干ややこしいですが、この再起動は Erlang でなく Akka の Supervisor により引き起こされています。

scala> await(process ? SelfPid)
res4: Any = ErlPid(ErlAtom(ainterface-sample@okumin-mini.local), 2, 0, 3)

以上、メッセージパッシングとリンクを例に、Erlang プロセスとの通信を行ってみました。

用途

現実的な使い道は皆無だと思います。

ミドルウェア的な機能が必要とされる箇所は Erlang で実装し、複雑なドメインロジックは Scala で記述する、といったコラボレーションを妄想したことはありました。

Erlang はそのソフトリアルタイムな性質やホットコードローディングにより止まらないシステムが実現しやすい言語(参考: Akka vs Erlang)ですが、表現力に限界があると感じます。一方、Scala は非常にパワフルな記述力を持っています。

なので、これらを組み合わせることで、何かがよくなる可能性はゼロではないと思います。

しかし、今となっては Elixir を使えばすべての問題が解決できるらしいです。それゆえ、わざわざ Ainterface を用いて変則的な構成にチャレンジする動機は強くないでしょう。

今後

分散 Erlang には興味深いモジュールがたくさんあるので、読んだら実装したいなと思っています。

感想

実装にはそれなりの困難を感じました。分散Erlangのプロトコルは公開されているから楽勝だと思って書き始めてみたものの、以下のような壁がありました。

  • ドキュメントで触れられていない仕様がかなりあった
  • そもそも P2P ハンドシェイクは難しい

苦労はしましたが、成果物としての Ainterface には概ね満足です。Akka は Erlang の設計をパクって作られているので、API としては自然で使いやすいものができたと思います。

最後に

頑張って作ったので星をください。

Ainterface