はじめに
はじめまして。Livesense - 学 Advent Calendar 15日目担当をしますエンジニアのkazhitと申します。
今回のカレンダーのお題は「学」。
何か書けそうなことはないかなと考えていたのですが、最近興味を持って触り始めたAkkaが他の方と内容もかぶりにくそうだと思ったので、約1ヶ月間の学習内容を書いてみようと思います。
実際にAkkaを触りながら思いついたことを色々試しつつ勉強を進めているため、まだまだ基本部分の勉強中です。
初心者が書いた内容なので全然まとまっていないと思いますが、ご容赦ください。
あと、なんか間違っていたらすみません。
この記事を予約投稿しようと思ったら以下の本が発売されていました。
Akkaの日本語の本が全然なかったので即買い。
Akkaの事がちゃんと知りたい人は、この本を読みましょう!
Akka実践バイブル
環境
以下のような環境で行なっています。
- macOSX 10.12.3
- scala 2.11.7
- akka-actor 2.4.12
- akka-remote 2.4.12
- play 2.5.10
- IntelliJ IDEA
Akkaを勉強する目的
一番の目的はとても単純で、色々と話には聞いたことがあるが、実際にAkkaで出来ることは何があるのかが知りたいという好奇心。
また、2年ほど前に業務で少し使っていたScalaをほぼ忘れてしまい、初心者状態になっているため、その復習もしたいと考えました。
Scala
Scalaの復習は、いわゆる「コップ本(第3版)」などを読みながら行いました。
あとは実際にコードを打ちながら少しずつ思い出していきました。
今回は主に以下の機能がコード内に出てきます。
(一応すごく簡単ではありますが、コード中にコメントで説明を書いてたりはします。)
- object
- case class
- パターンマッチ
- Future
- Either
- implicit
- 型パラメータ
Akkaとは
Akka
って何だろうということで、色々と調べてみました。自分の中ではざっくりと以下のような理解になっています。
- Akka とは、 アクターモデル で並行処理が出来るライブラリ。
- アクターモデル とは、以下の図ように複数の アクター(Actor) 同士が並行的にメッセージのやりとりを行うもの。
- アクター(Actor) とは、非同期でメッセージの送受信(スレッド、プロセス、ネットワーク間)と処理が出来て、それぞれが状態を持っているオブジェクトで、アクター(Actor)同士は状態を気にしない。
状態を気にしない。 良い響きですね。通常のマルチスレッドモデルでやっていると、アプリもプログラマーも寿命が短くなるだけですからね・・・。
Akka
で提供されている機能等は以下があります。
-
Actor
- 上記と同じ
-
ActorSystem
- Actorの管理者。Actorを取得したり作成したりするときに必要。
-
tell(!) ask(?)
- tell : メッセージ送信メソッド。投げっぱなしで返答を待たない。 別名で
!
- ask : メッセージ送信メソッド。メッセージの返答を待てる。別名で
?
- tell : メッセージ送信メソッド。投げっぱなしで返答を待たない。 別名で
-
ActorPath
- ActorのURI。ActorPathからActorを取得したり、直接メッセージを送信することもできる。
-
Router
- 複数のActorに対してメッセージをいい感じに振り分ける(Routing)もの。
-
SupervisorStrategy
- Actorは親子関係と複数の子を持たせることが出来る。その子達の管理(停止したら再起動するかとかを決める)するもの。
-
Akka Cluster
- (いまいちよく分かってないので、もう少し学んでから書きたいと思います。)
次からは実際にAkka
を使用してやったことを書いていきます。
Akkaでやろうと思ったこと
Akka
のActor
が途中で動作出来なくなったらどうなるのか、どうすれば良いのかを知りたくて、色々と試してみました。
Actor内でC/C++ライブラリを動かす
まずは簡単なアプリを作り、その中で図のようにActor
を2つ作成し、Actor1
でC/C++ライブラリ
を使用して処理をさせてみようと思いました。
この場合のActor
は、内部でスレッドプールを持ち、Actor
同士がメッセージのやりとりをします。
C/C++ライブラリ
を使おうと思った理由は、非常に重い処理などをC/C++で処理させたい人もいたり、場合によっては、ずっと昔からあるパンドラの箱のようなC/C++ライブラリ
を使用しないとダメなこともあるかもしれません。
そんなことを妄想しながらやってみました。
(以下の図は、あくまでも今回作成したアプリでのイメージです。コメントでご指摘頂いた通り、実際にはActorとスレッドは一対一に対応しているものではありません。)
ただし、C/C++ライブラリ
は内部でちゃんと例外処理をしていないと、誰も例外をcatch出来ずに、突然亡くなる可能性があります。
今回の場合だと、亡くなると同時にプロセスも道連れにするので、アプリ自体が亡くなってしまいました。
ライブラリ内で完璧に例外処理をやればいいのでは。ということになりますが、まあ確かにその通りです。
ただ、ライブラリがパンドラの箱だった場合には、そんなことをするのはなかなか厳しいと思われます。
(ウチにはそんな遺産がなくて本当によかった)
(以下の図は、あくまでも今回作成したアプリでのイメージです。コメントでご指摘頂いた通り、実際にはActorとスレッドは一対一に対応しているものではありません。)
Remote Actorを使う
アプリ自体をずっと生かした状態でどうにか出来ないかと思い、Remote Actor
を使うことにしました。
Remote Actor
を使えば、以下の図のようにActor1
を別のプロセスに置いて、プロセス間でのメッセージ送受信が簡単に出来ます。
Remote Actor
と言っても、akka-remote
ライブラリを読み込んで、Actor
を作る際の設定を変えるだけで簡単に使用できます。
これで、プロセスB
内のC/C++ライブラリ
が亡くなってもプロセスA
は生き続け、プロセスB
を再起動すれば、また問題なく動き出します。
ActorPath
が変わらなければ、Akka
が接続、再接続を勝手にやってくれるため、何も気にせず再起動するだけで良いです。
(以下の図は、あくまでも今回作成したアプリでのイメージです。コメントでご指摘頂いた通り、実際にはActorとスレッドは一対一に対応しているものではありません。)
実際に作ったアプリ
というわけで、Remote Actor
を使用してアプリを作ってみました。
概要
Playを使用したWebApp
というウェブアプリと、Akka
だけを使用したシンプルなRemoteApp
というアプリを作成しました。
WebApp
はClientObject
というオブジェクトと、ClientActor
というActor
を持ち、リクエストを受けたらClientObject
を使用してClientActor
にメッセージ(ask
)を送ります。
さらにClientActor
はRemoteActor
にメッセージ(ask
)を送ります。
RemoteApp
はRemoteActor
とそのActor
内でC/C++ライブラリ
を使用します。
RemoteActor
でClientActor
からのメッセージを受け取るとC/C++ライブラリ
で処理をしてその結果をClientActor
にメッセージ(tell
)を返します。
次から、WebApp
、RemoteApp
の実際に書いたコードを紹介します。
(※コードの一部となりますので、これだけで全てが動くわけではありません。)
WebAppを作る
まずはWebApp
から作成します。実際のプロジェクトの構成は以下のようにしました。
Playのプロジェクトを作成
(コメントでのご指摘の通り、現在activator
は非推奨となっております。sbt
を使用してください。playに関してはここ。この辺りはちゃんと調査した上で後ほど追記したいと思います。)
今回はTypesafe Activator
を使用します。brewなどでtypesafe-activator
をインストールして
$ activator new
6) play-scala
を選択します。これでPlayのプロジェクトが出来ます。
confファイルを作成
ActorSystem
を作成するためのコンフィグファイルを作成します。
akka {
loglevel = "INFO"
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2155
}
log-sent-messages = on
log-received-messages = on
}
}
app.remote-app.remote-actor = "akka.tcp://remote-app@127.0.0.1:2150/user/remote"
ルーティング
routes
ファイルにshow
アクションを追加します。
GET /:number controllers.HomeController.show(number: Long)
送受信データの定義
送受信するメッセージオブジェクトを定義します。
object common {
// パターンマッチで使用できるようにcase class
case class SendToClient(msg: Long) // ClientActorへ送信する用
case class SendToRemote(msg: Long) // RemoteActorへ送信する用
case class ResponseMsg(msg: String)// 返答用
}
Actorの作成
ClientActor
を作成します。
class ClientActor extends Actor {
// askのタイムアウトを設定する
// askの定義を見ると implicit引数でtimeoutがあります。
// implicitでtimeoutを定義すると、コンパイラがTimeout型のimplicit定義を探し、勝手に引数に繋げてくれます。
implicit val timeout: Timeout = Timeout(2000 milliseconds)
// configファイルに書いてあるRemoteActorのActorPathから、RemoteActorを取得
val remoteActor: ActorSelection = context.actorSelection(ClientObject.config.getString("app.remote-app.remote-actor"))
override def receive: Receive = {
case SendToClient(send: Long) =>
// askのタイムアウトがFailureではなく、例外で返ってくるため try catchする(仕様)
try {
val tempSender = sender
val f = remoteActor ? SendToRemote(send) // RemoteActorにaskでメッセージ送信。timeoutは上でimplicitで定義
f.onComplete { // 処理が終わればコールされる
case Success(s) =>
Logger.info(s"ClientActor : Success : $s")
tempSender ! s // ClientObjectに返す
case Failure(e) =>
Logger.info(s"ClientActor : Failure : $e")
throw e
}
Await.ready(f, timeout.duration) // 処理が終わるのを待つ
} catch {
// だいたいタイムアウトが入る
case e: Exception =>
Logger.info(s"ClientActor : Exception : $e")
throw e
}
case any =>
Logger.info(s"Unknown message $any from $sender")
}
}
ClientObjectを作成
ClientObject
を作成します。
ClientActor
を使用してメッセージの送受信を行なっています。
object ClientObject {
// askのタイムアウトを設定する
implicit val timeout: Timeout = Timeout(3000 milliseconds)
val configFile: String = getClass.getClassLoader.getResource("client.conf").getFile
val config: Config = ConfigFactory.parseFile(new File(configFile))
// configファイルからActorSystemを作成する
val system = ActorSystem("client-system", config)
// ActorSystemを使って、ClientActorを作成する
val clientActor: ActorRef = system.actorOf(Props[ClientActor], name = "client")
// Long型の整数を受け取り、Eitherで結果を返す
// 失敗時にはLeftにエラーのオブジェクト(Throwable)、成功時にはRightにその結果(String)
// 別に逆にしても問題ないが、正解は英語でRightととも言うので・・・
def sendNumberToRemote(n: Long): Either[Throwable, String] = {
try {
val f = clientActor ? SendToClient(n) // ClientActorにaskでメッセージ送信
Await.ready(f, timeout.duration) // 処理が終わるのを待つ
f.value.get match { // 現在のFutureの状態を取得する
case Success(ResponseMsg(s: String)) =>
Logger.info(s"ClientObject : Success : Right($s)")
Right(s) // 成功なのでRightに結果を入れて返す
case Failure(e) =>
Logger.info(s"ClientObject : Failure : Left($e)")
Left(e) // 失敗なのでLeftにThrowableを入れて返す
case _ =>
Logger.info(s"ClientObject : ??? : Right(Unknown)")
Right("Unknown")
}
} catch {
case e: Exception =>
Logger.info(s"ClientObject : Exception : Left($e)")
Left(e) // 例外発生したので、失敗としてLeftにThrowableを入れて返す
}
}
}
ControllerとView
HomeController
にshow
アクションを追加します。
def show(number: Long) = Action {
// Eitherはパターンマッチで以下のようにRightとLeftの場合で処理できる
ClientObject.sendNumberToRemote(number) match {
case Right(s) => Ok(views.html.show(s"Show => $number * 2 = $s")) // 成功時
case Left(e) => Ok(views.html.show(s"Error!!! : $e")) // 失敗時
}
}
View
にshow
アクションのためのshow.scala.html
を追加する。
@(message: String)
<!DOCTYPE html>
<html lang="ja">
<head>
<meta charset="utf-8">
<title>Akka Remote 学習</title>
</head>
<body>
@message
</body>
</html>
RemoteAppを作る
次にRemoteAppを作ります。プロジェクトの構成は以下になります。
最小限のAkkaのプロジェクトを作成する
(コメントでのご指摘の通り、現在activator
は非推奨となっております。sbt
を使用してください。)
$ activator new
2) minimal-akka-scala-seed
を選択。
confファイルを作成
WebApp
と同様に、ActorSystem
を作成するためのコンフィグファイルを作成します。
akka {
loglevel = "INFO"
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2150
}
log-sent-messages = on
log-received-messages = on
}
}
RemoteAppを作成
ここでRemoteActor
を作成しています。
object RemoteApp extends App {
val configFile = getClass.getClassLoader.getResource("remote.conf").getFile
val config = ConfigFactory.parseFile(new File(configFile))
val system = ActorSystem("remote-app", config)
val remote = system.actorOf(Props[RemoteActor], name="remote")
system.log.info("Remote is ready")
}
JNIでC/C++ライブラリを使用
今回はC/C++ライブラリ
を呼び出すために、JNI
を使用してみました。
scala
からは簡単にjava
を呼び出すことができます。
JNI
は書くのが面倒なので、JNA
を使って頂いても問題ありません。
public class CpplibJNI {
static {
System.loadLibrary("cpplib"); // 頭の"lib"と拡張子はいらない
}
public native String cpplib(int number);
}
Actorを作成
RemoteActor
を作成します。
class RemoteActor extends Actor with ActorLogging {
override def receive: Receive = {
case SendToRemote(l: Long) =>
log.info(s"Remote received $l from $sender")
// javaのオブジェクトを作成し、メソッドをコール
val jni = new CpplibJNI
val str: String = jni.cpplib(l.toInt)
sender ! ResponseMsg(str)
case any =>
log.info(s"Remote received unknown message $any from $sender")
}
}
C/C++ライブラリを作る
ここはちょっと自分用メモ
ヘッダファイルを作成
javah
を使用して、C/C++ライブラリ
作成のためのヘッダファイルを作成します。
RemoteApp
のディレクトリで、以下のコマンドを実行します。
$ javah -classpath {classesのパス} -jni CpplibJNI
パッケージ名_クラス名.h
というファイルが作成されるはずです。
自分は、jni
というパッケージの下にCpplibJNI
クラスを作成したので、jni_CpplibJNI.h
が出来ました。
ヘッダの中身を見ると、Java_パッケージ名_クラス名_メソッド名
の定義があるはずです。
自分は、Java_jni_CpplibJNI_cpplib
という名前の関数定義が出来ていました。
後はこの名前のC関数を作成すれば良いだけです。
ビルドする
以下のcppファイルを作成します。(.cでも良いです)
#include <iostream>
#include <stdio.h>
#include "jni_CpplibJNI.h"
JNIEXPORT jstring JNICALL Java_jni_CpplibJNI_cpplib(JNIEnv *env, jobject obj, jint jNumber) {
int iNum = (int)jNumber;
if (iNum == 0) {
int zero = 0;
int ret = 8 / zero; // 0が来たら0除算で亡くなってもらう
}
char buffer_c[256];
// int型をもらって倍にしてから、stringを返す
sprintf (buffer_c, "%d", (iNum * 2));
return env->NewStringUTF(buffer_c);
}
では、gcc
を使用してビルドします。
JNIのパス指定をすること。
Macの場合はライブラリの命名規則として、lib*****.dylib
とする必要があります。
$ gcc -dynamiclib -I /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/include -I /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/include/darwin -o libcpplib.dylib cpplib.cpp
アプリを動かす
RemoteAppを起動
(コメントでのご指摘の通り、現在activator
は非推奨となっております。sbt
を使用してください。)
-Djava.library.path={C/C++ライブラリのパス}
を忘れないように。
$ activator run -Xms512M -Xmx1024M -Xss1M -XX:+CMSClassUnloadingEnabled -Djava.library.path={C/C++ライブラリのパス}
WebAppを起動
(コメントでのご指摘の通り、現在activator
は非推奨となっております。sbt
を使用してください。)
$ activator run -Xms512M -Xmx1024M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M
使ってみる
ブラウザで http://localhost:9000/35
以下のように倍の数字が表示されればOK。
ブラウザで http://localhost:9000/0
以下のようにタイムアウトが出て、RemoteApp
は落ちていても、WebApp
は生きています。
再度RemoteApp
を起動すればまた倍の値を表示できます。
ハマったところ
実装していて少し詰まったところです。
型パラメータ
Akka
ではなくscala
の話なのですが、実は送受信データの定義で、最初は以下のように型パラメータを使用していました。
(必要ないと思ったので、途中で変えました。)
package object common {
case class SendToClient[A](msg: A)
case class SendToRemote[A](msg: A)
case class ResponseMsg[A](msg: A)
}
その際に、受信側の処理として、次のような書き方をしてしまいました。
override def receive: Receive = {
case l: SendToRemote[Long] =>
これをビルドすると、warning
が出てしまいます。
これは、 クラスや関数で実行時に指定される抽象型の情報は、Javaの型消去によって判定できないらしく、この書き方ではコンパイル時に型の情報が消えてしまい、判定が出来ないという警告のようです。
non-variable type argument Long in type pattern services.common.SendToRemote[Long] is unchecked since it is eliminated by erasure
というわけで、次のように書けば判定が出来るようです。
override def receive: Receive = {
case SendToRemote(l: Long) =>
Future
val f = remoteActor ? SendToRemote(send)
このval f
にはFuture
が入ります。Future
とは 非同期に処理される結果が入ったもの だそうです。
非同期で処理されるため、以下で処理を待っています。
Await.ready(f, timeout.duration)
処理が終わると、
f.onComplete {
...
}
で結果を取得できます。
ClientObject
での処理のように、Await.ready
で処理をまった後にFuture.value.get
で、現在のFuture
の値を取得するようなやり方もあります。
sender
sender
とはActor
内のreceive
処理でメッセージを受け取った際の送信元Actor
が入っています。
返信する際には、このsender
にtell
ask
します。
val tempSender = sender
についてですが、なぜわざわざここでsender
を保持しているのか。
試しに try {
の直後と、case Success(s) =>
の直後で sender
の中身をログに出してみます。
-
try {
の直後
try {
Logger.info(s"$sender") // ←ここでログ
val tempSender = sender
...
=> Actor[akka://client-system/temp/$a]
-
case Success(s) =>
の直後
case Success(s) =>
Logger.info(s"$sender") // ←ここでログ
Logger.info(s"ClientActor : Success : $s")
tempSender ! s
=> Actor[akka://client-system/deadLetters]
全然違うものが入っています。
よく考えてみると、try {
の直後は、ClientObject
から来たメッセージで、case Success(s) =>
の直後は、RemoteActor
から来たメッセージとなり、送ったActor
が違うので当然です。
なので、ここでClientObject
からのsender
を一旦保持させています。
(なんか良い方法ないのか。そもそもなんか設計おかしいかも。)
RemoteActor
でも同様にログを取ってみました。
class RemoteActor extends Actor with ActorLogging {
override def receive: Receive = {
case SendToRemote(l: Long) =>
log.info(s"$sender") // ←ここでログ
=> Actor[akka.tcp://client-system@127.0.0.1:2155/temp/$b]
こちらもtry {
の直後とよく似た/temp/$b
というActorからメッセージが来ているようです。
どうやらask
の場合は返信を受け取る用の一時的なActor
が/temp
の下に作成されているようです。
もう一つの/deadLetters
ですが、これは配信することが出来なかったメッセージを扱うActor
だそうです。
RemoteActor
はtell
でメッセージを送信しています。
tell
は返答を受け取らないので、/deadLetters
が送信してくれて、これに対してメッセージ返信しても配信することが出来なかったと処理するようにしている感じでしょうか。
というわけで、さっとClientObject
にだけ/temp
のActor
追記すると、以下のようなイメージですかね。
そうなると実は以下のように直接RemoteActor
とメッセージのやりとりをすれば、ClientActor
は全く必要が無くなります。
では、なぜ今回のような構成にしたのかというと・・・
次にやろうと思っていたことが、以下のようにRouter
を使用して複数のActor
と繋ぎたいと思っていたため、単純にClientActor
をRouter
に差し替えるだけで動くように出来そうなので、このようにしました。
実際にRouter
を使ってみたら変わるかもですが。
この先はまだ勉強中で、今回は以上となります。
おまけ
IntelliJ IDEA
でPlay
のプロジェクトを動かしていると、たまに以下のようなエラーが出てPlay
のアプリが起動しなくなることがあります。
そんな時は File
=> Invalidate Caches / Restart...
をすると直ります。
BootException: Cannot redefine component. ID: org.scala-sbt-compiler-interface-0.13.11-bin_2.11.7__52.0, files: ~~~~
まとめ
色々と勉強を進めていると、Actor
を作って繋げて切り離しすということが簡単に出来、Akka
(というかアクターモデル?)はかなり柔軟性が高いと感じました。
柔軟性高い=とんでもない設計が生まれる可能性もありますが・・・。
今後は、アプリが機能を失わずに動かし続けるために、複数のActor
を起動し、それをRouter
管理し、さらにActor
を監視して、もし動いていなければ再起動させるというようなことをしたいと考えています。
まだAkka Cluster
についてちゃんと勉強していないので、これもやりたいですね。
今後もちまちまと勉強を進めていきたいと思います。
あと、コードを整理したらGitHubにあげます!
おわりに
なんか全体的に自分用のメモっぽい感じになってしまいましたが・・・。
こうやって学習内容を書いていくことは、自分の理解の確認と整理ができるので大切ですね。
また機会あれば書きたいと思います。
そうだ Scala Matsuri 2018 行こう