1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Scalaによる並列処理:Cats: IOデータ型

Last updated at Posted at 2024-09-15

Scalaでスレッド(Thread)を使いこなすために、基礎から学びつつ
Scalaの関数型ライブラリである、Cats-EffectのIOデータ型を理解するために参考にした記事の備忘録です。

Scalaを利用して並列処理を実装される際の参考になれば幸いです。

スレッドの基本的な考え方・使い方

スレッド(thread)とは、プログラムが処理を実行する単位をプログラマの必要に応じて増やせるもの

プログラムでのスレッドの使い道は、例えば以下のものです。

  • 違うことを同時に行う:あるスレッドでネットワーク通信をしつつ、別のスレッドではユーザが行った画面操作への処理ができます
  • 処理にかかる時間を短くする:大きな問題を同時に処理できるより小さな問題に分割し、小さな問題をスレッドで同時に処理して、全体時間を短縮します
  • 単位時間当たりの処理量を増やす:コンピュータにスレッドを同時に動かせるだけの性能があれば、単位時間当たりに処理できる量を増やせます

並行処理と並列処理

並行処理(concurrent processing)

1つのCPU(コア)スレッドを切り替えながら複数のタスクを実行すること。

人間の目には複数のタスクが同時に進んでいるように見える

実行するスレッドを切り替える必要があるのでコンテキストスイッチのコストがかかることに留意(=スレッドをたくさん作って、それらを並行処理するとコンテキストスイッチによる無駄な時間が多くなりパフォーマンスが悪化する)

並行処理はCPUのアイドル時間をなくすために用いる。
以下のようにCPUバウンドな処理とIOバウンドな処理を繰り返す例を思い浮かべると効果がわかりやすい

TASK_A: CPU処理ー>IO処理ー>CPU処理ー>IO処理

TASK_B: IO処理ー>CPU処理ー>IO処理ー>CPU処理

並列処理(multiple processing)

複数のコアを使って同時に複数のタスクを実行すること。

ネイティブスレッド と グリーンスレッド

ネイティブスレッド

https://ja.wikipedia.org/wiki/スレッド_(コンピュータ) 引用

ライトウェイトプロセス(light-weight process、LWP)または軽量プロセスとは、
スレッドを複数並行して実行するためのカーネル内の機構。
マルチプロセッシングにおいて、ひとつのプロセス内のスレッドを複数個同時に実行する仕組みである。
カーネルスレッドとLWPを総称してネイティブスレッドと呼ぶこともある。
カーネルレベルで実現されるスレッド。

コンテキストスイッチのコストはプロセス並みに大きい(マイクロ秒くらいらしい)
ネイティブスレッドはOSのスケジューラーによって資源割当を行うため、並列処理が可能

グリーンスレッド

ユーザ空間(ユーザープロセスの動作するアドレス空間)で実装されたスレッド機構のうち、特にVM上で動くものがグリーンスレッド。

OSのスケジューラーが関与しないため、マルチコア対応が不可能。
コンテキストスイッチのコストが低い(ナノ秒くらいらしい)ため1コアに対して大量のスレッド必要な場合などに有用(ref: C10K問題)

多くの言語でネイティブスレッドが標準で採用されているが、RubyとPythonはマルチコアの恩恵を受けにくいのと、Goはネイティブスレッドと軽量スレッドのハイブリッドパターンになっている。

  • Ruby・・GVLの機構により同時に実行されるネイティブスレッドは基本は1つのみ(ただし、ネットワークIO、ディスクIO等のAPIはその限りではないらしい) ※GVL(Giant VM lock)=GIL(グローバルインタプリタロック)
  • Python・・GILあり
  • Go・・ネイティブスレッド&軽量スレッド(goroutine)

PythonやRubyはスレッドセーフでないC言語のライブラリを使って作られているのでGILの機構が必要
Ruby、Python x 並列とかでググると、並列処理にスレッドを使えという記事が散見されるが、これらは誤り。マルチプロセスでやらないといけない。

ScalaのFutureで並列処理

Scalaでは scala.concurrent.Future というAPIが提供されており、これにより並列処理を実現できる。

.scala
object FutureApp extends App {
  import scala.concurrent.ExecutionContext.Implicits.global

  private val result = for {
    s <- Future("hello")
  } yield s

  Await.ready(result, Duration.Inf)
  result onComplete {
    case Success(s) => println(s)
    case Failure(e) => println("エラーが発生した: " + e.getMessage)
  }
}

Futureを生成すると、内部的にはJavaのRunnableに変換される。

このRunnableをExecutionContextが良い感じにスレッドに割り当ててくれる。
スレッドは先述した通り、OSのスケジューラーにより均等に割り当てられ、実行されていく。

※つまり、Futureはスレッドを新しく作るというわけではない

Futureの注意点

よく「Futureでブロッキングを発生させるな」という話がある

まず、Futureを実行するときにExecutionContextを求められるので脳死でimport scala.concurrent.ExecutionContext.Implicits.global をすることがあると思う。

globalで提供されるExecutionContextは標準で「利用可能なコア数の同数のスレッド」を持つ。

