Posted at

Haskell(GHC)の並行処理における遅延評価の動作

More than 3 years have passed since last update.

haskell(GHC)でマルチスレッド処理をする場合、遅延評価がどう動くのかをシングルスレッドの場合以上によく知っておく必要があります。

『Haskellによる並列・並行プログラミング』 にはかなり多くの情報が書いてありますが、微妙にはっきり理解できなかった部分もあったので、実験を兼ねてまとめてみました。

遅延評価やサンクそのものについては解説しないので、必要なら他の解説や『Haskellによる並列・並行プログラミング』などを読んでください。


確認環境


  • haskell-platform-2014.2.0.0

  • stm-2.4.2

  • OS:Windows7 64bit (Ubuntu 14.04 64bit on VirtualBox on Windows7 64bitでも同じ結果となることは確認済み)


共通処理の準備

以降の実験で使う関数を準備しておきます。


Common.hs

module Common where

import qualified Control.Concurrent as CC
import Control.DeepSeq (deepseq)
import qualified Data.Time.Clock as Time
import System.IO.Unsafe (unsafePerformIO)

-- 文字列を経過時間、スレッドIDと共に表示する関数を生成
type Puts = String -> IO ()
mkPuts :: IO Puts
mkPuts = do
start <- Time.getCurrentTime
return $ \ str -> str `deepseq` do -- 先に引数を評価するためにdeepseqを使用
thid <- CC.myThreadId
now <- Time.getCurrentTime
let diffTime = now `Time.diffUTCTime` start
diff = fromIntegral (fromEnum diffTime) / 1000000000000 :: Float
timeStr = take 8 $ show diff
putStrLn $ timeStr ++ " " ++ show thid ++ " " ++ str

-- スレッドIDと文字列をデバッグ表示
trace :: Puts -> String -> a -> a
trace puts str v = unsafePerformIO $ puts str >> return v

-- numまでの数を足し合わせる。計算の最初と最後にデバッグ表示する。
count :: Puts -> Int -> Int
count puts num = trace puts "count start" $ go num 0
where
go 0 x = x `seq` trace puts "count end" x
go c x = go (c-1) (x+c)



case1 遅延評価の基本的な動作

単なる遅延評価の動作の確認です。まだ並行は出てきません。


case1.hs

module Main where

import qualified Common as C
import System.Environment (getArgs)

case1 :: C.Puts -> Int -> IO ()
case1 puts lazyVal = do
putStrLn "case1 遅延評価の基本的な動作"
puts $ "1st : " ++ show lazyVal -- 1つ目の表示には時間がかかる。
puts $ "2nd : " ++ show lazyVal -- 2つ目の表示はすぐ。

main :: IO ()
main = do
puts <- C.mkPuts
[numStr] <- getArgs
let num = read numStr :: Int
count = C.count puts
case1 puts (count num)



実行

ghc -Wall -O2 -threaded -rtsopts case1.hs

./case1 5000000000 +RTS -N4


  • 実行時の引数の5000000000は適当な数です。計算性能次第で4~6秒くらいで終わるようにすると、以降の実験でうまくいきます。

  • +RTS -N4は実際に4つまで同時並行で処理をさせるために渡しています。実験環境ではCPUコアが4つなので-N4を指定しています。


    • case1.hsでは意味がありませんが、以降の実験でも同じ前提で試すためここでも指定しています。




実行結果

case1 遅延評価の基本的な動作

0.0 ThreadId 3 count start
5.319609 ThreadId 3 count end
5.319609 ThreadId 3 1st : -5946744071209551616
5.319609 ThreadId 3 2nd : -5946744071209551616


case1 の実行結果から推測できること


  • 遅延評価なので、値が必要とされるときになってから評価が始まる


    • 根拠:"case1 遅延評価の基本的な動作"の表示後に"count start"が表示される



  • 一度評価された値を次に参照すると、前回評価された結果が返る


    • 根拠:1stと2ndの間はほとんど時間差がない




case2 遅延評価の値を複数のスレッドから参照する

遅延評価をマルチスレッドで参照した場合の動作の確認です。


case2.hs

module Main where

import qualified Common as C
import qualified Control.Concurrent as CC
import System.Environment (getArgs)

