conduitの学習のため、Conduit Overview - School of Haskell を翻訳してみました。全体的に平易な英語で書かれているので、原文そのままでも問題なく読めると思いますが、これから学習する方の役に立てればと思い公開します。
なお、練習問題の解答は原文の方で確認して下さい。
Conduit Overview
conduit
はデータのストリーミングのための1つのソリューションで、一定のメモリの中でストリームの生成、変換、消費を可能にします。イベント駆動的ななやり方で、ファイルやネットワークインターフェイスの処理、構造化されたデータの解析に使用することができます。
このチュートリアルに加え、多くのトピックを含む conduitについてのスライド があります。
Synopsis
import Data.Conduit
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
main = do
-- Pure operations: summing numbers.
result <- CL.sourceList [1..10] $$ CL.fold (+) 0
print result
-- Exception safe file access: copy a file.
writeFile "input.txt" "This is a test."
runResourceT $ CB.sourceFile "input.txt" $$ CB.sinkFile "output.txt"
readFile "output.txt" >>= putStrLn
-- Perform transformations.
result <- CL.sourceList [1..10] $$ CL.map (+ 1) =$ CL.consume
print result
Features of conduit
-
conduit
は大きな (あるいは無限の) ストリームデータを一定のメモリの中で扱うことを可能にします。データの塊 (chunk) は全体を一度に読み込む必要はなく、一度に1ピースずつ扱われます。 -
conduit
はまた、決定的なリソースの利用を可能にします。ファイルディスクリプタのような乏しいリソースを扱うとき、conduit
はリソースが必要なくなったらすぐに再利用するように設計されています。遅延I/Oとは対照的に、一定のメモリを使用しますがリソースの消費は決定的です。(訳注: ここは、ちょっと意味が取れていません。どなたか校正おねがいします。) - リソースの使用はまた、例外安全です。ファイルハンドルは、例外の存在下でも必ずリサイクルされます。この機能の大部分は、後述する関連の
resourcet
パッケージによって提供されます。 - 組み込みのコンポーネントを組み合わせて、より複雑な構造を作り上げるのは簡単です。私たちの目標は、命令型の世界の処理の中への純粋なコードの組み込みの容易性を保つことです。
Basics
conduit
パッケージの主要なモジュールはData.Conduit
で、核となるデータ型やプリミティブなオペレーションを提供します。もう1つの汎用的に使用されるモジュールはData.Conduit.List
で、map
やfold
といった標準的なHaskellの概念に基づいた、一般的なケースのための多くのヘルパー関数を提供します。
conduitには、3つの主要な概念があります。Source
はデータのストリームを生成し、それを*下流 (downstream)に送ります。Sink
は上流 (upstream)*から来るデータのストリームを消費 (consume) し、戻り値を生成します。Synopsis節の最初の例では、sourceList
がInteger
のストリームを生成し、fold
がそれを消費して55という戻り値を生成しました。3つ目の概念はConduit
です。これは、上流から受け取った値のストリームを消費して、新しいストリームを生成して下流に送ります。Synopsis節では、map
の呼び出しがInteger
の1つのストリームを消費し、それぞれの値に1を加えてその結果を下流に送りました。
これらの3つの異なるコンポーネントを組み合わせるために、*接続 (connect)と結合 (fuse)*があります。接続の演算子は$$
です。これは、Source
とSink
を組み合わせて、前者の生成した値を後者に与え、最終的な結果を生成します。一方、結合は2つのコンポーネントをとって1つの新たなコンポーネントを生成します。例えば、=$
はConduit
とSink
を結合して新たなSink
を生成できます。これは、元々のConduit
と同じ値を消費し、元々のSink
と同じ結果を生成します。他の2つの結合の演算子は、Source
とConduit
を組み合わせて新たなSource
にする$=
、2つのConduit
を組み合わせて新たなConduit
にする=$=
です。
import Data.Conduit
import qualified Data.Conduit.List as CL
source :: Source IO Int -- produces a stream of Ints
source = CL.sourceList [1..4]
sink :: Sink String IO () -- consumes a stream of Strings, no result
sink = CL.mapM_ putStrLn
conduit :: Conduit Int IO String -- converts Ints into Strings
conduit = CL.map show
main = do
source $$ conduit =$ sink
-- alternatively, with the same meaning
source $= conduit $$ sink
Exercise
すべての入力される数を2倍するConduit
を書き、それを上記のコードに組み込みなさい。期待する結果を得られる接続と結合の演算子の使い方は複数あることに留意し、それらのいくつかを示しなさい。
Unified data type
水面下では、3つの核となるデータ型はすべて、1つの同じ型ConduitM
の単なるラッパーです。この1つの統一された型をラッピングすることで、多くのコードを再利用でき、より簡単にconduitの中のコンポーネントを組み合わせることができます。
ConduitM
は4つの型パラメータを取ります。上流から受け取る入力、下流に送る出力、基底モナド、戻り値です。特殊化された型は、以下のように定義されます。
type Source m a = ConduitM () a m () -- no meaningful input or return value
type Conduit a m b = ConduitM a b m () -- no meaningful return value
type Sink a m b = ConduitM a Void m b -- no meaningful output value
ConduitM
はモナド変換子です。それ故、基底モナドの操作を持ち上げる (lift) ことができます (後述の "Lift Operations" 参照)。そして、複数のコンポーネントを容易に組み合わせることができます。これは、より単純なコンポーネントから複雑なメカニズムを作り出すことを簡単にします。
ConduitM
データ型を直接扱うことは滅多にないでしょうが、エラーメッセージの中にはときどき現れるでしょう。
Primitives
conduit
ライブラリには、以下の3つのプリミティブな関数があります。
-
await
は、(入手可能であれば) 上流から1つの値を受け取ります。 -
yield
は、下流に1つの値を送ります。 -
leftover
は1つの値を上流のキューに戻し、次のawait
呼び出しで読み出されるようにします。
import Data.Conduit
import Control.Monad.IO.Class
source :: Source IO Int
source = do
yield 1
yield 2
yield 3
yield 4
conduit :: Conduit Int IO String
conduit = do
-- Get all of the adjacent pairs from the stream
mi1 <- await
mi2 <- await
case (mi1, mi2) of
(Just i1, Just i2) -> do
yield $ show (i1, i2)
leftover i2
conduit
_ -> return ()
sink :: Sink String IO ()
sink = do
mstr <- await
case mstr of
Nothing -> return ()
Just str -> do
liftIO $ putStrLn str
sink
main = source $$ conduit =$ sink
Exercises
-
yield
を用いてsourceList
を実装しなさい。sourceList :: Monad m => [a] -> Source m a sourceList = ???
-
conduit
ライブラリにはawaitForever
というヘルパー関数があります。awaitForever
を用いて上述のsink
を書き直しなさい。sink :: Sink String IO () sink = ???
-
あなた独自の
awaitForever
を実装しなさい。myAwaitForever :: Monad m => (a -> Conduit a m b) -> Conduit a m b myAwaitForever f = ???
Monadic chaining
上述の例は、標準的なモナドの連結 (monadic binding) を用いて、どのようにプリミティブ関数を組み合わせることができるかを示したものです。これは、単にプリミティブに適用するだけではなく、より大きなコンポーネントを組み合わせることも可能です。受け取った任意の値を3回出力する、"triple" Conduit
を考えてみましょう。
import Data.Conduit
import qualified Data.Conduit.List as CL
triple :: Monad m => Conduit a m a
triple = do
ma <- await
case ma of
Nothing -> return ()
Just a -> do
CL.sourceList [a, a, a]
triple
main = CL.sourceList [1..4] $$ triple =$ CL.mapM_ print
ミニ練習: 上記をawaitForever
を使って (より短くなるように) 書き直しなさい。
この例から分かるように、sourceList
のようなより高レベルの関数を、より大きな関数の中に組み込むことができます。1つ質問があるとすれば、どうしてConduit
の本体の中でSource
を使えるのか?ということかもしれません。それについては、後述の "Generalizing" の節の中でProducer
とConsumer
と共に議論しましょう。
練習: Int
のストリームを消費 (consume) するConduit
を書きなさい。それは、ストリームから最初のInt
を受け取り、それを後続の各々のInt
に対して掛けあわせた値を下流に送ります。この処理には、Data.Conduit.List.map
関数を使うとよいでしょう。
Lifting operations
Conduit
はモナド変換子なので、基底モナドがサポートする任意の操作を行うことができます。CL.mapM_ print
の使用により、暗黙的ですが私たちはそれを既に見ています。しかし、lift
やliftIO
を明示的に使用することもできます。また、使えるのはIO
モナドに限りません。State
モナドの例を見てみましょう。
import Control.Monad.State
import Data.Conduit
import qualified Data.Conduit.List as CL
source :: Source (State Int) Int
source = do
x <- lift get
if x <= 0
then return ()
else do
yield x
lift $ modify (\x -> x - 2)
source
conduit :: Conduit Int (State Int) (Int, Int)
conduit = awaitForever $ \i -> do
lift $ modify (+ 1)
x <- lift get
yield (i, x)
main :: IO ()
main = do
let result :: State Int [(Int, Int)]
result = source $$ conduit =$ CL.consume
print $ runState result 5
Generalizing
前述のtriple conduitにおいて、どうしてConduit
の中でSource
を使うことができたのでしょう?型チェックされないんでしょうか?結局のところ、Source
は入力の型が()
に制限されているのに対し、Conduit
は任意の入力の型を取れます。前述の例では入力はInt
であり、うまく行かないはずだったんですが。
その答えは、私たちがここで導入する最後の2つの型シノニムにあります。Producer
は一般化されたSource
です。()
の入力ストリームを消費する代わりに、 任意の 入力の型を消費することができます。従って、Producer
はSource
とConduit
の両方になることができます。同様に、Consumer
は任意の型を出力できます。従って、Consumer
はConduit
とSink
のどちらにもなれます。
この一般化のため、ほとんどのライブラリ関数はProducer
かConsumer
を使って書かれるでしょう。ユーザとしては、(Conduit
としての機能も使う必要があるのでなければ) たいていのコードでは単にSource
とSink
を使えばよいです。
そして、後からSource
をConduit
に変換したくなった場合 (例えば、他人のライブラリを使っているとき) には、toProducer、Sink
に対してはtoConsumerを使うことができます。
Termination
コンポーネントの一連の流れ (私たちはそれをパイプライン (pipeline) と呼びます) のライフタイムについてお話しましょう。パイプラインは、常に下流から駆動されます。これは、例えばSource
とSink
を接続した場合、Sink
から処理を始めるということです。
Sinkは、さらなる入力が必要となるまで処理を続けます。その後await
を呼んで、新しい入力が来るまで処理が中断されます。Sourceの方は、下流が入力を要求した時に起動され、値を生成すると下流に制御を戻します。Sink
が終端 (terminate) するとすぐに、パイプライン全体が終了します。
次の例は、パイプラインのコンポーネントが相互にどのように作用するかを示します。sink
へのパラメータを2から4に変更し、それが出力にどのような影響を与えるのかを確認してみて下さい。
import Data.Conduit
import Control.Monad.IO.Class
source = do
liftIO $ putStrLn "source: yielding 1"
yield 1
liftIO $ putStrLn "source: yielding 2"
yield 2
liftIO $ putStrLn "source: yielding 3"
yield 3
conduit = do
liftIO $ putStrLn "conduit calling await"
mx <- await
case mx of
Nothing -> liftIO $ putStrLn "Nothing left, exiting"
Just x -> do
liftIO $ putStrLn $ "conduit yielding " ++ show x
yield x
conduit
sink 0 = liftIO $ putStrLn "sink is finished, terminating"
sink i = do
liftIO $ putStrLn $ "sink: still waiting for " ++ show i
mx <- await
case mx of
Nothing -> liftIO $ putStrLn "sink: Nothing from upstream, exiting"
Just x -> do
liftIO $ putStrLn $ "sink received: " ++ show x
sink (i - 1)
main = source $$ conduit =$ sink 2
ここまでに述べたことに基づくと、ある大きな制約があります。Source
とConduit
は自分自身でクリーンアップ (clean-up) 処理をする方法がありません。下流にある任意のコンポーネントが終端するとすぐに、それらは終端させられるからです。これを解決するために、conduit
はターミネーター (terminator) の概念をサポートします。Source
またはConduit
が下流に値をyield
する毎に、追加的にクリーンアップ関数を含むことができます。Source
がyield
する毎に、前回生成したクリーンアップ関数を上書きします。簡単な例を見てみましょう。
import Data.Conduit
import qualified Data.Conduit.List as CL
source =
loop 1
where
loop i = do
yieldOr i $ putStrLn $ "Terminated when yielding: " ++ show i
loop $ i + 1
main = source $$ CL.isolate 7 =$ CL.mapM_ print
今までの例の中では、これはそんなに重要なことではなかったのですが、ファイルディスクリプタのような乏しいリソースを扱い始めると、使い終わったらすぐにディスクリプタを閉じる機能が必要になります。
あなたのコードベースに一々yieldOr
を入れていくのは、あまりに面倒でしょう。代わりに、addCleanup
関数を使うのが通常はより簡単です。これは、終了時に特定の関数が呼ばれることを保証します。クリーンアップ関数には、Bool
型のパラメータが与えられます。これがTrue
であればコンポーネントは正常に完了するまで動作したことを示し、False
であれば下流が先に終了したことを示します。
ある単純なファイルI/Oの例を示しましょう。下記のコードは文字を一度に1つずつ扱っており、従って非常に非効率であることに留意して下さい。現実世界のユースケースでは、Data.Conduit.Binary
を使うことを強く推奨します。
import System.IO
import Data.Conduit
import Control.Monad.IO.Class
import qualified Data.Conduit.List as CL
source = do
handle <- liftIO $ openFile "test.txt" ReadMode
addCleanup (const $ putStrLn "Closing handle" >> hClose handle) $ loop handle
where
loop handle = do
eof <- liftIO $ hIsEOF handle
if eof
then return ()
else do
c <- liftIO $ hGetChar handle
yield c
loop handle
main = source $$ CL.isolate 5 =$ CL.mapM_ print
This is a test.
Exception Safety
このaddCleanup
アプローチには、まだ1つ重大な欠陥があります。例外安全ではないということです!パイプラインの中の私たちのコンポーネントか、他の任意のコンポーネントによって例外が投げられると、Handle
は正常に閉じられません。
例外の存在下でも動作することを保証するために、最後の関数bracketP
を導入する必要があります。これは、標準のbracket
関数と非常によく似た働きをします。あなたはそれに、乏しいリソースを確保する関数、そのリソースを開放する関数、そのリソースを使って何らかの処理を行う内部関数の3つをを与えます。
前述の非効率なファイルリーダーを、bracketP
を使って書き直してみましょう。
import System.IO
import Data.Conduit
import Control.Monad.IO.Class
import qualified Data.Conduit.List as CL
import Control.Monad.Trans.Resource
-- Better with bracketP
source =
bracketP
(openFile "test.txt" ReadMode)
(\handle -> putStrLn "Closing handle" >> hClose handle)
loop
where
loop handle = do
eof <- liftIO $ hIsEOF handle
if eof
then return ()
else do
c <- liftIO $ hGetChar handle
yield c
loop handle
-- An exception-throwing sink.
exceptionalSink = do
c <- await
liftIO $ print c
error "This throws an exception"
-- We also need to call runResourceT
main = runResourceT $ source $$ exceptionalSink
(訳注: 最新のconduitではrunResourceTが別モジュールに分離しており?コンパイルエラーになるため、import文を追加しています)
runResourceT
を呼んでいることに留意して下さい。この関数を抜ける時点で、ブロック内部で確保されたすべてのリソースは開放されます。ResourceT
に関するさらなる情報については、Control.Monad.Trans.Resourceを参照して下さい。
Connect and resume
conduit
は反転した制御形式を採っています。あなたは、もはやプログラムの実行の流れを制御する必要はありません。その代わりに、個々のコンポーネントがいつ入力を必要としていつ出力を与えるのかを宣言的に記述します。そして、conduit
はすべてが正しい順序で回ることを保証します。多くのユースケースでは、これで十分です。しかしながら、いくつかのケースでは実行の流れをもっと制御したいことがあるかもしれません。"connect and resume"はそのような抜け道を提供します。
"connect and resume"は**再利用可能なソース (resumable source)**の概念を導入します。これは、部分的に動作を終えているが、別のSink
に再接続することで継続することができるSource
です。ResumableSource
を生成するためには、$$+
(connect-and-resume) 演算子を使用します。ResumableSource
を新しいSink
に接続して更新されたResumableSource
を得るために、$$++
演算子を使用します。最後に、$$+-
演算子を使用してResumableSource
を最後のSink
に接続します。
import Data.Conduit
import qualified Data.Conduit.List as CL
main = do
(rsrc1, result1) <- CL.sourceList [1..10] $$+ CL.take 3
(rsrc2, result2) <- rsrc1 $$++ CL.take 3
result3 <- rsrc2 $$+- CL.consume
print (result1, result2, result3)
ResumableSource
に関して注意しないといけない重要な点は、それに関連付けられたクリーンアップ関数があるかもしれないことです。そのため、必ず 最終的に$$+-
を呼ばないといけません。さもなければ、それらのリソースのクリーンアップが遅延するリスクを冒すことになります。
"connect and resume"は、通常はより複雑な制御フロー操作の中でのみ議題に上がります。従って、conduit
の通常の使用の中で出くわす可能性は少ないでしょう。これを利用するライブラリの1つはhttp-conduitです。そこでは、ResumableSource
はHTTP応答の本体を表すためにhttp関数によって返されます。
Further reading
- Data.Conduit データ型と核となる操作を定義する主要なモジュール
- Data.Conduit.List 汎用的なヘルパー関数のコレクション
- Data.Conduit.Network ネットワークのサーバとクライアントを生成する