Help us understand the problem. What is going on with this article?

Akkaを勉強してみた(Scala)

More than 1 year has passed since last update.

はじめに

はじめまして。Livesense - 学 Advent Calendar 15日目担当をしますエンジニアのkazhitと申します。

今回のカレンダーのお題は「」。
何か書けそうなことはないかなと考えていたのですが、最近興味を持って触り始めたAkkaが他の方と内容もかぶりにくそうだと思ったので、約1ヶ月間の習内容を書いてみようと思います。

実際にAkkaを触りながら思いついたことを色々試しつつ勉強を進めているため、まだまだ基本部分の勉強中です。
初心者が書いた内容なので全然まとまっていないと思いますが、ご容赦ください。
あと、なんか間違っていたらすみません。

この記事を予約投稿しようと思ったら以下の本が発売されていました。
Akkaの日本語の本が全然なかったので即買い。
Akkaの事がちゃんと知りたい人は、この本を読みましょう!
Akka実践バイブル

環境

以下のような環境で行なっています。

  • macOSX 10.12.3
  • scala 2.11.7
  • akka-actor 2.4.12
  • akka-remote 2.4.12
  • play 2.5.10
  • IntelliJ IDEA

Akkaを勉強する目的

一番の目的はとても単純で、色々と話には聞いたことがあるが、実際にAkkaで出来ることは何があるのかが知りたいという好奇心。
また、2年ほど前に業務で少し使っていたScalaをほぼ忘れてしまい、初心者状態になっているため、その復習もしたいと考えました。

Scala

Scalaの復習は、いわゆる「コップ本(第3版)」などを読みながら行いました。
あとは実際にコードを打ちながら少しずつ思い出していきました。
今回は主に以下の機能がコード内に出てきます。
(一応すごく簡単ではありますが、コード中にコメントで説明を書いてたりはします。)

  • object
  • case class
  • パターンマッチ
  • Future
  • Either
  • implicit
  • 型パラメータ

Akkaとは

Akkaって何だろうということで、色々と調べてみました。自分の中ではざっくりと以下のような理解になっています。

  • Akka とは、 アクターモデル で並行処理が出来るライブラリ。
  • アクターモデル とは、以下の図ように複数の アクター(Actor) 同士が並行的にメッセージのやりとりを行うもの。
  • アクター(Actor) とは、非同期でメッセージの送受信(スレッド、プロセス、ネットワーク間)と処理が出来て、それぞれが状態を持っているオブジェクトで、アクター(Actor)同士は状態を気にしない。

スクリーンショット 2017-12-05 10.30.26.png

状態を気にしない。 良い響きですね。通常のマルチスレッドモデルでやっていると、アプリもプログラマーも寿命が短くなるだけですからね・・・。

Akkaで提供されている機能等は以下があります。

  • Actor
    • 上記と同じ
  • ActorSystem
    • Actorの管理者。Actorを取得したり作成したりするときに必要。
  • tell(!) ask(?)
    • tell : メッセージ送信メソッド。投げっぱなしで返答を待たない。 別名で!
    • ask : メッセージ送信メソッド。メッセージの返答を待てる。別名で?
  • ActorPath
    • ActorのURI。ActorPathからActorを取得したり、直接メッセージを送信することもできる。
  • Router
    • 複数のActorに対してメッセージをいい感じに振り分ける(Routing)もの。
  • SupervisorStrategy
    • Actorは親子関係と複数の子を持たせることが出来る。その子達の管理(停止したら再起動するかとかを決める)するもの。
  • Akka Cluster
    • (いまいちよく分かってないので、もう少しんでから書きたいと思います。)

次からは実際にAkkaを使用してやったことを書いていきます。

Akkaでやろうと思ったこと

AkkaActorが途中で動作出来なくなったらどうなるのか、どうすれば良いのかを知りたくて、色々と試してみました。

Actor内でC/C++ライブラリを動かす