case2 :: C.Puts -> Int -> IO ()
case2 puts lazyVal = do
putStrLn "case2 遅延評価の値を複数のスレッドから参照する"
_thid1 <- CC.forkIO $ do
puts "thread1"
puts $ "thread1 : " ++ show lazyVal -- 最初にこのスレッドで評価する
_thid2 <- CC.forkIO $ do
puts "thread2"
CC.threadDelay 1000000 -- 1秒後に表示を試みる。
puts $ "thread2 : " ++ show lazyVal
_thid3 <- CC.forkIO $ do
puts "thread3"
CC.threadDelay 2000000 -- 2秒後に表示を試みる。
puts $ "thread3 : " ++ show lazyVal
CC.threadDelay $ 10 * 1000000
-- とりあえず10秒待って様子を見る。実用的にはasyncでwaitする方がよい。

main :: IO ()
main = do
puts <- C.mkPuts
[numStr] <- getArgs
let num = read numStr :: Int
count = C.count puts
case2 puts (count num)



実行

ghc -Wall -O2 -threaded -rtsopts case2.hs

./case2 5000000000 +RTS -N4


実行結果

case2 遅延評価の値を複数のスレッドから参照する

0.0 ThreadId 5 thread2
0.0 ThreadId 6 thread3
0.0 ThreadId 4 thread1
0.0 ThreadId 4 count start
5.413209 ThreadId 4 count end
5.413209 ThreadId 4 thread1 : -5946744071209551616
5.413209 ThreadId 5 thread2 : -5946744071209551616
5.413209 ThreadId 6 thread3 : -5946744071209551616


case2 の実行結果から推測できること


  • 最初に評価を開始したスレッドで評価が行われる


    • 根拠:"count start"と"count end"が"thread1"と同じスレッドIDで表示されている



  • 既に評価の開始された値を他のスレッドから参照した場合、その評価結果を待つ


    • 評価が終わっていなくても別々に評価を行ったりはしない

    • 根拠:"thread1","thread2","thread3"で結果が同時に表示されている




case3 評価中のスレッドをkillしてみる

case2で最初に評価を開始したスレッドで評価が行われることがわかりました。

では、そのスレッドが評価中に死んだ場合はどうなるのか、確認してみます。


case3.hs

module Main where

import qualified Common as C
import qualified Control.Concurrent as CC
import System.Environment (getArgs)

case3 :: C.Puts -> Int -> IO ()
case3 puts lazyVal = do
putStrLn "case3 評価中のスレッドをkillしてみる"
thid1 <- CC.forkIO $ do
puts "thread1"
puts $ "thread1 : " ++ show lazyVal -- 最初にこのスレッドで評価する
_thid2 <- CC.forkIO $ do
puts "thread2"
CC.threadDelay 1000000 -- 1秒後に表示を試みる。
puts $ "thread2 : " ++ show lazyVal
_thid3 <- CC.forkIO $ do
puts "thread3"
CC.threadDelay 2000000 -- 2秒後に表示を試みる。
puts $ "thread3 : " ++ show lazyVal
_ <- CC.forkIO $ do
-- 3秒後にスレッド1を殺す
CC.threadDelay $ 3 * 1000000
puts "kill thread1"
CC.killThread thid1
CC.threadDelay $ 10 * 1000000
-- とりあえず10秒待って様子を見る。実用的にはasyncでwaitする方がよい。

main :: IO ()
main = do
puts <- C.mkPuts
[numStr] <- getArgs
let num = read numStr :: Int
count = C.count puts
case3 puts (count num)



実行

ghc -Wall -O2 -threaded -rtsopts case3.hs

./case3 5000000000 +RTS -N4


実行結果

case3 評価中のスレッドをkillしてみる

0.0 ThreadId 4 thread1
0.0 ThreadId 6 thread3
0.0 ThreadId 5 thread2
0.0 ThreadId 4 count start
3.026405 ThreadId 7 kill thread1
5.335209 ThreadId 6 count end
5.335209 ThreadId 5 thread2 : -5946744071209551616
5.335209 ThreadId 6 thread3 : -5946744071209551616


