Help us understand the problem. What is going on with this article?

Scalaの並列処理・非同期処理

More than 5 years have passed since last update.

Scalaでは障害に強い非同期のフレームワークAkkaがあります。
例えば既存のバッチ処理にAkkaを導入する場合、
AkkaのActorモデルに合わせて全体の構成見直したほう奇麗に設計できますが、
大掛かりな修正になってしまうかもしれません。

非同期の処理単位での障害発生時のリトライや復旧を必要としない場合、
Akkaを導入しなくても、簡単に並列、非同期な処理を取り入れることができます。

サーバーのリソースがあまっていて、スケールせずに手軽に並列化したい場合に有効な手段です。

環境構築

実行環境は、Scalaの言語を開発しているTypesafeが提供しているActivatorで作ります。

https://typesafe.com/platform/getstarted

1. Activator をダウンロード

http://downloads.typesafe.com/typesafe-activator/1.2.10/typesafe-activator-1.2.10.zip?_ga=1.61783916.137862477.1412681769

2. testプロジェクトの作成

Activatorをダウンロードしたら、解凍して、
activatorコマンドでScalaのプロジェクトを作成します。

unzip typesafe-activator-1.2.10.zip
~/Downloads/activator new test

Fetching the latest list of templates...

Browse the list of templates: http://typesafe.com/activator/templates
Choose from these featured templates or enter a template name:
  1) minimal-java
  2) minimal-scala
  3) play-java
  4) play-scala
(hit tab to see a list of all templates)

テンプレートをどれを使うか聞いてくるので、2を選択してScalaのプロジェクトを作ります。

> 2
OK, application "test" is being created using the "minimal-scala" template.

testプロジェクトができました。

ll ./test
total 2384
-rw-r--r--  1 N1407A003  staff      551 10  7 20:17 LICENSE
-rwxr--r--  1 N1407A003  staff     9593 10  7 20:17 activator
-rw-r--r--  1 N1407A003  staff  1188338 10  7 20:17 activator-launch-1.2.3.jar
-rwxr--r--  1 N1407A003  staff     6974 10  7 20:17 activator.bat
-rw-r--r--  1 N1407A003  staff      294 10  7 20:17 build.sbt
drwxr-xr-x  3 N1407A003  staff      102 10  7 20:17 project
drwxr-xr-x  4 N1407A003  staff      136 10  7 20:17 src

3.サンプルの実行

サンプルのハローワールドを実行してみます。

cd test
./activator run

[info] Running com.example.Hello 
Hello, world!
[success] Total time: 3 s, completed 2014/10/07 21:31:41

src/main/scala/com/example/Hello.scalaが実行されました。

src/main/scala/com/example/Hello.scala
package com.example

object Hello {
  def main(args: Array[String]): Unit = {
    println("Hello, world!")
  }
}

これで準備ができました。
Hello.scalaと同じ階層にサンプルを作って試していきます。

簡単にできる並列処理

Scalaではループ処理を簡単に並列化できます。
この例では、通常のループでは6秒かかっているものが、並列で実行することで2秒でおわります。

1.通常のループ

src/main/scala/com/example/Parallel.scala
package com.example

object Parallel {
  def main(args: Array[String]): Unit = {
     (1 to 10).map { i => //ここの中がループ
         println(i)
         Thread.sleep(500) //1回の処理に500ミリ秒かかるとする
     }
  }
}

実行結果

cd test
./activator
> runMain com.example.Parallel
[info] Updating {file:/Users/N1407A003/work/test/}test...
[info] Resolving jline#jline;2.11 ...
[info] Done updating.
[info] Running com.example.Parallel 
1
2
3
4
5
6
7
8
9
10
[success] Total time: 6 s, completed

2.並列に変更後

src/main/scala/com/example/Parallel.scala
package com.example

object Parallel {
  def main(args: Array[String]): Unit = {
     //(1 to 10).map { i => //元の処理
     (1 to 10).par.map { i =>  //parでparallel collection に変換
         println(i)
         Thread.sleep(500) //1回の処理に500ミリ秒かかるとする
     }
  }
}

実行結果

> runMain com.example.Parallel
[info] Running com.example.Parallel 
1
8
6
7
9
2
3
10
4
5
[success] Total time: 2 s, completed

並列で実行する場合、コレクションの順番がバラバラに実行されます。
順番に処理する必要がない場合に有効です。

非同期処理

時間のかかるメソッドがそれぞれ独立した処理をしている場合、
それらを非同期実行することで、全体の処理時間を短縮できる場合があります。

1.ライブラリーの追加

依存性管理にscala-asyncを追加します。

build.sbt
name := """test"""

version := "1.0"

scalaVersion := "2.11.1"

// Change this to another test framework if you prefer
libraryDependencies ++= Seq(
  "org.scalatest" %% "scalatest" % "2.1.6" % "test",
  "org.scala-lang.modules" %% "scala-async" % "0.9.2"
)

Activatorのコンソールを起動している場合、プロジェクトの設定をリロードします。

> reload
[info] Loading project definition from /Users/N1407A003/work/test/project
[info] Set current project to test (in build file:/Users/N1407A003/work/test/)

2.同期実行

処理時間が5秒かかるfuncAと10秒かかるfuncBでは、順番に実行すると全体で15秒程度かかります。

src/main/scala/com/example/Async.scala
package com.example

object Async {

  def main(args: Array[String]): Unit = {
     logic//15秒かかる
  }


  def funcA = {
    println("funcA start")
    Thread.sleep(5000);//5秒かかる処理 
    println("funcA end")
    true
  }

  def funcB = {
    println("funcB start")
    Thread.sleep(10000)//10秒かかる処理
    println("funcB end")
    true
  }

  def logic = {
    funcA //5秒かかる
    funcB //10秒かかる
  }
}

実行結果

> runMain com.example.Async
[info] Running com.example.Async 
funcA start
funcA end
funcB start
funcB end
[success] Total time: 15 s, completed

3.非同期実行

これらのfuncAとfuncBを非同期に実行することで、funcAはfuncBの処理中に完了できます。
全体では10秒で処理を完了できます。

package com.example

import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Await}
import scala.async.Async.{async, await}

object Async {
  def main(args: Array[String]): Unit = {

     val finished = Await.result(asyncLogic,Duration.Inf)
     if (finished) {
       //AとBが両方完了したら実行する処理
       println("finish")
     }
  }

  def funcA = {
    println("funcA start")
    Thread.sleep(5000);//5秒かかる処理
    println("funcA end")
    true
  }

  def funcB = {
    println("funcB start")
    Thread.sleep(10000)//10秒かかる処理
    println("funcB end")
    true
  }

  def asyncLogic: Future[Boolean] = async {
     val a = async {
        funcA
     }

     val b = async {
        funcB
     }
     //aとbの結果を待ってから返す
     if(await(a)) await(b) else false
  }

}

実行結果

> runMain com.example.Async
[info] Running com.example.Async 
funcA start
funcB start
funcA end
funcB end
finish
[success] Total time: 10 s, completed

yhidai
idcf
未来をささえる、Your Innovative Partner
http://www.idcf.jp/cloud/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away