Scalaでスレッド(Thread)を使いこなすために、基礎から学びつつ
Scalaの関数型ライブラリである、Cats-EffectのIOデータ型を理解するために参考にした記事の備忘録です。
Scalaを利用して並列処理を実装される際の参考になれば幸いです。
スレッドの基本的な考え方・使い方
スレッド(thread)とは、プログラムが処理を実行する単位をプログラマの必要に応じて増やせるもの
プログラムでのスレッドの使い道は、例えば以下のものです。
- 違うことを同時に行う:あるスレッドでネットワーク通信をしつつ、別のスレッドではユーザが行った画面操作への処理ができます
- 処理にかかる時間を短くする:大きな問題を同時に処理できるより小さな問題に分割し、小さな問題をスレッドで同時に処理して、全体時間を短縮します
- 単位時間当たりの処理量を増やす:コンピュータにスレッドを同時に動かせるだけの性能があれば、単位時間当たりに処理できる量を増やせます
並行処理と並列処理
並行処理(concurrent processing)
1つのCPU(コア)スレッドを切り替えながら複数のタスクを実行すること。
人間の目には複数のタスクが同時に進んでいるように見える
実行するスレッドを切り替える必要があるのでコンテキストスイッチのコストがかかることに留意(=スレッドをたくさん作って、それらを並行処理するとコンテキストスイッチによる無駄な時間が多くなりパフォーマンスが悪化する)
並行処理はCPUのアイドル時間をなくすために用いる。
以下のようにCPUバウンドな処理とIOバウンドな処理を繰り返す例を思い浮かべると効果がわかりやすい
TASK_A: CPU処理ー>IO処理ー>CPU処理ー>IO処理
TASK_B: IO処理ー>CPU処理ー>IO処理ー>CPU処理
並列処理(multiple processing)
複数のコアを使って同時に複数のタスクを実行すること。
ネイティブスレッド と グリーンスレッド
ネイティブスレッド
https://ja.wikipedia.org/wiki/スレッド_(コンピュータ) 引用
ライトウェイトプロセス(light-weight process、LWP)または軽量プロセスとは、
スレッドを複数並行して実行するためのカーネル内の機構。
マルチプロセッシングにおいて、ひとつのプロセス内のスレッドを複数個同時に実行する仕組みである。
カーネルスレッドとLWPを総称してネイティブスレッドと呼ぶこともある。
カーネルレベルで実現されるスレッド。
コンテキストスイッチのコストはプロセス並みに大きい(マイクロ秒くらいらしい)
ネイティブスレッドはOSのスケジューラーによって資源割当を行うため、並列処理が可能
グリーンスレッド
ユーザ空間(ユーザープロセスの動作するアドレス空間)で実装されたスレッド機構のうち、特にVM上で動くものがグリーンスレッド。
OSのスケジューラーが関与しないため、マルチコア対応が不可能。
コンテキストスイッチのコストが低い(ナノ秒くらいらしい)ため1コアに対して大量のスレッド必要な場合などに有用(ref: C10K問題)
多くの言語でネイティブスレッドが標準で採用されているが、RubyとPythonはマルチコアの恩恵を受けにくいのと、Goはネイティブスレッドと軽量スレッドのハイブリッドパターンになっている。
- Ruby・・GVLの機構により同時に実行されるネイティブスレッドは基本は1つのみ(ただし、ネットワークIO、ディスクIO等のAPIはその限りではないらしい) ※GVL(Giant VM lock)=GIL(グローバルインタプリタロック)
- Python・・GILあり
- Go・・ネイティブスレッド&軽量スレッド(goroutine)
PythonやRubyはスレッドセーフでないC言語のライブラリを使って作られているのでGILの機構が必要
Ruby、Python x 並列とかでググると、並列処理にスレッドを使えという記事が散見されるが、これらは誤り。マルチプロセスでやらないといけない。
ScalaのFutureで並列処理
Scalaでは scala.concurrent.Future というAPIが提供されており、これにより並列処理を実現できる。
object FutureApp extends App {
import scala.concurrent.ExecutionContext.Implicits.global
private val result = for {
s <- Future("hello")
} yield s
Await.ready(result, Duration.Inf)
result onComplete {
case Success(s) => println(s)
case Failure(e) => println("エラーが発生した: " + e.getMessage)
}
}
Futureを生成すると、内部的にはJavaのRunnableに変換される。
このRunnableをExecutionContextが良い感じにスレッドに割り当ててくれる。
スレッドは先述した通り、OSのスケジューラーにより均等に割り当てられ、実行されていく。
※つまり、Futureはスレッドを新しく作るというわけではない
Futureの注意点
よく「Futureでブロッキングを発生させるな」という話がある
まず、Futureを実行するときにExecutionContextを求められるので脳死でimport scala.concurrent.ExecutionContext.Implicits.global をすることがあると思う。
globalで提供されるExecutionContextは標準で「利用可能なコア数の同数のスレッド」を持つ。
このExecutionContext上でブロッキングする処理を入れてしまうと、単純に1スレッドが無駄に専有されてしまう。
この状況下ではCPUが処理をしていないのにも関わらず、スレッドに別の仕事をさせることができなくなり、パフォーマンスが劣化する。
ブロッキング含む処理をしたいときはどうしたら良いか
ブロッキング処理専用のExecutionContext(新しいスレッド)を作り、そこにFutureを割り当ててる。そうすると、globalのECを汚染することはない
ただ、OSのスケジューラーはスレッドに対して均等に処理時間を割り当てるのでスレッドを増やしすぎてもコンテキストスイッチのコストが上がってしまう。
for式の中で使うと並列化されない
for式は変換を直列化するので、Futureをfor式の中で使うと並列化されない
1. for式の中でFutureを使った場合
object FutureApp2 extends App {
import scala.concurrent.ExecutionContext.Implicits.global
private def task(name: String, milliSeconds: Int) = Future {
println(s"Thread: ${Thread.currentThread.getName}")
println(s"Task Name: $name")
Thread.sleep(milliSeconds)
name
}
private val start = System.currentTimeMillis
private val result = for {
_ <- task("task1", 1000)
_ <- task("task2", 1000)
} yield ()
Await.ready(result, Duration.Inf)
result onComplete {
case Success(_) => println("処理時間: " + (System.currentTimeMillis - start) + " ミリ秒")
case Failure(e) => println("エラーが発生した: " + e.getMessage)
}
}
// 結果
Thread: scala-execution-context-global-11
Task Name: task1
Thread: scala-execution-context-global-13
Task Name: task2
処理時間: 2229 ミリ秒
直列処理になるため処理時間: 2620 ミリ秒になる。
2. for式の外でFutureを使った場合
mainスレッドと別のスレッドで動くため、非同期に実行される。
object FutureOutApp extends App {
import scala.concurrent.ExecutionContext.Implicits.global
private def task(name: String, milliSeconds: Int) = Future {
println(s"Thread: ${Thread.currentThread.getName}")
println(s"Task Name: $name")
Thread.sleep(milliSeconds)
name
}
private val task1 = task("task1", 1000)
private val task2 = task("task2", 1000)
private val start = System.currentTimeMillis
private val result = for {
_ <- task1
_ <- task2
} yield ()
Await.ready(result, Duration.Inf)
result onComplete {
case Success(_) => println("処理時間: " + (System.currentTimeMillis - start) + " ミリ秒")
case Failure(e) => println("エラーが発生した: " + e.getMessage)
}
}
//結果
Thread: scala-execution-context-global-11
Task Name: task1
Thread: scala-execution-context-global-12
Task Name: task2
処理時間: 1007 ミリ秒
IOでHelloWorld
ようやく、cats.effect.IOをみていきます。
IO型の特徴について
遅延評価
IOのapplyメソッドのbodyに定義したタスクは遅延評価されます。
import cats.effect.IO
object IOApp extends App {
import cats.effect.unsafe.implicits.global
// この時点ではIOのbodyは評価されていない
val ioa = IO(println("Hello World!"))
print(ioa.unsafeRunSync())
}
// 結果
Hello World!
モナド
IOはモナドなので、for式で書くことができます。
import cats.effect.IO
object IOApp extends App {
import cats.effect.unsafe.implicits.global
(for {
_ <- IO.print("What's your name?")
x <- IO.readLine
_ <- IO.println(s"Hello, $x")
} yield ()).unsafeRunSync()
}
// 結果
What's your name?nishi
Hello, nishi
IOでエラーハンドリング
unsafeRunSync()メソッドはただ中身を評価するだけなので、安全ではありません。
試しにIOの中で例外を投げて、unsafeRunSync()メソッドで評価するとそのまま例外が投げられプログラムが終了します。
object IOErrorApp extends App {
import cats.effect.unsafe.implicits.global
val error = IO { throw new RuntimeException("error") }
error.unsafeRunSync()
}
/**
Exception in thread "main" java.lang.RuntimeException: error
at jp.sample.day1.IOErrorApp$.$anonfun$error$1(IOApp.scala:18)
at apply @ jp.sample.day1.IOErrorApp$.delayedEndpoint$jp$sample$day1$IOErrorApp$1(IOApp.scala:18)
*/
そのため、IO内でファイル参照やデータベース接続、ネットワーク接続などの外部との例外が起きる可能性がある処理をする場合には有効ではありません。
そこで、IOには例外をハンドリングするメソッドが用意されています。
- attempt: タスクの実行結果がEitherに包まれる
- handleErrorWith: Throwableをキャッチして例外処理をかける(FutureのrecoverWithみたいなやつ)
object IOErrorApp extends App {
import cats.effect.unsafe.implicits.global
private val io = IO("Hello World!")
private val error = IO(throw new RuntimeException("error"))
private val r = (for {
a <- io
_ <- error
} yield a)
println(r.attempt.unsafeRunSync())
println(r.handleErrorWith { error =>
IO.pure(error.getMessage)
}.unsafeRunSync())
}
// 結果
Left(java.lang.RuntimeException: error)
error
IOで非同期処理
IOはタスクを同期にも非同期にも実行することができます。
1. 同期処理
object StartSyncApp extends App {
import cats.effect.unsafe.implicits.global
private val start = System.currentTimeMillis
private val task = (v: Int, name: String) => IO {
println(s"Thread: ${Thread.currentThread.getName}")
println(s"Task Name: $name")
Thread.sleep(v)
name
}
(for {
// 同期実行
_ <- task(5000, "task1")
_ <- task(1000, "task2")
} yield (println("処理時間: " + (System.currentTimeMillis - start) + " ミリ秒")))
.unsafeRunSync()
}
// 結果
Thread: io-compute-3
Task Name: task1
Thread: io-compute-3
Task Name: task2
処理時間: 6464 ミリ秒
2. 非同期処理
タスクを非同期に実行するためには、IOに定義されているevalOnを用いて実行します。
(Cats Effect2ではshift?の記事がよく出てきました。間違いがあれば指摘をお願いします。)
startはタスクを指定したスレッドプール内で実行するように調整します。
joinは非同期で実行したタスクが完了したあと、結果を取得します。
object StartAsyncApp extends App {
import cats.effect.unsafe.implicits.global
private val task = (v: Int, name: String) => IO {
println(s"Thread: ${Thread.currentThread.getName}")
println(s"Task Name: $name")
Thread.sleep(v)
name
}
private val start = System.currentTimeMillis
(for {
f1 <- task(3000, "task1").start // ブロックされない処理
f2 <- task(1000, "task2").start
_ <- f1.join // 処理結果を取得
_ <- f2.join
} yield (println("処理時間: " + (System.currentTimeMillis - start) + " ミリ秒")))
.unsafeRunSync()
}
// 結果
Thread: io-compute-2
Thread: io-compute-0
Task Name: task2
Task Name: task1
処理時間: 3739 ミリ秒
IOを使ったスレッドのスケジューリング
ExecutionContextを IO.evalOn() メソッドに渡すことによって別スレッドでタスクを実行することができます。
以下のコードは、各タスクevalOnによってそれぞれ別スレッドで実行しています。
object IOAsyncApp extends App {
import cats.effect.unsafe.implicits.global
private val task = IO {
println(s"Thread: ${Thread.currentThread.getName}")
println("task")
}
private val ec1 = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
private val ec2 = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
private val ec3 = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
val r = task >> task >> task //同じスレッド
val r2 = task.evalOn(ec1) >> task.evalOn(ec2) >> task.evalOn(ec3) //異なるスレッドで実行
r.unsafeRunSync()
r2.unsafeRunSync()
}
/**
------r.unsafeRunSync()------
Thread: io-compute-2
task
Thread: io-compute-2
task
Thread: io-compute-2
task
------r2.unsafeRunSync()------
Thread: pool-1-thread-1 // pool1
task
Thread: pool-2-thread-1 // pool2
task
Thread: pool-3-thread-1 // pool3
tas
*/
実行するスレッドが切り替えられていることがわかるかと思います。
実践: IOでDynamoDBにデータを追加する
IO型で実際にDynamoDBにデータを追加する処理のサンプルコードの例を書いてみます。
まずは基底となるDynamoDBClientを使ってデータを追加する部分。
IOで返しているだけです。
protected def put2[E](item: Map[String, AttributeValue]): IO[Map[String, AttributeValue]] =
for {
_ <- IO(
dynamoDBClient.putItem(
PutItemRequest
.builder()
.tableName(tableName)
.item(item.asJava)
.build()
)
)
} yield item
次に利用する側のコードですが
IOで包んでいるだけでEitherやTryで書く場合と特に違いはないかなと思います。
def create2(device: Device): IO[Device] =
for {
item <- IO(toItem(device))
_ <- put2(item)
} yield device
最後の注意点としては、IOはunsafeRunSync()などを呼び出さないと実行されないので書き忘れに注意です。
"1件データを削除できる" in new WithFixture {
Given("1件データを追加する")
create2(
Device(
CompanyId("DUMMY_COMPANY"),
UuId("DUMMY_UUID"),
Os("DUMMY_OS"),
Some("西のデバイス"),
DateTimeUtil.toISOString(DateTimeUtil.now())
)
).handleErrorWith(error => {
IO.pure(SystemError("", error))
}).unsafeRunSync()unsafeRunSync()
}
まとめ
スレッドやScalaでの並列処理の書き方や
Cats EffectのIOについて簡単に紹介しました。
既存のScalaで書いてあるLambdaコードも、FutureやIOを上手く使いこなせればまだまだ最適化できると思いますので参考になれば幸いです。