case3 の実行結果から推測できること


  • 評価中のスレッドが死んだ場合、他のスレッドが評価を引き継ぐ


    • 根拠:"count start"と"count end"が異なるスレッドIDで表示されている




case4 スレッドkillにより中断された遅延評価を再開させる

case3 ではスレッドが殺されたとき、既に他のスレッドが評価を待っている状態でした。

では、他にスレッドが待っていない場合はどうなるのか、確認してみます。


case4.hs

module Main where

import qualified Common as C
import qualified Control.Concurrent as CC
import System.Environment (getArgs)

case4 :: C.Puts -> Int -> IO ()
case4 puts lazyVal = do
putStrLn "case4 スレッドkillにより中断された遅延評価を再開させる"
thid1 <- CC.forkIO $ do
puts "thread1"
puts $ "thread1 : " ++ show lazyVal
_ <- CC.forkIO $ do
-- 3秒後にスレッド1を殺す
CC.threadDelay $ 3 * 1000000
puts "kill thread1"
CC.killThread thid1
-- 8秒後にスレッド2、3を起動
CC.threadDelay $ 8 * 1000000
puts "start thread2 and thread3"
_thid2 <- CC.forkIO $ do
puts "thread2"
puts $ "thread2 : " ++ show lazyVal
_thid3 <- CC.forkIO $ do
puts "thread3"
CC.threadDelay $ 1000000 -- 1秒後に表示を試みる。
puts $ "thread3 : " ++ show lazyVal
CC.threadDelay $ 10 * 1000000
-- とりあえず10秒待って様子を見る。実用的にはasyncでwaitする方がよい。

main :: IO ()
main = do
puts <- C.mkPuts
[numStr] <- getArgs
let num = read numStr :: Int
count = C.count puts
case4 puts (count num)



実行

ghc -Wall -O2 -threaded -rtsopts case4.hs

./case4 5000000000 +RTS -N4


実行結果

case4 スレッドkillにより中断された遅延評価を再開させる

0.0 ThreadId 4 thread1
0.0 ThreadId 4 count start
3.026405 ThreadId 5 kill thread1
8.018414 ThreadId 3 start thread2 and thread3
8.018414 ThreadId 7 thread3
8.018414 ThreadId 6 thread2
10.34281 ThreadId 6 count end
10.34281 ThreadId 6 thread2 : -5946744071209551616
10.34281 ThreadId 7 thread3 : -5946744071209551616


case4 の実行結果から推測できること


  • 評価中のスレッドが死んだ場合、評価はいったん停止するが、他のスレッドが評価を再開すると残りの計算を行う


    • 途中で引き継ぐスレッドがいなくなっても、評価途中の状態は破棄されずに残り、再開することができる

    • 根拠:"count start"から"thread1"を殺すまでの時間と、"thread2"がスタートしてから"count end"までの時間の合計が、評価にかかる時間とだいたい一致している



これはもしかすると、扱いに注意が必要な動作かもしれません。(問題になるケースは必ずしも多くはないと思いますが)

大きなメモリを必要とする評価を途中で止めてしまうと、意図せずに大きなメモリを占有したままになってしまう可能性があるということです。


case5 IORef/MVar/TVarへの書き込みと読みだし

少し話は変わって、IORefやMVar、TVarへの書き込みや読み出しで評価がどうなるのかの確認です。


case5.hs

module Main where

import qualified Common as C
import qualified Control.Concurrent.MVar as MVar
import qualified Control.Concurrent.STM as STM
import qualified Data.IORef as Ref
import System.Environment (getArgs)

case5A :: C.Puts -> Int -> IO ()
case5A puts lazyVal = do
putStrLn "case5A IORefへの書き込みと読みだし"
ref <- Ref.newIORef 0
puts "phase1"
Ref.writeIORef ref lazyVal
puts "phase2"
lazyVal' <- Ref.readIORef ref
puts "phase3"
puts $ show lazyVal'

case5B :: C.Puts -> Int -> IO ()
case5B puts lazyVal = do
putStrLn "case5B MVarへの書き込みと読みだし"
mv <- MVar.newEmptyMVar
puts "phase1"
MVar.putMVar mv lazyVal
puts "phase2"
lazyVal' <- MVar.takeMVar mv
puts "phase3"
puts $ show lazyVal'

