7
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Conduit (1) - Haskell Streaming library

Last updated at Posted at 2020-04-11

Conduit (1) - Haskell Streaming library - Qiita
Conduit (2) - Monadic composition - Qiita
Conduit (3) - Resource - Qiita

Haskell のStreaming ライブラリである Conduit についての記事です。

記事のソースは作者による以下の記事です。
Conduit Documents - snoyberg/conduit@github

APIは以下にあります。
conduit: Streaming data processing library

1. テスト環境構築

まずはstackプロジェクトを作成します。

stack new test
cd test

動作を確認します。

stack build && stack exec test-exe

以下のような出力が得られます。

someFunc

それではソースコードを修正してConduitの最初のサンプルコードを試してみます。
stackプロジェクトの、最初のソースを確認しておきます。

メインは以下の通りです。

app/Main.hs
module Main where

import qualified Synopsis       as S0
import qualified List1          as S1
import qualified List2          as S2
import qualified List3          as S3

main :: IO ()
main = S0.runTest

メインから以下の関数が呼ばれます。(以下必要に応じてS2.runTestやS3.runTestを呼ぶように書き換えます。)

Synopsis.hs
module Synopsis
    ( runTest
    ) where

import Conduit

runTest :: IO ()
runTest = do
    -- Pure operations: summing numbers.
    print $ runConduitPure $ yieldMany [1..10] .| sumC

    -- Exception safe file access: copy a file.
    writeFile "input.txt" "This is a test." -- create the source file
    runConduitRes $ sourceFileBS "input.txt" .| sinkFile "output.txt" -- actual copying
    readFile "output.txt" >>= putStrLn -- prove that it worked

    -- Perform transformations.
    print $ runConduitPure $ yieldMany [1..10] .| mapC (+ 1) .| sinkList

必要なライブラリを設定します。

package.yaml
dependencies:
- base >= 4.7 && < 5
- conduit

再コンパイルして実行します。

stack build && stack exec test-exe

以下のような結果が得られました。これでConduitのテスト環境が得られました。

stack exec test-exe

55
This is a test.
[2,3,4,5,6,7,8,9,10,11]

2. Conduitの基本

Conduitはザックリ言えば、以下のようなものです。

  • Conduit は日本語で「導管」の意味です
  • Conduit はデータの流れ(streams) を扱います。
  • pipeline の各componentは、上流(upstream)からのデータを消費(consume)し、データを生産(produce)し、下流(downstream)へ送ります。

2-1. conduit pipeline

pipelineの例を見てみましょう。

pipeline-example
runConduit $ yieldMany [1..10] .| mapC show .| mapM_C print
  • このpipelineは3つのcomponentから構成される:yieldMany [1..10], mapC show, mapM_C print
  • これらのcomponentは .| オペレーターで結合され、ひとつのpipelineとなっている。
  • mapC show の観点からみれば、yieldMany [1..10]upstream であり、mapM_C printdownstream です。

以下は2つのcomponentが結合され、より大きな1つのcomponentを生み出しています。このcomponentに注目しましょう。

component1
yieldMany [1..10] .| mapC show
  • yieldManyupstream からは、何も consume しません。Int streamproduce してます。
  • mapC showInt streamconsume し、String streamproduce しています。
  • 結果的に、結合された1つのcomponentは、 upstream からは、何も consume せず、String streamproduce しています。
signatures
yieldMany [1..10] :: ConduitT ()  Int    IO ()
mapC show         :: ConduitT Int String IO ()

ConduitT は Conduit の コアdatatypeです。それはmonad transformerであり IO とかの base monad を含んでいます。

ConduitTの4つのパラメータ:

  • 第1引数 : upstream value, または input を意味する
    • yieldMany の場合は () :upstream からは、何も consume しないことを意味する
    • mapC の場合は Int : Int streamconsume することを意味する
  • 第2引数 : downstream value, または output を意味する
    • yieldMany の場合は Int : mapC の第1引数にマッチ
    • mapC の場合は String : mapCのoutputはStringであることを意味する
  • 第3引数 : Monad transformer ConduitT のbase monad. この場合、IO モナド。
  • 第4引数 : componentのresult の type。

*補足:Monad transformerについては以下の過去記事を参照してください。
【Control.Monad.Trans】(1) Identityモナド - Qiita

視点を変えます。 .| オペレーターに視点を移します:

(.|)
(.|) :: Monad m
     => ConduitT a b m ()  -- A component
     -> ConduitT b c m r   -- B component
     -> ConduitT a c m r   -- C component
  • Aのoutput typeとBのinput typeは等しい。
  • Aのresultは無視して、Bのresultを保持する。
  • AとBを結合したCについて。AとCのinputのtypeは等しい。BとCのoutputのtypeは等しい。
  • A,B,Cは同じbase monadで走る。

結果として、.| オペレーターで結合されたcomponentの型は以下のようになります。

component1
yieldMany [1..10] .| mapC show   :: ConduitT () String IO ()

また、最初のpipelineにもどって、3番目のcomponentに注目すると以下の型が得られます。

mapM_C
mapM_C :: Monad m => (a -> m ()) -> ConduitT a o m ()
print :: Show a => a -> IO ()
mapM_C print :: Show a => ConduitT a o IO ()

最終的にこのpipelineの型は以下のようになります。

pipeline-signatures
yieldMany [1..10] .| mapC show .| mapM_C print  :: ConduitT ()  o IO ()

結論です。runConduitの説明は次に行いますが、このpipelineをrunConduitで走らせると、以下のようにIO()モナドの中で走ることになります。

