15
6

More than 3 years have passed since last update.

STMとReal-Time Queueを利用したマルチスレッドズンドコキヨシ

Last updated at Posted at 2019-12-18

STMとReal-Time Queueを利用したマルチスレッドズンドコキヨシ

巨大な機械

「さまざまな技術を使って作られています」「16個のセンサーと32個のモーターによって駆動する精密機械です」
その部屋を占領している巨大な機械のまえで博士は話しつづけた。
「このモーターは特注の品で、この機械のためにわざわざ取りよせました」
「何をする機械なんですか」
彼は冷蔵庫から卵を取りだすと、その機械の前面にある小さなくぼみにそっと置いた。機械ははげしくうなり、複雑にからみあった金属の棒ががちゃがちゃと動きはじめる。ボールペンのような部品が卵をつつきはじめ、両脇からのびてきたアームが卵をつかむ。つぎの瞬間、ガラス容器のなかに卵の中身が落ちる。機械は動きをとめる。沈黙が流れる。
「これだけですか」
「これだけです」

(民明書房刊「博士の異常な執着」より)

はじめに

3年くらい前に「ズンドコキヨシ」というものが流行った。

この記事であつかう内容

つぎの内容を説明する。

  • Haskellにおける変更可能な「変数」であるIORef
  • Haskellにおける並行処理(スレッドの生成)
  • 共有メモリと競合
  • ロックの問題点
    • デッドロックなど
  • ソフトウェアトランザクショナルメモリ(STM)の説明
  • 値が評価ずみなのか、サンクなのかを調べるやりかた
  • GHCiで「値の表示」に自分で定義した機能を使う方法
  • 短命なキュー
    • 要素の押しこみや取りだしに状態変化をともなう
  • 永続キュー
    • 要素の押しこみや取りだしが純粋な関数で実装されている
    • Batched Queue
      • たいていは平均すればO(1)
    • Banker's Queue
      • どのような使いかたをしても平均すればO(1)
    • Real-Time Queue
      • 平均しなくてもO(1)
  • Knuth-Morris-PrattアルゴリズムのHaskellでの実装
    • 文字列のなかから特定の並びを探索するアルゴリズム

STM、Real-Time Queue、Knuth-Morris-Prattアルゴリズムなどを使って、ズンドコキヨシを実装する。それぞれの機能について、あるていど、ていねいに説明していく。

サンプルコード

サンプルコードは、つぎのディレクトリに置いてある。参考にしてほしい。

GitHub: YoshikuniJujo/test_haskell/tribial/qiita/concurrent-zundoko

変更可能な「変数」であるIORef

多くの言語では変数の値は変化する。つまり状態をもつ。Haskellでは変数の値は変化しない。うれしいかどうかは別として、Haskellでも、ほかの言語にある変更可能な変数も用意されている。

たとえば、整数の並びの要素の総和をもとめてみよう。C言語だと、つぎのようになる。

sum.c
#include <stdio.h>

int sum(int *nums);

int
main(int argc, char *argv[])
{
        int nums[] = { 123, 456, 789, -1 };
        printf("%d\n", sum(nums));
        return 0;
}

int
sum(int *nums)
{
        int s = 0;
        for (int i = 0; nums[i] >= 0; i++) {
                s += nums[i]; }
        return s;
}

Rubyだと、つぎのようになるだろう。

sum.rb
def sum(nums)
        s = 0
        for n in nums do
                s += n
        end
        s
end

puts sum([123, 456, 789])

これを「おなじように考えて」Haskellで実装すると、こうなる。

sum_ioref.hs
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

import Data.IORef

main :: IO()
main = print =<< mySum [123, 456, 789]

mySum :: [Int] -> IO Int
mySum nums = do
        s <- newIORef 0
        (modifyIORef s . (+)) `mapM_` nums
        readIORef s

関数newIORefで初期値0の「可変変数」をつくる。関数modifyIORefで第2引数の関数で「可変変数」のなかみを変更する。それをリストの要素すべてに対してくりかえすので、関数mapM_を使っている。最後に関数readIORefで「可変変数」のなかみを取り出している。もちろん、こんなコードを書くのは「どうかと思う」わけだ。そもそもPrelude.sumを使えばいいとか、Data.List.foldl'あたりを使うという話もあるわけだけど、そうでなくても、つぎのように書くべきだろう。

sum_sane.hs
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

main :: IO ()
main = print $ mySum [123, 456, 789]

mySum :: [Int] -> Int
mySum = sm 0
        where
        sm s [] = s
        sm s (n : ns) = (sm $! s + n) ns

本質的にはつぎのようなロジックだ。

sm s [] = s
sm s (n : ns) = sm (s + n) ns

「変数の値を変化させる」必要はない。多くの場合に「変更可能な変数」を使う必要はないが、もしかするとIORefを使うことで、コードがすっきりすることもあるかもしれない。対話環境でいろいろ試してみよう。

% stack ghci
Prelude> :module Data.IORef
Prelude Data.IORef> num <- newIORef 123
Prelude Data.IORef> readIORef num
123
Prelude Data.IORef> writeIORef num 456
Prelude Data.IORef> readIORef num
456
Prelude Data.IORef> modifyIORef num (* 2)
Prelude Data.IORef> readIORef num
912

Haskellにおける並行処理(スレッド)

GHCは「軽量スレッドによる並行処理」をサポートしている。マルチスレッドプログラミングは、かんたんに実装できる。

threads.hs
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

import Control.Monad
import Control.Concurrent

main :: IO ()
main = do
        void . forkIO . forever $ threadDelay 200000 >> putStrLn "foo"
        void . forkIO . forever $ threadDelay 300000 >> putStrLn "bar"
        threadDelay 5000000

これを実行すると、つぎのように表示される。

foo
bar
foo
foo
bar
foo
bar
foo
bafro
o
foo
bar
.
.
.

0.2秒ごとにfooを出力するスレッドと0.3秒ごとにbarを出力するスレッドとが並行して実行されている。5秒後にはメインのスレッドが終了することで全体が終了する。

共有メモリと競合

「変更可能な変数」と「並行実行」というのは「サンポール」と「ハイター」のようなものだ。混ぜてみよう(なお、この実験は十分に安全を確保したうえで行っています)。

danger.hs
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

import Control.Monad
import Control.Concurrent
import Data.IORef

type Account = IORef Int

newAccount :: Int -> IO Account
newAccount = newIORef

withdraw :: Account -> Int -> IO ()
withdraw acc amnt = do
        bl <- readIORef acc
        threadDelay 100000
        writeIORef acc $ bl - amnt

deposit :: Account -> Int -> IO ()
deposit acc amnt = do
        threadDelay 50000
        bl <- readIORef acc
        threadDelay 100000
        writeIORef acc $ bl + amnt

inquiry :: Account -> IO Int
inquiry = readIORef

main :: IO ()
main = do
        acc <- newAccount 1000
        void . forkIO $ withdraw acc 500
        void . forkIO $ deposit acc 500
        threadDelay 1000000
        print =<< inquiry acc 

関数threadDelayを使って「問題が生じやすい」ようにしてある。現実的なシチュエーションとしては「時間のかかる計算」をしていることを想像してほしい。実行してみる。

% stack ghc -- -e main danger.hs
1500

たとえば、こういうことだ。

  • はじめの口座の残高は1000円
  • 商品の代金として500円引き落とされた
  • 同時にATMで500円入金した
  • 結果として口座の残高は1500円になった
  • ^_^

「うれしい」けど。なにが起きているのだろうか。

  • 出金スレッドが残高をチェックする(1000円)
  • 入金スレッドが残高をチェックする(1000円)
  • 出金スレッドが500円マイナスした残高を書き込む(500円)
  • 入金スレッドが500円プラスした残高を書き込む(1500円)

こういうことだ。口座の残高を調べる操作と、変更後の残高を書き込む操作とが、まぜこぜになってしまっている。

ロックという解

口座の残高を正しく更新するためには、「現在の残高の参照」と「新しい残高の設定」とをセットにして、ほかの操作が入りこまないようにすればいい。たとえば、それぞれの口座ごとに「旗」を用意しておき、口座の残高を変更する操作を行うときには、その「旗」を手もとに置くようにすれば、それぞれの口座について、ふたつ以上のスレッドが同時に変更するということはなくなる。つまり、つぎのようになる。

  • 出金スレッドが、その口座の「旗」を手もとに置く
  • 出金スレッドが残高をチェックする(1000円)
  • 入金スレッドが、その口座の「旗」を手もとに置こうとするが、ない
  • 出金スレッドが500円マイナスした残高を書き込む(500円)
  • 出金スレッドが「旗」をかえす
  • 入金スレッドが「旗」を手もとに置く
  • 入金スレッドが残高をチェックする(500円)
  • 入金スレッドが500円プラスした残高を書き込む(1000円)
  • 入金スレッドが「旗」をかえす

ロックのための「旗を置く場所」として、HaskellではMVarが使える。MVarには値を置いたり、値を取りだしたりすることができる。値を置くとMVarは「値を保持している状態」になり、値を取りだすとMVarは「空の状態」になる。

Hackage:Control.Concurrent.MVar

値を含むMVarの作成、値の取りだし、値の設定には、それぞれつぎのような関数が用意されている。

newMVar :: a -> IO (MVar a)
takeMVar :: MVar a -> IO a
putMVar :: MVar a -> a -> IO ()

これを使って銀行口座の例を書き直すと、つぎのようになる。

mutex.hs
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

import Control.Monad
import Control.Concurrent
import Data.IORef

data Account = Account (MVar ()) (IORef Int)

newAccount :: Int -> IO Account
newAccount bl = Account <$> newMVar () <*> newIORef bl

withdraw :: Account -> Int -> IO ()
withdraw (Account m acc) amnt = do
        takeMVar m
        bl <- readIORef acc
        threadDelay 100000
        writeIORef acc $ bl - amnt
        putMVar m ()

deposit :: Account -> Int -> IO ()
deposit (Account m acc) amnt = do
        threadDelay 50000
        takeMVar m
        bl <- readIORef acc
        threadDelay 100000
        writeIORef acc $ bl + amnt
        putMVar m ()

inquiry :: Account -> IO Int
inquiry (Account _ r) = readIORef r

main :: IO ()
main = do
        acc <- newAccount 1000
        void . forkIO $ withdraw acc 500
        void . forkIO $ deposit acc 500
        threadDelay 1000000
        print =<< inquiry acc

関数withdrawとdepositで処理のはじめにtakeMVar mで旗を手もとに置き、おわりにはputMVar m ()で旗をもどしている。これで、それぞれの処理の途中で、たがいに相手の処理にわりこむことはない。実行してみる。

% stack ghc -- -e main mutex.hs
1000