このExecutionContext上でブロッキングする処理を入れてしまうと、単純に1スレッドが無駄に専有されてしまう。
この状況下ではCPUが処理をしていないのにも関わらず、スレッドに別の仕事をさせることができなくなり、パフォーマンスが劣化する。

ブロッキング含む処理をしたいときはどうしたら良いか

ブロッキング処理専用のExecutionContext(新しいスレッド)を作り、そこにFutureを割り当ててる。そうすると、globalのECを汚染することはない

ただ、OSのスケジューラーはスレッドに対して均等に処理時間を割り当てるのでスレッドを増やしすぎてもコンテキストスイッチのコストが上がってしまう。

for式の中で使うと並列化されない

for式は変換を直列化するので、Futureをfor式の中で使うと並列化されない

1. for式の中でFutureを使った場合

.scala
object FutureApp2 extends App {
  import scala.concurrent.ExecutionContext.Implicits.global

  private def task(name: String, milliSeconds: Int) = Future {
    println(s"Thread: ${Thread.currentThread.getName}")
    println(s"Task Name: $name")
    Thread.sleep(milliSeconds)
    name
  }

  private val start = System.currentTimeMillis
  private val result = for {
    _ <- task("task1", 1000)
    _ <- task("task2", 1000)
  } yield ()

  Await.ready(result, Duration.Inf)
  result onComplete {
    case Success(_) => println("処理時間: " + (System.currentTimeMillis - start) + " ミリ秒")
    case Failure(e) => println("エラーが発生した: " + e.getMessage)
  }
}

// 結果
Thread: scala-execution-context-global-11
Task Name: task1
Thread: scala-execution-context-global-13
Task Name: task2
処理時間 2229 ミリ秒

直列処理になるため処理時間: 2620 ミリ秒になる。

2. for式の外でFutureを使った場合

mainスレッドと別のスレッドで動くため、非同期に実行される。

.scala
object FutureOutApp extends App {
  import scala.concurrent.ExecutionContext.Implicits.global

  private def task(name: String, milliSeconds: Int) = Future {
    println(s"Thread: ${Thread.currentThread.getName}")
    println(s"Task Name: $name")
    Thread.sleep(milliSeconds)
    name
  }

  private val task1 = task("task1", 1000)
  private val task2 = task("task2", 1000)

  private val start = System.currentTimeMillis
  private val result = for {
    _ <- task1
    _ <- task2
  } yield ()

  Await.ready(result, Duration.Inf)
  result onComplete {
    case Success(_) => println("処理時間: " + (System.currentTimeMillis - start) + " ミリ秒")
    case Failure(e) => println("エラーが発生した: " + e.getMessage)
  }
}
//結果
Thread: scala-execution-context-global-11
Task Name: task1
Thread: scala-execution-context-global-12
Task Name: task2
処理時間 1007 ミリ秒

IOでHelloWorld

ようやく、cats.effect.IOをみていきます。
IO型の特徴について

遅延評価

IOのapplyメソッドのbodyに定義したタスクは遅延評価されます。

.scala
import cats.effect.IO

object IOApp extends App {
  import cats.effect.unsafe.implicits.global

  // この時点ではIOのbodyは評価されていない
  val ioa = IO(println("Hello World!"))

  print(ioa.unsafeRunSync())
}
// 結果
Hello World!

モナド

IOはモナドなので、for式で書くことができます。

.scala
import cats.effect.IO

object IOApp extends App {
  import cats.effect.unsafe.implicits.global

  (for {
    _ <- IO.print("What's your name?")
    x <- IO.readLine
    _ <- IO.println(s"Hello, $x")
  } yield ()).unsafeRunSync()
}

// 結果
What's your name?nishi
Hello, nishi

IOでエラーハンドリング

unsafeRunSync()メソッドはただ中身を評価するだけなので、安全ではありません。
試しにIOの中で例外を投げて、unsafeRunSync()メソッドで評価するとそのまま例外が投げられプログラムが終了します。

.scala
object IOErrorApp extends App {
  import cats.effect.unsafe.implicits.global

  val error = IO { throw new RuntimeException("error") }
  error.unsafeRunSync()
}

/**
Exception in thread "main" java.lang.RuntimeException: error
	at jp.sample.day1.IOErrorApp$.$anonfun$error$1(IOApp.scala:18)
	at apply @ jp.sample.day1.IOErrorApp$.delayedEndpoint$jp$sample$day1$IOErrorApp$1(IOApp.scala:18)
*/

そのため、IO内でファイル参照やデータベース接続、ネットワーク接続などの外部との例外が起きる可能性がある処理をする場合には有効ではありません。

そこで、IOには例外をハンドリングするメソッドが用意されています。

  • attempt: タスクの実行結果がEitherに包まれる
  • handleErrorWith: Throwableをキャッチして例外処理をかける(FutureのrecoverWithみたいなやつ)
.scala
object IOErrorApp extends App {
  import cats.effect.unsafe.implicits.global

  private val io = IO("Hello World!")
  private val error = IO(throw new RuntimeException("error"))

  private val r = (for {
    a <- io
    _ <- error
  } yield a)

