conduitの学習のため、Conduit Overview - School of Haskell を翻訳してみました。全体的に平易な英語で書かれているので、原文そのままでも問題なく読めると思いますが、これから学習する方の役に立てればと思い公開します。
なお、練習問題の解答は原文の方で確認して下さい。
Conduit Overview
conduitはデータのストリーミングのための1つのソリューションで、一定のメモリの中でストリームの生成、変換、消費を可能にします。イベント駆動的ななやり方で、ファイルやネットワークインターフェイスの処理、構造化されたデータの解析に使用することができます。
このチュートリアルに加え、多くのトピックを含む conduitについてのスライド があります。
Synopsis
import Data.Conduit
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
main = do
    -- Pure operations: summing numbers.
    result <- CL.sourceList [1..10] $$ CL.fold (+) 0
    print result
    
    -- Exception safe file access: copy a file.
    writeFile "input.txt" "This is a test."
    runResourceT $ CB.sourceFile "input.txt" $$ CB.sinkFile "output.txt"
    readFile "output.txt" >>= putStrLn
    
    -- Perform transformations.
    result <- CL.sourceList [1..10] $$ CL.map (+ 1) =$ CL.consume
    print result
Features of conduit
- 
conduitは大きな (あるいは無限の) ストリームデータを一定のメモリの中で扱うことを可能にします。データの塊 (chunk) は全体を一度に読み込む必要はなく、一度に1ピースずつ扱われます。
- 
conduitはまた、決定的なリソースの利用を可能にします。ファイルディスクリプタのような乏しいリソースを扱うとき、conduitはリソースが必要なくなったらすぐに再利用するように設計されています。遅延I/Oとは対照的に、一定のメモリを使用しますがリソースの消費は決定的です。(訳注: ここは、ちょっと意味が取れていません。どなたか校正おねがいします。)
- リソースの使用はまた、例外安全です。ファイルハンドルは、例外の存在下でも必ずリサイクルされます。この機能の大部分は、後述する関連のresourcetパッケージによって提供されます。
- 組み込みのコンポーネントを組み合わせて、より複雑な構造を作り上げるのは簡単です。私たちの目標は、命令型の世界の処理の中への純粋なコードの組み込みの容易性を保つことです。
Basics
conduitパッケージの主要なモジュールはData.Conduitで、核となるデータ型やプリミティブなオペレーションを提供します。もう1つの汎用的に使用されるモジュールはData.Conduit.Listで、mapやfoldといった標準的なHaskellの概念に基づいた、一般的なケースのための多くのヘルパー関数を提供します。
conduitには、3つの主要な概念があります。Sourceはデータのストリームを生成し、それを*下流 (downstream)に送ります。Sinkは上流 (upstream)*から来るデータのストリームを消費 (consume) し、戻り値を生成します。Synopsis節の最初の例では、sourceListがIntegerのストリームを生成し、foldがそれを消費して55という戻り値を生成しました。3つ目の概念はConduitです。これは、上流から受け取った値のストリームを消費して、新しいストリームを生成して下流に送ります。Synopsis節では、mapの呼び出しがIntegerの1つのストリームを消費し、それぞれの値に1を加えてその結果を下流に送りました。
これらの3つの異なるコンポーネントを組み合わせるために、*接続 (connect)と結合 (fuse)*があります。接続の演算子は$$です。これは、SourceとSinkを組み合わせて、前者の生成した値を後者に与え、最終的な結果を生成します。一方、結合は2つのコンポーネントをとって1つの新たなコンポーネントを生成します。例えば、=$はConduitとSinkを結合して新たなSinkを生成できます。これは、元々のConduitと同じ値を消費し、元々のSinkと同じ結果を生成します。他の2つの結合の演算子は、SourceとConduitを組み合わせて新たなSourceにする$=、2つのConduitを組み合わせて新たなConduitにする=$=です。
import Data.Conduit
import qualified Data.Conduit.List as CL
source :: Source IO Int -- produces a stream of Ints
source = CL.sourceList [1..4]
sink :: Sink String IO () -- consumes a stream of Strings, no result
sink = CL.mapM_ putStrLn
conduit :: Conduit Int IO String -- converts Ints into Strings
conduit = CL.map show
main = do
    source $$ conduit =$ sink
    -- alternatively, with the same meaning
    source $= conduit $$ sink
