Conduit (1) - Haskell Streaming library - Qiita
Conduit (2) - Monadic composition - Qiita
Conduit (3) - Resource - Qiita
Haskell のStreaming ライブラリである Conduit についての記事です。
記事のソースは作者による以下の記事です。
Conduit Documents - snoyberg/conduit@github
APIは以下にあります。
conduit: Streaming data processing library
1. テスト環境構築
まずはstackプロジェクトを作成します。
stack new test
cd test
動作を確認します。
stack build && stack exec test-exe
以下のような出力が得られます。
someFunc
それではソースコードを修正してConduitの最初のサンプルコードを試してみます。
stackプロジェクトの、最初のソースを確認しておきます。
メインは以下の通りです。
module Main where
import qualified Synopsis as S0
import qualified List1 as S1
import qualified List2 as S2
import qualified List3 as S3
main :: IO ()
main = S0.runTest
メインから以下の関数が呼ばれます。(以下必要に応じてS2.runTestやS3.runTestを呼ぶように書き換えます。)
module Synopsis
( runTest
) where
import Conduit
runTest :: IO ()
runTest = do
-- Pure operations: summing numbers.
print $ runConduitPure $ yieldMany [1..10] .| sumC
-- Exception safe file access: copy a file.
writeFile "input.txt" "This is a test." -- create the source file
runConduitRes $ sourceFileBS "input.txt" .| sinkFile "output.txt" -- actual copying
readFile "output.txt" >>= putStrLn -- prove that it worked
-- Perform transformations.
print $ runConduitPure $ yieldMany [1..10] .| mapC (+ 1) .| sinkList
必要なライブラリを設定します。
dependencies:
- base >= 4.7 && < 5
- conduit
再コンパイルして実行します。
stack build && stack exec test-exe
以下のような結果が得られました。これでConduitのテスト環境が得られました。
stack exec test-exe
55
This is a test.
[2,3,4,5,6,7,8,9,10,11]
2. Conduitの基本
Conduitはザックリ言えば、以下のようなものです。
- Conduit は日本語で「導管」の意味です
- Conduit はデータの流れ(streams) を扱います。
- pipeline の各componentは、上流(upstream)からのデータを消費(consume)し、データを生産(produce)し、下流(downstream)へ送ります。
2-1. conduit pipeline
pipelineの例を見てみましょう。
runConduit $ yieldMany [1..10] .| mapC show .| mapM_C print
- このpipelineは3つのcomponentから構成される:yieldMany [1..10], mapC show, mapM_C print
- これらのcomponentは .| オペレーターで結合され、ひとつのpipelineとなっている。
- mapC show の観点からみれば、yieldMany [1..10] は upstream であり、mapM_C print は downstream です。
以下は2つのcomponentが結合され、より大きな1つのcomponentを生み出しています。このcomponentに注目しましょう。
yieldMany [1..10] .| mapC show
- yieldMany は upstream からは、何も consume しません。Int stream を produce してます。
- mapC show は Int stream を consume し、String stream を produce しています。
- 結果的に、結合された1つのcomponentは、 upstream からは、何も consume せず、String stream を produce しています。
yieldMany [1..10] :: ConduitT () Int IO ()
mapC show :: ConduitT Int String IO ()
ConduitT は Conduit の コアdatatypeです。それはmonad transformerであり IO とかの base monad を含んでいます。
ConduitTの4つのパラメータ:
- 第1引数 : upstream value, または input を意味する
- yieldMany の場合は () :upstream からは、何も consume しないことを意味する
- mapC の場合は Int : Int stream を consume することを意味する
- 第2引数 : downstream value, または output を意味する
- yieldMany の場合は Int : mapC の第1引数にマッチ
- mapC の場合は String : mapCのoutputはStringであることを意味する
- 第3引数 : Monad transformer ConduitT のbase monad. この場合、IO モナド。
- 第4引数 : componentのresult の type。
*補足:Monad transformerについては以下の過去記事を参照してください。
【Control.Monad.Trans】(1) Identityモナド - Qiita
視点を変えます。 .| オペレーターに視点を移します:
(.|) :: Monad m
=> ConduitT a b m () -- A component
-> ConduitT b c m r -- B component
-> ConduitT a c m r -- C component
- Aのoutput typeとBのinput typeは等しい。
- Aのresultは無視して、Bのresultを保持する。
- AとBを結合したCについて。AとCのinputのtypeは等しい。BとCのoutputのtypeは等しい。
- A,B,Cは同じbase monadで走る。
結果として、.| オペレーターで結合されたcomponentの型は以下のようになります。
yieldMany [1..10] .| mapC show :: ConduitT () String IO ()
また、最初のpipelineにもどって、3番目のcomponentに注目すると以下の型が得られます。
mapM_C :: Monad m => (a -> m ()) -> ConduitT a o m ()
print :: Show a => a -> IO ()
mapM_C print :: Show a => ConduitT a o IO ()
最終的にこのpipelineの型は以下のようになります。
yieldMany [1..10] .| mapC show .| mapM_C print :: ConduitT () o IO ()
結論です。runConduitの説明は次に行いますが、このpipelineをrunConduitで走らせると、以下のようにIO()モナドの中で走ることになります。
runConduit $ yieldMany [1..10] .| mapC show .| mapM_C print :: IO ()
runConduitで、pipelineの最後のcomponentのbase monadのresultが返ります。この場合IOモナドでresult ()が返ることになります。
2-2. runConduit で monad を取り出す
再度 .| オペレーターを見てみましょう。
(.|) :: Monad m -- infixr 2 右結合
=> ConduitT a b m () -- A component
-> ConduitT b c m r -- B component
-> ConduitT a c m r -- C component
結合前の最後のcomponent の base monad (m r)が、そのまま結合された Conduit のbase monadになっています。これはpipelineを流れてきたデータを最後のcomponentで集計して、result値としてmonad (m r)を作り出すイメージです。 具体的には runConduit でpiplelineを走らせ、result値としての monad を取り出します。
runConduit のsignatureは以下の通りです。
runConduit :: Monad m => ConduitT () Void m r -> m r
runConduit は、入力としてスタンドアロンcomponentをとり、結果としてmonad (m r)を取り出します。スタンドアロンcomponentとは、upstreamからは何も消費せず、dowunstream へ何も生産しないものを意味しています。
このsignatureでは、upstreamからは何も消費しないことは () で示されています。他方、downstream へ何も流さないことは Void で示されています。()とVoidの使い分けは、とりあえずはこのまま覚えておくとして、詳細を知りたい方は以下のリンクを読むことを勧められています。
To Void or to void. - FPComplete
runConduitPure のsignatureは以下の通りです。ただ単に base monad を Identity としただけです。
runConduitPure :: ConduitT () Void Identity r -> r
3. スマートでない List としての Conduit
スマートでない List としての Conduitの使い方です。ConduitをListの代替物として利用することが可能です。Listを同じような機能を提供してくれますが、単にListの代替として使うには、ちょっと使いにくいインターフェースです。
3-1. runConduitPure
以下の例では、runConduitPureで pure な conduit pipeline を走らせます。ここでpureとはモナドを含んでいないことを意味しています。この場合、runConduitPureはListを返します。printでそれを出力します。
module List2
( runTest
) where
import Conduit
runTest :: IO ()
runTest = do
putStrLn "List version:"
print $ takeWhile (< 18) $ map (* 2) $ take 10 [1..]
putStrLn ""
putStrLn "Conduit version:"
print $ runConduitPure
$ yieldMany [1..]
.| takeC 10
.| mapC (* 2)
.| takeWhileC (< 18)
.| sinkList
{-
stack build && stack exec test-exe
List version:
[2,4,6,8,10,12,14,16]
Conduit version:
[2,4,6,8,10,12,14,16]
-}
3-2. runConduit
以下の例では、runConduit で conduit pipeline を走らせます。このpipelineは最後にモナドアクション printを含んでいます。mapM_ の Conduitバージョンである mapM_C を利用しています。
module List3
( runTest
) where
import Conduit
runTest :: IO ()
runTest = do
putStrLn "List version:"
mapM_ print $ takeWhile (< 18) $ map (* 2) $ take 10 [1..]
putStrLn ""
putStrLn "Conduit version:"
runConduit
$ yieldMany [1..]
.| takeC 10
.| mapC (* 2)
.| takeWhileC (< 18)
.| mapM_C print
{-
stack build && stack exec test-exe
List version:
2
4
6
8
10
12
14
16
Conduit version:
2
4
6
8
10
12
14
16
-}
4. Signatures - 補足
これまでの登場人物の Signatures をまとめたものです
data ConduitT i o m r
-- Run a pipeline until processing completes.
runConduit :: Monad m => ConduitT () Void m r -> m r
-- Run a pure pipeline until processing completes, i.e. a pipeline with Identity as the base monad.
-- This is equivalient to runIdentity . runConduit.
runConduitPure :: ConduitT () Void Identity r -> r
-- Run a pipeline which acquires resources with ResourceT, and then run the ResourceT transformer.
-- This is equivalent to runResourceT . runConduit.
runConduitRes :: MonadUnliftIO m => ConduitT () Void (ResourceT m) r -> m r
(.|) :: Monad m
=> ConduitM a b m () -- upstream
-> ConduitM b c m r -- downstream
-> ConduitM a c m r
-- Stream the contents of a file as binary data.
sourceFileBS :: MonadResource m => FilePath -> ConduitT i ByteString m ()
-- Yield each of the values contained by the given MonoFoldable.
-- This will work on many data structures, including lists, ByteStrings, and Vectors.
yieldMany :: (Monad m, MonoFoldable mono) => mono -> ConduitT i (Element mono) m ()
-- Consume all values from the stream and return as a list.
-- Note that this will pull all values into memory.
sinkList :: Monad m => ConduitT a o m [a]
-- Stream all incoming data to the given file
sinkFile :: MonadResource m => FilePath -> ConduitT ByteString o m ()
-- Get the sum of all values in the stream.
sumC :: (Monad m, Num a) => ConduitT a o m a
-- Stream up to n number of values downstream.
takeC :: Monad m => Int -> ConduitT a a m ()
-- Apply a transformation to all values in a stream.
mapC :: Monad m => (a -> b) -> ConduitT a b m ()
-- Apply the action to all values in the stream.
mapM_C :: Monad m => (a -> m ()) -> ConduitT a o m ()