  println(r.attempt.unsafeRunSync())
  println(r.handleErrorWith { error =>
    IO.pure(error.getMessage)
  }.unsafeRunSync())
}

// 結果
Left(java.lang.RuntimeException: error)
error

IOで非同期処理

IOはタスクを同期にも非同期にも実行することができます。

1. 同期処理

.scala
object StartSyncApp extends App {
  import cats.effect.unsafe.implicits.global

  private val start = System.currentTimeMillis
  private val task = (v: Int, name: String) => IO {
    println(s"Thread: ${Thread.currentThread.getName}")
    println(s"Task Name: $name")
    Thread.sleep(v)
    name
  }

  (for {
    // 同期実行
    _ <- task(5000, "task1")
    _ <- task(1000, "task2")
  } yield (println("処理時間: " + (System.currentTimeMillis - start) + " ミリ秒")))
    .unsafeRunSync()
}

// 結果
Thread: io-compute-3
Task Name: task1
Thread: io-compute-3
Task Name: task2
処理時間 6464 ミリ秒

2. 非同期処理

タスクを非同期に実行するためには、IOに定義されているevalOnを用いて実行します。
(Cats Effect2ではshift?の記事がよく出てきました。間違いがあれば指摘をお願いします。)

startはタスクを指定したスレッドプール内で実行するように調整します。
joinは非同期で実行したタスクが完了したあと、結果を取得します。

.scala
object StartAsyncApp extends App {
  import cats.effect.unsafe.implicits.global

  private val task = (v: Int, name: String) => IO {
    println(s"Thread: ${Thread.currentThread.getName}")
    println(s"Task Name: $name")
    Thread.sleep(v)
    name
  }

  private val start = System.currentTimeMillis
  (for {
      f1 <- task(3000, "task1").start // ブロックされない処理
      f2 <- task(1000, "task2").start
      _ <- f1.join // 処理結果を取得
      _ <- f2.join
    } yield (println("処理時間: " + (System.currentTimeMillis - start) + " ミリ秒")))
  .unsafeRunSync()
}

// 結果
Thread: io-compute-2
Thread: io-compute-0
Task Name: task2
Task Name: task1
処理時間 3739 ミリ秒

IOを使ったスレッドのスケジューリング

ExecutionContextを IO.evalOn() メソッドに渡すことによって別スレッドでタスクを実行することができます。

以下のコードは、各タスクevalOnによってそれぞれ別スレッドで実行しています。

.scala
object IOAsyncApp extends App {
  import cats.effect.unsafe.implicits.global
  private val task = IO {
    println(s"Thread: ${Thread.currentThread.getName}")
    println("task")
  }

  private val ec1 = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
  private val ec2 = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
  private val ec3 = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))

  val r = task >> task >> task //同じスレッド
  val r2 = task.evalOn(ec1) >> task.evalOn(ec2) >> task.evalOn(ec3) //異なるスレッドで実行
  r.unsafeRunSync() 
  r2.unsafeRunSync()
}

/**
------r.unsafeRunSync()------
Thread: io-compute-2
task
Thread: io-compute-2
task
Thread: io-compute-2
task

------r2.unsafeRunSync()------
Thread: pool-1-thread-1 // pool1
task
Thread: pool-2-thread-1 // pool2
task
Thread: pool-3-thread-1 // pool3
tas
*/

実行するスレッドが切り替えられていることがわかるかと思います。

実践: IOでDynamoDBにデータを追加する

IO型で実際にDynamoDBにデータを追加する処理のサンプルコードの例を書いてみます。

まずは基底となるDynamoDBClientを使ってデータを追加する部分。
IOで返しているだけです。

.scala
  protected def put2[E](item: Map[String, AttributeValue]): IO[Map[String, AttributeValue]] =
    for {
      _ <- IO(
        dynamoDBClient.putItem(
          PutItemRequest
            .builder()
            .tableName(tableName)
            .item(item.asJava)
            .build()
        )
      )
    } yield item

次に利用する側のコードですが
IOで包んでいるだけでEitherやTryで書く場合と特に違いはないかなと思います。

.scala
  def create2(device: Device): IO[Device] =
    for {
      item     <- IO(toItem(device))
      _ <- put2(item)
    } yield device

最後の注意点としては、IOはunsafeRunSync()などを呼び出さないと実行されないので書き忘れに注意です。

.scala
"1件データを削除できる" in new WithFixture {
  Given("1件データを追加する")
  create2(
    Device(
      CompanyId("DUMMY_COMPANY"),
      UuId("DUMMY_UUID"),
      Os("DUMMY_OS"),
      Some("西のデバイス"),
      DateTimeUtil.toISOString(DateTimeUtil.now())
    )
  ).handleErrorWith(error => {
     IO.pure(SystemError("", error))
  }).unsafeRunSync()unsafeRunSync()
}

まとめ

スレッドやScalaでの並列処理の書き方や
Cats EffectのIOについて簡単に紹介しました。

既存のScalaで書いてあるLambdaコードも、FutureやIOを上手く使いこなせればまだまだ最適化できると思いますので参考になれば幸いです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?