Exercise
すべての入力される数を2倍するConduitを書き、それを上記のコードに組み込みなさい。期待する結果を得られる接続と結合の演算子の使い方は複数あることに留意し、それらのいくつかを示しなさい。
Unified data type
水面下では、3つの核となるデータ型はすべて、1つの同じ型ConduitMの単なるラッパーです。この1つの統一された型をラッピングすることで、多くのコードを再利用でき、より簡単にconduitの中のコンポーネントを組み合わせることができます。
ConduitMは4つの型パラメータを取ります。上流から受け取る入力、下流に送る出力、基底モナド、戻り値です。特殊化された型は、以下のように定義されます。
type Source m a = ConduitM () a m () -- no meaningful input or return value
type Conduit a m b = ConduitM a b m () -- no meaningful return value
type Sink a m b = ConduitM a Void m b -- no meaningful output value
ConduitMはモナド変換子です。それ故、基底モナドの操作を持ち上げる (lift) ことができます (後述の "Lift Operations" 参照)。そして、複数のコンポーネントを容易に組み合わせることができます。これは、より単純なコンポーネントから複雑なメカニズムを作り出すことを簡単にします。
ConduitMデータ型を直接扱うことは滅多にないでしょうが、エラーメッセージの中にはときどき現れるでしょう。
Primitives
conduitライブラリには、以下の3つのプリミティブな関数があります。
- 
awaitは、(入手可能であれば) 上流から1つの値を受け取ります。
- 
yieldは、下流に1つの値を送ります。
- 
leftoverは1つの値を上流のキューに戻し、次のawait呼び出しで読み出されるようにします。
import Data.Conduit
import Control.Monad.IO.Class
source :: Source IO Int
source = do
    yield 1
    yield 2
    yield 3
    yield 4
    
conduit :: Conduit Int IO String
conduit = do
    -- Get all of the adjacent pairs from the stream
    mi1 <- await
    mi2 <- await
    case (mi1, mi2) of
        (Just i1, Just i2) -> do
            yield $ show (i1, i2)
            leftover i2
            conduit
        _ -> return ()
            
sink :: Sink String IO ()
sink = do
    mstr <- await
    case mstr of
        Nothing -> return ()
        Just str -> do
            liftIO $ putStrLn str
            sink
            
main = source $$ conduit =$ sink
Exercises
- 
yieldを用いてsourceListを実装しなさい。sourceList :: Monad m => [a] -> Source m a sourceList = ???
- 
conduitライブラリにはawaitForeverというヘルパー関数があります。awaitForeverを用いて上述のsinkを書き直しなさい。sink :: Sink String IO () sink = ???
- 
あなた独自の awaitForeverを実装しなさい。myAwaitForever :: Monad m => (a -> Conduit a m b) -> Conduit a m b myAwaitForever f = ???
Monadic chaining
上述の例は、標準的なモナドの連結 (monadic binding) を用いて、どのようにプリミティブ関数を組み合わせることができるかを示したものです。これは、単にプリミティブに適用するだけではなく、より大きなコンポーネントを組み合わせることも可能です。受け取った任意の値を3回出力する、"triple" Conduit を考えてみましょう。
import Data.Conduit
import qualified Data.Conduit.List as CL
triple :: Monad m => Conduit a m a
triple = do
    ma <- await
    case ma of
        Nothing -> return ()
        Just a -> do
            CL.sourceList [a, a, a]
            triple
main = CL.sourceList [1..4] $$ triple =$ CL.mapM_ print
ミニ練習: 上記をawaitForeverを使って (より短くなるように) 書き直しなさい。
この例から分かるように、sourceListのようなより高レベルの関数を、より大きな関数の中に組み込むことができます。1つ質問があるとすれば、どうしてConduitの本体の中でSourceを使えるのか?ということかもしれません。それについては、後述の "Generalizing" の節の中でProducerとConsumerと共に議論しましょう。
練習: Intのストリームを消費 (consume) するConduitを書きなさい。それは、ストリームから最初のIntを受け取り、それを後続の各々のIntに対して掛けあわせた値を下流に送ります。この処理には、Data.Conduit.List.map関数を使うとよいでしょう。
Lifting operations
Conduitはモナド変換子なので、基底モナドがサポートする任意の操作を行うことができます。CL.mapM_ printの使用により、暗黙的ですが私たちはそれを既に見ています。しかし、liftやliftIOを明示的に使用することもできます。また、使えるのはIOモナドに限りません。Stateモナドの例を見てみましょう。
import Control.Monad.State
import Data.Conduit
import qualified Data.Conduit.List as CL
source :: Source (State Int) Int
source = do
    x <- lift get
    if x <= 0
        then return ()
        else do
            yield x
            lift $ modify (\x -> x - 2)
            source
            
conduit :: Conduit Int (State Int) (Int, Int)
conduit = awaitForever $ \i -> do
    lift $ modify (+ 1)
    x <- lift get
    yield (i, x)