残高が1000円だった口座から500円引き落として、おなじその口座に500円入金した結果として、その口座の残高は1000円になった。ちなみに、ここでは説明の都合上、MVarをロックとして使ったが、この場合にはMVarそのものに口座残高を記録したほうが、かんたんだ。

ロックの問題

下記の論文を参考にした。

Simon Peyton Jones (2007) "Beautiful concurrency"

複数の口座間で預金を移動させることを考える。話の都合でロックとアンロックを関数withdrawやdepositとは独立した関数にする。

transfer_lock.hs
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

import Control.Monad
import Control.Concurrent
import Data.IORef

data Account = Account (MVar ()) (IORef Int)

newAccount :: Int -> IO Account
newAccount amnt = Account <$> newMVar () <*> newIORef amnt

lock :: Account -> IO ()
lock (Account m _) = takeMVar m

unlock :: Account -> IO ()
unlock (Account m _) = putMVar m ()

withdraw :: Account -> Int -> IO ()
withdraw (Account _ a) amnt = do
        bl <- readIORef a
        writeIORef a $ bl - amnt

deposit :: Account -> Int -> IO ()
deposit (Account _ a) amnt = do
        bl <- readIORef a
        writeIORef a $ bl + amnt

inquiry :: Account -> IO Int
inquiry (Account _ a) = readIORef a

それぞれ、新しい口座の作成、口座のロック、アンロック、引きだし、預けいれ、残高照会のための関数だ。複数の口座の残高の合計を照会する関数を追加する。

transfer_lock.hs
inquiryAll :: [Account] -> IO Int
inquiryAll accs = do
        lock `mapM_` accs
        sum <$> (inquiry `mapM` accs) <* unlock `mapM_` accs

預金が減る

口座間で預金を移動させる関数を書く。

transfer_lock.hs
transfer :: Account -> Account -> Int -> IO ()
transfer a b amnt = do
        lock a
        withdraw a amnt
        unlock a
        threadDelay 500000
        lock b
        deposit b amnt
        unlock b

try :: IO ()
try = do
        a <- newAccount 800
        b <- newAccount 300
        void . forkIO $ transfer a b 500
        threadDelay 250000
        print =<< inquiryAll [a, b]

tryは800円を入金した口座と300円を入金した口座を作成し、前者から後者に500円移動させつつ、両者の口座の残高を表示する動作だ。試してみよう。

% stack ghci transfer_lock.hs
> try
600

本当は1100円あるはずなのに、600円しかないことになっている。

預金は減らない

預金が減ってしまったのは、預金を移動の途中の状態を取得してしまったためだ。引きだしもとの口座と、預けいれさきの口座とを「預金の移動」の最初から最後までロックしなくてはならない。

transfer_lock.hs
transfer' :: Account -> Account -> Int -> IO ()
transfer' a b amnt = do
        lock a
        threadDelay 500000
        lock b
        withdraw a amnt
        threadDelay 500000
        deposit b amnt
        unlock a
        unlock b

try' :: IO ()
try' = do
        a <- newAccount 800
        b <- newAccount 300
        void . forkIO $ transfer' a b 500
        threadDelay 250000
        print =<< inquiryAll [a, b]

試してみよう。

> :reload
> try'
1100

預金は減らなくなった。

動かなくなる

さて、こんどは預金の移動を複数並行して行うことを考える。

transfer_lock.hs
deadlock :: IO ()
deadlock = do
        a <- newAccount 800
        b <- newAccount 300
        void . forkIO $ transfer' a b 500
        transfer' b a 200

ひとつめの口座からふたつめの口座に500円移動させるのと同時に、ふたつめの口座からひとつめの口座に200円移動させようとしている。試してみよう。

> :reload
> deadlock

動かなくなる。Ctrl-C(Linuxの場合)などで強制終了しよう。なにが起きているのだろうか。

  • スレッド1が口座Aをロック
  • スレッド2が口座Bをロック
  • スレッド1が口座Bがアンロックされるのを待つ
  • スレッド2が口座Aがアンロックされるのを待つ
  • 待つ
  • 待つ
  • ...
  • 待ち続ける

これがデッドロックだ。ロックを使って状態を管理するとき、このような問題に注意する必要がある。この問題があつかいにくいのは、せっかく関数で処理を抽象化しているのに、関数の実装の詳細についての知識が問題解決に必要になることだ。たとえば、関数transfer'が口座のロックを「どのような順番でしているのか」などの情報が必要になる。

ソフトウェアトランザクショナルメモリ(STM)という解

「ロック」を生のままで使うのは、「メモリ」を生のままで使うようなものだ。人間にとって「確保したすべてのメモリを解放」することを徹底することはむずかしい。すべての「ロック」に対して「アンロック」をするのはむずかしい。さらに悪いことに、ロックの場合、それぞれの資源ごとに「ロックする順番」を守らないと、うえでみたような、デッドロックを引き起こすことになる。

STMを使えば、「あちこちの資源を順番を守りながらロックし、使い終わったらロックを解除して」という操作をするかわりに、「ここからここまでの作業はひとまとまりだよ」と、印をつけてやるだけでいい。デッドロックの心配もない。

預金を移動する例

STMを使えばtransferは、つぎのように書ける。

transfer a b amnt = atomically $ do
        withdraw a amnt
        deposit b amnt

ひとつめの口座からの引きだしと、ふたつめの口座からの預けいれを「ひとまとまりの作業」として行うように、atomicallyという印をつけている。実際のコードをみてみよう。

transfer_stm.hs
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM

type Account = TVar Int

newAccount :: Int -> STM Account
newAccount = newTVar

withdraw :: Account -> Int -> STM ()
withdraw acc amnt = do
        bl <- readTVar acc
        writeTVar acc $ bl - amnt

deposit :: Account -> Int -> STM ()
deposit acc amnt = do
        bl <- readTVar acc
        writeTVar acc $ bl + amnt

inquiry :: Account -> STM Int
inquiry = readTVar

inquiryAll :: [Account] -> IO Int
inquiryAll accs = atomically $ sum <$> (inqury `mapM` accs)

transfer :: Account -> Account -> Int -> IO ()
transfer a b amnt = atomically $ withdraw a amnt >> deposit b amnt

try :: IO ()
try = do
        a <- atomically $ newAccount 800
        b <- atomically $ newAccount 300
        void . forkIO $ transfer a b 500
        inquiryAll [a, b] >>= print

noDeadlock :: IO ()
noDeadlock = do
        a <- atomically $ newAccount 800
        b <- atomically $ newAccount 300
        void . forkIO $ transfer a b 500
        transfer b a 200
        print =<< atomically ((,) <$> inquiry a <*> inquiry b)
        threadDelay 500000
        print =<< atomically ((,) <$> inquiry a <*> inquiry b)

試してみる。

% stack ghci transfer_stm.hs
> try
1100
> noDeadlock
(1000,100)
(500,600)

STMでは「変化する値」を表現するのにTVar a型の値を利用する。また、操作はSTMモナド内に書かれる。上記のコードでは、つぎの関数が使われている。

atomically :: STM a -> IO a

newTVar :: a -> STM (TVar a)
readTVar :: TVar a -> STM a
writeTVAr :: TVar a -> a -> STM ()

STMでは操作はSTMモナド内で組み立てられる。組み立てた操作を「ひとまとまりの操作」として、IOモナド内に置かれる動作に変換するのが関数atomicallyだ。残りのみっつは、ほかの「可変な変数」とおなじように、それぞれ「作成」「読みだし」「書きこみ」となっている。

retry

複数のプロセスで状態を共有しているときに、条件が満たされるまで待ちたいということは、よくある。たとえば、「リストが空でなくなるまで待って、リストの先頭の要素を表示する」などだ。このときに、つぎのような操作retryが使える。

retry :: STM a

リストが空でなくなるのを待って、その先頭を表示する例は、つぎのようになる。

