10
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Haskellで並行化する方法: 6秒かかる処理を3秒にしよう

Last updated at Posted at 2016-05-08

Concurrentなプログラミングの練習をしようとGoで書かれた例をいろいろHaskellで書いてみようと思う。ただのネタである。

今回の参考記事

まず直列実行するプログラムを書く

main = do
  putLog "started."

  -- 1秒かかるコマンド
  putLog "sleep1 started."
  sleep 1
  putLog "sleep1 finished."

  -- 2秒かかるコマンド
  putLog "sleep2 started."
  sleep 2
  putLog "sleep2 finished."

  -- 3秒かかるコマンド
  putLog "sleep3 started."
  sleep 3
  putLog "sleep3 finished."

  putLog "all finished."

sleep = threadDelay . (1000 * 1000 *)

putLog str = do
  time <- getCurrentTime
  let timeStr = formatTime defaultTimeLocale rfc822DateFormat time
  printf "%s %s\n" timeStr str

なるべく同じようなコードになるようにsleep関数とputLog関数を用意した。

実行してみる。

Sat, 30 Apr 2016 03:39:42 UTC started.
Sat, 30 Apr 2016 03:39:42 UTC sleep1 started.
Sat, 30 Apr 2016 03:39:43 UTC sleep1 finished.
Sat, 30 Apr 2016 03:39:43 UTC sleep2 started.
Sat, 30 Apr 2016 03:39:45 UTC sleep2 finished.
Sat, 30 Apr 2016 03:39:45 UTC sleep3 started.
Sat, 30 Apr 2016 03:39:48 UTC sleep3 finished.
Sat, 30 Apr 2016 03:39:48 UTC all finished.

予定どおり6秒かかった。

これを並行化したい。

軽量スレッドを使って並行化し、3秒で終わるようにする。

HaskellにはGoルーチンはないが、forkIOを使うと軽量スレッドを使うことができる。IOアクションを渡せば並行処理をすることができる。

> :t forkIO
forkIO :: IO () -> IO ThreadId

IOアクションでない純粋関数の世界ではどうするのかというと、純粋間数な世界では、そもそも実行順序に左右されず結果は一定なので、並行化する必要はないと聞いたことがある。並列に処理すべきである。

main = do
  putLog "started."
  sleep1_finished <- newEmptyMVar
  sleep2_finished <- newEmptyMVar
  sleep3_finished <- newEmptyMVar

  forkIO $ do
    -- 1秒かかるコマンド
    putLog "sleep1 started."
    sleep 1
    putLog "sleep1 finished."
    putMVar sleep1_finished ()

  forkIO $ do
    -- 2秒かかるコマンド
    putLog "sleep2 started."
    sleep 2
    putLog "sleep2 finished."
    putMVar sleep2_finished ()

  forkIO $ do
    -- 3秒かかるコマンド
    putLog "sleep3 started."
    sleep 3
    putLog "sleep3 finished."
    putMVar sleep3_finished ()

  takeMVar sleep1_finished
  takeMVar sleep2_finished
  takeMVar sleep3_finished

  putLog "all finished."
  
sleep = threadDelay . (1000 * 1000 *)

putLog str = do
  time <- getCurrentTime
  let timeStr = formatTime defaultTimeLocale rfc822DateFormat time
  printf "%s %s\n" timeStr str  

スレッド同士が協調して動作するためにはMVarが使える
MVarは値が入ってくる前に値が取得されるまでスレッドがまってくれる。takeMVarをつかってスレッドの完了まつ。個々のスレッドは処理完了時にputMVarで()を格納することで完了を通知する。Go言語で書いたコードとよく似ている

Sun,  1 May 2016 01:50:44 UTC started.
Sun,  1 May 2016 01:50:44 USTuSCnu ,ns ,l  e1 e 1pM 1aM yas yt2 a02r10t61e 6d0 .10
:15:05:04:44 4U TUCT Cs lseleepe2p 3s tsatratretde.d
.
Sun,  1 May 2016 01:50:45 UTC sleep1 finished.
Sun,  1 May 2016 01:50:46 UTC sleep2 finished.
Sun,  1 May 2016 01:50:47 UTC sleep3 finished.
Sun,  1 May 2016 01:50:47 UTC all finished.

3秒で処理が終了していることがわかる。
Haskellの軽量スレッドはプリエンプティブなので、出力が混ざってしまっている。

Control.Concurrent.ChanというMVarをキューのようにつかえるようにしたものがあったので、出力したい内容をChanに詰めて、別のスレッドでChanの中身を取り出して出力するようにしてみた。

別スレッドで走らせる内容は以下の感じにしてみた。

data Log = End | Write String

logging :: Chan Log -> MVar () -> IO ()
logging chan finished = loop
  where
    loop = do
      log <- readChan chan
      case log of
        End -> putMVar finished ()
        Write str -> do
          putStrLn str
          loop
