4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Conduit (2) - Monadic composition

Last updated at Posted at 2020-04-18

Conduit (1) - Haskell Streaming library - Qiita
Conduit (2) - Monadic composition - Qiita
Conduit (3) - Resource - Qiita

Haskell のStreaming ライブラリである Conduit についての記事です。

記事のソースは作者による以下の記事です。
Conduit Documents - snoyberg/conduit@github

APIは以下にあります。
conduit: Streaming data processing library

前回、Conduitの2つのcomponentを**.|** オペレーター で結合して、より大きなcomponentを作る方法をみました。しかしcomponentを結合するもう一つの方法がります。モナドの結合を使う方法です。

1. Source での結合

upstreamでモナド結合を使います。

Monadic1.hs
module Monadic1
    ( runTest
    ) where

import Conduit

-- source :: Monad m => ConduitT i Int m ()
source :: ConduitT () Int IO ()  -- より具体的な型
source = do
    yieldMany [1..10]    -- A
    yieldMany [11..20]   -- B

runTest :: IO ()
runTest = runConduit $ source .| mapM_C print

ここでは2つのConditTモナド(yieldMany)を、通常のモナドのdo記法で並べてあります。結合されたsourceの動作は、まず初めに component A が1~10までdownstreamに流し、次に component B が11~20までを流す動作になります。

実際に動作させると以下のようになります。

stack build && stack exec test-exe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

2. Sink での結合

downstreamでモナド結合を使います。

Monadic2.hs
module Monadic2
    ( runTest
    ) where

import Conduit

sink :: Monad m => ConduitT Int o m (String, Int)
sink = do
    x <- takeC 5 .| mapC show .| foldC     -- C
    y <- sumC                              -- D
    return (x, y)

runTest :: IO ()
runTest = do
    let res = runConduitPure $ yieldMany [1..10] .| sink
    print res

sinkは2つのConduit componentが、モナドのdo記法で結合されています。結合されたcomponentは次のように動作します。
まず上流からは、1~10までの値が流れてきます。最初に component C が、先頭の値から5個消費します。残った5個のstreamの値を、component D が消費します。

実際に動作させると以下のような結果が得られます。

stack build && stack exec test-exe

("12345",40)

ちなみにfoldCとsumCは以下の通りです。

foldCはstreamの全値を結合(モノイド)します。

foldC
-- Monoidally combine all values in the stream.
foldC :: (Monad m, Monoid a) => ConduitT a o m a

sumCはstreamの全値を足しこみます。

sumC
-- Get the sum of all values in the stream.
sumC :: (Monad m, Num a) => ConduitT a o m a

3. transformer で使用

これまでupstream と downstreamでのモナド結合を見てきました。最後に、中間の位置でtransformer としてモナド結合を使う場合を見ます。

Monadic3.hs
module Monadic3
    ( runTest
    ) where

import Conduit

trans :: Monad m => ConduitT Int Int m ()
trans = do
    takeC 5 .| mapC (+ 1)    -- E
    mapC (* 2)               -- F

runTest :: IO ()
runTest = runConduit $ yieldMany [1..10] .| trans .| mapM_C print

上流から1~10の値をstreamに流します。
それをtransで加工して、downstreamに流しprintします。

transでは、E componentでstreamの最初の5個の値をとり1を足し算してdownstreamに流します。
streamには別に5個の値が残りますが、これはF componentで2を掛け算してdownstreamに流します。

実際に動作させて確認してみましょう。

stack build && stack exec test-exe

2
3
4
5
6
12
14
16
18
20

今回は以上です。

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?