まずは簡単なアプリを作り、その中で図のようにActorを2つ作成し、Actor1C/C++ライブラリを使用して処理をさせてみようと思いました。
この場合のActorは、内部でスレッドプールを持ち、Actor同士がメッセージのやりとりをします。
C/C++ライブラリを使おうと思った理由は、非常に重い処理などをC/C++で処理させたい人もいたり、場合によっては、ずっと昔からあるパンドラの箱のようなC/C++ライブラリを使用しないとダメなこともあるかもしれません。
そんなことを妄想しながらやってみました。

(以下の図は、あくまでも今回作成したアプリでのイメージです。コメントでご指摘頂いた通り、実際にはActorとスレッドは一対一に対応しているものではありません。)
スクリーンショット 2017-12-03 18.58.44.png

ただし、C/C++ライブラリは内部でちゃんと例外処理をしていないと、誰も例外をcatch出来ずに、突然亡くなる可能性があります。
今回の場合だと、亡くなると同時にプロセスも道連れにするので、アプリ自体が亡くなってしまいました。

ライブラリ内で完璧に例外処理をやればいいのでは。ということになりますが、まあ確かにその通りです。
ただ、ライブラリがパンドラの箱だった場合には、そんなことをするのはなかなか厳しいと思われます。
(ウチにはそんな遺産がなくて本当によかった)

(以下の図は、あくまでも今回作成したアプリでのイメージです。コメントでご指摘頂いた通り、実際にはActorとスレッドは一対一に対応しているものではありません。)
スクリーンショット 2017-12-03 19.05.18.png

Remote Actorを使う

アプリ自体をずっと生かした状態でどうにか出来ないかと思い、Remote Actorを使うことにしました。
Remote Actorを使えば、以下の図のようにActor1を別のプロセスに置いて、プロセス間でのメッセージ送受信が簡単に出来ます。
Remote Actorと言っても、akka-remoteライブラリを読み込んで、Actorを作る際の設定を変えるだけで簡単に使用できます。
これで、プロセスB内のC/C++ライブラリが亡くなってもプロセスAは生き続け、プロセスBを再起動すれば、また問題なく動き出します。
ActorPathが変わらなければ、Akkaが接続、再接続を勝手にやってくれるため、何も気にせず再起動するだけで良いです。

(以下の図は、あくまでも今回作成したアプリでのイメージです。コメントでご指摘頂いた通り、実際にはActorとスレッドは一対一に対応しているものではありません。)
スクリーンショット 2017-12-03 19.56.30.png

実際に作ったアプリ

というわけで、Remote Actorを使用してアプリを作ってみました。

概要

Playを使用したWebAppというウェブアプリと、Akkaだけを使用したシンプルなRemoteAppというアプリを作成しました。

WebAppClientObjectというオブジェクトと、ClientActorというActorを持ち、リクエストを受けたらClientObjectを使用してClientActorにメッセージ(ask)を送ります。
さらにClientActorRemoteActorにメッセージ(ask)を送ります。

RemoteAppRemoteActorとそのActor内でC/C++ライブラリを使用します。
RemoteActorClientActorからのメッセージを受け取るとC/C++ライブラリで処理をしてその結果をClientActorにメッセージ(tell)を返します。

スクリーンショット 2017-12-03 19.59.11.png

次から、WebAppRemoteAppの実際に書いたコードを紹介します。
(※コードの一部となりますので、これだけで全てが動くわけではありません。)

WebAppを作る

まずはWebAppから作成します。実際のプロジェクトの構成は以下のようにしました。

スクリーンショット 2017-12-04 10.32.04.png

Playのプロジェクトを作成

(コメントでのご指摘の通り、現在activatorは非推奨となっております。sbtを使用してください。playに関してはここ。この辺りはちゃんと調査した上で後ほど追記したいと思います。)
今回はTypesafe Activatorを使用します。brewなどでtypesafe-activatorをインストールして

$ activator new

