Concurrentなプログラミングの練習をしようとGoで書かれた例をいろいろHaskellで書いてみようと思う。ただのネタである。
今回の参考記事
まず直列実行するプログラムを書く
main = do
putLog "started."
-- 1秒かかるコマンド
putLog "sleep1 started."
sleep 1
putLog "sleep1 finished."
-- 2秒かかるコマンド
putLog "sleep2 started."
sleep 2
putLog "sleep2 finished."
-- 3秒かかるコマンド
putLog "sleep3 started."
sleep 3
putLog "sleep3 finished."
putLog "all finished."
sleep = threadDelay . (1000 * 1000 *)
putLog str = do
time <- getCurrentTime
let timeStr = formatTime defaultTimeLocale rfc822DateFormat time
printf "%s %s\n" timeStr str
なるべく同じようなコードになるようにsleep関数とputLog関数を用意した。
実行してみる。
Sat, 30 Apr 2016 03:39:42 UTC started.
Sat, 30 Apr 2016 03:39:42 UTC sleep1 started.
Sat, 30 Apr 2016 03:39:43 UTC sleep1 finished.
Sat, 30 Apr 2016 03:39:43 UTC sleep2 started.
Sat, 30 Apr 2016 03:39:45 UTC sleep2 finished.
Sat, 30 Apr 2016 03:39:45 UTC sleep3 started.
Sat, 30 Apr 2016 03:39:48 UTC sleep3 finished.
Sat, 30 Apr 2016 03:39:48 UTC all finished.
予定どおり6秒かかった。
これを並行化したい。
軽量スレッドを使って並行化し、3秒で終わるようにする。
HaskellにはGoルーチンはないが、forkIOを使うと軽量スレッドを使うことができる。IOアクションを渡せば並行処理をすることができる。
> :t forkIO
forkIO :: IO () -> IO ThreadId
IOアクションでない純粋関数の世界ではどうするのかというと、純粋間数な世界では、そもそも実行順序に左右されず結果は一定なので、並行化する必要はないと聞いたことがある。並列に処理すべきである。
main = do
putLog "started."
sleep1_finished <- newEmptyMVar
sleep2_finished <- newEmptyMVar
sleep3_finished <- newEmptyMVar
forkIO $ do
-- 1秒かかるコマンド
putLog "sleep1 started."
sleep 1
putLog "sleep1 finished."
putMVar sleep1_finished ()
forkIO $ do
-- 2秒かかるコマンド
putLog "sleep2 started."
sleep 2
putLog "sleep2 finished."
putMVar sleep2_finished ()
forkIO $ do
-- 3秒かかるコマンド
putLog "sleep3 started."
sleep 3
putLog "sleep3 finished."
putMVar sleep3_finished ()
takeMVar sleep1_finished
takeMVar sleep2_finished
takeMVar sleep3_finished
putLog "all finished."
sleep = threadDelay . (1000 * 1000 *)
putLog str = do
time <- getCurrentTime
let timeStr = formatTime defaultTimeLocale rfc822DateFormat time
printf "%s %s\n" timeStr str
スレッド同士が協調して動作するためにはMVarが使える
MVarは値が入ってくる前に値が取得されるまでスレッドがまってくれる。takeMVarをつかってスレッドの完了まつ。個々のスレッドは処理完了時にputMVarで()
を格納することで完了を通知する。Go言語で書いたコードとよく似ている
Sun, 1 May 2016 01:50:44 UTC started.
Sun, 1 May 2016 01:50:44 USTuSCnu ,ns ,l e1 e 1pM 1aM yas yt2 a02r10t61e 6d0 .10
:15:05:04:44 4U TUCT Cs lseleepe2p 3s tsatratretde.d
.
Sun, 1 May 2016 01:50:45 UTC sleep1 finished.
Sun, 1 May 2016 01:50:46 UTC sleep2 finished.
Sun, 1 May 2016 01:50:47 UTC sleep3 finished.
Sun, 1 May 2016 01:50:47 UTC all finished.
3秒で処理が終了していることがわかる。
Haskellの軽量スレッドはプリエンプティブなので、出力が混ざってしまっている。
Control.Concurrent.ChanというMVarをキューのようにつかえるようにしたものがあったので、出力したい内容をChanに詰めて、別のスレッドでChanの中身を取り出して出力するようにしてみた。
別スレッドで走らせる内容は以下の感じにしてみた。
data Log = End | Write String
logging :: Chan Log -> MVar () -> IO ()
logging chan finished = loop
where
loop = do
log <- readChan chan
case log of
End -> putMVar finished ()
Write str -> do
putStrLn str
loop
Tue, 3 May 2016 05:23:40 UTC started.
Tue, 3 May 2016 05:23:40 UTC sleep1 started.
Tue, 3 May 2016 05:23:40 UTC sleep2 started.
Tue, 3 May 2016 05:23:40 UTC sleep3 started.
Tue, 3 May 2016 05:23:41 UTC sleep1 finished.
Tue, 3 May 2016 05:23:42 UTC sleep2 finished.
Tue, 3 May 2016 05:23:43 UTC sleep3 finished.
Tue, 3 May 2016 05:23:43 UTC all finished.
うまくできた。Chanに詰める速度が、出力する速度より速く出来てしまう可能性がある。この時メモリが足りなくなる可能性があることは注意しないといけなさそう。
ログへの書き込みをブロックさせることも状況によっては検討するほうが良さそうです。
まとめ
forkIOとMVarは基本的な機能らしい。
これらを活用して並行のための機能はいろいろ作成されているようだ。ChanもMVarで実装されていた。
asyncなどを使うともっとシンプルにかけるらしいが、これもforkIOやMVarが使われている。
参考文献
ソースコード
import Control.Concurrent
import Control.Concurrent.MVar
import Data.Time.Clock
import Data.Time.Format
import Text.Printf
main = fast
fast = do
putLog "started."
sleep1_finished <- newEmptyMVar
sleep2_finished <- newEmptyMVar
sleep3_finished <- newEmptyMVar
forkIO $ do
-- 1秒かかるコマンド
putLog "sleep1 started."
sleep 1
putLog "sleep1 finished."
putMVar sleep1_finished ()
forkIO $ do
-- 2秒かかるコマンド
putLog "sleep2 started."
sleep 2
putLog "sleep2 finished."
putMVar sleep2_finished ()
forkIO $ do
-- 3秒かかるコマンド
putLog "sleep3 started."
sleep 3
putLog "sleep3 finished."
putMVar sleep3_finished ()
takeMVar sleep1_finished
takeMVar sleep2_finished
takeMVar sleep3_finished
putLog "all finished."
slow = do
putLog "started."
-- 1秒かかるコマンド
putLog "sleep1 started."
sleep 1
putLog "sleep1 finished."
-- 2秒かかるコマンド
putLog "sleep2 started."
sleep 2
putLog "sleep2 finished."
-- 3秒かかるコマンド
putLog "sleep3 started."
sleep 3
putLog "sleep3 finished."
putLog "all finished."
sleep = threadDelay . (1000 * 1000 *)
putLog str = do
time <- getCurrentTime
let timeStr = formatTime defaultTimeLocale rfc822DateFormat time
printf "%s %s\n" timeStr str
ログを別スレッドで書き込むほう
import Control.Concurrent
import Control.Concurrent.Chan
import Control.Concurrent.MVar
import Data.Time.Clock
import Data.Time.Format
import Text.Printf
data Log = End | Write String
main = fast
fast = do
chan <- newChan
logger_finished <- newEmptyMVar
forkIO $ logging chan logger_finished
let putLog str = putLogChan chan str
finishedLog = writeChan chan End >> takeMVar logger_finished
in do
putLog "started."
sleep1_finished <- newEmptyMVar
sleep2_finished <- newEmptyMVar
sleep3_finished <- newEmptyMVar
forkIO $ do
-- 1秒かかるコマンド
putLog "sleep1 started."
sleep 1
putLog "sleep1 finished."
putMVar sleep1_finished ()
forkIO $ do
-- 2秒かかるコマンド
putLog "sleep2 started."
sleep 2
putLog "sleep2 finished."
putMVar sleep2_finished ()
forkIO $ do
-- 3秒かかるコマンド
putLog "sleep3 started."
sleep 3
putLog "sleep3 finished."
putMVar sleep3_finished ()
takeMVar sleep1_finished
takeMVar sleep2_finished
takeMVar sleep3_finished
putLog "all finished."
finishedLog
sleep = threadDelay . (1000 * 1000 *)
logging :: Chan Log -> MVar () -> IO ()
logging chan finished = loop
where
loop = do
log <- readChan chan
case log of
End -> putMVar finished ()
Write str -> do
putStrLn str
loop
putLogChan :: Chan Log -> String -> IO ()
putLogChan chan str = do
time <- getCurrentTime
let timeStr = formatTime defaultTimeLocale rfc822DateFormat time
let log = printf "%s %s" timeStr str
writeChan chan $ Write log