case5C :: C.Puts -> Int -> IO ()
case5C puts lazyVal = do
putStrLn "case5C TVarへの書き込みと読みだし"
tv <- STM.newTVarIO 0
puts "phase1"
STM.atomically $ STM.writeTVar tv lazyVal
puts "phase2"
lazyVal' <- STM.atomically $ STM.readTVar tv
puts "phase3"
puts $ show lazyVal'

main :: IO ()
main = do
puts <- C.mkPuts
[numStr] <- getArgs
let num = read numStr :: Int
count = C.count puts
case5A puts (count num)
case5B puts (count num)
case5C puts (count num)



実行

ghc -Wall -O2 -threaded -rtsopts case5.hs

./case5 5000000000 +RTS -N4


実行結果

case5A IORefへの書き込みと読みだし

0.0 ThreadId 3 phase1
0.0 ThreadId 3 phase2
0.0 ThreadId 3 phase3
0.0 ThreadId 3 count start
5.428809 ThreadId 3 count end
5.428809 ThreadId 3 -5946744071209551616
case5B MVarへの書き込みと読みだし
5.428809 ThreadId 3 phase1
5.428809 ThreadId 3 phase2
5.428809 ThreadId 3 phase3
5.428809 ThreadId 3 count start
10.73282 ThreadId 3 count end
10.73282 ThreadId 3 -5946744071209551616
case5C TVarへの書き込みと読みだし
10.73282 ThreadId 3 phase1
10.73282 ThreadId 3 phase2
10.73282 ThreadId 3 phase3
10.73282 ThreadId 3 count start
16.00562 ThreadId 3 count end
16.00562 ThreadId 3 -5946744071209551616


case5 の実行結果から推測できること


  • IORefもMVarもTVarも、書き込みや読み出しでは、中身は評価されない


    • 根拠:どれも"phase3"の後に"count start"が表示されている



IORefへの書き込みや読み出しでは評価されないので、評価を別スレッドに任せたりすることもできます。

これは基本といえば基本なのですが、ちゃんと知っておかないといけない動作です。

たとえば以下のようなコードでは、中身が評価されないため、サンクがどんどん長くなってメモリを圧迫してやがてメモリ不足に陥ります。


memory_leak1.hs

module Main where

import qualified Control.Monad as M
import qualified Data.IORef as Ref

main :: IO ()
main = do
ref <- Ref.newIORef (1 :: Int)
M.forever $ do
i <- Ref.readIORef ref
Ref.writeIORef ref (i + 1)


おそらく多くの場合、IORefやMVarやTVarに書き込む前に評価した方がよいのではないかと思われます。

たとえば以下のように書き込みの前に評価すれば、メモリが圧迫されることはありません。(この場合、永久に動き続けることになります)


no_memory_leak1.hs

module Main where

 
import qualified Control.Monad as M
import qualified Data.IORef as Ref

main :: IO ()
main = do
ref <- Ref.newIORef (1 :: Int)
M.forever $ do
i <- Ref.readIORef ref
i `seq` Ref.writeIORef ref (i + 1)



case6 atomicModifyIORefの動作

またまた少し話は飛びますが、次はatomicModifyIORefの動作の確認です。

『Haskellによる並列・並行プログラミング』では一番最後のほうに出てきて「データがひとつなら実はこれが一番性能いいんだ」みたいに言われる憎いヤツですね。

atomicModifyIORefがやっていることは基本的にはCAF(compare and swap)です。

よくあるCAFの使い方は、以下のようなものです。


  1. 値を読み出す。

  2. その値を使用して更新後の値を計算する。

  3. 更新後の値をCAFで書き込む。(他のスレッドから対象のメモリ内容が更新されているとCAFが失敗する)

  4. 書き込みに失敗した場合は、1~3を繰り返す。

このような処理をすることにより、対象のメモリの内容の更新について楽観的ロックによる排他性を実現することができます。

しかし、2の計算が長い時間を要するものになると、他のスレッドから更新される可能性が高くなり、CAFの失敗率が上がってしまいます。そうすると、何度も計算を繰り返すことになってパフォーマンスが低下してしまいます。

atomicModifyIORefでは、更新後の「値」はその場では計算せず、未評価のままサンクをCAFで書き込みます。