wait_head.hs
{-# LANGUAGE LambdaCase #-}
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM

main :: IO ()
main = do
        lst <- atomically $ newTVar []
        void . forkIO $ threadDelay 3000000 >>
                atomically (writeTVar lst "Hello, world!")
        chr <- atomically $ readTVar lst >>= \case
                [] -> retry
                c : _ -> pure c
        print chr

制御が操作retryに到達すると、関数atomicallyの引数である操作全体がやりなおされる。ここでは、リストが空でなくなるまで、プロセスが停止することになる。

意味論について

atomicallyの引数となった操作には、つぎの性質がある。

  • 不可分である
  • 隔離されている

不可分である

ほかのプロセスからは、操作の途中の状態はみえない。つまり、ほかのプロセスにみえるのは「操作のまえ」の状態、または「操作のあと」の状態のみだ。

隔離されている

操作の途中で、ほかのプロセスによる変更の影響をうけない。

実装のしかたについて

現在のバージョンでどのように実装されているかは知らないが、実装のしかたのひとつとして「楽観的実行」というやりかたがある。

楽観的実行

「変更可能な変数」(以下「変数」とする)について、とくにロックなどを考えずに読みこみを行う。「ひとまとまりの操作」を実行していくが、変数への書きこみは「おおもとの変数に」ではなく「一時的な保存場所に」おこなう。「ひとまとまりの操作」が終了したところで、「読みこんだ変数」についてチェックして、ほかのプロセスによる変更がなければ「おおもとの変数」を「一時的な保存場所」に記録しておいた値で置き換える。「ひとまとまりの操作」が終了したところで、「読みこんだ変数」に変更があった場合、「一時的な保存場所」の記録は破棄し、はじめからやりなおす。

IOモナドを直接、使うのではなく、STMモナドが用意されている理由として、「操作は破棄できなくてはならない」ということがある。「変数への書きこみ」は破棄することが可能だけど、たとえば端末に文字を表示する動作は破棄することはできないため、「やりなおし」のときに表示が重複してしまう。

STMのまとめ

生のメモリ管理に対してGCがあるように、生のロックの管理に対してSTMがある。ひとつひとつの「変更可能な変数」に対して「ロック」「ロックの解除」を手作業でやるかわりに、「この操作はひとまとまり」という印をつけておけば、デッドロックなどの問題に悩まされることなく、共有メモリにつきものの競合を避けることができる。STMを使用する側としては、「不可分である」ということと「隔離されている」という性質があることだけを意識しておけばいい。実装方法のひとつとして「楽観的実行」がある。これは、競合を気にせずに変数を読みこんで操作を実行するが、書きこみのときに「読みこんだ変数」が変更されていないことを確認し、変更があれば操作を破棄し、もう一度やりなおすというやりかただ。

Hackage Control.Concurrent.STM

キュー

下記の書籍を参考にした。

Purely Functional Data Structures - Chris Okasaki

キューとは

「先に入れたものが、先に出てくる」データ構造。筒のなかにビー玉を押しこむイメージ。

「短命なデータ構造」と「永続データ構造」について

短命なデータ構造

短命なデータ構造ではデータ構造を更新すると、もともとのデータ構造そのものが変化する。データ構造の更新のあとでは、値のまえの版は存在しない。

永続データ構造

永続データ構造では、データ構造の更新は「新しい値」を作成することだ。意味論的には「すべての値をコピーしたうえで、新しくコピーされた値を変更する」のとおなじことだ。実際には、「変更部分」以外の多くを共有することで、時間や空間的な面で効率的な実装になっていることが多い。

短命な連結リスト

つぎのようなファイルephemeral_list.hsを用意する。

ephemeral_list.hs
{-# LANGUAGE LambdaCase #-}
{-# OPTIONAL_GHC -Wall -fno-warn-tabs #-}

import Prelude hiding (head)

import Control.Concurrent.STM

短命な連結リストは、つぎのように定義できる。

ephemeral_list.hs
type List a = TVar (Cons a)
data Cons a = Nil | Cons a (List a)

TVar aは(実装は異なるかもしれないが)、a型の値が格納されているメモリ領域へのポインタと考えることができる。Cons a型の値を、ここではコンスセルと呼ぶことにすると、List aはコンスセルへのポインタであり、コンスセルは空(Nil)であるか、またはa型の値と「コンスセルへのポインタ」のペアになっている。

空のリストを用意する機能を定義する。

ephemeral_list.hs
newList :: STM (List a)
newList = newTVar Nil

consとhead

先頭への要素の追加と、先頭からの要素の取りだしの機能を定義する。

ephemeral_list.hs
cons :: a -> List a -> STM ()
cons x l = writeTVar l . Cons x =<< newTVar =<< readTVar l

head :: List a -> STM a
head l = readTVar l >>= \case
        Nil -> retry
        Cons x t -> x <$ (writeTVar l =<< readTVar t)

consは、readTVar lでCons a型の値を取りだして、newTVarでそれへのポインタpを作成し、それをCons x pのようなかたちにしてから、writeTVarでlがそれを指すようにしている。

headはreadTVar lで取りだした値がNilならretryとし、Cons x tならばxをかえしつつ、tの指すCons a型の値をlが指すようにする。試してみよう。

% stack ghci ephemeral_list.hs
> lst <- atomically newList :: IO (List Int)
> atomically $ cons 123 lst
> atomically $ cons 456 lst
> atomically $ cons 789 lst
> atomically $ head lst
789
> atomically $ head lst
456
> atomically $ head lst
123

リストの先頭に順に値を追加していき、つぎに、できたリストについて先頭から値を順に取りだしている。空リストから値を取りだそうとするとどうなるだろうか。

> :module + Control.Concurrent
> forkIO $ print =<< atomically (head lst)
ThreadId 418
> atomically $ cons 321 lst
321

新しいスレッドをつくり、そのなかでリストの先頭を表示させようとしている。head lstは空リストではretryとなるので、リストに値が追加されるまでは、スレッドは停止する。そのあとcons 321 lstでリストに値を追加すると、その段階で先頭の値が表示される。

snoc

要素を末尾に追加する機能を定義する。

ephemeral_list.hs
snoc :: List a -> a -> STM ()
snoc l x = readTVar l >>= \case
        Nil -> writeTVar l . Cons x =<< newTVar Nil
        Cons _ t -> snoc t x

lがNil(空のコンスセル)を指している場合には、追加する値と「空のコンスセルを指すポインタ」をペアにしたコンスセルを作成して、lがそれを指すようにする。そうでなければ、コンスセルのポインタ部分に対して、snoc自体を実行する。試してみよう。

> :reload
> lst <- atomically newList :: IO (List Int)
> atomically $ snoc lst 123
> atomically $ snoc lst 456
> atomically $ snoc lst 789
> atomically $ head lst
123
> atomically $ head lst
456
> atomically $ head lst
789

この機能の実行には、追加のために要素すべてをスキャンする必要があるので要素数nに対してO(n)時間かかる。

短命なキュー

連結リストの末尾への追加を実行するのにかかる時間がO(n)になってしまうのは、「追加するべき末尾」をみつけるのに、要素を順にたどっていく必要があるからだ。末尾のポインタへのポインタを保持しておくことで、末尾への追加をO(1)時間で実行できるようにすることができる。そのような構造を「短命なキュー」として使うことができる。まずは、ファイルephemeral_queue.hsを用意する。

ephemeral_queue.hs
{-# LANGUAGE LambdaCase #-}
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

import Prelude hiding (head)

import Control.Concurrent.STM

短命なキューは、つぎのように定義できる。

ephemeral_queue.hs
data Queue a = Queue (List a) (TVar (List a))

type List a = TVar (Cons a)
data Cons a = Nil | Cons a (List a)

新しいキューを作成する機能を定義する。

ephemeral_queue.hs
newQueue :: STM (Queue a)
newQueue = newTVar Nil >>= \l -> Queue l <$> newTVar l

空のコンスセルへのポインタlを作成し、Queueの第1引数にはポインタlを、第2引数にはポインタlへのポインタを、それぞれ指定する。

snocとhead

末尾への追加と先頭からの取りだしの機能を定義する。

ephemeral_queue.hs
snoc :: Queue a -> a -> STM ()
snoc (Queue _ end) x = newTVar Nil >>= \new -> do
        (`writeTVar` Cons x new) =<< readTVar end
        writeTVar end new

head :: Queue a -> STM a
head (Queue lst _) = readTVar lst >>= \case
        Nil -> retry
        Cons x t -> x <$ (writeTVar lst =<< readTVar t)

関数headは連結リストでの定義と、だいたいおなじだ。関数snocでは、まずはnewTVar Nilで新しい末尾を生成し、それを追加する値xとペアにしてコンスセルを作る。そのコンスセルをreadTVar endで取りだした「古い末尾のポインタ」が指すようにする(writeTVar)。最後に「末尾のポインタを指すポインタ」が「新しい末尾のポインタ」を指すようにする(writeTVar end new)。試してみよう。

% stack ghci ephemeral_queue.hs
> :module + Control.Monad
> q <- atomically newQueue :: IO (Queue Int)
> atomically $ (q `snoc`) `mapM_` [1 .. 15]
> atomically $ head q
1
> atomically $ head q
2
> atomically $ head q
3
> atomically . replicateM 10 $ head q
[4,5,6,7,8,9,10,11,12,13]

うまく定義できているようだ。stmパッケージに含まれるSTM用のチャネルであるTChanは、だいたい上記のような作りになっている。

Hackage Control.Concurrent.STM.TChan

永続キュー

ズンドコキヨシを実装するのに、上記の短命なキュー(あるいは、本質的におなじものであるTChan)を使ってもいい。しかし、ここでは、純粋な関数で表現できる範囲を広げるために永続キューを使うことにする。たとえば、スタックであればリストを使うことで純粋な関数であつかうことができる。おなじようなデータ構造をキューにも用意したい。

永続キューのAPI

いくつかのモジュールを作成するので、Stackのプロジェクトを作成する。

% stack new try-persistent-queue
% cd try-persistent-queue

永続キューのAPIを定義する。

src/Queue.hs
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module Queue (Queue(..), ConsQueue(..), head, tail, snocAll, consAll) where

import Prelude hiding (head, tail)

import Data.List (foldl')

class Queue q where
        empty :: q a
        snoc :: q a -> a -> q a
        uncons :: q a -> Maybe (a, q a)

class Queue q => ConsQueue q where
        cons :: a -> q a -> q a

head :: Queue q => q a -> Maybe a
head = (fst <$>) . uncons

tail :: Queue q => q a -> Maybe (q a)
tail = (snd <$>) . uncons

snocAll :: Queue q => q a -> [a] -> q a
snocAll = foldl' snoc

consAll :: ConsQueue q => [a] -> q a -> q a
consAll = flip $ foldr cons

型クラスQueueには、キューとして必要なAPIが定義されている。基本的な使いかたとしては、空のキュー(empty)を用意して、関数snocで要素を末尾から追加していく。関数unconsで先頭と残りとにわける。キューの多くは、かんたんに効率的な「先頭への追加」を実装できるので、クラスConsQueueに関数consを用意した。

関数headとtailは関数unconsの返り値のかたほうを捨てたもの。また、便利なのでリストの全要素を追加する関数snocAllとconsAllも定義した。

リストをキューとして使う

要素の追加を末尾に対しておこない、取りだしを先頭からおこなえば、リストはキューとして使える\(^^)/<ワーイ。

src/ListQueue.hs
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module ListQueue (
        ListQueue, empty, snoc, uncons, cons, head, tail, snocAll, consAll
        ) where

import Prelude hiding (head, tail)

import Queue (Queue(..), ConsQueue(..), head, tail, snocAll, consAll)

newtype ListQueue a = ListQueue [a] deriving Show

snocList :: [a] -> a -> [a]
snocList [] y = [y]
snocList (x : xs) y = x : snocList xs y

instance Queue ListQueue where
        empty = ListQueue []
        snoc (ListQueue xs) x = ListQueue $ snocList xs x
        uncons (ListQueue []) = Nothing
        uncons (ListQueue (x : xs)) = Just (x, ListQueue xs)

instance ConsQueue ListQueue where
        cons x (ListQueue xs) = ListQueue $ x : xs

試してみよう。

% stack ghci
> snocAll empty [1 .. 15] :: ListQueue Int
ListQueue [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]
> uncons it
Just (1,ListQueue [2,3,4,5,6,7,8,9,10,11,12,13,14,15])
> uncons . snd =<< it
Just (2,ListQueue [3,4,5,6,7,8,9,10,11,12,13,14,15])
> :module + Data.List
> unfoldr ListQueue.uncons . snd <$> it
Just [3,4,5,6,7,8,9,10,11,12,13,14,15]

末尾への追加と先頭からの取りだしが、うまく定義できているようだ。

性能解析

さて、このキューは実用的だろうか。つまり、それなりの数の要素について、ちゃんと使えるだろうか。性能解析(プロファイリング)をしてみよう。まずは、性能解析用のモジュールを作成する。ここで定義する関数は、あるていど使い回しができる。

src/ProfileQueue.hs
{-# LANGUAGE ScopedTypeVariables, KindSignatures #-}
{-# LANGUAGE TypeApplications, AllowAmbiguousTypes #-}
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module ProfileQueue (profileWhole, queueN) where

import Data.List (unfoldr)

import Queue

profileWhole :: forall q . Queue q => Int -> IO ()
profileWhole n = do
        putStrLn $ show n ++ " and " ++ show (n * 10) ++ " elem queues"
        print . sum $ {-# SCC "BUILD_AND_BREAK_N_ELEM_QUEUE" #-}
                unfoldr uncons (queueN @q n)
        print . sum $ {-# SCC "BUILD_AND_BREAK_N*10_ELEM_QUEUE" #-}
                unfoldr uncons (queueN @q (n * 10))

queueN :: Queue q => Int -> q Int
queueN n = snocAll empty [1 .. n]

SCCプラグマによって、性能解析のときのコストセンターを指定することができる。関数profileWholeはTypeApplications拡張をつかってprofileWhole @ListQueueのようにすることで、使うキューのタイプを変えることができる。このモジュールを使って、ListQueueの性能解析をしてみる。まずは、実行可能ファイルを作成するためにpackage.yamlを編集する。executable:に、つぎのように追加する。

package.yaml
  profileListQueue:
    main: profileListQueue.hs
    source-dirs: app
    ghc-options:
    - -threaded
    - -rtsopts
    - -with-rtsopts=-N
    dependencies:
    - try-persistent-queue

実行可能ファイルのソースコードを追加する。

app/profileListQueue.hs
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module Main where

import ProfileQueue
import ListQueue

main :: IO ()
main = profileWhole @ListQueue 1000

試してみる。

% stack build --profile
% stack exec --profile profileListQueue -- +RTS -p
1000 and 1000 elem queues
500500
50005000

結果はprofileListQueue.profに書きだされる。結果をみると、つぎのようになっている。

  • total time = 1.63 secs (1629 ticks @ 1000 us, 1 processor)
  • BUILD_AND_BREAK_N*10_ELEM_QUEUE 98.6% time
  • BUILD_AND_BREAK_N_ELEM_QUEUE 1.1% time

また、snocListの呼びだし回数は、つぎのようになる。

  • BUILD_AND_BREAK_N*10_ELEM_QUEUE 50005000
  • BUILD_AND_BREAK_N_ELEM_QUEUE 500500

1000要素のキューをつくって、すべての要素を取りだすのと、10000要素のキューをつくって、すべての要素を取りだすのとでは、後者のほうに100倍時間がかかる。つまりO(n^2)時間かかっていることになる。snocListの実行には、リストの末尾までの走査が必要でO(n)時間かかるためだ。

Batched Queue

キューに要素を追加するのに、概存の要素数nに対してO(n)時間かかるのでは、「うーん、ちょっと使えないかも」という感じがある。ひとつ要素を追加するくらいのことはO(1)時間でやりたい。永続データ構造であるためには、データ構造そのものを変化させるわけにはいかない。この制約はプログラミングの複雑さの制御のためには、とてもいい制約であるけれど、この制約を満足させつつO(1)のキューを作るのはむずかしい。

そこで考えた。「n回の処理のうち一回だけ時間nかかって、あとは時間1で処理できるようなアルゴリズム」ならば、「平均すれば時間2で処理できる」ことになる。全体として考えれば、ひとつひとつの処理はO(1)で終わってることになるよね、と。このように「処理の回数と全体の時間とからもとめた平均で考えたとき」のO(x)を「償却...時間」と呼ぶ。今回はO(1)なので償却定数時間と呼ぶ。

償却定数時間に対して「償却しないでも定数時間」であるような処理にかかる時間を最悪定数時間と(ここでは)呼ぶ。worst case constant timeの訳だ。「最悪定数時間ではなく償却定数時間でいい」とすると、話はすこしかんたんになる。リストをふたつ使えばいい。

src/BatchedQueue.hs
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module BatchedQueue (
        BatchedQueue, empty, snoc, uncons, head, tail, snocAll, consAll ) where

import Prelude hiding (head, tail)

import Queue

data BatchedQueue a = BatchedQueue [a] [a] deriving Show

instance Queue BatchedQueue where
        empty = BatchedQueue [] []
        snoc (BatchedQueue f r) x = BatchedQueue f (x : r)
        uncons (BatchedQueue [] []) = Nothing
        uncons (BatchedQueue (x : f) r) = Just (x, BatchedQueue f r)
        uncons (BatchedQueue [] r) = uncons $ BatchedQueue (rev r) []

instance ConsQueue BatchedQueue where
        cons x (BatchedQueue f r) = BatchedQueue (x : f) r

rev :: [a] -> [a]
rev = rv [] where rv s [] = s; rv s (x : xs) = rv (x : s) xs

押し込み(snoc)は単に後ろがわのリストの先頭に要素を追加するだけだ。連続して押し込みをすると、要素が逆順に格納されることに注意する。取り出し(uncons)のほうに工夫がある。前がわのリストが空でなければ、そこから先頭を取り出す。もしも、それが空リストであれば、後ろがわのリストを逆順にして、前がわにもってくる。標準的な関数reverseを使わずに、独自に定義した関数revを使っているのは、性能解析の結果を読みやすくするためだ。このように独自に定義しておけばローカルな関数rvが何回呼ばれているのかを調べることができる。試してみる。

% stack ghci
> q = snocAll empty [1 .. 15] :: BatchedQueue Int
> q
BatchedQueue [] [15,14,13,12,11,10,9,8,7,6,5,4,3,2,1]
> uncons q
Just (1,BatchedQueue [2,3,4,5,6,7,8,9,10,11,12,13,14,15] [])
> uncons . snd =<< it
Just (2,BatchedQueue [3,4,5,6,7,8,9,10,11,12,13,14,15] [])
> :module + Data.List
> unfoldr BatchedQueue.uncons . snd <$> it
Just [3,4,5,6,7,8,9,10,11,12,13,14,15]

キューとして機能しているようだ。

性能解析

性能解析をしてみる。まずはpackage.yamlに追記する。

package.yaml
  profileBatchedQueue:
    main: profileBatchedQueue.hs
    source-dirs: app
    ghc-options:
    - -threaded
    - -rtsopts
    - -with-rtsopts=-N
    dependencies:
    - try-persistent-queue

モジュールProfileQueueを利用すれば、性能解析用の実行可能ファイルは、かんたんに作ることができる。

app/profileBatchedQueue.hs
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module Main where

import ProfileQueue
import BatchedQueue

main :: IO ()
main = profileWhole @BatchedQueue 100000

コンパイルして実行してみよう。

% stack build --profile
% stack exec --profile profileBatchedQueue -- +RTS -p
100000 and 1000000 elem queues
5000050000
500000500000

結果をみてみよう。ファイルprofileBatchedQueue.profを参照する。

  • total time = 0.10 secs (97 ticks @ 1000 us, 1 processor)
  • BUILD_AND_BREAK_N*10_ELEM_QUEU 77.3% time
  • BUILD_AND_BREAK_N_ELEM_QUEU 8.2% time

1000000(100万)要素のキューで100000(10万)要素のキューの10倍時間がかかっている。なので、ひとつひとつの操作(追加と取りだし)を考えると平均して定数時間(償却定数時間)になっている。厳密な話ではないけれど、n個の要素を追加してn個の要素を取りだすことを考えると、O(1)の処理が2n個あることになる。そのあいだに、O(n)時間かかる関数reverseを1回使っている。2n回の処理のなかで1回だけO(n)時間の処理があるので、償却定数時間になる。

実は、この実装では償却定数時間にはならない

この実装は実のところ、永続データ構造としては償却定数時間で処理ができるとは言えない。O(n)の処理だけをくりかえし実行することができるからだ。そうすると償却してもO(n)時間になる。

この実装を「短命なキュー」として使えば償却定数時間になる。「短命なキュー」であれば、一度処理すれば「それ自体が変形してしまう」ので、おなじキューに対する処理はありえないからだ。「短命なキュー」としてSTMで使う例をみてみよう。ファイルpackage.yamlのdependenciesにstmを追加する。

package.yaml
dependencies:
- base >= 4.7 &&& < 5
- stm

STMモナドで使える関数を定義する。

src/EphemeralBatchedQueue.hs
{-# LANGUAGE LambdaCase #-}
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module EphemeralBatchedQueue (Q.BatchedQueue, newQueue, snoc, head) where

import Prelude hiding (head)

import Control.Concurrent.STM

import qualified BatchedQueue as Q

newQueue :: STM (TVar (Q.BatchedQueue a))
newQueue = newTVar Q.empty

snoc :: TVar (Q.BatchedQueue a) -> a -> STM ()
snoc q x = modifyTVar q (`Q.snoc` x)

head :: TVar (Q.BatchedQueue a) -> STM a
head q = Q.uncons <$> readTVar q >>= \case
        Nothing -> retry
        Just (x, t) -> x <$ writeTVar q t

このように定義してモジュールEphemeralBatchedQueueから公開される関数newQueue, snoc, headだけしか使わなければ、このキューに対する処理は償却定数時間であるといえる。試してみよう。

% stack ghci
> :module EphemeralBatchedQueue Control.Concurrent.STM
> q <- atomically newQueue :: IO (TVar (BatchedQueue Int))
> atomically $ snoc q 123
> atomically $ snoc q 456
> atomically $ snoc q 789
> atomically $ EphemeralBatchedQueue.head q
123
> atomically $ EphemeralBatchedQueue.head q
456
> atomically $ EphemeralBatchedQueue.head q
789

この構造は本質的にはモジュールControl.Concurrent.STM.TQueueのデータ構造TQueueとおなじだ。

hackage: Control.Concurrent.STM.TQueue

データ構造ListQueueを永続データ構造として使うなら、このキューに対する処理は償却定数時間ではない。コードを書いて、性能解析をしてみよう。

package.yamlのexecutablesに追加する。

package.yaml
  profileNotAmortizedConstant:
    main: profileNotAmortizedConstant.hs
    source-dirs: app
    ghc-options:
    - -threaded
    - -rtsopts
    - -with-rtsopts=-N
    dependencies:
    - try-persistent-queue

つぎのようなモジュールMainを定義する。

app/profileNotAmortizedConstant.hs
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module Main where

import Prelude hiding (head)

import ProfileQueue
import BatchedQueue

main :: IO ()
main = do
        {-# SCC "2000_ELEM_QUEUE" #-} trial 2000
        {-# SCC "4000_ELEM_QUEUE" #-} trial 4000

trial :: Int -> IO ()
trial n = print
        $ sum <$> sequence (timesEvaluate n head $ queueN @BatchedQueue n)

timesEvaluate :: Int -> (a -> b) -> a -> [b]
timesEvaluate n _ _ | n < 1 = []
timesEvaluate n f x = f x : timesEvaluate (n - 1) f x

timesEvaluateはメモ化されないようにしながら、おなじ関数適用をくりかえす関数。BatchedQueueでは「空のキューに要素をたくさん追加したうえで、ひとつめの要素を取りだす」ときにO(n)の関数reverseの評価が起こる。要素数nのキューで、この処理をn回行うとO(n^2)時間かかる。これは償却してもO(n)時間になる。試してみよう。

% stack build --profile
% stack exec --profile profileNotAmortizedConstant -- +RTS -p

性能解析の結果はファイルprofileNotAmortizedConstant.profに書きこまれる。

  • total time = 0.25 secs (247 ticks @ 1000 us, 1 processor)
  • 2000_ELEM_QUEUE 20.6% time
    • 関数rvの呼び出し回数は4002000回
  • 4000_ELEM_QUEUE 78.9% time
    • 関数rvの呼び出し回数は16004000回

要素数を2倍にしたところ、所要時間は4倍になっている。キューの先頭を取りだす処理のうち、一番時間のかかる処理はリストの反転が必要な、要素の追加終了後のはじめの1回目であり、O(n)時間かかる。これをn回おこなっているので、O(n^2)時間かかっているということだ。

サンクをつぶさないで表示する

永続データ構造としてあつかったとしても、償却定数時間で押しこみや取りだしができるようにするにはどうすればいいだろうか。答えは「遅延評価を利用する」ことだ。「遅延評価を利用する」ということは、データ構造のなかで「何が評価されていて、何が評価されていない(サンクである)か」が重要になる。で、GHCiで表示しながら説明をしていきたいのだけど、もしも関数showが値をすべて表示するようなかたちになっていると、サンクをすべて評価してしまう。そうすると、本来の動きを説明できない。なので、ここでは「評価ずみ」か「評価ずみでない(サンクである)」かをチェックして「評価ずみ」のときだけ表示する関数を作成する。

そもそもHaskellの値とかサンクとかって何なのだろう

Haskellの値とかサンクとかって何なのだろう。この問いに対する答えはいろいろあって、意味論的な話と実装という観点からの答えは異なると思うけど、ここでは低レベルの実装という観点から答えてみる。その答えはクロージャだ。GHCでは値の「クロージャとしての情報」を取りだす機能が公開されている。パッケージghc-heapを使うのでpackage.yamlに、つぎのように追加する。

package.yaml
dependencies:
- base >= 4.7 && < 5
- stm
- ghc-heap

パッケージghc-heapのモジュールGHC.Exts.Heapに定義されている機能getClosureDataを使うと値やサンクがクロージャとして、どのような状態であるのかをみることができる。

Hackage: GHC.Exts.Heap.getClosureData

getClosureData :: a -> IO Closure

試してみよう。

% stack ghci
> :module GHC.Exts.Heap
> n <- read <$> getLine :: IO Int
(123と入力しエンターキーを押す)
123
> getClosureData n
ThunkClosure {info = StgInfoTable {entry = Nothing, ptrs = 2, nptrs = 0, tipe = THUNK_2_0, strlen = 0, code = Nothing}, ptrArgs = [0x0000004201f934a0,0x00000042021bd930], dataArgs = []}
> n
123
> getClosureData n
BlackholeClosure {info = StgInfoTable {entry = Nothing, ptrs = 1, nptrs = 0, tipe = BLACKHOLE, strlen = 0, code = Nothing}, indirectee = 0x000000420214f8d0/1}
(しばらく待つ)
> getClosureData n
ConstrClosure {info = StgInfoTable {entry = Nothing, ptrs = 0, nptrs = 1, tipe = CONSTR_0_1, strlen = 0, code = Nothing}, ptrArgs = [], dataArgs = [123], pkg = "ghc-prim", modl = "GHC.Types", name = "I#"}

BlackholeClosure { ... }である期間は試行によって異なる。すぐにConstrClosure { ... }になることもあれば、かなり時間がかかることもある(僕の環境では)。

getClosureData xについて返り値が値ThunkClosure { ... }であれば値xはサンクをであり、値BlackholeClosure { ... }であれば値xは(たぶん)ほかのスレッドで評価中の値であり、値ConstrClosure { ... }であれば評価ずみの値である。

Closureの種類は最新のghc-heapで26種類ある。

  • ConstrClosure
  • FunClosure
  • ThunkClosure
  • ...
  • BlackholeClosure
  • ...
  • FloatClosure
  • DoubleClosure
  • OtherClosure
  • UnsupportedClosure

このなかで、普通の値を表すクロージャのなかで評価ずみである状態を意味するのはConstrClosureである。またBlackholeClosureは「ほかのスレッドで評価中」の状態なので、これも今回の用途では評価ずみと考えることにする。

リストの「評価ずみ」の部分までを文字列化する関数

モジュールShowLazyListを用意する。

src/ShowLazyList.hs
{-# LANGUAGE LambdaCase #-}
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module ShowLazyList (showLazyList) where

import GHC.Exts.Heap (GenClosure(..), getClosureData)
import Control.Arrow (second)
import Data.Bool (bool)

値が「評価ずみ」かどうかを返す関数を定義する。

src/ShowLazyList.hs
isEvaluated :: a -> IO Bool
isEvaluated x = (<$> getClosureData x) $ \case
        ConstrClosure {} -> True
        BlackholeClosure {} -> True
        _ -> False

これを使って、リストの「評価されているところ」までを文字列化したリストを返す関数を定義する。

src/ShowLazyList.hs
showLazyList :: Show a => [a] -> IO (Bool, [String])
showLazyList xs = (=<< isEvaluated xs) . bool (pure (False, [])) $ case xs of
        [] -> pure (True, [])
        x : xs' -> second (show x :) <$> showLazyList xs'

試してみよう。

% stack ghci
> ns = [0 .. 15] :: [Int]
> showLazyList ns
(False,[])
> ns !! 5
5
> showLazyList ns
(False,["0","1","2","3","4","5"])
> ns
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]
> showLazyList ns
(True,["0","1","2","3","4","5","6","7","8","9","10","11","12","13","14","15"])

GHCiで表示に自分で定義した機能を使う

GHCの対話環境であるGHCiでは、値の表示に機能print :: Show a => a -> IO ()を使う。オプション-interactive-print=fooのようにすると、値の表示に機能foo :: SomeClass a => a -> IO ()を使うように変更できる。

デフォルトの表示機能である機能printはprint = putStrLn . showのような定義だ。これは純粋な関数showによって文字列化したものを機能putStrLnで表示している。しかし、サンクかどうかで表示のしかたを変化させるには純粋な関数showではなく、show' :: a -> IO Stringのような機能を利用してprint' x = putStrLn =<< show' xのように定義された機能print'を使いたい。つぎのような機能を定義していく。

print' :: Printable a => a -> IO ()

モジュールPrintableをつくり、クラスPrintableを定義する。

src/Printable.hs
{-# LANGUAGE FlexibleInstances, UndecidableInstances #-}
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module Printable where

class Printable a where show' :: a -> IO String

print' :: Printable a => a -> IO ()
print' x = putStrLn =<< show' x

instance {-# OVERLAPPABLE #-} Show a => Printable a where
        show' = pure . show

クラスPrintableの関数show'はshow' xのように引数をあたえると「文字列をかえす動作」をかえすクラス関数だ。それを利用して機能print'を定義している。すでにクラスShowのインスタンスである型については自動的にクラスPrintableのインスタンスになるようにした。

さらに、「解説のあとのほう」で「キューをあらわす型のタプルのMaybe値」を表示したいので、つぎのようなインスタンス宣言も追加する。

src/Printable.hs
instance Printable a => Printable (Maybe a) where
        show' (Just x) = ("Just (" ++) . (++ ")") <$> show' x
        show' Nothing = pure "Nothing"

instance (Printable a, Printable b) => Printable (a, b) where
        show' (x, y) =
                (\s t -> "(" ++ s ++ "," ++ t ++ ")") <$> show' x <*> show' y

それぞれ、Maybe値やタプルに対して、 それらの要素が型クラスPrintableのインスタンスであれば、それらもそのインスタンスになるように定義した。

遅延リストを「評価を強制せずに」表示する

この関数を使って遅延リストを「評価を強制せずに」表示する機能を作ってみよう。

src/LazyList.hs
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module LazyList where

import Data.Bool
import Data.List

import ShowLazyList
import Printable

newtype LazyList a = LazyList [a]

instance Show a => Printable (LazyList a) where
        show' (LazyList xs) = do
                (e, s) <- showLazyList xs
                pure $ "LazyList [" ++
                        intercalate "," s ++ bool ".." "" e ++ "]"

試してみよう。GHCiにオプション-interactive-print=print'をつけて呼び出す。

% stack ghci --ghc-options="-interactive-print=print'"
> atoz = ['a' .. 'z']
> LazyList atoz
LazyList [..]
> atoz !! 5
'f'
> LazyList atoz
LazyList ['a','b','c','d','e','f'..]
> atoz
"abcdefghijklmnopqrstuvwxyz"
> LazyList atoz
LazyList ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']

評価したところまでが表示されているのがわかる。

Banker's Queue

評価を強制せずに表示する関数が作れるようになった。遅延評価を生かした償却定数時間のキューを定義してみよう。

基本的にはBatched Queueとおなじように、ふたつのリストをつかう。Batched Queueでは「前がわ」のリストが空になってから、「後ろがわ」のリストをひっくり返して、「前がわ」のリストにした。Banker's Queueでは、いつでも、つぎの関係が成り立つようにする。

  • [前がわのリストの要素数] >= [後ろがわのリストの要素数]

この関係が成り立たなくなる前に「後ろがわのリスト」をひっくり返して、「前がわのリスト」の末尾に追加する。

src/BankersQueue.hs
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module BankersQueue (BankersQueue, empty, snoc, uncons, cons, head, tail) where

import Prelude hiding (head, tail)

import Control.Monad (when)
import Data.Bool (bool)
import Data.List (intercalate)

import Queue (Queue(..), ConsQueue(..), head, tail)
import ShowLazyList (showLazyList)
import Printable (Printable(..))

data BankersQueue a = BankersQueue Int [a] Int [a]

instance Queue BankersQueue where
        empty = BankersQueue 0 [] 0 []
        snoc (BankersQueue lf f lr r) x
                | lf <= lr =
                        BankersQueue (lf + lr + 1) (f ++ reverse (x : r)) 0 []
                | otherwise = BankersQueue lf f (lr + 1) (x : r)
        uncons (BankersQueue 0 _ 0 _) = Nothing
        uncons (BankersQueue lf (x : f) lr r)
                | lf <= lr = Just
                        (x, BankersQueue (lf + lr - 1) (f ++ reverse r) 0 [])
                | otherwise = Just (x, BankersQueue (lf - 1) f lr r)
        uncons (BankersQueue _ [] _ _) = error "never occur"

instance ConsQueue BankersQueue where
        cons x (BankersQueue lf f lr r) = BankersQueue (lf + 1) (x : f) lr r

showBankersQueue :: Show a => BankersQueue a -> IO String
showBankersQueue (BankersQueue _ f _ r) = do
        (ef, sf) <- showLazyList f
        (er, sr) <- showLazyList r
        when (not er) $ error "rear list should not have thunk"
        pure $ "BankersQueue [" ++ intercalate "," sf ++
                bool ".." "|" ef ++ intercalate "," (reverse sr) ++ "]"

instance Show a => Printable (BankersQueue a) where
        show' = showBankersQueue
コードの説明

lfとlrとは、それぞれ、前のリストと後ろのリストの要素数を表す。関数snocやunconsでは「前のリストの要素数」が「後ろのリストの要素数」とおなじになったら、「後ろのリスト」をひっくり返して、前のリストの末尾に追加している。

関数showBankersQueueでは「前のリスト」の評価されているところまでと、「後ろのリスト」をひっくり返したものとを表示する。「後ろのリスト」のなかには(オブジェクトコードでなら)リストのサンクは作られないはずなので、そこにサンクがあればエラーとするようにしてある。

試してみる

ここでは対話環境にオプション-fobject-codeをつけて立ち上げる。GHCiは(オブジェクトコードにアクセスできないときには)ソースコードをバイトコードにコンパイルするが、それだと追加のサンクが作られてしまうようで、今回の用途には向かないためだ。

また、表示にはまえに定義したprint' :: Printable a => a -> IO ()を利用するので、interactive-print=print'も指定する。

% stack ghci --ghc-options="-interactive-print=print' -fobject-code"
> :module BankersQueue
> :module + Data.List
> empty `snoc` 123 :: BankersQueue Int
BankersQueue [..]
> it `snoc` 456
BankersQueue [..456]
> it `snoc` 789
BankersQueue [..]
> it `snoc` 987
BankersQueue [..987]
> it `snoc` 654
BankersQueue [..987,654]
> it `snoc` 321
BankersQueue [..987,654,321]
> BankersQueue.uncons it
Just (123,BankersQueue [..])
> BankersQueue.uncons . snd =<< it
Just (456,BankersQueue [..])
> BankersQueue.uncons . snd =<< it
Just (789,BankersQueue [..])
> BankersQueue.uncons . snd =<< it
Just (987,BankersQueue [654,321|])
> BankersQueue.uncons . snd =<< it
Just (654,BankersQueue [321|])
> BankersQueue.uncons . snd =<< it
Just (321,BankersQueue[|])

押しこんだ順に取りだせている。

BatchedQueueで生じた問題が生じない理由

0から14までの15個の整数をキューに押しこみ、それをひとつずつ取りだしていってみよう。

> foldl' snoc empty [0 .. 13] :: BankersQueue Int
BankersQueue [..7,8,9,10,11,12,13]
> q = it `snoc` 14
> q
BankersQueue [..]
> BankersQueue.uncons q
Just (0,BankersQueue [..])
> BankersQueue.uncons . snd =<< it
Just (1,BankersQueue [..])
> BankersQueue.uncons . snd =<< it
Just (2,BankersQueue [..])
> BankersQueue.uncons . snd =<< it
Just (3,BankersQueue [..])
> BankersQueue.uncons . snd =<< it
Just (4,BankersQueue [..])
> BankersQueue.uncons . snd =<< it
Just (5,BankersQueue [..])
> Just (6, q') = BankersQueue.uncons . snd =<< it
> q'
BankersQueue [..]
> BankersQueue.uncons q'
Just (7,BankersQueue [8,9,10,11,12,13,14|])
> q'
BankersQueue [7,8,9,10,11,12,13,14|]

0から13まではfoldl'でまとめて押しこんだ。そのときの状態は、つぎのとおり。

  • 前側のリスト: サンク - 評価すると[0,1,2,3,4,5,6]
  • 後側のリスト: [13,12,11,10,9,8,7]

14を追加すると、つぎのようになる。

  • 前側のリスト: サンク - [0,1,2,3,4,5,6] ++ reverse [14,13,12,11,10,9,8,7]
  • 後側のリスト: []

ここから0から6まで償却定数時間で取りだせたとする。で、7を取りだそうとしたときにreverseが実行されるので8の時間がかかる。より一般的には(2 ^ n - 1)の取りだしのあとに(2 ^ n)の時間がかかる取り出しが生じる。償却O((2 ^ n) / (2 ^ n))となり、これは償却O(1)になる。

たしかに償却定数時間になっている。しかしBatched Queueで生じた問題は解決しただろうか。0から6まで取りだしたあとのキューを保存しておいて、ここから何度も7を取りだしたら償却してもO(n)にならないだろうか。ここに「遅延評価 + メモ化」が効いてくる。7を取りだした時点でreverse [14,13,...]のサンクは評価されているので、2度目からはreverseを実行する必要はない。

Real-Time Queue

永続データ構造として使っても償却定数時間で処理できるキューができた。多くの使いかたで、ここまでできれば問題ない。しかし、使い道によっては「償却で」ではなく「最悪時間で」定数時間で処理したいこともある。Banker's Queueでは評価の遅延を利用した。Real-Time Queueでは「評価の遅延」を管理する。つまり、いつ値を評価するかをスケジューリングすることで、式reverse xsの評価にかかる時間を、それぞれの操作に分散させる。

リストをすこしずつひっくり返す

評価をスケジューリングしようと思っても、式reverse xsでは結果の1番はじめの値を取りだそうとすれば、全体が評価されてしまう。

% stack ghci --ghc-options="-interactive-print=print' -fobject-code"
> xs = reverse [0 .. 10] :: [Int]
> LazyList xs
LazyList [..]
> xs !! 0
10
> LazyList xs
LazyList [10,9,8,7,6,5,4,3,2,1,0]

そこで、つぎのような仕組みを考える。

  • リストをみっつ用意する(xs, ys, a)
  • 返り値はxs ++ reverse ysとおなじ
  • リストxsから先頭のひとつを取りだすたびに、リストysの先頭からリストaにひとつ要素を移動させる
  • リストxsとysがおなじ長さであれば、リストxsの要素を末尾まで取りだしたとき、ysの要素はすべて逆順でリストaに移されているはず

コードにしてみよう。

src/RealtimeQueue.hs
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module RealtimeQueue where

rotate :: [a] -> [a] -> [a] -> [a]
rotate [] ys a = reverse ys ++ a
rotate xs [] a = xs ++ a
rotate (x : xs) (y : ys) a = x : rotate xs ys (y : a)

試してみる。

% stack ghci --ghc-options="-interactive-print=print' -fobject-code"
> :module RealtimeQueue LazyList
> xs = rotate [0 .. 6] [7 .. 14] [] :: [Int]
> LazyList xs
LazyList [..]
> take 3 xs
[0,1,2]
> LazyList xs
LazyList [0,1,2..]

このとき、xsはつぎのようなかたちになっている。

0 : 1 : 2 : rotate [3, 4, 5, 6] [10, 11, 12, 13, 14] [9, 8, 7]

さらに続けよう。

> take 7 xs
[0,1,2,3,4,5,6]
> LazyList xs
LazyList [0,1,2,3,4,5,6..]

このときのxsは、つぎのとおり。

0 : 1 : 2 : 3 : 4 : 5 : 6 : rotate [] [14] [13, 12, 11, 10, 9, 8, 7]

で。

> take 8 xs
[0,1,2,3,4,5,6,14]
> LazyList xs
LazyList [0,1,2,3,4,5,6,14..]

xsは、つぎのようになっている。

0 : 1 : 2 : 3 : 4 : 5 : 6 : 14 : ([] ++ [13, 12, 11, 10, 9, 8, 7])

つぎで、最後まで評価される。

> take 9 xs
[0,1,2,3,4,5,6,14,13]
> LazyList xs
LazyList [0,1,2,3,4,5,6,14,13,12,11,10,9,8,7]

rotate xs ys []を評価することを考える。リストxsの部分をひとつ取りだすたびにysの要素をひとつずつ「反転されたリスト」に追加していくということだ。

評価をスケジュールする

関数reverseの代わりに関数rotateをつかえば、リストの要素をひとつ評価するたびに、反転をすこしずつ実行してくれる。Real-time Queueでは要素の押しこみと取りだしのたびに、ひとつずつ、この部分的な反転を実行したい。そのために、前がわのリストとおなじものへの参照をキューのなかに保持しておく。そちらへの評価を強制すれば、前がわのリストへの評価を強制することになる。

まずは、関数rotateよりもうえの部分を書きかえる。

src/RealtimeQueue.hs
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module RealtimeQueue (RTQueue, empty, snoc, uncons, cons, head, tail) where

import Prelude hiding (head, tail)

import Control.Monad (when)
import Data.Bool (bool)
import Data.List (intercalate)

import Queue (Queue(..), ConsQueue(..), head, tail)
import ShowLazyList (showLazyList)
import Printable (Printable(..))

関数rotateよりしたの部分を書く。

src/RealtimeQueue.hs
data RTQueue a = RTQueue [a] [a] [a]

showRTQueue :: Show a => RTQueue a -> IO String
showRTQueue (RTQueue f r _) = do
        (ef, sf) <- showLazyList f
        (er, sr) <- showLazyList r
        when (not er) $ error "rear list should not have thunk"
        pure $ "RTQueue [" ++ intercalate "," sf ++
                bool ".." "|" ef ++ intercalate "," (reverse sr) ++ "]"

instance Show a => Printable (RTQueue a) where show' = showRTQueue

instance Queue RTQueue where
        empty = RTQueue [] [] []
        snoc (RTQueue f r []) x = RTQueue f' [] f'
                where f' = rotate f (x : r) []
        snoc (RTQueue f r (_ : s)) x = RTQueue f (x : r) s
        uncons (RTQueue [] [] []) = Nothing
        uncons (RTQueue (x : f) r []) = Just (x, RTQueue f' [] f')
                where f' = rotate f r []
        uncons (RTQueue (x : f) r (_ : s)) = Just (x, RTQueue f r s)
        uncons (RTQueue [] [] (_ : _)) = error "never occur"
        uncons (RTQueue [] (_ : _) _) = error "never occur"

instance ConsQueue RTQueue where
        cons x (RTQueue f r s) = RTQueue (x : f) r (x : s)
コードの説明

Banker's Queueとだいたい似ている。異なるのはf ++ reverse rではなく、rotate f r []としている部分と、RTQueueのみっつめの引数である「前がわのリストとおなじものへの参照」のあつかいだ。関数snocでの要素の押しこみと関数unconsによる要素の取りだしのたびに、この部分のリストから値を取りだしている。「値を取りだす」たびに、リストのその位置までが評価される。

試してみる

試してみよう。

% stack ghci --ghc-options="-interactive-print=print' -fobject-code"
> :module RealtimeQueue Data.List
> foldl' snoc empty [0 .. 16] :: RTQueue Int
RTQueue [0,1..15,16]
> RealtimeQueue.uncons it
Just (0,RTQueue [1,2..15,16])
> RealtimeQueue.uncons . snd =<< it
Just (1,RTQueue [2,3..15,16])
> RealtimeQueue.uncons . snd =<< it
Just (2,RTQueue [3,4..15,16])
> RealtimeQueue.uncons . snd =<< it
Just (3,RTQueue [4,5..15,16])
> RealtimeQueue.uncons . snd =<< it
Just (4,RTQueue [5,6..15,16])
> RealtimeQueue.uncons . snd =<< it
Just (5,RTQueue [6,7..15,16])
> RealtimeQueue.uncons . snd =<< it
Just (6,RTQueue [7,8,9,10,11,12,13,14|15,16])
> RealtimeQueue.uncons . snd =<< it
Just (7,RTQueue [8,9,10,11,12,13,14|15,16])

要素の押し込みと取り出しごとに、「反転」がすこしずつ実行される。

Knuth-Morris-Prattアルゴリズム

さて、文字(などの)列から特定の並びを探すアルゴリズムといえば、Boyer-MooreアルゴリズムかKnuth-Morris-Pratt(以下KMP)アルゴリズムあたりが有名だ。ここでは後者を採用する。くわしくは、つぎの記事で解説してある。

Knuth-Morris-PrattアルゴリズムのHaskellによる実装の解説

ここでは、最終的なコードを紹介し、ざっと解説する。解説用のプロジェクトを作成する。

% stack new try-knuth-morris-pratt-algorithm
% cd try-knuth-morris-pratt-algorithm

モジュールを用意する。

src/KnuthMorrisPratt.hs
{-# LANGUAGE LambdaCase #-}
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module KnuthMorrisPratt where

入力ごとに状態を変化させていく状態機械(オートマトン)を生成するのが、このアルゴリズムの肝だ。状態をあらわす代数的データ型を定義する。

src/KnuthMorrisPratt.hs
data Rep a = Null | Node a (Rep a) (Rep a)

第1引数に初期状態、第2引数に現在の状態、第3引数に「文字」をとり、新しい状態をかえす関数stepを定義する。

src/KnuthMorrisPratt.hs
step :: Eq a => Rep [a] -> Rep [a] -> a -> Rep [a]
step rt = op
        where
        op Null _ = rt
        op (Node [] l _) x = op l x
        op (Node (v : _) l r) x
                | v == x = r
                | otherwise = op l x

状態がNullであれば初期状態にもどる。Nodeの第一引数はパターンの「残り」の部分だ。残りがなければ「マッチに失敗」して、木の左の枝に進み、もういちどマッチを試す。パターンの最初の文字と入力を比較して、おなじなら右の枝に進む。異なれば左の枝に進み、もういちどマッチを試す。このへんで、buildしておこう。

% stack build

Knuth-Morris-Prattアルゴリズムの「Knuth」にあたる部分を定義する。これは、たとえば"ababc"のような並びに対して、"aba"までマッチして、つぎの'b'でマッチに失敗したときに、その入力は'b'ではあり得ないということを利用して、比較回数を減らす最適化だ。関数nextを定義する。

src/KnuthMorrisPratt.hs
next :: Eq a => Rep [a] -> a -> Rep [a]
next t@Null _ = t
next t@(Node [] _ _) _ = t
next t@(Node (v : _) l _) x | v == x = next l x | otherwise = t

関数nextは第1引数に状態をあらわす木をとり、第2引数に「マッチに失敗したときの、並びのパターンのほうの文字x」をとる。もしもxが新しい状態での比較対象であるvに等しいならば、古い状態とのマッチと同様にマッチに失敗するので、「失敗したときに進む状態」である「左の枝のさきの状態」で、その状態を置き換える。

さいごに、状態をあらわす木を組み立てる関数grepを定義する。

src/KnuthMorrisPratt.hs
grep :: Eq a => Rep [a] -> Rep [a] -> [a] -> Rep [a]
grep _ l [] = Node [] l Null
grep rt l va@(v : vs) = Node va (next l v) (grep rt (step rt l v) vs)

ポイントは「失敗したときの遷移先」である左の枝に、「『現在マッチしている文字列から先頭をおとした文字列』へのマッチによる状態遷移の結果」が置かれているということだ。

状態を引数にとり「マッチが終了」しているかどうかをかえす関数okを定義する。

src/KnuthMorrisPratt.hs
ok :: Rep [a] -> Bool
ok = \case Null -> False; Node vs _ _ -> null vs

Repの第1引数が空文字列であれば、マッチすべき文字は残っていないということだ。これで、基本的な関数はそろった。あとは、これらを組み合わせればいい。このまま使ってもいいが、つぎのような関数を定義したほうが使いやすい。

initialState :: Eq a => [a] -> KmpState a
nextState :: Eq a => KmpState a -> a -> KmpState a
found :: KmpState a -> Bool

代数的データ型KmpStateと、これらの関数を定義する。

KnuthMorrisPratt.hs
data KmpState a = KmpState { rootRep :: Rep [a], currentRep :: Rep [a] }

initialState :: Eq a => [a] -> KmpState a
initialState ws = KmpState root root where root = grep root Null ws

nextState :: Eq a => KmpState a -> a -> KmpState a
nextState st x = st { currentRep = step (rootRep st) (currentRep st) x }

found :: KmpState a -> Bool
found = ok . currentRep

代数的データ型KmpStateは、初期状態と現在の状態とをまとめたもの。関数initialState, nextState, foundは、それぞれ、関数grep, step, okのラッパーになっている。モジュール宣言を書き直す。

src/KnuthMorrisPratt.hs
module KnuthMorrisPratt (KmpState, initialState, nextState, found) where

試してみよう。

% stack ghci
> (fst <$>) . filter (found . snd) . zip [0 ..] $ scanl nextState (initialState "abcab") "ababcabcab"
[7,10]

文字列"ababcabcab"のなかで"abcab"を探している。マッチする位置を探す並びの最後の文字が、1から数えて何番目にあるかを示している。

マルチスレッドズンドコキヨシ

ようやく本題にはいる。マルチスレッドズンドコキヨシをつくっていこう。

仕様

オリジナル版の仕様

オリズナルのズンドコキヨシの仕様は、(おそらく)つぎのようになっている。

  • ズンとドコをランダムに生成する
    • 生成されたズンとドコはその順にターミナルに表示する
  • ズン、ズン 、ズン、ズン、ドコの並びがあらわれたとき
    • キ・ヨ・シ!と表示して終了

マルチスレッド版の仕様(と実装?)

マルチスレッド版では、つぎのような仕様(実装?)にする。

  • ズンとドコのうち、それぞれひとつを生成する、ふたつのスレッドを走らせる
    • それぞれ、ズンやドコを表示する間隔は1μ秒から100m秒のあいだでランダムにする
    • 生成されたズンやドコはローカルなキューに押しこまれる
  • 別のスレッドでローカルなキューから取りだされたズンやドコは出力用のキューに押しこまれる
    • ズン、ズン、ズン、ズン、ドコというパターンが出現したところで、出力用のキューにキ・ヨ・シ!を押しこんで、このスレッドは終了する
  • メインのスレッドから上記の3スレッドをフォークする
  • メインのスレッドは出力用キューからズンやドコを取り出して表示する
    • キ・ヨ・シ!を取り出したところで、それを表示して終了

マルチスレッドズンドコキヨシを書く

プロジェクトの作成と必要なモジュールのコピー

まずは、プロジェクトを作成する。

% stack new concurrent-zundoko

Real-Time QueueのモジュールとKnuth-Morris-Prattアルゴリズムのモジュールとをコピーしておく。コピーするモジュールは、つぎのとおり。

  • Queue.hs
  • RealtimeQueue.hs
  • ShowLazyList.hs
  • Printable.hs
  • KnuthMorrisPratt.hs

これらをディレクトリsrc/下に置く。GNU/Linuxを使っているのであれば、たとえば、つぎのようにコピーする。

% cp try-persistent-queue/src/Queue.hs concurrent-zundoko/src/
% cp try-persistent-queue/src/RealtimeQueue.hs concurrent-zundoko/src/
% cp try-persistent-queue/src/ShowLazyList.hs concurrent-zundoko/src/
% cp try-persistent-queue/src/Printable.hs concurrent-zundoko/src/
% cp try-knuth-morris-pratt-algorithm/src/KnuthMorrisPratt.hs concurrent-zundoko/src/

ディレクトリconcurrent-zundoko/下に移動して、ファイルpackage.yamlのdependenciesにパッケージghc-heap, stm, randomのみっつを追加する。

package.yaml
dependencies:
- base >= 4.7 && < 5
- ghc-heap
- stm
- random

buildしておこう。

% stack build

補助的なモジュール

ちょっとした便利関数を定義する。

src/Tips.hs
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module Tips (forkForever, forkLoopIf, loopIf) where

import Control.Monad (void, forever)
import Control.Concurrent (forkIO)
import Data.Bool (bool)

forkForever :: IO () -> IO ()
forkForever = void . forkIO . forever

forkLoopIf :: IO Bool -> IO ()
forkLoopIf = void . forkIO . loopIf

loopIf :: Monad m => m Bool -> m ()
loopIf act = bool (return ()) (loopIf act) =<< act

機能forkForeverはスレッドを生成して、そのスレッドのなかで、あたえられた動作を、いつまでもくりかえす機能だ。機能forkLoopIfはおなじような機能をもつが、動作がTrueをかえすあいだだけくりかえす。STMモナド内でReal-Time Queueを使用するのに便利な関数も定義しておこう。

src/RealtimeQueueStm.hs
{-# LANGUAGE LambdaCase #-}
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module RealtimeQueueStm (TRTQueue, newQueue, enqueue, dequeue) where

import Control.Concurrent.STM
import RealtimeQueue

type TRTQueue a = TVar (RTQueue a)

newQueue :: STM (TRTQueue a)
newQueue = newTVar empty

enqueue :: TRTQueue a -> a -> STM ()
enqueue q = modifyTVar q . flip snoc

dequeue :: TRTQueue a -> STM a
dequeue q = uncons <$> readTVar q >>= \case
        Nothing -> retry
        Just (x, xs) -> x <$ writeTVar q xs

Real-Time Queueを格納するTVarを作成し(newQueue)、それに値を押しこん(enqueue)だり、そこから値を取りだし(dequeue)たりする関数だ。

中心的なモジュールZunDoko

モジュールZunDokoを用意する。

src/ZunDoko.hs
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE TypeFamilies, DefaultSignatures #-}
{-# LANGUAGE FlexibleContexts #-}
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module ZunDoko where

import Control.Arrow
import Control.Concurrent
import Control.Concurrent.STM hiding (check)
import System.Random

import Tips
import RealtimeQueue
import RealtimeQueueStm
import KnuthMorrisPratt

必要なモジュールがそろっていることを確認する意味でも、ここでいちどbuildしておこう。

% stack build

「終了」値を追加する型クラス

ズン、ドコ、キ・ヨ・シ!のみっつを表現する型には、つぎのふたつの候補がある。ひとつめを示す。

data ZunDoko = Zun | Doko | Kiyoshi

もうひとつは、つぎのようになる。

type ZunDokoKiyoshi = Maybe ZunDoko
data ZunDoko = Zun | Doko

つまり、ZunDoko型の値のひとつとして終了を意味するKiyoshiを定義するか、あるいは終了を意味する値としてはMaybe ZunDoko型のNothingを使うか、だ。どちらも捨てがたい。どちらも捨てがたいので、どちらにも対応できるような関数を定義することにする。

そのために、「既存の型に終了をあらわす値を追加する」型クラスを定義する。

src/ZunDoko.hs
class ToEndable a where
        type PreEndable a
        endable :: PreEndable a -> a
        endValue :: a

        default endable :: PreEndable a ~ a => PreEndable a -> a
        endable = id

instance ToEndable (Maybe a) where
        type PreEndable (Maybe a) = a
        endable = Just
        endValue = Nothing

クラスToEndableを定義した。PreEndable aは終了値を追加するまえの型をあらわす。関数endableは終了値を追加するまえの型の値から、終了値を追加したあとの型の値への変換関数。値endValueは終了値だ。関数endableは終了値の追加の前後の型がおなじときには、デフォルトで関数idになる(default endable :: ...)。

Maybe a型はクラスToEndableのインスタンスであり、通常の値はJust _であり、終了値はNothingとなる。

スレッドをランダムなμ秒のあいだだけ休止させる

スレッドをランダムなμ秒のあいだだけ休止させる関数ruffleを定義する。

src/ZunDoko.hs
ruffle :: Int -> IO ()
ruffle n = randomRIO (1, n) >>= threadDelay

引数nは休止させるμ秒の上限だ。

キューにはいっている要素の並びをチェックする

ここがズンドコキヨシの心臓部分ともいえる。並びのパターンをチェックする関数だ。KMPアルゴリズムにおける状態の値とキューをとり、チェックした要素のリストと新しい状態をかえす。もとめる並びが検出されたとき、新しい状態はかえさずにNothingとする。

src/ZunDoko.hs
check :: Eq a => KmpState a -> RTQueue a -> ([a], Maybe (KmpState a))
check st q = case uncons q of
        Nothing -> ([], Just st)
        Just (z, q') -> let st' = st `nextState` z in
                if found st' then ([z], Nothing) else (z :) `first` check st' q'

関数unconsでキューを先頭の要素zと残りのキューq'にわける。要素zを入力にして状態stを新しい状態st'に更新する。状態st'が「みつかった」状態であれば、要素zのみを要素とするリストとNothingのタプルをかえす。もし状態st'が「みつかった」状態でなければ、残りのキューq'に対して探索を続けて、それがかえした値のタプルのひとつめであるリストに要素zを追加する。

それぞれのスレッドを生成する

それぞれのスレッドを生成して、出力用のキューをかえす動作を作成する。

src/ZunDoko.hs
zundoko :: (ToEndable e, Eq (PreEndable e)) =>
        [PreEndable e] -> [PreEndable e] -> IO (TVar (RTQueue e))
zundoko ts pt = do
        ql <- atomically newQueue
        (forkForever . (ruffle 100000 >>) . atomically . enqueue ql) `mapM_` ts
        kmpst <- atomically . newTVar $ initialState pt
        qo <- atomically newQueue
        forkLoopIf . atomically $ do
                st <- readTVar kmpst
                q <- readTVar ql; writeTVar ql empty
                case check st q of
                        ([], _) -> retry
                        (zs, st') -> (enqueue qo . endable) `mapM_` zs >> maybe
                                (False <$ enqueue qo endValue)
                                ((True <$) . writeTVar kmpst) st'
        pure qo

関数zundokoの第1引数tsは走らせるスレッドの数だけの値を要素とするリストだ。それぞれのスレッドが、それぞれの値をキューに押しこむ。第2引数ptは探索する並びのパターンだ。

ローカルなキューを生成して、それぞれの値をキューに押しこみつづける、それぞれのスレッドを走らせる。KMPアルゴリズムの初期状態と出力用のキューを生成する。forkLoopIfの引数が終了のパターンを探索する動作だ。つぎのような動作になる。

  • KMPアルゴリズムの状態を取り出す
  • 現在のキューを取り出したうえで、空のキューで置き換える
  • 関数checkでパターンを探索する
    • キューになにも入っていないときはretryで値の取得からやりなおす
    • 関数checkの返り値であるタプルの第1要素(zs)の要素をすべて出力用のタプルに押しこむ
    • 第2要素はKMPアルゴリズムの新しい状態だ
      • Nothingは「検出」という意味。「終了」値を押しこんでスレッドを終了(Falseをかえす)する
      • Just値なら、なかみの新しい状態で古い状態を置き換える
  • 出力用のキューを、この動作の呼び出しもとにわたす

モジュール宣言に公開リストを書き加える。

src/ZunDoko.hs
module ZunDoko (ToEndable(..), zundoko) where

ズン、ドコ、キ・ヨ・シ!をあらわす代数的データ型

モジュールMainを用意する。

app/Main.hs
{-# LANGUAGE LambdaCase, TypeFamilies #-}
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module Main where

import Control.Concurrent.STM

import Tips
import RealtimeQueueStm
import ZunDoko

main :: IO ()
main = putStrLn "dummy"

必要なモジュールの導入などをしている。できるだけbuildしながら、開発を進めたいのでダミーの動作mainを定義してある。

ズン、ドコ、キ・ヨ・シ!をあらわす代数的データ型を定義する。

app/Main.hs
data ZunDoko = Zun | Doko | Kiyoshi deriving (Show, Eq)

instance ToEndable ZunDoko where
        type PreEndable ZunDoko = ZunDoko
        endValue = Kiyoshi

say :: ZunDoko -> String
say = \case Zun -> "ズン"; Doko -> "ドコ"; Kiyoshi -> "キ・ヨ・シ!"

終了を示す値もZunDoko型のなかにいれてしまう実装にした。理由は「終了」の値についても(キ・ヨ・シ!という)「表示」があるので、それについても関数sayでまとめてあつかったほうが、きれいだからだ。型ZunDokoは自身をPreEndable _とするToEndableクラスのインスタンスだ。終了を意味する値endValueはKiyoshiとする。

関数main

関数zundokoに必要な引数をあたえると出力用のキューが手に入る。このキューからズン、ドコ、キ・ヨ・シ!を取り出して、1行ずつ表示していく。取り出した値がキ・ヨ・シ!だったら全体を終了させる。

app/Main.hs
main :: IO ()
main = zundoko [Zun, Doko] [Zun, Zun, Zun, Zun, Doko] >>= \q -> loopIf
        $ (<$) <$> (/= endValue) <*> putStrLn . say =<< atomically (dequeue q)

あまり、よくないと思いつつも、こういうコードを「きれいだなぁ」と感じてしまう。よくない。展開しよう。

app/Main.hs
main = do
        q <- zundoko [Zun, Doko] [Zun, Zun, Zun, Zun, Doko]
        loopIf $ do
                z <- atomically $ dequeue q
                putStrLn $ say z
                pure $ z /= endValue

ということ。最近は関数returnの代わりに関数pureを使うのがマイブームだ。関数zundokoに「それぞれのスレッドがキューに押しこむ値のリスト」と「終了パターン」を引数としてあたえる。返り値である出力用のキューで変数qを束縛する。関数loopIfは引数である動作がTrueをかえすあいだ、その動作をくりかえす。くりかえされる動作のなかでは、キューqから要素を取りだして、それをputStrLn $ say zで表示したうえで、終了するかどうかのブール値であるz /= endValueをかえす。

試してみよう。

% stack build
% stack exec concurrent-zundoko-exe
ドコ
ズン
ズン
ドコ
ドコ
ズン
ドコ
ドコ
ドコ
ズン
ドコ
ドコ
ズン
ズン
ズン
ズン
ズン
ドコ
キ・ヨ・シ!

おまけ(PPAP)

終了をNothingで表現するほうの例も挙げておく。まずはpackage.yamlのexecutableに追加。

package.yaml
  ppap:
    main: ppap.hs
    source-dirs: app
    ghc-options:
    - -threaded
    - -rtsopts
    - -with-rtsopts=-N
    dependencies:
    - concurrent-zundoko

モジュールMainを書く。

app/ppap.hs
{-# LANGUAGE LambdaCase #-}
{-# OPTIONS_GHC -Wall -fno-warn-tabs #-}

module Main where

import Control.Concurrent.STM
import Data.Maybe

import Tips
import RealtimeQueueStm
import ZunDoko

data PPAP = Pen | Pineapple | Apple deriving (Show, Eq)

say :: PPAP -> String
say = \case Pen -> "ペン"; Pineapple -> "パイナッポー"; Apple -> "アッポー"

main :: IO ()
main = zundoko [Pen, Pineapple, Apple] [Pen, Pineapple, Apple, Pen] >>= \q ->
        loopIf $ (<$) <$> isJust <*> maybe (return ()) (putStrLn . say)
                =<< atomically (dequeue q)

試してみよう。

% stack exec ppap
パイナッポー
アッポー
ペン
パイナッポー
アッポー
パイナッポー
パイナッポー
アッポー
ペン
パイナッポー
アッポー
ペン

まとめ

長くなってしまった。ズンドコキヨシをマルチスレッドにしたら、おもしろそうだと思ったのだけど、実際におもしろかった。STMとReal-Time Queueを紹介するだけのつもりだったのが、「終了判定」のところのコードが、あまりきれいに書けなかったことから、Knuth-Morris-Prattアルゴリズムを書かざるをえなくなり、「関数プログラミング 珠玉のアルゴリズムデザイン」をひっぱり出してきて、第17章を熟読することになった。

また、Real-Time Queueを解説するうえで、関数show(というかprint?)がサンクをつぶしまくるのが気になったために、モジュールGHC.Exts.Heapの関数getClosureDataを使った。

あと、すでに長くなりすぎた記事なのでそのままにするが、本当なら「無限ループ」しているスレッドをほったらかしにしておくのはよくない。機能killThread :: ThreadId -> IO ()あたりで後始末をするべきだろう。

15
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
6