AkkaのactorとErlangのプロセス間でメッセージパッシングしてみた

  • 26
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

AkkaのactorとErlangのプロセス間でメッセージパッシングしてみた

Scala Advent Calendar 2014の17日目です。よろしくお願いいたします。
「akkaがerlangの機能をどのようにパクっているか書きたいです」と書いて枠を確保させていただいたのですが、残念ながらErlangの勉強の進捗がダメなので急遽別テーマで記事を書くことになりました。
多くの方に楽しみにしていただいていたにも関わらず、このような結果となってしまって心苦しい限りです(◞‸◟)

https://twitter.com/scalajp_gitter/status/544853401050177536
https://twitter.com/scalajp_gitter/status/544853634844852224
https://twitter.com/scalajp_gitter/status/544853702079545344
https://twitter.com/scalajp_gitter/status/544853750133690371

私無謀にも「akka vs erlang」というタイトルでErlang Advent Calendar 2014の枠も押さえさせていただいておりますので、そちらで今回書きたかったことにも触れられたらと思っております。

追記: 「今回書きたかったこと」にはほとんど触れられませんでしたが記事公開しました。
Akka vs Erlang

分散Erlangについて

Akkaと同じく、Erlangのメッセージパッシングインターフェースにはlocation transparencyが備わっています。
スモールな実行例を示します。

$ erl -sname mofu
Erlang/OTP 17 [erts-6.2.1] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]

Eshell V6.2.1  (abort with ^G)
(mofu@okumin-no-Mac-mini)1> register(mofu_shell, self()).
true
(mofu@okumin-no-Mac-mini)2> receive
(mofu@okumin-no-Mac-mini)2>     {From, hello} -> From
(mofu@okumin-no-Mac-mini)2> end.

まずはmofuという名前でシェルを立ち上げ、自分自身を「mofu_shell」という名前で登録します。
こうすることで、AkkaのActorPathに対してメッセージを送るように、直接名前指定でメッセージパッシングができるようになります。
2番目の処理はメッセージを受信するもので、Akkaアクターのreceiveのようにパターンマッチで受け取るメッセージと、そのメッセージに対する処理を指定します。
今はまだメールボックスが空なので、何かが送られてくるまでシェルがブロックされます。

erl -sname poyo
Erlang/OTP 17 [erts-6.2.1] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]

Eshell V6.2.1  (abort with ^G)
(poyo@okumin-no-Mac-mini)1> net_adm:names().
{ok,[{"mofu",60995},{"poyo",60997}]}
(poyo@okumin-no-Mac-mini)2> {mofu_shell, 'mofu@okumin-no-Mac-mini'} ! {self(), hello}.
{<0.38.0>,hello}

シェルをもう一つ、今度はpoyoという名前で立ち上げます。
最初にローカルにいるErlang VMのHOSTを列挙(net_adm:names())してみます。mofuがいますね。
次に先ほど登録したmofuシェルのmofu_shellにメッセージを送ってみます。Akkaでいえば、ActorSelectionに対して「!」でメッセージを送信しているイメージです。
Akkaと違いErlangの「!」演算子は送信したメッセージを結果として返すので、「{<0.38.0>,hello}」という値が返ってきています。
「<0.38.0>」はプロセスID(AkkaでいうActorRef)で、「self()」が評価されたものです。ちなみにプロセスIDの最初の数字はホストを表し、自分と同じVMのプロセスの場合は必ず0となります。

(mofu@okumin-no-Mac-mini)2> receive
(mofu@okumin-no-Mac-mini)2>     {From, hello} -> From
(mofu@okumin-no-Mac-mini)2> end.
<7250.38.0>
(mofu@okumin-no-Mac-mini)3>

mofuシェルに移動すると、メッセージが届き、「<7250.38.0>」が返ってきていました。
poyoシェルで見たときは「<0.38.0>」でしたが、mofuシェルにとっては別VMに存在するプロセスなので、最初の数字が変わっていますね。
もちろん、このプロセスIDに対してメッセージを送信することもできます。

以上がErlangにおけるリモートとのメッセージパッシングの例です。
Akkaと同じ要領で通信できることが分かっていただけたと思います。

Erlactor

今回作ったAkkaアクターからErlangプロセスに対してメッセージパッシングするための簡単なライブラリです。

Erlactor

Erlangがすごい得意な知り合いOさんに、分散Erlangのプロトコルを実装したJavaライブラリが提供されていると教わったのでそれを使ってみました。

Erlangプロセスを表現したActorオブジェクトをScala側に作成することで、(送信可能なメッセージは制限されるが)普通のAkkaアクターとやりとりするのと同じインターフェースでメッセージパッシングを行うことができる、といった実装となっています。

詳細はgithubを読んでいただくとして……EchoサーバをErlangで、クライアントをAkkaで実装したサンプルアプリを作ってみたのでデモを載せておきます。

サーバ

-module(yamabiko_server).
-export([start_link/0]).

start_link() ->
    spawn_link(fun loop/0).