サンクを作る時間は実際に値を計算するよりも早いので、例え長い時間を要する計算内容であっても、すぐにCAFが実行されます。これによりCAFの成功率は高くなり、やり直しによるロスを最小限に抑えることができます。

atomicModifyIORefは遅延評価を非常にうまく活かした機能だといえると思います。


case6.hs

module Main where

import qualified Common as C
import qualified Control.Concurrent as CC
import qualified Data.IORef as Ref
import System.Environment (getArgs)

case6 :: C.Puts -> Int -> Int -> Int -> Int -> IO ()
case6 puts lazyVal1 lazyVal2 lazyVal3 lazyVal4 = do
putStrLn "case6 atomicModifyIORefの動作"
ref <- Ref.newIORef 0
puts "phase1"
Ref.atomicModifyIORef ref $ \ c -> (c + lazyVal1, ())
puts "phase2"
Ref.atomicModifyIORef ref $ \ c -> (c + lazyVal2, ())
puts "phase3"
Ref.atomicModifyIORef ref $ \ c -> (c + lazyVal3, ())
puts "phase4"
Ref.atomicModifyIORef ref $ \ c -> (c + lazyVal4, ())
puts "phase5"
lazyVal <- Ref.readIORef ref
CC.threadDelay $ 5 * 1000000
puts $ show lazyVal
puts "phase6"

main :: IO ()
main = do
puts <- C.mkPuts
[numStr] <- getArgs
let num = read numStr :: Int
count = C.count puts
case6 puts (count num) (count num) (count num) (count num)



実行

ghc -Wall -O2 -threaded -rtsopts case6.hs

./case6 5000000000 +RTS -N4


実行結果

case6 atomicModifyIORefの動作

0.0 ThreadId 3 phase1
0.0 ThreadId 3 phase2
0.0 ThreadId 3 phase3
1.560000 ThreadId 3 phase4
1.560000 ThreadId 3 phase5
5.023208 ThreadId 3 count start
10.45201 ThreadId 3 count end
10.45201 ThreadId 3 count start
15.97442 ThreadId 3 count end
15.97442 ThreadId 3 count start
21.41883 ThreadId 3 count end
21.41883 ThreadId 3 count start
26.81644 ThreadId 3 count end
26.81644 ThreadId 3 -5340232211128654848
26.81644 ThreadId 3 phase6


case6 の実行結果から推測できること



  • atomicModifyIORefでは中身は評価されない


    • 根拠:"phase5"と"phase6"の間に全ての評価が行われている




atomicModifyIORefの注意点、サンクの連鎖について

case5で確認したように、atomicModifyIORefを使用せずとも、誰かが評価しなければサンクは無限に連鎖します。

しかし、atomicModifyIORefのhackageの説明に記載があるようにcase5の場合には無かった微妙な落とし穴もあります。


atomicModifyIORef does not apply the function strictly. This is important to know even if all you are doing is replacing the value. For example, this will leak memory:


Haskellによる並列・並行プログラミング(p.289)のatomicModifyIORefについての記述にも上記と同じ内容ではありませんが「atomicModifyIORef'を使えばサンクの連鎖が発生しない」といった記載があります)

例えば以下のような処理ではサンクは連鎖せず、無限にメモリを消費することはありません。


no_memory_leak2.hs

module Main where

import qualified Control.Monad as M
import qualified Data.IORef as Ref

main :: IO ()
main = do
ref <- Ref.newIORef (1 :: Int)
M.forever $ do
_i <- Ref.readIORef ref
Ref.writeIORef ref 2


しかし、atomicModifyIORefのhackageの説明に記載されているように、以下のコードはサンクが連鎖し、メモリを無限に消費します。


memory_leak2.hs

module Main where

import qualified Control.Monad as M
import qualified Data.IORef as Ref

main :: IO ()
main = do
ref <- Ref.newIORef (1 :: Int)
M.forever $ Ref.atomicModifyIORef ref (\_ -> (2, ()))


両方とも、過去の値を捨てて新しい固定値を書き込んでいるのに、動作に違いが出てしまいました。

このような差異が生じるのは、atomicModifyIORefが、与えられた関数の内容に関わらず、連鎖したサンク(前の値への参照を持ったサンク)を生成して書き込むためです。