6) play-scalaを選択します。これでPlayのプロジェクトが出来ます。

confファイルを作成

ActorSystemを作成するためのコンフィグファイルを作成します。

client.conf
akka {
  loglevel = "INFO"
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2155
    }
    log-sent-messages = on
    log-received-messages = on
  }
}
app.remote-app.remote-actor = "akka.tcp://remote-app@127.0.0.1:2150/user/remote"

ルーティング

routesファイルにshowアクションを追加します。

routes
GET     /:number                    controllers.HomeController.show(number: Long)

送受信データの定義

送受信するメッセージオブジェクトを定義します。

common.scala
object common {
  // パターンマッチで使用できるようにcase class
  case class SendToClient(msg: Long) // ClientActorへ送信する用
  case class SendToRemote(msg: Long) // RemoteActorへ送信する用
  case class ResponseMsg(msg: String)// 返答用
}

Actorの作成

ClientActorを作成します。

ClientActor.scala
class ClientActor extends Actor {
  // askのタイムアウトを設定する
  // askの定義を見ると implicit引数でtimeoutがあります。
  // implicitでtimeoutを定義すると、コンパイラがTimeout型のimplicit定義を探し、勝手に引数に繋げてくれます。
  implicit val timeout: Timeout = Timeout(2000 milliseconds)

  // configファイルに書いてあるRemoteActorのActorPathから、RemoteActorを取得
  val remoteActor: ActorSelection = context.actorSelection(ClientObject.config.getString("app.remote-app.remote-actor"))

  override def receive: Receive = {
    case SendToClient(send: Long) =>
      // askのタイムアウトがFailureではなく、例外で返ってくるため try catchする(仕様)
      try {
        val tempSender = sender
        val f = remoteActor ? SendToRemote(send) // RemoteActorにaskでメッセージ送信。timeoutは上でimplicitで定義
        f.onComplete { // 処理が終わればコールされる
          case Success(s) =>
            Logger.info(s"ClientActor : Success : $s")
            tempSender ! s // ClientObjectに返す
          case Failure(e) =>
            Logger.info(s"ClientActor : Failure : $e")
            throw e
        }
        Await.ready(f, timeout.duration) // 処理が終わるのを待つ
      } catch  {
        // だいたいタイムアウトが入る
        case e: Exception =>
          Logger.info(s"ClientActor : Exception : $e")
          throw e
      }

    case any =>
      Logger.info(s"Unknown message $any from $sender")
  }
}

ClientObjectを作成

ClientObjectを作成します。
ClientActorを使用してメッセージの送受信を行なっています。

ClientObject.scala
object ClientObject {
  // askのタイムアウトを設定する
  implicit val timeout: Timeout = Timeout(3000 milliseconds)

  val configFile: String = getClass.getClassLoader.getResource("client.conf").getFile
  val config: Config = ConfigFactory.parseFile(new File(configFile))
  // configファイルからActorSystemを作成する
  val system = ActorSystem("client-system", config)
  // ActorSystemを使って、ClientActorを作成する
  val clientActor: ActorRef = system.actorOf(Props[ClientActor], name = "client")

  // Long型の整数を受け取り、Eitherで結果を返す
  // 失敗時にはLeftにエラーのオブジェクト(Throwable)、成功時にはRightにその結果(String)
  // 別に逆にしても問題ないが、正解は英語でRightととも言うので・・・
  def sendNumberToRemote(n: Long): Either[Throwable, String] = {
    try {
      val f = clientActor ? SendToClient(n) // ClientActorにaskでメッセージ送信
      Await.ready(f, timeout.duration) // 処理が終わるのを待つ
      f.value.get match { // 現在のFutureの状態を取得する
        case Success(ResponseMsg(s: String)) =>
          Logger.info(s"ClientObject : Success : Right($s)")
          Right(s) // 成功なのでRightに結果を入れて返す
        case Failure(e) =>
          Logger.info(s"ClientObject : Failure : Left($e)")
          Left(e) // 失敗なのでLeftにThrowableを入れて返す
        case _ =>
          Logger.info(s"ClientObject : ??? : Right(Unknown)")
          Right("Unknown")
      }
    } catch {
      case e: Exception =>
        Logger.info(s"ClientObject : Exception : Left($e)")
        Left(e) // 例外発生したので、失敗としてLeftにThrowableを入れて返す
    }
  }
}