main :: IO ()
main = do
    let result :: State Int [(Int, Int)]
        result = source $$ conduit =$ CL.consume
    print $ runState result 5
Generalizing
前述のtriple conduitにおいて、どうしてConduitの中でSourceを使うことができたのでしょう?型チェックされないんでしょうか?結局のところ、Sourceは入力の型が()に制限されているのに対し、Conduitは任意の入力の型を取れます。前述の例では入力はIntであり、うまく行かないはずだったんですが。
その答えは、私たちがここで導入する最後の2つの型シノニムにあります。Producerは一般化されたSourceです。()の入力ストリームを消費する代わりに、 任意の 入力の型を消費することができます。従って、ProducerはSourceとConduitの両方になることができます。同様に、Consumerは任意の型を出力できます。従って、ConsumerはConduitとSinkのどちらにもなれます。
この一般化のため、ほとんどのライブラリ関数はProducerかConsumerを使って書かれるでしょう。ユーザとしては、(Conduitとしての機能も使う必要があるのでなければ) たいていのコードでは単にSourceとSinkを使えばよいです。
そして、後からSourceをConduitに変換したくなった場合 (例えば、他人のライブラリを使っているとき) には、toProducer、Sinkに対してはtoConsumerを使うことができます。
Termination
コンポーネントの一連の流れ (私たちはそれをパイプライン (pipeline) と呼びます) のライフタイムについてお話しましょう。パイプラインは、常に下流から駆動されます。これは、例えばSourceとSinkを接続した場合、Sinkから処理を始めるということです。
Sinkは、さらなる入力が必要となるまで処理を続けます。その後awaitを呼んで、新しい入力が来るまで処理が中断されます。Sourceの方は、下流が入力を要求した時に起動され、値を生成すると下流に制御を戻します。Sinkが終端 (terminate) するとすぐに、パイプライン全体が終了します。
次の例は、パイプラインのコンポーネントが相互にどのように作用するかを示します。sinkへのパラメータを2から4に変更し、それが出力にどのような影響を与えるのかを確認してみて下さい。
import Data.Conduit
import Control.Monad.IO.Class
source = do
    liftIO $ putStrLn "source: yielding 1"
    yield 1
    liftIO $ putStrLn "source: yielding 2"
    yield 2
    liftIO $ putStrLn "source: yielding 3"
    yield 3
    
conduit = do
    liftIO $ putStrLn "conduit calling await"
    mx <- await
    case mx of
        Nothing -> liftIO $ putStrLn "Nothing left, exiting"
        Just x -> do
            liftIO $ putStrLn $ "conduit yielding " ++ show x
            yield x
            conduit
            
sink 0 = liftIO $ putStrLn "sink is finished, terminating"
sink i = do
    liftIO $ putStrLn $ "sink: still waiting for " ++ show i
    mx <- await
    case mx of
        Nothing -> liftIO $ putStrLn "sink: Nothing from upstream, exiting"
        Just x -> do
            liftIO $ putStrLn $ "sink received: " ++ show x
            sink (i - 1)
            
main = source $$ conduit =$ sink 2
ここまでに述べたことに基づくと、ある大きな制約があります。SourceとConduitは自分自身でクリーンアップ (clean-up) 処理をする方法がありません。下流にある任意のコンポーネントが終端するとすぐに、それらは終端させられるからです。これを解決するために、conduitはターミネーター (terminator) の概念をサポートします。SourceまたはConduitが下流に値をyieldする毎に、追加的にクリーンアップ関数を含むことができます。Sourceがyieldする毎に、前回生成したクリーンアップ関数を上書きします。簡単な例を見てみましょう。
import Data.Conduit
import qualified Data.Conduit.List as CL
source =
    loop 1
  where
    loop i = do
        yieldOr i $ putStrLn $ "Terminated when yielding: " ++ show i
        loop $ i + 1
        
main = source $$ CL.isolate 7 =$ CL.mapM_ print
今までの例の中では、これはそんなに重要なことではなかったのですが、ファイルディスクリプタのような乏しいリソースを扱い始めると、使い終わったらすぐにディスクリプタを閉じる機能が必要になります。
あなたのコードベースに一々yieldOrを入れていくのは、あまりに面倒でしょう。代わりに、addCleanup関数を使うのが通常はより簡単です。これは、終了時に特定の関数が呼ばれることを保証します。クリーンアップ関数には、Bool型のパラメータが与えられます。これがTrueであればコンポーネントは正常に完了するまで動作したことを示し、Falseであれば下流が先に終了したことを示します。
ある単純なファイルI/Oの例を示しましょう。下記のコードは文字を一度に1つずつ扱っており、従って非常に非効率であることに留意して下さい。現実世界のユースケースでは、Data.Conduit.Binaryを使うことを強く推奨します。
import System.IO
import Data.Conduit
import Control.Monad.IO.Class
import qualified Data.Conduit.List as CL
source = do
    handle <- liftIO $ openFile "test.txt" ReadMode
    addCleanup (const $ putStrLn "Closing handle" >> hClose handle) $ loop handle
  where
    loop handle = do
        eof <- liftIO $ hIsEOF handle
        if eof
            then return ()
            else do
                c <- liftIO $ hGetChar handle
                yield c
                loop handle
                