実際にサンクを評価するまでは、連鎖が切れていることを判別できないため、連鎖が発生してしまうわけです。

これはatomicModifyIORefに限った話ではなく、例えば以下のように関数を渡すような方式のコードを書けば発生しうる現象です。


memory_leak3.hs

module Main where

import qualified Control.Monad as M
import qualified Data.IORef as Ref

-- 最適化によりインライン展開されてしまうとサンクの連鎖が切れるので、
-- わざとインライン展開しないように指定しています。
{-# NOINLINE modify #-}
modify :: Ref.IORef a -> (a -> a) -> IO ()
modify ref f = do
i <- Ref.readIORef ref
Ref.writeIORef ref $ f i

main :: IO ()
main = do
ref <- Ref.newIORef (1 :: Int)
M.forever $ modify ref (\_ -> 2)


これはIORefに限った話ではなく、MVarやTVarでも発生しえますので同じ注意が必要です。

いずれにせよ、意図的に評価を遅延させたい特殊なケースを除けば、値を評価してから書き込む/atomicModifyIORef'を使う方が安全です。


case7 atomicModifyIORef'の動作

というわけで、atomicModifyIORef'の動作の確認です。

atomicModifyIORef'は、atomicModifyIORefを実行後にその結果を評価させます。(atomicModifyIORef'の実装


case7.hs

module Main where

import qualified Common as C
import qualified Data.IORef as Ref
import System.Environment (getArgs)

case7 :: C.Puts -> Int -> Int -> Int -> Int -> IO ()
case7 puts lazyVal1 lazyVal2 lazyVal3 lazyVal4 = do
putStrLn "case7 atomicModifyIORef'の動作"
ref <- Ref.newIORef 0
puts "phase1"
Ref.atomicModifyIORef' ref $ \ c -> (c + lazyVal1, ())
puts "phase2"
Ref.atomicModifyIORef' ref $ \ c -> (c + lazyVal2, ())
puts "phase3"
Ref.atomicModifyIORef' ref $ \ c -> (c + lazyVal3, ())
puts "phase4"
Ref.atomicModifyIORef' ref $ \ c -> (c + lazyVal4, ())
puts "phase5"
lazyVal <- Ref.readIORef ref
puts $ show lazyVal
puts "phase6"

main :: IO ()
main = do
puts <- C.mkPuts
[numStr] <- getArgs
let num = read numStr :: Int
count = C.count puts
case7 puts (count num) (count num) (count num) (count num)



実行

ghc -Wall -O2 -threaded -rtsopts case7.hs

./case7 5000000000 +RTS -N4


実行結果

case7 atomicModifyIORef'の動作

0.0 ThreadId 3 phase1
0.0 ThreadId 3 count start
5.382009 ThreadId 3 count end
5.382009 ThreadId 3 phase2
5.382009 ThreadId 3 count start
10.82641 ThreadId 3 count end
10.82641 ThreadId 3 phase3
10.82641 ThreadId 3 count start
16.23963 ThreadId 3 count end
16.23963 ThreadId 3 phase4
16.23963 ThreadId 3 count start
21.71523 ThreadId 3 count end
21.71523 ThreadId 3 phase5
21.71523 ThreadId 3 -5340232211128654848
21.73083 ThreadId 3 phase6


case7 の実行結果から推測できること



  • atomicModifyIORef'は実行時に結果が評価され、評価が完了するまで戻らない


    • 根拠:各phaseの間に"count start"と"count end"が表示されている




case8 atomicModifyIORef'でサンクの連鎖を処理するスレッドの一つをkillする

case7までの確認でatomicModifyIORef'を使えばすぐに値が評価されるためサンクが無限に連鎖しないことはわかりました。

しかし、無制限に連鎖しないだけで、まったく連鎖しないわけではありません。

例えばスレッドが複数あってそれらが同じIORefに対してatomicModifyIORef'を使えば、最大でスレッド数分のサンクが連鎖し、他のスレッドのサンク評価待ち状態になることはありえます。

そして、そのスレッドがcase3,4のように値の評価中もしくは評価前に死ぬこともありえます。

case8はそのような場合、どういった動作になるのかの確認です。

これまでにわかったことの複合したケースなので、結果は予想可能です。

試しにどういう動作になるか予想してみるとよいと思います。


case8.hs

module Main where

import qualified Common as C
import qualified Control.Concurrent as CC
import qualified Data.IORef as Ref
import System.Environment (getArgs)

case8 :: C.Puts -> Int -> Int -> Int -> Int -> IO ()
case8 puts lazyVal1 lazyVal2 lazyVal3 lazyVal4 = do
putStrLn "case8 atomicModifyIORef'でサンクの連鎖を処理するスレッドの一つをkillする"
ref <- Ref.newIORef 0
puts "phase1"
_thid1 <- CC.forkIO $ do
puts "thread1"
Ref.atomicModifyIORef' ref $ \ c -> (c + lazyVal1, ())
puts "thread1 end"
puts "phase2"
CC.threadDelay $ 1000000
thid2 <- CC.forkIO $ do
puts "thread2"
Ref.atomicModifyIORef' ref $ \ c -> (c + lazyVal2, ())
puts "thread2 end"
puts "phase3"
CC.threadDelay $ 1000000
puts "kill thread2"
CC.killThread thid2 -- スレッド2を起動して1秒後に殺す
_thid3 <- CC.forkIO $ do
puts "thread3"
Ref.atomicModifyIORef' ref $ \ c -> (c + lazyVal3, ())
puts "thread3 end"
puts "phase4"
CC.threadDelay $ 1000000
_thid4 <- CC.forkIO $ do
puts "thread4"
Ref.atomicModifyIORef' ref $ \ c -> (c + lazyVal4, ())
puts "thread4 end"
puts "phase5"
lazyVal <- Ref.readIORef ref
puts "phase6"
puts $ show lazyVal
puts "phase7"

main :: IO ()
main = do
puts <- C.mkPuts
[numStr] <- getArgs
let num = read numStr :: Int
count = C.count puts
case8 puts (count num) (count num) (count num) (count num)



実行

ghc -Wall -O2 -threaded -rtsopts case8.hs

./case8 5000000000 +RTS -N4


実行結果

case8 atomicModifyIORef'でサンクの連鎖を処理するスレッドの一つをkillする

0.0 ThreadId 3 phase1
0.0 ThreadId 4 thread1
0.0 ThreadId 3 phase2
0.0 ThreadId 4 count start
1.014001 ThreadId 3 phase3
1.014001 ThreadId 5 thread2
2.028003 ThreadId 3 kill thread2
2.028003 ThreadId 3 phase4
2.028003 ThreadId 6 thread3
3.073205 ThreadId 7 thread4
3.073205 ThreadId 3 phase5
3.073205 ThreadId 3 phase6
5.444409 ThreadId 4 count end
5.444409 ThreadId 4 thread1 end
5.444409 ThreadId 6 count start
10.88881 ThreadId 6 count end
10.88881 ThreadId 6 count start
16.36442 ThreadId 6 count end
16.36442 ThreadId 6 thread3 end
16.36442 ThreadId 7 count start
21.66843 ThreadId 7 count end
21.66843 ThreadId 7 thread4 end
21.66843 ThreadId 3 -5340232211128654848
21.66843 ThreadId 3 phase7

さて、予想通りになりましたでしょうか?


まとめ

それぞれのcaseの結論をまとめました(一部書き方を変えています)。


  • 遅延評価なので、値が必要とされるときになってから評価が始まる

  • 一度評価された値を次に参照すると、前回評価された結果が返る

  • 最初に評価を開始したスレッドで評価が行われる

  • 既に評価の開始された値を他のスレッドから参照した場合、その評価結果を待つ

  • 評価中のスレッドが死んだ場合、評価途中の状態でいったん停止する

  • 評価が途中で停止した値を参照すると、そのスレッドが残りの計算を行う

  • IORefもMVarもTVarも、書き込みや読み出しでは、中身は評価されない


  • atomicModifyIORefでは中身は評価されない


  • atomicModifyIORef'は実行時に結果が評価され、評価が完了するまで戻らない

どこそこに書かれているものもありますが、これはあくまでGHCの動作を実験結果から推測したものに過ぎない点にはご注意ください。