LoginSignup
1
2

More than 3 years have passed since last update.

Cats Effect MVar を使った並行 Scala プログラミング

Last updated at Posted at 2019-05-26

『Haskell による並列・並行プログラミング』の第二部「並行Haskell」で多用される MVar のサンプルコードを、Cats Effect 版の Mvar を使って書いてみる記事。

はじめに

Haskell による並列・並行プログラミング(以下 PCPH 本)』といえば、Haskell にとどまらず関数型な並列・並行プログラミング一般に役に立つ古典1で、Scala を書く上でも参考になる。

関数型 Scala 界隈では、Haskell で実績のあるAPIや技法を取り入れることが多いが、PCPH 本に出てくる並列・並行関連の Haskell の型も、いくつかは既存の関数型 Scala ライブラリにも導入されている。

今回は、その中でも PCPH本 第二部「並行Haskell2」でよく使われている Mvar を、Cats Effect 版の MVar で試してみたい。

MVar 概要

箇条書きにすると以下のようなもの

  • 値が一つ入っているか、それとも空か、2つの状態をもつ容れ物
  • スレッド間で共有される
  • 自スレッドで MVar に値を出し入れできるが、、、
    • すでに値がある MVar に入れようとすると、別スレッドが値を取り出して空になるまでブロックされ
    • 空 MVar から値を取り出そうとすると、別スレッドが値を入れるまでブロックされる
  • Haskell では Control.Concurrent.MVar で提供されている。
  • Scala では Cats Effect で提供されている。ただし、Haskell では IO に固定されているエフェクトが、Cats Effect では F[_]: Concurrent と抽象化されている。

実装例

以下、PCPH 本の7章「並行制御の基本:スレッドとMVar」のサンプルコードを Scala + Cats Effect で書いてみる。

Cats Effect のバージョンは 1.3.0。その他はこのあたり

シンプルな書き込み/読み出し

PCPH 本で最初に紹介される一番シンプルな MVar の用例は以下のようなものになる。

main = do
  m <- newEmptyMVar       -- 空の MVar m を作る
  forkIO $ putMVar m 'x'  -- 別スレッドにフォークして m に'x'を入れる
  r <- takeMVar m         -- 元のスレッドで m から 'x' を取り出して
  print r                 -- 出力する

Cata Effect の IO を使うと、だいたい等価な Scala コードが以下のように書けるが、、、

object MVar1IOMain extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = for {
    m <- MVar.empty[IO, Char] // 空の MVar を作る
    _ <- m.put('x').start     // 別スレッドにフォークして m に'x'を入れる
    r <- m.take               // 元のスレッドで m から 'x' を取り出して
    _ <- IO { println(r) }    // 出力する
  } yield ExitCode.Success
}

これを次の観点でちょっと書き換えると、、、

  • Concurrent のインスタンスがあればいいので、IO 決め打ちにする必要はない(Monix Task とかでも)。
  • 実験の利便性のため、IO(println(...)) に替えて、現在時刻やスレッドIDも出力する別途定義したinfo[F[_]](s: Any)(implicit F: Sync[F], C: Clock[F]): F[Unit] を使う。

以下のようになる。

def program[F[_]](implicit C: Concurrent[F], T: Timer[F]): F[Unit] = for {
  m <- MVar.empty[F, Char]    // 空の MVar を作る
  _ <- C.start { m.put('x') } // 'x' を MVar に入れる
  r <- m.take                 // MVar から値を取り出して
  _ <- info(r)                // 出力する
} yield ()

出力結果。

14:27:53.446 (thread 11): x

ソース

ロガー ~ スレッド間通信の例

もう少し具体的な例として、ログ出力の別スレッド化に MVar を利用するスレッド間通信サンプルが、PCPH 本では紹介されている。

ロガーに求められるフォースには以下のようなものがあるが、、、

  • ロガーを呼び出す側はログ出力が終わるまで待ちたくない。
  • アプリケーションの終了時には、最後のログ出力が終わるまで待ちたい。

例えば以下のように実装できる。

sealed trait LogCommand
case class Message(value: String) extends LogCommand
case class Stop(value: MVar[F, Unit]) extends LogCommand

case class Logger(m: MVar[F, LogCommand]) {
  def message(s: String): F[Unit] = m.put(Message(s))
  def start: F[Unit] = {
    def loop: F[Unit] = m.take flatMap {                     // MVar からコマンド取り出し
      case Message(msg) => info(msg) >> loop                 // メッセージを出力して再帰
      case Stop(s)      => info("logger: stop") >> s.put(()) // 呼び出しスレッドに通知
    }
    loop
  }
  def stop: F[Unit] = for {
    s <- MVar.empty[F, Unit] // ロガースレッド終了待ち通信のための MVar
    _ <- m.put(Stop(s))      // ロガースレッドに終了コマンドを送る
    _ <- s.take              // ロガースレッドが終了するのを待つ
  } yield ()
}

def initLogger: F[Logger] = for {
  m <- MVar.empty[F, LogCommand] // ロガーへのコマンドをやり取りする MVar
  l =  Logger(m)
  _ <- F.start(l.start) // ロガースレッドをフォークして開始
} yield l

先の単純な読み書きの例では、Haskell コードと Scala コードは同じようなものになったが、この例では Scala らしく少し OOP ぽく書いたので、PCPH 本の Haskell コードとは少し構成が違う。