main = source $$ CL.isolate 5 =$ CL.mapM_ print
This is a test.
Exception Safety
このaddCleanupアプローチには、まだ1つ重大な欠陥があります。例外安全ではないということです!パイプラインの中の私たちのコンポーネントか、他の任意のコンポーネントによって例外が投げられると、Handleは正常に閉じられません。
例外の存在下でも動作することを保証するために、最後の関数bracketPを導入する必要があります。これは、標準のbracket関数と非常によく似た働きをします。あなたはそれに、乏しいリソースを確保する関数、そのリソースを開放する関数、そのリソースを使って何らかの処理を行う内部関数の3つをを与えます。
前述の非効率なファイルリーダーを、bracketPを使って書き直してみましょう。
import System.IO
import Data.Conduit
import Control.Monad.IO.Class
import qualified Data.Conduit.List as CL
import Control.Monad.Trans.Resource
-- Better with bracketP
source =
    bracketP
        (openFile "test.txt" ReadMode)
        (\handle -> putStrLn "Closing handle" >> hClose handle)
        loop
  where
    loop handle = do
        eof <- liftIO $ hIsEOF handle
        if eof
            then return ()
            else do
                c <- liftIO $ hGetChar handle
                yield c
                loop handle
-- An exception-throwing sink.
exceptionalSink = do
    c <- await
    liftIO $ print c
    error "This throws an exception"
-- We also need to call runResourceT
main = runResourceT $ source $$ exceptionalSink
(訳注: 最新のconduitではrunResourceTが別モジュールに分離しており?コンパイルエラーになるため、import文を追加しています)
runResourceTを呼んでいることに留意して下さい。この関数を抜ける時点で、ブロック内部で確保されたすべてのリソースは開放されます。ResourceTに関するさらなる情報については、Control.Monad.Trans.Resourceを参照して下さい。
Connect and resume
conduitは反転した制御形式を採っています。あなたは、もはやプログラムの実行の流れを制御する必要はありません。その代わりに、個々のコンポーネントがいつ入力を必要としていつ出力を与えるのかを宣言的に記述します。そして、conduitはすべてが正しい順序で回ることを保証します。多くのユースケースでは、これで十分です。しかしながら、いくつかのケースでは実行の流れをもっと制御したいことがあるかもしれません。"connect and resume"はそのような抜け道を提供します。
"connect and resume"は**再利用可能なソース (resumable source)**の概念を導入します。これは、部分的に動作を終えているが、別のSinkに再接続することで継続することができるSourceです。ResumableSourceを生成するためには、$$+ (connect-and-resume) 演算子を使用します。ResumableSourceを新しいSinkに接続して更新されたResumableSourceを得るために、$$++演算子を使用します。最後に、$$+- 演算子を使用してResumableSourceを最後のSinkに接続します。
import Data.Conduit
import qualified Data.Conduit.List as CL
main = do
    (rsrc1, result1) <- CL.sourceList [1..10] $$+ CL.take 3
    (rsrc2, result2) <- rsrc1 $$++ CL.take 3
    result3 <- rsrc2 $$+- CL.consume
    print (result1, result2, result3)
ResumableSourceに関して注意しないといけない重要な点は、それに関連付けられたクリーンアップ関数があるかもしれないことです。そのため、必ず 最終的に$$+-を呼ばないといけません。さもなければ、それらのリソースのクリーンアップが遅延するリスクを冒すことになります。
"connect and resume"は、通常はより複雑な制御フロー操作の中でのみ議題に上がります。従って、conduitの通常の使用の中で出くわす可能性は少ないでしょう。これを利用するライブラリの1つはhttp-conduitです。そこでは、ResumableSourceはHTTP応答の本体を表すためにhttp関数によって返されます。
Further reading
- Data.Conduit データ型と核となる操作を定義する主要なモジュール
- Data.Conduit.List 汎用的なヘルパー関数のコレクション
- Data.Conduit.Network ネットワークのサーバとクライアントを生成する