Scalaでは障害に強い非同期のフレームワークAkkaがあります。
例えば既存のバッチ処理にAkkaを導入する場合、
AkkaのActorモデルに合わせて全体の構成見直したほう奇麗に設計できますが、
大掛かりな修正になってしまうかもしれません。
非同期の処理単位での障害発生時のリトライや復旧を必要としない場合、
Akkaを導入しなくても、簡単に並列、非同期な処理を取り入れることができます。
サーバーのリソースがあまっていて、スケールせずに手軽に並列化したい場合に有効な手段です。
環境構築
実行環境は、Scalaの言語を開発しているTypesafeが提供しているActivatorで作ります。
1. Activator をダウンロード
2. testプロジェクトの作成
Activatorをダウンロードしたら、解凍して、
activatorコマンドでScalaのプロジェクトを作成します。
unzip typesafe-activator-1.2.10.zip
~/Downloads/activator new test
Fetching the latest list of templates...
Browse the list of templates: http://typesafe.com/activator/templates
Choose from these featured templates or enter a template name:
1) minimal-java
2) minimal-scala
3) play-java
4) play-scala
(hit tab to see a list of all templates)
テンプレートをどれを使うか聞いてくるので、2を選択してScalaのプロジェクトを作ります。
> 2
OK, application "test" is being created using the "minimal-scala" template.
testプロジェクトができました。
ll ./test
total 2384
-rw-r--r-- 1 N1407A003 staff 551 10 7 20:17 LICENSE
-rwxr--r-- 1 N1407A003 staff 9593 10 7 20:17 activator
-rw-r--r-- 1 N1407A003 staff 1188338 10 7 20:17 activator-launch-1.2.3.jar
-rwxr--r-- 1 N1407A003 staff 6974 10 7 20:17 activator.bat
-rw-r--r-- 1 N1407A003 staff 294 10 7 20:17 build.sbt
drwxr-xr-x 3 N1407A003 staff 102 10 7 20:17 project
drwxr-xr-x 4 N1407A003 staff 136 10 7 20:17 src
3.サンプルの実行
サンプルのハローワールドを実行してみます。
cd test
./activator run
[info] Running com.example.Hello
Hello, world!
[success] Total time: 3 s, completed 2014/10/07 21:31:41
src/main/scala/com/example/Hello.scalaが実行されました。
package com.example
object Hello {
def main(args: Array[String]): Unit = {
println("Hello, world!")
}
}
これで準備ができました。
Hello.scalaと同じ階層にサンプルを作って試していきます。
簡単にできる並列処理
Scalaではループ処理を簡単に並列化できます。
この例では、通常のループでは6秒かかっているものが、並列で実行することで2秒でおわります。
1.通常のループ
package com.example
object Parallel {
def main(args: Array[String]): Unit = {
(1 to 10).map { i => //ここの中がループ
println(i)
Thread.sleep(500) //1回の処理に500ミリ秒かかるとする
}
}
}
実行結果
cd test
./activator
> runMain com.example.Parallel
[info] Updating {file:/Users/N1407A003/work/test/}test...
[info] Resolving jline#jline;2.11 ...
[info] Done updating.
[info] Running com.example.Parallel
1
2
3
4
5
6
7
8
9
10
[success] Total time: 6 s, completed
2.並列に変更後
package com.example
object Parallel {
def main(args: Array[String]): Unit = {
//(1 to 10).map { i => //元の処理
(1 to 10).par.map { i => //parでparallel collection に変換
println(i)
Thread.sleep(500) //1回の処理に500ミリ秒かかるとする
}
}
}
実行結果
> runMain com.example.Parallel
[info] Running com.example.Parallel
1
8
6
7
9
2
3
10
4
5
[success] Total time: 2 s, completed
並列で実行する場合、コレクションの順番がバラバラに実行されます。
順番に処理する必要がない場合に有効です。
非同期処理
時間のかかるメソッドがそれぞれ独立した処理をしている場合、
それらを非同期実行することで、全体の処理時間を短縮できる場合があります。
1.ライブラリーの追加
依存性管理にscala-asyncを追加します。
name := """test"""
version := "1.0"
scalaVersion := "2.11.1"
// Change this to another test framework if you prefer
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "2.1.6" % "test",
"org.scala-lang.modules" %% "scala-async" % "0.9.2"
)
Activatorのコンソールを起動している場合、プロジェクトの設定をリロードします。
> reload
[info] Loading project definition from /Users/N1407A003/work/test/project
[info] Set current project to test (in build file:/Users/N1407A003/work/test/)
2.同期実行
処理時間が5秒かかるfuncAと10秒かかるfuncBでは、順番に実行すると全体で15秒程度かかります。
package com.example
object Async {
def main(args: Array[String]): Unit = {
logic//15秒かかる
}
def funcA = {
println("funcA start")
Thread.sleep(5000);//5秒かかる処理
println("funcA end")
true
}
def funcB = {
println("funcB start")
Thread.sleep(10000)//10秒かかる処理
println("funcB end")
true
}
def logic = {
funcA //5秒かかる
funcB //10秒かかる
}
}
実行結果
> runMain com.example.Async
[info] Running com.example.Async
funcA start
funcA end
funcB start
funcB end
[success] Total time: 15 s, completed
3.非同期実行
これらのfuncAとfuncBを非同期に実行することで、funcAはfuncBの処理中に完了できます。
全体では10秒で処理を完了できます。
package com.example
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Await}
import scala.async.Async.{async, await}
object Async {
def main(args: Array[String]): Unit = {
val finished = Await.result(asyncLogic,Duration.Inf)
if (finished) {
//AとBが両方完了したら実行する処理
println("finish")
}
}
def funcA = {
println("funcA start")
Thread.sleep(5000);//5秒かかる処理
println("funcA end")
true
}
def funcB = {
println("funcB start")
Thread.sleep(10000)//10秒かかる処理
println("funcB end")
true
}
def asyncLogic: Future[Boolean] = async {
val a = async {
funcA
}
val b = async {
funcB
}
//aとbの結果を待ってから返す
if(await(a)) await(b) else false
}
}
実行結果
> runMain com.example.Async
[info] Running com.example.Async
funcA start
funcB start
funcA end
funcB end
finish
[success] Total time: 10 s, completed