F[_] = IO とすると以下のような実行コードになる。

def run(args: List[String]): IO[ExitCode] = for {
  l <- initLogger
  _ <- l.message("hello")
  _ <- l.message("bye")
  _ <- l.stop
} yield ExitCode.Success

ソース

電話帳 ~ 共有状態を可変ぽく扱う例

普通の関数型プログラミングで State/StateT モナドを扱うのと同じ感じで、並行プログラミングの共有状態も MVar で管理できる。PCPH 本では、電話帳へのエントリ追加と検索を例にした PhoneBook サンプルが載っているが、これも Scala で書いてみる。

type Name        = NonEmptyString // refind で提供されている非空文字列型
type PhoneNumber = NonEmptyString
type PhoneBook   = Map[Name, PhoneNumber]

case class PhoneBookState[F[_]: Concurrent](m: MVar[F, PhoneBook]) {
  def insert(n: Name, p: PhoneNumber): F[Unit] =
    m.take >>= (book => m.put(book + (n -> p)))

  def lookup(n: Name): F[Option[PhoneNumber]] = for {
    book <- m.take
    _    <- m.put(book) // すぐ戻す
  } yield book.get(n)
}
object PhoneBookState {
  def apply[F[_]: Concurrent]: F[PhoneBookState[F]] =
    MVar.of(Map.empty[Name, PhoneNumber]).map(PhoneBookState(_))
}

例えばこんなふうに読み書きできる。

def program[F[_]](implicit F: Concurrent[F], C: Clock[F]) = for {
  s <- PhoneBookState[F]
  _ <- (1 to 10000).toList.map(n => s.insert(s"Name$n", s"$n")).sequence.void
  _ <- s.lookup("Name999") >>= info[F]
  _ <- s.lookup("Unknown") >>= info[F]
} yield ()

ソース

MVar を使った Chan の実装例

ここまでの例は、「一個ずつ出し入れ」という MVar の性質そのままだったが、工夫すると無制限バッファやマルチキャストなどにも利用できる。

ポイントは次のようなものになる。

  1. 読み出し用と書き込み用に、それぞれ MVar を用意する(書き込み MVar は常に空=hole)。
  2. データ構造として、MVar を入れ子というか数珠つなぎにしたものを使う。

ポイント1 の Mvar のペアを持つ型を Chan とし、ポイント2 の MVar の連鎖を Stream 型として定義する。

Haskell では Control.Concurent.Chan としてすでに提供されているが3、 Scala で書くと以下のようになる。

type Stream[A] = MVar[F, Item[A]]
case class Item[A](head: A, tail: Stream[A])

case class Chan[A](reader: MVar[F, Stream[A]], writer: MVar[F, Stream[A]]) {

ちなみに F を省略して Stream を展開すると以下のように書ける。

MVar(Item(a1, MVar(Item(a2, MVar(Item(a3, MVar(...)))))))

Chan への操作としては、生成・読み・書きの他に、Chan を「複製」する dup があり、これを使うと書き込み側の hole を共有する空の Chan が作られ、挙動としては一つの Chan に書き込むと、他の複数の Chan 読み取り側で互いに他の読み取り操作に影響されることなく、Item を読み出すことができる。つまりマルチキャストができることになる。

case class Chan[A](reader: MVar[F, Stream[A]], writer: MVar[F, Stream[A]]) {
  // 読み出し側の先頭要素を取り出す
  def read = for {
    stream <- reader.take
    item   <- stream.read
    _      <- reader.put(item.tail) // tail を新たな読み取りストリームとして
  } yield item.head                 // head を返す

  // 書き込み側に値を入れる
  def write(a: A): F[Unit] = for {
    newHole <- MVar.empty[F, Item[A]]       // 書き込み側は常に空 MVar (hole)
    oldHole <- writer.take
    _       <- oldHole put Item(a, newHole) // 古い hole に値をいれて
    _       <- writer put newHole           // 空 MVar を新たな書き込み側とする
  } yield ()

  // Chan を複製する(writer を共有する空の Chan)
  def dup: F[Chan[A]] = for {
    hole      <- writer.read
    newReader <- MVar.of(hole)
  } yield copy(reader = newReader)
}
object Chan {
  // F[_]にラップするコンストラクタ
  def apply[A]: F[Chan[A]] = for {
    hole     <- MVar.empty[F, Item[A]] // 最初は空
    readVar  <- MVar.of(hole)
    writeVar <- MVar.of(hole)
  } yield Chan(readVar, writeVar)
}

動かして挙動を確認できるコードも Github に上げておいた。

おわりに

  • 仕様からも直感するが、やはり仕組み上、上手く使わないとデッドロックは起こしやすいらしい。こことかこことかで議論がある。
  • PCPH 本の構成としては MVar の先に STM が来る。これもいずれ試してみたい。

  1. 2013年08月の出版 

  2. 第一部が「並列Haskell」。ちなみに並列(parallel)は文字通り同時に複数タスクが実行されることで、並行(concurrent)は外観上(内部ではちょっとずつ切り替えながらでも)同時進行していること。 

  3. Monix の ConcurrentChannl は、Haskell の Chan にインスパイアされた面もあるが、大幅に改善された API らしい。https://twitter.com/alexelcu/status/1063901337568587776 

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2