ControllerとView

HomeControllershowアクションを追加します。

HomeController.scala
  def show(number: Long) = Action {
    // Eitherはパターンマッチで以下のようにRightとLeftの場合で処理できる
    ClientObject.sendNumberToRemote(number) match {
      case Right(s) => Ok(views.html.show(s"Show => $number * 2 = $s")) // 成功時
      case Left(e) => Ok(views.html.show(s"Error!!! : $e")) // 失敗時
    }
  }

Viewshowアクションのためのshow.scala.htmlを追加する。

show.scala.html
@(message: String)

<!DOCTYPE html>
<html lang="ja">
    <head>
        <meta charset="utf-8">
        <title>Akka Remote 学習</title>
    </head>
    <body>
        @message
    </body>
</html>

RemoteAppを作る

次にRemoteAppを作ります。プロジェクトの構成は以下になります。

スクリーンショット 2017-12-04 10.33.26.png

最小限のAkkaのプロジェクトを作成する

(コメントでのご指摘の通り、現在activatorは非推奨となっております。sbtを使用してください。)

$ activator new

2) minimal-akka-scala-seedを選択。

confファイルを作成

WebAppと同様に、ActorSystemを作成するためのコンフィグファイルを作成します。

remote.conf
akka {
  loglevel = "INFO"
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2150
    }
    log-sent-messages = on
    log-received-messages = on
  }
}

RemoteAppを作成

ここでRemoteActorを作成しています。

RemoteApp.scala
object RemoteApp extends App {
  val configFile = getClass.getClassLoader.getResource("remote.conf").getFile
  val config = ConfigFactory.parseFile(new File(configFile))
  val system = ActorSystem("remote-app", config)
  val remote = system.actorOf(Props[RemoteActor], name="remote")
  system.log.info("Remote is ready")
}

JNIでC/C++ライブラリを使用

今回はC/C++ライブラリを呼び出すために、JNIを使用してみました。
scalaからは簡単にjavaを呼び出すことができます。
JNIは書くのが面倒なので、JNAを使って頂いても問題ありません。

CpplibJNI.java
public class CpplibJNI {
    static {
        System.loadLibrary("cpplib"); // 頭の"lib"と拡張子はいらない
    }
    public native String cpplib(int number);
}

Actorを作成

RemoteActorを作成します。

RemoteActor.scala
class RemoteActor extends Actor with ActorLogging {
  override def receive: Receive = {
    case SendToRemote(l: Long) =>
      log.info(s"Remote received $l from $sender")
      // javaのオブジェクトを作成し、メソッドをコール
      val jni = new CpplibJNI
      val str: String = jni.cpplib(l.toInt)
      sender ! ResponseMsg(str)
    case any =>
      log.info(s"Remote received unknown message $any from $sender")
  }
}

C/C++ライブラリを作る

ここはちょっと自分用メモ

ヘッダファイルを作成

javahを使用して、C/C++ライブラリ作成のためのヘッダファイルを作成します。
RemoteAppのディレクトリで、以下のコマンドを実行します。

$ javah -classpath {classesのパス} -jni CpplibJNI

パッケージ名_クラス名.hというファイルが作成されるはずです。
自分は、jni というパッケージの下にCpplibJNIクラスを作成したので、jni_CpplibJNI.hが出来ました。
ヘッダの中身を見ると、Java_パッケージ名_クラス名_メソッド名の定義があるはずです。
自分は、Java_jni_CpplibJNI_cpplibという名前の関数定義が出来ていました。
後はこの名前のC関数を作成すれば良いだけです。