loop() ->
    receive
        {From, identity} -> % プロセスを取得するプロトコル
            send(From, self()),
            loop();
        {From, Term} -> % Echo
            send(From, Term),
            loop();
        Unexpected ->
            io:format("Received an unexpected message, ~s~n", [Unexpected]),
            loop()
    end.

send({Pid, Ref}, Message) ->
    Pid ! {Ref, Message}.

クライアント

object Sample {
  val system = ActorSystem("erlactor")
  val yamabikoClient = system.actorOf(Props(classOf[YamabikoClient]), name = "yamabiko-client")

  implicit val timeout = Timeout(3.seconds)

  case class YamabikoRequest(name: RemoteProcessName, term: ErlangTerm)
  case class KillYamabiko(name: RemoteProcessName)

  class YamabikoClient extends Actor with ActorLogging {
    private[this] val extension = ErlactorExtension(system)

    override def receive: Receive = {
      case YamabikoRequest(name, term) =>
        val response = for {
          process <- extension.resolveByName(name)
          response <- (process ? Ask(term)).mapTo[ErlangTerm]
        } yield response
        response.onComplete {
          case Success(t) =>
            log.info(s"Yamabiko returned. $t")
          case Failure(e) =>
            log.error(e, "Yamabiko failed.")
        }
    }
  }
}

デモ

まずEchoサーバを起動します。

$ erl -sname mofu                                                                   
Erlang/OTP 17 [erts-6.2.1] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]

Eshell V6.2.1  (abort with ^G)
(mofu@okumin-no-Mac-mini)1> c(yamabiko_server).
{ok,yamabiko_server}
(mofu@okumin-no-Mac-mini)2> Pid = yamabiko_server:start_link().
<0.45.0>
(mofu@okumin-no-Mac-mini)3> register(server, Pid).
true

「mofu」という名前でVMを起動して、サーバプロセスを「server」という名前で登録します。

sbt console                                                                ⏎ ✹ ✭
Use of ~/.sbtconfig is deprecated, please migrate global settings to /usr/local/etc/sbtopts
[info] Loading global plugins from /Users/okumin/.sbt/0.13/plugins
[info] Loading project definition from /Users/okumin/Documents/program/product/erlactor/project
[info] Set current project to erlactor (in build file:/Users/okumin/Documents/program/product/erlactor/)
[info] Starting scala interpreter...
[info]
Welcome to Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_55).
Type in expressions to have them evaluated.
Type :help for more information.

scala> import sample.Sample._
import sample.Sample._

scala> import erlactor.ErlactorSelection._
import erlactor.ErlactorSelection._

scala> import erlactor.ErlangTerm._
import erlactor.ErlangTerm._

scala> sample.Sample // 初期化
res0: sample.Sample.type = sample.Sample$@2d147608

クライアントも立ち上げると、

(mofu@okumin-no-Mac-mini)4> net_adm:ping('erlactor@okumin-no-Mac-mini').
pong

Erlang側からAkkaにつながるようになりました。ちなみにこのpingなしだとこれ以降の処理がなぜか動きません。

scala> yamabikoClient ! YamabikoRequest(RemoteProcessName(node = "mofu", name = "server"), ErlangList("yahoo!"))

scala> [INFO] [12/17/2014 10:28:26.152] [erlactor-akka.actor.default-dispatcher-6] [akka://erlactor/user/erlactor-selection] Waiting the response...
[INFO] [12/17/2014 10:28:26.169] [erlactor-akka.actor.default-dispatcher-6] [akka://erlactor/user/erlactor-selection] Get Erlactor Actor[akka://erlactor/user/erlactor-selection/erlactor-mofu@okumin-no-Mac-mini-45-0-1#-1919978540]
[INFO] [12/17/2014 10:28:26.179] [ForkJoinPool-2-worker-5] [akka://erlactor/user/yamabiko-client] Yamabiko returned. ErlangTuple2(ErlangRef(erlactor@okumin-no-Mac-mini,[I@6fa33b9d,2),ErlangList(List(ErlangInteger(121), ErlangInteger(97), ErlangInteger(104), ErlangInteger(111), ErlangInteger(111), ErlangInteger(33))))

scala> yamabikoClient ! YamabikoRequest(RemoteProcessName(node = "mofu", name = "server"), ErlangTrue)

scala> [INFO] [12/17/2014 10:29:07.341] [ForkJoinPool-2-worker-1] [akka://erlactor/user/yamabiko-client] Yamabiko returned. ErlangTuple2(ErlangRef(erlactor@okumin-no-Mac-mini,[I@3a34b8fd,2),ErlangAtom(true))

送信したメッセージがちゃんと返ってきました。成功です。

JInterfaceの感想

API自体はとてもシンプルで、使いやすいライブラリでした。
PINGやnode一覧を取得するAPIも用意されているので、デバッグも簡単に行うことができました。
気になったのは以下の2点でしょうか。

  • Erlang側から一度pingを撃ってもらわないとなぜか接続できない
  • メールボックスの中身を取り出すAPIがスレッドをブロックする

分散Erlangのプロトコルは公開されているので、プロトコルを読んで自分で実装してみるのも楽しそうだなと思いました。

この投稿は Scala Advent Calendar 201417日目の記事です。