この記事はScala Advent Calendar 2019の8日目の記事です。
前日は @miy1924 さんのはじめてのEitherTでした。
楽しいEitherTライフ、未来感が半端ないですね。
本日も未来感の溢れる話ですが、EitherTよりは身近な未来、scala.concurrent.Futureの話です。
対象読者
- Futureで並列処理を実装したい。
- for式を使うのは好きだ。
- コンパイルが通れば後はいい感じに動いてくれると信じている。
書かれていない事
- ExecutionContextの選び方
- パフォーマンス測定やパフォーマンスチューニングについて
注意事項
この記事はコードを一切コンパイルせずに「気持ちで」書かれています。
コンパイルが通らないコードや間違いなどはコメント・編集リクエストをお待ちしております。
Futureのお話
では本題のFutureの話に入っていきましょう。
Futureの生成
今回はサンプルコードなので、スレッドを占有する重い処理として有名なjava.lang.Thread.sleep
を使用します。
import scala.concurrent.{ExecutionContext, Future}
object Samples {
def future[A](id: String, sleepMills: Long, result: A)(
implicit ec: ExecutionContext
): Future[A] = Future {
println(s"ID: $id is started at ${LocalDateTime.now}")
Thread.sleep(sleepMills)
println(s"ID: $id is finished at ${LocalDateTime.now}")
result
}
}
replなどでSamples.future("a", 10, ())
などと実行すると、きっとメインスレッド以外でprintlnしてくれるはずです。
for式
さて、タイトルにもあるfor式です。
// implicitなExecutionContextをスコープに用意
val result = for {
a <- Samples.future("Future A", 3000, 42)
b <- Samples.future("Future B", 1000, "The answer to life")
} yield (a, b) // Future[(Int, String)]
こちらもreplなどで(たぶん)動作するはずですが、printされる内容の予測は可能でしょうか?
想定読者はこの挙動を理解していない人ですので、scalaつよつよニキや「僕はfor式に詳しいんだ」という方以外は、ぜひ一度replなどで実行して体験してみてください。
何が問題か
上記のコードを実行すると、"Future A"
の実行が完了してから"Future B"
の実行が開始されます。
これはfor式がmap
, flatMap
の糖衣構文で、実際には以下のようなコードとしてコンパイルされるためです。
val result = Samples.future("Future A", 3000, 42).flatMap { a =>
Samples.future("Future B", 1000, "The answer to life").map { b =>
(a, b)
}
}
型について考える
これだけではわかりづらいので、型シグネチャーだけを抽出してみます。
Future[Int] => (Int => (Future[String] => (String => (Int, String) => Future[(Int, String)])) => Future[(Int, String)]
なんだか余計にわからなくなってしまいましたね。
説明に不要な部分をそぎ落として、単純化します。
まずは元のコードを改行変更してもう一度。
Samples.future("Future A", 3000, 42) // (1)
.flatMap {
a => Samples.future("Future B", 1000, "The answer to life") // (2)
.map {
b => (a, b) // (3)
}
}
(1)
の部分は Future[Int]
(3)
の部分は (
Int, String) => Tuple2[Int, String]
です。
問題は2の部分なのですが、Future[String]
ではなくInt => Future[String]
となっています。
引数にInt
を受け取ってFuture[String]
を返す関数です。
当然引数を受け取るまでFuture
は生成されません。
Future[Int]
=> (Int => Future[String])
=> ((Int, String) => (Int, String))
=> Future[(Int, String)]
Int
やString
じゃなくても同じなので、A
やB
を使って短くすると:
F[A] => (A => F[B]) => ((A, B) => C) => F[C]
では、F[B]
をF[A]
と並列に実行するためにはどうすれば良いのでしょうか?
解決策
以下の型になる関数を用意します。
F[A] => F[B] => ((A, B) => C) => F[C]
A => F[B]
が関数になっているのが問題なので、関数をやめてしまえば良いということです。
object Samples {
// 以前のコードに追記
def zipF[A, B, C](fa: Future[A], fb: Future[B])(f: (A, B) => C)(
implicit ec: ExecutionContext
): Future[C] = for {
a <- fa
b <- fb
} yield f(a, b)
}
これを使って先程の処理を実行すると、並列に実行されるでしょうか?
// implicit ec
Samples.zipF(
Samples.future("Future A", 3000, 42),
Samples.future("Future B", 1000, "The answer to life")
)((a, b) => (a, b)) // Future[(Int, String)]
ecにも依存はするのですが、きっと並列に実行されたはずです。
Futureの実行開始について
先程のzipF
で、処理本体は元のコードと同じようにfor式が記述されていました。
それでも並列に実行されることに疑問を持った方もおられることかと思います。
scala標準に入っているFuture
は作成した瞬間から実行が開始される仕様になっており、zipF
に渡された時点でfor式開始よりも前に生成されているため、forの中では既に実行中(もしくは実行完了済み)のFuture
を返す関数が渡されます。
monixのTaskの場合
標準のFuture
以外を使用した場合は生成時に実行が開始されない可能性があります。
一例としてmonixのTask
を見ていきましょう。
import monix.eval.Task
object MonixSamples {
def task[A](id: String, mills: Long, result: A): Task[A] = Task.eval {
println(s"ID: $id is started at ${LocalDateTime.now}")
Thread.sleep(sleepMills)
println(s"ID: $id is finished at ${LocalDateTime.now}")
result
}
def zipF[A, B, C](fa: Task[A], fb: Task[B])(f: (A, B) => C): Task[C] =
for {
a <- fa
b <- fb
} yield f(a, b)
}
そもそも動くのかも疑問ですが、一応書いておきます。
// needs an implicit Scheduler here
val a = MonixSamples.task("Task A", 3000, 42)
val b = MonixSamples.task("Task B", 1000, "the universe")
MonixSamples.zipF(a, b)((a, b) => (a, b)).runAsync
並列実行
objectのTask
にparMap2
というのがあるので、こちらを呼び出します。
object MonixSamples {
// ...
def zipF[A, B, C](fa: Task[A], fb: Task[B])(f: (A, B) => C): Task[C] =
Task.parMap2(fa, fb)(f)
}
複雑な型について考えてみる。
単純なFutureを返すメソッドだけを相手に生活していても十分に複雑なのですが、社内が関数型ムードだったりするとさらに難しそうな型を相手にすることがあるかもしれません。
昨日の記事で出てきたようなEitherTや、DB接続などで平然と使われるKleisliなどの型だとどうなるのでしょうか?
EitherTで試してみる
EitherTは昨日の記事で詳しく解説されているので、importなどはそちらをご参考ください。
for {
a <- EitherT(Samples.future("EitherT A", 3000, 42))
b <- EitherT(Samples.future("EitherT B", 1000, "and everything.")
} yield (a, b)
Kleisli
Kleisli[I, F[_], A]
はI => F[A]
です。
試すのは皆様どうぞいい感じにお願い致します。
for式とflatMapを理解する
ここまで読んできて感の良い読者の皆様はお気付きかもしれませんが、for式またはflatMap
には以下のような性質があります。
1行目が完了してから2行目を開始する
JavaやRubyなどのメインストリームな言語をたしなむ方々からすると当たり前のように映るかもしれませんが、これは遅延評価や並列処理の世界では決して当たり前のことではありません。
どんなに並列に処理したい場合でもここは順番にやってねという強い意思表示なのです。
それに対してzipF
は、並列にすることが可能な型シグネチャーでした。
ただし、メソッドの型が同一でも並列に実行されるかどうかは実装しだいとなっていました。
Applicative
今までの流れからいくとたぶん我々は独自に定義したzipF
に相当する何かを使用したいのですが、これは一般に通用する型クラスとしてはApplicative
と呼ばれるものの中に定義されています。
注意:
Applicative
は必ずzipF
に相当する処理が実行できますが、zipF
ができるからといってApplicative
とは限りません。
詳しく知りたい方はApplicative Lawsなどで検索してください。
Applicativeの定義
Applicative
はscalaz
やcats
などに定義されているのですが、メソッド名にはかなりの違いがあります。
そのためこの記事では独自に定義して説明のためだけのメソッド名を使用します。
実際に利用する際には信頼できるライブラリーに含まれているものをご使用ください。
package typeclass
trait Applictive[F[_]] {
def point[A](a: A): F[A]
def ap[A, B](fa: F[A])(ff: F[A => B]): F[B]
final def zipF[A, B, C](fa: F[A], fb: F[B])(f: (A, B) => C): F[C] =
ap(fb)(ap(fa, point(f)))
}
object Applicative {
implicit def apply[F[_]: Applicative]: Applicative[F] = implicitly
implicit class ApplicativeSyntax[F[_]: Applicative, A](fa: F[A]) {
def apply[B, C](fb: F[B])(implicit ev: A =:= (B => C)): F[C] =
Applicative[F].ap(fb)(fa)
def zipF[B, C](fb: F[B])(f: (A, B) => C): F[C] =
Applicative[F].zipF(fa, fb)(f)
}
implicit def scalazApplicative[F[_]: scalaz.Applicative]: Applicative[F] = new Applicative[F] {
override def point[A](a: A): F[A] = scalaz.Applicative[F].point(a)
override def ap[A, B](fa: F[A])(ff: F[A => B]): F[B] =
scalaz.Applicative[F].ap(fa)(ff)
}
implicit def catsApplicative[F[_]: cats.Applicative]: Applicative[F] = new Applicative[F] {
override def point[A](a: A): F[A] = scalaz.Applicative[F].pure(a)
override def ap[A, B](fa: F[A])(ff: F[A => B]): F[B] =
cats.Applicative[F].ap(ff)(fa)
}
}
Applicativeの使用
このApplicative
を使用すると、EitherT
やKleisli
もいい感じに並列にできるかもしれない処理を実装できます。
object Samples {
// ...
implicit def eitherTApplicative[F[_]: Applicative, E]: Applicative[EitherT[E, F, *]] = new Applicative[EitherT[E, F, *]] {
override def point[A](a: A): EitherT[E, F, A] = EitherT.right(a)
override def ap[A, B](fa: EitherT[E, F, A])(ff: EitherT[E, F, A => B]): EitherT[E, F, B] = EitherT {
val fea: F[Either[E, A]] = fa.toEither
val fef: F[Either[E, A => B]] = ff.toEither
fef.zip2(fea) { (ef: Either[E, A => B], ea: Either[E, A]) =>
for {
f <- ef
a <- ea
} yield f(a)
}
}
}
implicit def kleisliApplicative[F[_]: Applicative, I]: Applicative[Kleisli[I, F, *]] = new Applicative[Kleisli[I, F, *]] {
override def point[A](a: A): Kleisli[I, F, A] = Kleisli(_ => a)
override def ap[A, B](fa: Kleisli[I, F, A])(ff: Kleisli[I, F, A => B]): Kleisli[I, F, B] = Kleisli { i =>
ff.run(i).zip2(fa.run(i)) { (f, a) => f(a) }
}
}
}
詳しく調べてはいませんが、scalaz
やcats
も同様の(しかももっと実行効率の良い)実装が存在しています。
業務ではそちらを使用しましょう。
よりみち: KleisliとState
scalaではDatabase接続にKleisliが使用される事があるという話をしましたが、Database接続では基本的に処理を実行するたびにDatabaseの状態が変かしていきます。
テーブルをロックしたりデータを追加・削除したり、当然の事だと思います。
ですが、純粋にimmutableな世界では引数がいつのまにか変化しているのはあまり好ましくありません。
忠実に関数の世界を追い求めるならば、Database接続はStateまたはIOと呼ばれるような型でくるむべきでしょう。
ところで、State[S, A]
という型はS => (S, A)
と変更後の状態を戻り値に返します。
Database操作などの一連の処理では変更後の状態に対して次の処理を実行したいため、初めの処理で変更された状態を取得するまでは次の処理が実行できません。
よってStateのApplicative実装は並列に処理を行なわない方が好ましいということになります。
IndexedStateなどで2つの変更をマージできることが保証できる場合などは並列に実行可能ですが、一般化は難しいのかと思われます。
def zipFState[F[_], S, S1, S2, A, B, C](fa: IndexedStateT[S, S1, F, A], fb: IndexedStateT[S, S2, F, B])(f: (A, B) => C): IndexedState[S, (S1, S2), F, C] = IndexedStateT { s =>
fa.run(s).zipF(fb.run(s)) { ((s1, a), (s2, b)) =>
((s1, s2) f(a, b))
}
}
Freeモナド
Free
モナドも基本的な話は同様なのですが、使い方が異なるので個別に取り上げようと思います。
Free[F[_], A]
のF
がApplicative
の場合は今までと同様に並列実行できるかもしれない形に定義できますが、一般的にscalazやcatsのFree
に渡したいF
はApplicative
ではないデータ構造のことがほとんどです。
例えばDatabase接続など、ビジネスロジックとインフラ実装を別にしたい部分などでFree
は利用されます。
final case class User(
id: User.ID
// , ...
)
object User {
type ID = String
sealed trait Repository[A]
object Repository {
final case class Read(id: ID) extends Repository[User]
final case class Update(target: User, newValue: User) extends Repository[Unit]
def read(id: ID): Free[Repository, User] = Free.lift(Read(id))
def update(target: User, newValue: User): Free[Repository, Unit] = Free.lift(Update(target, newValue))
}
}
またExtensible Effectsなどで利用する場合、たとえF[_]
とG[_]
がApplicative
であったとしてもF or G
はApplicative
にはなりません。
このような処理の場合、Free
に対してApplicative
な実装ができません。
つまり、どのような場合でも直列実行を強制するということになります。
Free Applicative
シンプルな(実行効率最適化を図っていない)Free
の実装は以下のようになります。
sealed trait Free[F[_], A] {
final def flatMap[B](f: A => Free[F, B]): Free[F, B] = this match {
case Pure(a) => f(a)
case Impure(fi, g) => Impure(fi, g.andThen(_.flatMap(f)))
}
}
final case class Pure[F[_], A](a: A) extends Free[F, A]
final case class Impure[F[_], I, A](fi: F[I], f: I => Free[F, A]) exntends Free[F, A]
難しい話はさておき、これと同様の考え方でApplicative
の処理zipF
をクラス実装するとApplicative
の性質を持つFree
のようなものを実装できます。
詳しくはFree Applicative
で検索するといろいろ引っかかるのではないかと思います。
実行効率の良い実装はそちらを参照してもらうとして、素直な実装を記述します。
stack overflowなど起こりえるので、そのまま使用するのはお勧めしません。
sealed trait Frap[F[_], A] { // FRee APplicative
def zipF[B, C](fb: Frap[F, B])(f: (A, B) => C) Frap[F, C] =
Frap.ZipF(this, fb, f)
def map[B](f: A => B): Frap[F, B] = zipF(Frap.pure(()))((a, _) => f(a))
def foldMap[G[_]: Applicative](nt: F ~> G): G[A] = this match {
case Frap.Pure(a) => Applicative[G].point(a)
case Frap.Lift(fa) => nt(fa)
case Frap.ZipF(fi, fj, f) =>
Applicative[G].zipF(fi.foldMap(nt), fj.foldMap(nt))(f)
}
}
object Frap {
final case class Pure[F[_], A](a: A) extends Frap[F, A]
final case class Lift[F[_], A](fa: F[A]) extends Frap[F, A]
final case class ZipF[I, J, F[_], A](fi: Frap[F, I], fj: Frap[F, J], f: (I, J) => A) extends Frap[F, A]
def pure[F[_], A](a: A): Frap[A] = Pure(a)
def liftF[F[_], A](fa: F[A]): Frap[A] = Lift(fa)
implicit def frapApplicative[F[_]]: Applicative[Frap[F, *]] = new Applicative[Frap[F, *]] {
override def point[A](a: A): Frap[F, A] = Frap(_ => a)
override def ap[A, B](fa: Frap[F, A])(ff: Frap[F, A => B]): Frap[F, B] =
ZipF[A => B, A, F, B](ff, fa, (f, a) => f(a))
}
}
上で出てきたF ~> G
はNaturalTransformatrion
と呼ばれるもので、F[A]
をA
の型によらずG[A]
に変換するものです。
これらを使用すれば、ビジネスロジック上は並列にできる処理を適切に実行器に伝える事ができます。
val read = Frap.liftF(User.Repository.read("user 1")
val update = Frap.liftF(User.Repository.Update(User("user 2"), User("user 3")))
val zipped = read.zipF(update)((a, b) => (a, b))
def nt(implicit ec: ExecutionContext) = new (User.Repository ~> Future) {
def apply[A](fa: User.Repository[A]): Future[A] = fa match {
case User.Repository.Read(id) => Samples.future(s"read($id)", 3000, User(id))
case User.Repository.Update(target, newValue) => Samples.future(s" update($target, $newValue)", 1000, ())
}
}
zipped.foldMap(nt)
この状態で合成してからであれば、Freeでの合成に入れても並列実行できるかもという情報を伝えることが可能となります。
また、実行器側でもReadが2並列なら1接続で取得するなどの最適化も頑張れば可能となります。
Repository.read
などはFrap[Repository, *]
を返すようにして、適時Extensible Effectsなどでラップするのが使いやすいかと思われます。
Extensible Effects上ではF or G or H
に対し、F ~> H
, G ~> H
した上で再度Effに格納する処理を行うとEff[H, A]
を作成可能なので、別Repositoryながら同一インフラのエンティティー群などはまとめて処理可能です。
また、F[_] or G[_]
をFrap
に入れた場合実行器を書くのは非常に大変ですが、原理上は並列処理可能です。
時間切れなのでここでまとめとします。
まとめ
- for式は直列実行。
- 並列実行できる(かもしれない)部分は明示が必要。
- ビジネスロジック上は並列化可能でも、インフラ上はできないこともある。
明日は~~@cactaceaeさんの「なにか書く」~~@todokrさんのPostgreSQLの識別子のtruncateをFlywayのCallbackとParser Combinatorで防ぐです。