Conduit (1) - Haskell Streaming library - Qiita
Conduit (2) - Monadic composition - Qiita
Conduit (3) - Resource - Qiita
Haskell のStreaming ライブラリである Conduit についての記事です。
記事のソースは作者による以下の記事です。
Conduit Documents - snoyberg/conduit@github
APIは以下にあります。
conduit: Streaming data processing library
前回、Conduitの2つのcomponentを**.|** オペレーター で結合して、より大きなcomponentを作る方法をみました。しかしcomponentを結合するもう一つの方法がります。モナドの結合を使う方法です。
1. Source での結合
upstreamでモナド結合を使います。
module Monadic1
( runTest
) where
import Conduit
-- source :: Monad m => ConduitT i Int m ()
source :: ConduitT () Int IO () -- より具体的な型
source = do
yieldMany [1..10] -- A
yieldMany [11..20] -- B
runTest :: IO ()
runTest = runConduit $ source .| mapM_C print
ここでは2つのConditTモナド(yieldMany)を、通常のモナドのdo記法で並べてあります。結合されたsourceの動作は、まず初めに component A が1~10までdownstreamに流し、次に component B が11~20までを流す動作になります。
実際に動作させると以下のようになります。
stack build && stack exec test-exe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2. Sink での結合
downstreamでモナド結合を使います。
module Monadic2
( runTest
) where
import Conduit
sink :: Monad m => ConduitT Int o m (String, Int)
sink = do
x <- takeC 5 .| mapC show .| foldC -- C
y <- sumC -- D
return (x, y)
runTest :: IO ()
runTest = do
let res = runConduitPure $ yieldMany [1..10] .| sink
print res
sinkは2つのConduit componentが、モナドのdo記法で結合されています。結合されたcomponentは次のように動作します。
まず上流からは、1~10までの値が流れてきます。最初に component C が、先頭の値から5個消費します。残った5個のstreamの値を、component D が消費します。
実際に動作させると以下のような結果が得られます。
stack build && stack exec test-exe
("12345",40)
ちなみにfoldCとsumCは以下の通りです。
foldCはstreamの全値を結合(モノイド)します。
-- Monoidally combine all values in the stream.
foldC :: (Monad m, Monoid a) => ConduitT a o m a
sumCはstreamの全値を足しこみます。
-- Get the sum of all values in the stream.
sumC :: (Monad m, Num a) => ConduitT a o m a
3. transformer で使用
これまでupstream と downstreamでのモナド結合を見てきました。最後に、中間の位置でtransformer としてモナド結合を使う場合を見ます。
module Monadic3
( runTest
) where
import Conduit
trans :: Monad m => ConduitT Int Int m ()
trans = do
takeC 5 .| mapC (+ 1) -- E
mapC (* 2) -- F
runTest :: IO ()
runTest = runConduit $ yieldMany [1..10] .| trans .| mapM_C print
上流から1~10の値をstreamに流します。
それをtransで加工して、downstreamに流しprintします。
transでは、E componentでstreamの最初の5個の値をとり1を足し算してdownstreamに流します。
streamには別に5個の値が残りますが、これはF componentで2を掛け算してdownstreamに流します。
実際に動作させて確認してみましょう。
stack build && stack exec test-exe
2
3
4
5
6
12
14
16
18
20
今回は以上です。