ビルドする

以下のcppファイルを作成します。(.cでも良いです)

cpplib.cpp
#include <iostream>
#include <stdio.h>
#include "jni_CpplibJNI.h"

JNIEXPORT jstring JNICALL Java_jni_CpplibJNI_cpplib(JNIEnv *env, jobject obj, jint jNumber) {
  int iNum = (int)jNumber;
  if (iNum == 0) {
    int zero = 0;
    int ret = 8 / zero; // 0が来たら0除算で亡くなってもらう
  }
  char buffer_c[256];
  // int型をもらって倍にしてから、stringを返す
  sprintf (buffer_c, "%d", (iNum * 2));
  return env->NewStringUTF(buffer_c);
}

では、gccを使用してビルドします。
JNIのパス指定をすること。
Macの場合はライブラリの命名規則として、lib*****.dylib とする必要があります。

$ gcc -dynamiclib -I /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/include -I /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/include/darwin -o libcpplib.dylib cpplib.cpp

アプリを動かす

RemoteAppを起動

(コメントでのご指摘の通り、現在activatorは非推奨となっております。sbtを使用してください。)

-Djava.library.path={C/C++ライブラリのパス}を忘れないように。

$ activator run -Xms512M -Xmx1024M -Xss1M -XX:+CMSClassUnloadingEnabled -Djava.library.path={C/C++ライブラリのパス}

WebAppを起動

(コメントでのご指摘の通り、現在activatorは非推奨となっております。sbtを使用してください。)

$ activator run -Xms512M -Xmx1024M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M

使ってみる

ブラウザで http://localhost:9000/35
以下のように倍の数字が表示されればOK。

スクリーンショット 2017-12-03 21.19.51.png

ブラウザで http://localhost:9000/0
以下のようにタイムアウトが出て、RemoteAppは落ちていても、WebAppは生きています。
再度RemoteAppを起動すればまた倍の値を表示できます。

スクリーンショット 2017-12-03 21.22.28.png

ハマったところ

実装していて少し詰まったところです。

型パラメータ

Akkaではなくscalaの話なのですが、実は送受信データの定義で、最初は以下のように型パラメータを使用していました。
(必要ないと思ったので、途中で変えました。)

common.scala
package object common {
  case class SendToClient[A](msg: A)
  case class SendToRemote[A](msg: A)
  case class ResponseMsg[A](msg: A)
}

その際に、受信側の処理として、次のような書き方をしてしまいました。

