12
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

ScalaAdvent Calendar 2019

Day 8

Futureをfor式で動かす話

Last updated at Posted at 2019-12-08

この記事はScala Advent Calendar 2019の8日目の記事です。

前日は @miy1924 さんのはじめてのEitherTでした。
楽しいEitherTライフ、未来感が半端ないですね。

本日も未来感の溢れる話ですが、EitherTよりは身近な未来、scala.concurrent.Futureの話です。

対象読者

  • Futureで並列処理を実装したい。
  • for式を使うのは好きだ。
  • コンパイルが通れば後はいい感じに動いてくれると信じている。

書かれていない事

  • ExecutionContextの選び方
  • パフォーマンス測定やパフォーマンスチューニングについて

注意事項

この記事はコードを一切コンパイルせずに「気持ちで」書かれています。
コンパイルが通らないコードや間違いなどはコメント・編集リクエストをお待ちしております。

Futureのお話

では本題のFutureの話に入っていきましょう。

Futureの生成

今回はサンプルコードなので、スレッドを占有する重い処理として有名なjava.lang.Thread.sleepを使用します。

import scala.concurrent.{ExecutionContext, Future}

object Samples {
  def future[A](id: String, sleepMills: Long, result: A)(
    implicit ec: ExecutionContext
  ): Future[A] = Future {
    println(s"ID: $id is started at ${LocalDateTime.now}")
    Thread.sleep(sleepMills)
    println(s"ID: $id is finished at ${LocalDateTime.now}")
    result
  }
}

replなどでSamples.future("a", 10, ())などと実行すると、きっとメインスレッド以外でprintlnしてくれるはずです。

for式

さて、タイトルにもあるfor式です。

// implicitなExecutionContextをスコープに用意

val result = for {
  a <- Samples.future("Future A", 3000, 42)
  b <- Samples.future("Future B", 1000, "The answer to life")
} yield (a, b) // Future[(Int, String)]

こちらもreplなどで(たぶん)動作するはずですが、printされる内容の予測は可能でしょうか?
想定読者はこの挙動を理解していない人ですので、scalaつよつよニキや「僕はfor式に詳しいんだ」という方以外は、ぜひ一度replなどで実行して体験してみてください。

何が問題か

上記のコードを実行すると、"Future A"の実行が完了してから"Future B"の実行が開始されます。

これはfor式がmap, flatMapの糖衣構文で、実際には以下のようなコードとしてコンパイルされるためです。

val result = Samples.future("Future A", 3000, 42).flatMap { a =>
  Samples.future("Future B", 1000, "The answer to life").map { b =>
    (a, b)
  }
}

型について考える

これだけではわかりづらいので、型シグネチャーだけを抽出してみます。