Tue,  3 May 2016 05:23:40 UTC started.
Tue,  3 May 2016 05:23:40 UTC sleep1 started.
Tue,  3 May 2016 05:23:40 UTC sleep2 started.
Tue,  3 May 2016 05:23:40 UTC sleep3 started.
Tue,  3 May 2016 05:23:41 UTC sleep1 finished.
Tue,  3 May 2016 05:23:42 UTC sleep2 finished.
Tue,  3 May 2016 05:23:43 UTC sleep3 finished.
Tue,  3 May 2016 05:23:43 UTC all finished.

うまくできた。Chanに詰める速度が、出力する速度より速く出来てしまう可能性がある。この時メモリが足りなくなる可能性があることは注意しないといけなさそう。

ログへの書き込みをブロックさせることも状況によっては検討するほうが良さそうです。

まとめ

forkIOとMVarは基本的な機能らしい。
これらを活用して並行のための機能はいろいろ作成されているようだ。ChanもMVarで実装されていた。
asyncなどを使うともっとシンプルにかけるらしいが、これもforkIOやMVarが使われている。

参考文献

ソースコード

import Control.Concurrent
import Control.Concurrent.MVar
import Data.Time.Clock
import Data.Time.Format
import Text.Printf

main = fast

fast = do
  putLog "started."
  sleep1_finished <- newEmptyMVar
  sleep2_finished <- newEmptyMVar
  sleep3_finished <- newEmptyMVar

  forkIO $ do
    -- 1秒かかるコマンド
    putLog "sleep1 started."
    sleep 1
    putLog "sleep1 finished."
    putMVar sleep1_finished ()

  forkIO $ do
    -- 2秒かかるコマンド
    putLog "sleep2 started."
    sleep 2
    putLog "sleep2 finished."
    putMVar sleep2_finished ()

  forkIO $ do
    -- 3秒かかるコマンド
    putLog "sleep3 started."
    sleep 3
    putLog "sleep3 finished."
    putMVar sleep3_finished ()

  takeMVar sleep1_finished
  takeMVar sleep2_finished
  takeMVar sleep3_finished

  putLog "all finished."

slow = do
  putLog "started."

  -- 1秒かかるコマンド
  putLog "sleep1 started."
  sleep 1
  putLog "sleep1 finished."

  -- 2秒かかるコマンド
  putLog "sleep2 started."
  sleep 2
  putLog "sleep2 finished."

  -- 3秒かかるコマンド
  putLog "sleep3 started."
  sleep 3
  putLog "sleep3 finished."

  putLog "all finished."

sleep = threadDelay . (1000 * 1000 *)

putLog str = do
  time <- getCurrentTime
  let timeStr = formatTime defaultTimeLocale rfc822DateFormat time
  printf "%s %s\n" timeStr str

ログを別スレッドで書き込むほう

import Control.Concurrent
import Control.Concurrent.Chan
import Control.Concurrent.MVar
import Data.Time.Clock
import Data.Time.Format
import Text.Printf

data Log = End | Write String

main = fast

fast = do
  chan <- newChan
  logger_finished <- newEmptyMVar
  forkIO $ logging chan logger_finished
  let putLog str = putLogChan chan str
      finishedLog = writeChan chan End >> takeMVar logger_finished
    in do
    putLog "started."
    sleep1_finished <- newEmptyMVar
    sleep2_finished <- newEmptyMVar
    sleep3_finished <- newEmptyMVar

    forkIO $ do
      -- 1秒かかるコマンド
      putLog "sleep1 started."
      sleep 1
      putLog "sleep1 finished."
      putMVar sleep1_finished ()

    forkIO $ do
      -- 2秒かかるコマンド
      putLog "sleep2 started."
      sleep 2
      putLog "sleep2 finished."
      putMVar sleep2_finished ()

    forkIO $ do
      -- 3秒かかるコマンド
      putLog "sleep3 started."
      sleep 3
      putLog "sleep3 finished."
      putMVar sleep3_finished ()

    takeMVar sleep1_finished
    takeMVar sleep2_finished
    takeMVar sleep3_finished

    putLog "all finished."
    finishedLog

sleep = threadDelay . (1000 * 1000 *)

logging :: Chan Log -> MVar () -> IO ()
logging chan finished = loop
  where
    loop = do
      log <- readChan chan
      case log of
        End -> putMVar finished ()
        Write str -> do
          putStrLn str
          loop

putLogChan :: Chan Log -> String -> IO ()
putLogChan chan str = do
  time <- getCurrentTime
  let timeStr = formatTime defaultTimeLocale rfc822DateFormat time
  let log = printf "%s %s" timeStr str
  writeChan chan $ Write log
10
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?