RemoteActor.scala
  override def receive: Receive = {
    case l: SendToRemote[Long] =>

これをビルドすると、warningが出てしまいます。
これは、 クラスや関数で実行時に指定される抽象型の情報は、Javaの型消去によって判定できないらしく、この書き方ではコンパイル時に型の情報が消えてしまい、判定が出来ないという警告のようです。

warning
non-variable type argument Long in type pattern services.common.SendToRemote[Long] is unchecked since it is eliminated by erasure

というわけで、次のように書けば判定が出来るようです。

RemoteActor.scala
  override def receive: Receive = {
    case SendToRemote(l: Long) =>

Future

ClientActor.scala
  val f = remoteActor ? SendToRemote(send)

このval fにはFutureが入ります。Futureとは 非同期に処理される結果が入ったもの だそうです。
非同期で処理されるため、以下で処理を待っています。

ClientActor.scala
  Await.ready(f, timeout.duration)

処理が終わると、

ClientActor.scala
  f.onComplete {
    ...
  }

で結果を取得できます。

ClientObjectでの処理のように、Await.readyで処理をまった後にFuture.value.getで、現在のFutureの値を取得するようなやり方もあります。

sender

senderとはActor内のreceive処理でメッセージを受け取った際の送信元Actorが入っています。
返信する際には、このsendertell askします。

ClientActor.scala
  val tempSender = sender

についてですが、なぜわざわざここでsenderを保持しているのか。

試しに try {の直後と、case Success(s) =>の直後で senderの中身をログに出してみます。

  • try { の直後
ClientActor.scala
      try {
        Logger.info(s"$sender") // ←ここでログ
        val tempSender = sender
        ...

=> Actor[akka://client-system/temp/$a]

  • case Success(s) =>の直後
ClientActor.scala
          case Success(s) =>
            Logger.info(s"$sender") // ←ここでログ
            Logger.info(s"ClientActor : Success : $s")
            tempSender ! s

=> Actor[akka://client-system/deadLetters]

全然違うものが入っています。

よく考えてみると、try {の直後は、ClientObjectから来たメッセージで、case Success(s) =>の直後は、RemoteActorから来たメッセージとなり、送ったActorが違うので当然です。
なので、ここでClientObjectからのsenderを一旦保持させています。
(なんか良い方法ないのか。そもそもなんか設計おかしいかも。)

RemoteActorでも同様にログを取ってみました。

RemoteActor.scala
class RemoteActor extends Actor with ActorLogging {
  override def receive: Receive = {
    case SendToRemote(l: Long) =>
      log.info(s"$sender") // ←ここでログ

=> Actor[akka.tcp://client-system@127.0.0.1:2155/temp/$b]
こちらもtry {の直後とよく似た/temp/$bというActorからメッセージが来ているようです。

どうやらaskの場合は返信を受け取る用の一時的なActor/tempの下に作成されているようです。

もう一つの/deadLettersですが、これは配信することが出来なかったメッセージを扱うActorだそうです。
RemoteActortellでメッセージを送信しています。
tellは返答を受け取らないので、/deadLettersが送信してくれて、これに対してメッセージ返信しても配信することが出来なかったと処理するようにしている感じでしょうか。

というわけで、さっとClientObjectにだけ/tempActor追記すると、以下のようなイメージですかね。

スクリーンショット 2017-12-03 22.09.16.png

そうなると実は以下のように直接RemoteActorとメッセージのやりとりをすれば、ClientActorは全く必要が無くなります。

スクリーンショット 2017-12-03 22.16.03.png

では、なぜ今回のような構成にしたのかというと・・・
次にやろうと思っていたことが、以下のようにRouterを使用して複数のActorと繋ぎたいと思っていたため、単純にClientActorRouterに差し替えるだけで動くように出来そうなので、このようにしました。
実際にRouterを使ってみたら変わるかもですが。

スクリーンショット 2017-12-03 22.16.20.png

この先はまだ勉強中で、今回は以上となります。

おまけ

IntelliJ IDEAPlayのプロジェクトを動かしていると、たまに以下のようなエラーが出てPlayのアプリが起動しなくなることがあります。
そんな時は File => Invalidate Caches / Restart...をすると直ります。

error
BootException: Cannot redefine component. ID: org.scala-sbt-compiler-interface-0.13.11-bin_2.11.7__52.0, files: ~~~~

まとめ

色々と勉強を進めていると、Actorを作って繋げて切り離しすということが簡単に出来、Akka(というかアクターモデル?)はかなり柔軟性が高いと感じました。
柔軟性高い=とんでもない設計が生まれる可能性もありますが・・・。
今後は、アプリが機能を失わずに動かし続けるために、複数のActorを起動し、それをRouter管理し、さらにActorを監視して、もし動いていなければ再起動させるというようなことをしたいと考えています。
まだAkka Clusterについてちゃんと勉強していないので、これもやりたいですね。
今後もちまちまと勉強を進めていきたいと思います。
あと、コードを整理したらGitHubにあげます!

おわりに

なんか全体的に自分用のメモっぽい感じになってしまいましたが・・・。
こうやって習内容を書いていくことは、自分の理解の確認と整理ができるので大切ですね。
また機会あれば書きたいと思います。

そうだ Scala Matsuri 2018 行こう

Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away