Future[Int] => (Int => (Future[String] => (String => (Int, String) => Future[(Int, String)])) => Future[(Int, String)]

なんだか余計にわからなくなってしまいましたね。
説明に不要な部分をそぎ落として、単純化します。
まずは元のコードを改行変更してもう一度。

Samples.future("Future A", 3000, 42) // (1)
  .flatMap {
    a => Samples.future("Future B", 1000, "The answer to life") // (2)
      .map {
        b => (a, b) // (3)
      }
  }

(1)の部分は Future[Int]
(3)の部分は (Int, String) => Tuple2[Int, String]です。

問題は2の部分なのですが、Future[String]ではなくInt => Future[String]となっています。
引数にIntを受け取ってFuture[String]を返す関数です。
当然引数を受け取るまでFutureは生成されません。

Future[Int]
  => (Int => Future[String])
  => ((Int, String) => (Int, String))
  => Future[(Int, String)]

IntStringじゃなくても同じなので、ABを使って短くすると:

F[A] => (A => F[B]) => ((A, B) => C) => F[C]

では、F[B]F[A]と並列に実行するためにはどうすれば良いのでしょうか?

解決策

以下の型になる関数を用意します。

F[A] => F[B] => ((A, B) => C) => F[C]

A => F[B]が関数になっているのが問題なので、関数をやめてしまえば良いということです。

object Samples {
  // 以前のコードに追記
  def zipF[A, B, C](fa: Future[A], fb: Future[B])(f: (A, B) => C)(
    implicit ec: ExecutionContext
  ): Future[C] = for {
     a <- fa
     b <- fb
  } yield f(a, b)
}

これを使って先程の処理を実行すると、並列に実行されるでしょうか?

// implicit ec

Samples.zipF(
  Samples.future("Future A", 3000, 42),
  Samples.future("Future B", 1000, "The answer to life")
)((a, b) => (a, b)) // Future[(Int, String)]

ecにも依存はするのですが、きっと並列に実行されたはずです。

Futureの実行開始について

先程のzipFで、処理本体は元のコードと同じようにfor式が記述されていました。
それでも並列に実行されることに疑問を持った方もおられることかと思います。

scala標準に入っているFutureは作成した瞬間から実行が開始される仕様になっており、zipFに渡された時点でfor式開始よりも前に生成されているため、forの中では既に実行中(もしくは実行完了済み)のFutureを返す関数が渡されます。

monixのTaskの場合

標準のFuture以外を使用した場合は生成時に実行が開始されない可能性があります。
一例としてmonixのTaskを見ていきましょう。

import monix.eval.Task

object MonixSamples {
  def task[A](id: String, mills: Long, result: A): Task[A] = Task.eval {
    println(s"ID: $id is started at ${LocalDateTime.now}")
    Thread.sleep(sleepMills)
    println(s"ID: $id is finished at ${LocalDateTime.now}")
    result
  }

  def zipF[A, B, C](fa: Task[A], fb: Task[B])(f: (A, B) => C): Task[C] =
    for {
      a <- fa
      b <- fb
    } yield f(a, b)
}

そもそも動くのかも疑問ですが、一応書いておきます。

// needs an implicit Scheduler here

val a = MonixSamples.task("Task A", 3000, 42)
val b = MonixSamples.task("Task B", 1000, "the universe")

MonixSamples.zipF(a, b)((a, b) => (a, b)).runAsync

並列実行

objectのTaskparMap2というのがあるので、こちらを呼び出します。

object MonixSamples {
  // ...

  def zipF[A, B, C](fa: Task[A], fb: Task[B])(f: (A, B) => C): Task[C] =
    Task.parMap2(fa, fb)(f)
}

複雑な型について考えてみる。

単純なFutureを返すメソッドだけを相手に生活していても十分に複雑なのですが、社内が関数型ムードだったりするとさらに難しそうな型を相手にすることがあるかもしれません。

昨日の記事で出てきたようなEitherTや、DB接続などで平然と使われるKleisliなどの型だとどうなるのでしょうか?

EitherTで試してみる

EitherTは昨日の記事で詳しく解説されているので、importなどはそちらをご参考ください。

for {
  a <- EitherT(Samples.future("EitherT A", 3000, 42))
  b <- EitherT(Samples.future("EitherT B", 1000, "and everything.")
} yield (a, b)

Kleisli

Kleisli[I, F[_], A]I => F[A]です。
試すのは皆様どうぞいい感じにお願い致します。

for式とflatMapを理解する

ここまで読んできて感の良い読者の皆様はお気付きかもしれませんが、for式またはflatMapには以下のような性質があります。

1行目が完了してから2行目を開始する

JavaやRubyなどのメインストリームな言語をたしなむ方々からすると当たり前のように映るかもしれませんが、これは遅延評価や並列処理の世界では決して当たり前のことではありません。

どんなに並列に処理したい場合でもここは順番にやってねという強い意思表示なのです。

それに対してzipFは、並列にすることが可能な型シグネチャーでした。
ただし、メソッドの型が同一でも並列に実行されるかどうかは実装しだいとなっていました。

Applicative

今までの流れからいくとたぶん我々は独自に定義したzipFに相当する何かを使用したいのですが、これは一般に通用する型クラスとしてはApplicativeと呼ばれるものの中に定義されています。

注意:

Applicativeは必ずzipFに相当する処理が実行できますが、zipFができるからといってApplicativeとは限りません。
詳しく知りたい方はApplicative Lawsなどで検索してください。

Applicativeの定義

Applicativescalazcatsなどに定義されているのですが、メソッド名にはかなりの違いがあります。
そのためこの記事では独自に定義して説明のためだけのメソッド名を使用します。
実際に利用する際には信頼できるライブラリーに含まれているものをご使用ください。

package typeclass

trait Applictive[F[_]] {
  def point[A](a: A): F[A]
  def ap[A, B](fa: F[A])(ff: F[A => B]): F[B]
  final def zipF[A, B, C](fa: F[A], fb: F[B])(f: (A, B) => C): F[C] =
    ap(fb)(ap(fa, point(f)))
}

object Applicative {
  implicit def apply[F[_]: Applicative]: Applicative[F] = implicitly

  implicit class ApplicativeSyntax[F[_]: Applicative, A](fa: F[A]) {
    def apply[B, C](fb: F[B])(implicit ev: A =:= (B => C)): F[C] =
      Applicative[F].ap(fb)(fa)
    def zipF[B, C](fb: F[B])(f: (A, B) => C): F[C] =
      Applicative[F].zipF(fa, fb)(f)
  }

  implicit def scalazApplicative[F[_]: scalaz.Applicative]: Applicative[F] = new Applicative[F] {
    override def point[A](a: A): F[A] = scalaz.Applicative[F].point(a)
    override def ap[A, B](fa: F[A])(ff: F[A => B]): F[B] =
      scalaz.Applicative[F].ap(fa)(ff)
  }

  implicit def catsApplicative[F[_]: cats.Applicative]: Applicative[F] = new Applicative[F] {
    override def point[A](a: A): F[A] = scalaz.Applicative[F].pure(a)
    override def ap[A, B](fa: F[A])(ff: F[A => B]): F[B] =
      cats.Applicative[F].ap(ff)(fa)
  }
}

Applicativeの使用

このApplicativeを使用すると、EitherTKleisliもいい感じに並列にできるかもしれない処理を実装できます。

object Samples {
  // ...

  implicit def eitherTApplicative[F[_]: Applicative, E]: Applicative[EitherT[E, F, *]] = new Applicative[EitherT[E, F, *]] {
    override def point[A](a: A): EitherT[E, F, A] = EitherT.right(a)
    override def ap[A, B](fa: EitherT[E, F, A])(ff: EitherT[E, F, A => B]): EitherT[E, F, B] = EitherT {
      val fea: F[Either[E, A]] = fa.toEither
      val fef: F[Either[E, A => B]] = ff.toEither
      fef.zip2(fea) { (ef: Either[E, A => B], ea: Either[E, A]) =>
        for {
          f <- ef
          a <- ea
        } yield f(a)
      }
    }
  }

  implicit def kleisliApplicative[F[_]: Applicative, I]: Applicative[Kleisli[I, F, *]] = new Applicative[Kleisli[I, F, *]] {
    override def point[A](a: A): Kleisli[I, F, A] = Kleisli(_ => a)
    override def ap[A, B](fa: Kleisli[I, F, A])(ff: Kleisli[I, F, A => B]): Kleisli[I, F, B] = Kleisli { i =>
      ff.run(i).zip2(fa.run(i)) { (f, a) => f(a) }
    }
  }
}

詳しく調べてはいませんが、scalazcatsも同様の(しかももっと実行効率の良い)実装が存在しています。
業務ではそちらを使用しましょう。

よりみち: KleisliとState

scalaではDatabase接続にKleisliが使用される事があるという話をしましたが、Database接続では基本的に処理を実行するたびにDatabaseの状態が変かしていきます。
テーブルをロックしたりデータを追加・削除したり、当然の事だと思います。

ですが、純粋にimmutableな世界では引数がいつのまにか変化しているのはあまり好ましくありません。
忠実に関数の世界を追い求めるならば、Database接続はStateまたはIOと呼ばれるような型でくるむべきでしょう。

ところで、State[S, A]という型はS => (S, A)と変更後の状態を戻り値に返します。
Database操作などの一連の処理では変更後の状態に対して次の処理を実行したいため、初めの処理で変更された状態を取得するまでは次の処理が実行できません。
よってStateのApplicative実装は並列に処理を行なわない方が好ましいということになります。

IndexedStateなどで2つの変更をマージできることが保証できる場合などは並列に実行可能ですが、一般化は難しいのかと思われます。

  def zipFState[F[_], S, S1, S2, A, B, C](fa: IndexedStateT[S, S1, F, A], fb: IndexedStateT[S, S2, F, B])(f: (A, B) => C): IndexedState[S, (S1, S2), F, C] = IndexedStateT { s =>
    fa.run(s).zipF(fb.run(s)) { ((s1, a), (s2, b)) =>
      ((s1, s2) f(a, b))
    }
  }

Freeモナド

Freeモナドも基本的な話は同様なのですが、使い方が異なるので個別に取り上げようと思います。

Free[F[_], A]FApplicativeの場合は今までと同様に並列実行できるかもしれない形に定義できますが、一般的にscalazやcatsのFreeに渡したいFApplicativeではないデータ構造のことがほとんどです。

例えばDatabase接続など、ビジネスロジックとインフラ実装を別にしたい部分などでFreeは利用されます。

final case class User(
  id: User.ID
  // , ...
)

object User {
  type ID = String

  sealed trait Repository[A]

  object Repository {
     final case class Read(id: ID) extends Repository[User]
     final case class Update(target: User, newValue: User) extends Repository[Unit]

    def read(id: ID): Free[Repository, User] = Free.lift(Read(id))
    def update(target: User, newValue: User): Free[Repository, Unit] = Free.lift(Update(target, newValue))
  }
}

またExtensible Effectsなどで利用する場合、たとえF[_]G[_]ApplicativeであったとしてもF or GApplicativeにはなりません。

このような処理の場合、Freeに対してApplicativeな実装ができません。
つまり、どのような場合でも直列実行を強制するということになります。

Free Applicative

シンプルな(実行効率最適化を図っていない)Freeの実装は以下のようになります。

sealed trait Free[F[_], A] {
  final def flatMap[B](f: A => Free[F, B]): Free[F, B] = this match {
    case Pure(a) => f(a)
    case Impure(fi, g) => Impure(fi, g.andThen(_.flatMap(f)))
  }
}

final case class Pure[F[_], A](a: A) extends Free[F, A]
final case class Impure[F[_], I, A](fi: F[I], f: I => Free[F, A]) exntends Free[F, A]

難しい話はさておき、これと同様の考え方でApplicativeの処理zipFをクラス実装するとApplicativeの性質を持つFreeのようなものを実装できます。
詳しくはFree Applicativeで検索するといろいろ引っかかるのではないかと思います。

実行効率の良い実装はそちらを参照してもらうとして、素直な実装を記述します。
stack overflowなど起こりえるので、そのまま使用するのはお勧めしません。

sealed trait Frap[F[_], A] { // FRee APplicative
  def zipF[B, C](fb: Frap[F, B])(f: (A, B) => C) Frap[F, C] =
    Frap.ZipF(this, fb, f)

  def map[B](f: A => B): Frap[F, B] = zipF(Frap.pure(()))((a, _) => f(a))

  def foldMap[G[_]: Applicative](nt: F ~> G): G[A] = this match {
    case Frap.Pure(a) => Applicative[G].point(a)
    case Frap.Lift(fa) => nt(fa)
    case Frap.ZipF(fi, fj, f) =>
      Applicative[G].zipF(fi.foldMap(nt), fj.foldMap(nt))(f)
  }
}

object Frap {
  final case class Pure[F[_], A](a: A) extends Frap[F, A]
  final case class Lift[F[_], A](fa: F[A]) extends Frap[F, A]
  final case class ZipF[I, J, F[_], A](fi: Frap[F, I], fj: Frap[F, J], f: (I, J) => A) extends Frap[F, A]

  def pure[F[_], A](a: A): Frap[A] = Pure(a)
  def liftF[F[_], A](fa: F[A]): Frap[A] = Lift(fa)

  implicit def frapApplicative[F[_]]: Applicative[Frap[F, *]] = new Applicative[Frap[F, *]] {
    override def point[A](a: A): Frap[F, A] = Frap(_ => a)
    override def ap[A, B](fa: Frap[F, A])(ff: Frap[F, A => B]): Frap[F, B] =
      ZipF[A => B, A, F, B](ff, fa, (f, a) => f(a))
  }
}

上で出てきたF ~> GNaturalTransformatrionと呼ばれるもので、F[A]Aの型によらずG[A]に変換するものです。

これらを使用すれば、ビジネスロジック上は並列にできる処理を適切に実行器に伝える事ができます。

val read = Frap.liftF(User.Repository.read("user 1")
val update = Frap.liftF(User.Repository.Update(User("user 2"), User("user 3")))
val zipped = read.zipF(update)((a, b) => (a, b))

def nt(implicit ec: ExecutionContext) = new (User.Repository ~> Future) {
  def apply[A](fa: User.Repository[A]): Future[A] = fa match {
    case User.Repository.Read(id) => Samples.future(s"read($id)", 3000, User(id))
    case User.Repository.Update(target, newValue) => Samples.future(s" update($target, $newValue)", 1000, ())
  }
}

zipped.foldMap(nt)

この状態で合成してからであれば、Freeでの合成に入れても並列実行できるかもという情報を伝えることが可能となります。

また、実行器側でもReadが2並列なら1接続で取得するなどの最適化も頑張れば可能となります。

Repository.readなどはFrap[Repository, *]を返すようにして、適時Extensible Effectsなどでラップするのが使いやすいかと思われます。
Extensible Effects上ではF or G or Hに対し、F ~> H, G ~> Hした上で再度Effに格納する処理を行うとEff[H, A]を作成可能なので、別Repositoryながら同一インフラのエンティティー群などはまとめて処理可能です。

また、F[_] or G[_]Frapに入れた場合実行器を書くのは非常に大変ですが、原理上は並列処理可能です。

時間切れなのでここでまとめとします。

まとめ

  • for式は直列実行。
  • 並列実行できる(かもしれない)部分は明示が必要。
  • ビジネスロジック上は並列化可能でも、インフラ上はできないこともある。

明日は~~@cactaceaeさんの「なにか書く」~~@todokrさんのPostgreSQLの識別子のtruncateをFlywayのCallbackとParser Combinatorで防ぐです。

12
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?