pipeline-example
runConduit $ yieldMany [1..10] .| mapC show .| mapM_C print :: IO ()

runConduitで、pipelineの最後のcomponentのbase monadのresultが返ります。この場合IOモナドでresult ()が返ることになります。

2-2. runConduit で monad を取り出す

再度 .| オペレーターを見てみましょう。

(.|)
(.|) :: Monad m            -- infixr 2 右結合
     => ConduitT a b m ()  -- A component
     -> ConduitT b c m r   -- B component
     -> ConduitT a c m r   -- C component

結合前の最後のcomponent の base monad (m r)が、そのまま結合された Conduit のbase monadになっています。これはpipelineを流れてきたデータを最後のcomponentで集計して、result値としてmonad (m r)を作り出すイメージです。 具体的には runConduit でpiplelineを走らせ、result値としての monad を取り出します。

runConduit のsignatureは以下の通りです。

runConduit
runConduit :: Monad m => ConduitT () Void m r -> m r

runConduit は、入力としてスタンドアロンcomponentをとり、結果としてmonad (m r)を取り出します。スタンドアロンcomponentとは、upstreamからは何も消費せず、dowunstream へ何も生産しないものを意味しています。
このsignatureでは、upstreamからは何も消費しないことは () で示されています。他方、downstream へ何も流さないことは Void で示されています。()とVoidの使い分けは、とりあえずはこのまま覚えておくとして、詳細を知りたい方は以下のリンクを読むことを勧められています。
To Void or to void. - FPComplete

runConduitPure のsignatureは以下の通りです。ただ単に base monad を Identity としただけです。

runConduitPure
runConduitPure :: ConduitT () Void Identity r -> r

3. スマートでない List としての Conduit

スマートでない List としての Conduitの使い方です。ConduitをListの代替物として利用することが可能です。Listを同じような機能を提供してくれますが、単にListの代替として使うには、ちょっと使いにくいインターフェースです。

3-1. runConduitPure

以下の例では、runConduitPurepureconduit pipeline を走らせます。ここでpureとはモナドを含んでいないことを意味しています。この場合、runConduitPureはListを返します。printでそれを出力します。

List2.hs
module List2
    ( runTest
    ) where

import Conduit

runTest :: IO ()
runTest = do
    putStrLn "List version:"
    print $ takeWhile (< 18) $ map (* 2) $ take 10 [1..]
    putStrLn ""
    putStrLn "Conduit version:"
    print $ runConduitPure
          $ yieldMany [1..]
         .| takeC 10
         .| mapC (* 2)
         .| takeWhileC (< 18)
         .| sinkList

{-
stack build && stack exec test-exe

List version:
[2,4,6,8,10,12,14,16]

Conduit version:
[2,4,6,8,10,12,14,16]
-}

3-2. runConduit

以下の例では、runConduitconduit pipeline を走らせます。このpipelineは最後にモナドアクション printを含んでいます。mapM_ の Conduitバージョンである mapM_C を利用しています。

List3.hs
module List3
    ( runTest
    ) where

import Conduit

runTest :: IO ()
runTest = do
    putStrLn "List version:"
    mapM_ print $ takeWhile (< 18) $ map (* 2) $ take 10 [1..]
    putStrLn ""
    putStrLn "Conduit version:"
    runConduit
          $ yieldMany [1..]
         .| takeC 10
         .| mapC (* 2)
         .| takeWhileC (< 18)
         .| mapM_C print

{-
stack build && stack exec test-exe

List version:
2
4
6
8
10
12
14
16

Conduit version:
2
4
6
8
10
12
14
16
-}

4. Signatures - 補足

これまでの登場人物の Signatures をまとめたものです

Signatures
data ConduitT i o m r

-- Run a pipeline until processing completes.
runConduit :: Monad m => ConduitT () Void m r -> m r

-- Run a pure pipeline until processing completes, i.e. a pipeline with Identity as the base monad. 
-- This is equivalient to runIdentity . runConduit.
runConduitPure :: ConduitT () Void Identity r -> r

-- Run a pipeline which acquires resources with ResourceT, and then run the ResourceT transformer. 
-- This is equivalent to runResourceT . runConduit.
runConduitRes :: MonadUnliftIO m => ConduitT () Void (ResourceT m) r -> m r


(.|) :: Monad m	 
     => ConduitM a b m ()	-- upstream
     -> ConduitM b c m r	-- downstream
     -> ConduitM a c m r	 


-- Stream the contents of a file as binary data.
sourceFileBS :: MonadResource m => FilePath -> ConduitT i ByteString m ()

-- Yield each of the values contained by the given MonoFoldable.
-- This will work on many data structures, including lists, ByteStrings, and Vectors.
yieldMany :: (Monad m, MonoFoldable mono) => mono -> ConduitT i (Element mono) m ()

-- Consume all values from the stream and return as a list. 
-- Note that this will pull all values into memory.
sinkList :: Monad m => ConduitT a o m [a]

-- Stream all incoming data to the given file
sinkFile :: MonadResource m => FilePath -> ConduitT ByteString o m ()

-- Get the sum of all values in the stream.
sumC :: (Monad m, Num a) => ConduitT a o m a

-- Stream up to n number of values downstream.
takeC :: Monad m => Int -> ConduitT a a m ()

-- Apply a transformation to all values in a stream. 
mapC :: Monad m => (a -> b) -> ConduitT a b m ()

-- Apply the action to all values in the stream.
mapM_C :: Monad m => (a -> m ()) -> ConduitT a o m ()
7
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?