Edited at

Conduitの使い方

More than 1 year has passed since last update.

使えると便利、Conduitの大雑把な使い方です。


目次


使い始める前に

使い始める前に、Hackage: conduitから適切な関数を選択する為の最低限の知識として次の事を覚えましょう。


  • 何するものなの?

  • Conduitを構成するベースの型1つ

  • 基本の3つの型

  • 2つのオマケの型


何をするものなのか?

Conduitはストリームデータを処理する為のライブラリです。

ストリームデータ処理の流れを「データの取得」「データの加工」「最終処理」の3段階に分けて記述していきます。

使用するに当たってはモナド変換子の知識が多少必要になります。

liftとかMonadIOをとりあえず使える程度でOKです。


ベースの型

Conduitの中核となるベースの型の定義は下記の様になります。

data ConduitM i o m r

各型パラメータの意味は下記の様になります。


  • i は処理の入力として取る値の型

  • o は次の処理に流す値の型

  • m は基底モナドの型

  • r は処理終了後に返す値の型

ConduitMの部分はどうでもよく、各型パラメータの意味が次に説明する型の理解に必要となります。

なお、ConduitMはMonad, MonadTrans, MonadIO, Monoid等のインスタンスです


基本の3つの型+オマケの2つの型

先ほど出てきたConduitMはconduitライブラリで使用するストリームデータの処理の段階を表した3つの型を実装する為に使用されます。

ではその3つの型の型定義を見てみましょう。

type Source m o = ConduitM () o m ()

type Conduit i m o = ConduitM i o m ()
type Sink i m r = ConduitM i Void m r

ConduitMの型パラメータの意味と合わせますとそれぞれ次のような意味の型となります。


  • Source・Conduit・Sinkは全て任意の基底モナドを取ります。

  • Sourceは基底モナドの他に次の処理へ流す値の型だけを取ります。

    次の処理へデータを流す専用の型です。

  • Conduitは基底モナドの他に処理の入力の型と次の処理へ流すデータの型を取ります。

    入力を取って何らかの値を次へと流す処理を行う型です。

  • Sinkは基底モナドの他に入力の型と処理終了後に返す値の型を取ります。

    入力を取って、最終処理を行う型です。

この3種の型を"入力の型"と"次の処理へ流す型"を合わせて専用の演算子で結合し、Sinkの返す最終処理を受け取るのが基本的な使い方になります。

結合用の演算子には下記の様な物がありますが、詳しくは次項以降で使いながら説明していきます。

($$) :: Monad m => Source m a -> Sink a m b -> m b

($=) :: Monad m => Source m a -> Conduit a m b -> Source m b
(=$) :: Monad m => Conduit a m b -> Sink b m c -> Sink a m c
(=$=) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r

また上記の3つの型以外に次の2つの型があります。

type Producer m o = forall i. ConduitM i o m ()

type Consumer i m r = forall o. ConduitM i o m r

詳しくはHaddockのコメントを見れば分かりますが、Producer型が「ConduitにもなれるSource」型、Consumer型が「ConduitにもなれるSink」型を表します。


Sourceから全ての値をリストで取得する

import Data.Conduit (($$))

import qualified Data.Conduit.List as CL

main :: IO ()
main = do
xs <- CL.sourceList ['a'..'z'] $$ CL.consume
putStrLn xs

Sourceから全ての値をリストにして取得するにはData.Conduit.List.consumeを使用します。

型は下記の様になっています。

consume :: Monad m => Consumer a m [a]

さっそくConsumerです。

Conduitとして使用する訳でもない限り、Sinkに脳内置換しておきましょう。

また、上記サンプルコードで使用しているSourceを生成するData.Conduit.List.sourceList関数の型は下記の様になっています。

sourceList :: Monad m => [a] -> Producer m a

リストを取って、Producerを返す関数です。

これもConduitとして使用する訳でもない限り、Sourceに脳内置換しておきましょう。

そして上記2つのsourceListとconsumeを結合して戻り値を取得するのに使用する演算子が $$ です。

($$) :: Monad m => Source m a -> Sink a m b -> m b

第一引数にSourceを、第二引数にSinkを取り、基底モナドに戻り値を包んで返します。

Source $$ Sink

簡単…ですよね?


複数のSourceの全ての値をリストで取得する

conduitを使っていたらSourceを返す複数の関数の戻り値を全て処理したいシーンがあるかもしれません。

そんな時はSourceの型が同じであればSourceを全て結合して処理をすることも可能です。

import Data.Conduit (Source, ($$))

import qualified Data.Conduit.List as CL
import Data.Monoid ((<>))

srcA :: Monad m => Source m Int
srcA = CL.sourceList [1..10]

srcB :: Monad m => Source m Int
srcB = CL.sourceList [11..20]

main :: IO ()
main = do
xs <- (srcA <> srcB) $$ CL.consume -- ()は見やすさの為に付けています。
print xs

Sourceの基になっているConduitMがConduitM i m o ()としてMonoidのインスタンスになっている為、SourceとConduit、そして一部のSinkがMonoidとして結合可能です。

MonoidとしてA <> Bの様に結合されると、Aから順(左から順)に使用されていきます。

Aの処理が終了すると、次にBを使用して処理を継続していきます。

その結果、上記のコードは1から20までのリストを標準出力に出力することになります。


自作Sourceを作成する

Hackage: conduitに用意されているSourceを生成する関数では作れないSourceが欲しくなる場面も多々出てくると思います。

そのような場面に備えて自作のSourceを作れるようにします。

最低限必要なのはお馴染みのMonadとData.Conduit.yield関数です。

import Control.Monad.IO.Class (MonadIO, liftIO)

import Data.Conduit (Source, ($$), yield)
import qualified Data.Conduit.List as CL

mySrc :: (Monad m, MonadIO m) => Source m String
mySrc = do
x <- liftIO $ getLine
if x == "END"
then return ()
else yield x >> mySrc

main :: IO ()
main = mySrc $$ CL.mapM_ putStrLn

上記のコードはSourceが標準入力から読み込んだ1行を次へと流し、それをSinkであるCL.mapM_ putStrLnが標準出力へと出力するコードです。

mySrc関数が自作のSourceです。

Sourceの基であるConduitMがMonadのインスタンスなので、モナドとして書くことが出来ます。

Sourceの他にConduit・Sinkも同様にモナドとして書くことが出来ます。

何故実現出来るのかは省きますが、このモナドとして書いているSource(又はConduit)の文脈でyieldを使う事で次の処理へと値を流すことが出来ます(※returnではありません)。

これは値を溜め込んでSourceの実行が終わってから次へと流すのではなく、yieldが呼ばれた時点で次の処理へと値が流れます。

このyieldがあることにより、mySrcの様に再帰等で繰り返しを書くことが出来ます。

yieldの型は下記の様になります。

yield :: Monad m => o -> ConduitM i o m ()


出力可能な値の残っているSourceを再利用する

出力可能な値の残っているSourceを再利用したい時はここまでに出てきた演算子ではなく、次の演算子を使用します。



  • $$+演算子。$$とほぼ同じ働きをするが、再利用可能なSource(ResumableSource)も返す部分が違います。


  • $$++演算子。$$+とほぼ同じ働きをするが、引数にSourceではなくResumableSourceを取ります。


  • $$+-演算子。$$とほぼ同じ働きをするが、引数にSourceではなくResumableSourceを取ります。

各演算子及びResumableSourceの型:

data ResumableSource m o

($$+) :: Monad m => Source m a -> Sink a m b -> m (ResumableSource m a, b)
($$++) :: Monad m => ResumableSource m a -> Sink a m b -> m (ResumableSource m a, b)
($$+-) :: Monad m => ResumableSource m a -> Sink a m b -> m b

サンプルコード:

import Data.Conduit (($$+), ($$++), ($$+-))

import qualified Data.Conduit.List as CL

main :: IO ()
main = do
(resumableSrc0, xs) <- CL.sourceList [1..25] $$+ CL.take 10
display xs

(resumableSrc1, ys) <- resumableSrc0 $$++ CL.take 10
display ys

zs <- resumableSrc1 $$+- CL.take 10
display zs

display :: [Int] -> IO ()
display xs = do
print xs
putStrLn "-----"


Sinkに流れてきた値を全て出力する

Data.Conduit.List.mapM_を使うと簡単です。

import Data.Conduit (($$))

import qualified Data.Conduit.List as CL

main :: IO ()
main = CL.sourceList ['a'..'z'] $$ CL.mapM_ print


自作Sinkを作成する

自作Sourceとほぼ同じ様に作れますが、yieldではなくawaitを使用します。

yieldが次の処理へと値を流す関数だったのに対し、awaitを流れてきた値を受け取る関数です。

awaitの型

await :: Monad m => Consumer i m (Maybe i)

サンプルコード

import Control.Monad.IO.Class (MonadIO, liftIO)

import Data.Conduit (Sink, ($$), await)
import qualified Data.Conduit.List as CL

-- | 入力を標準出力へ出力し、受け取った値の総数を返します。3の倍数でアホになります。
mySink :: (Monad m, MonadIO m) => Sink Int m Int
mySink = go 0
where
go l = do
n <- await
case n of
Nothing -> return l
Just n' -> do
if n' `mod` 3 == 0
then liftIO $ putStrLn "(^q^)"
else liftIO $ print n'
go $ l + 1

main :: IO ()
main = (CL.sourceList [1..30] $$ mySink) >>= print


Conduitで流れているデータを加工する

Conduitを使うと、SourceからSinkへ流れるデータを途中で加工することが出来ます。

import Data.Char (toUpper, toLower)

import Data.Conduit (($$), ($=), (=$), (=$=))
import qualified Data.Conduit.List as CL

main :: IO ()
main = do
CL.sourceList ['a'..'z'] $= CL.map toUpper $$ CL.mapM_ print
CL.sourceList ['A'..'Z'] $$ CL.map toLower =$ CL.mapM_ print
CL.sourceList ['a'..'z'] $= CL.map toUpper =$= CL.map fromEnum $$ CL.mapM_ print

ConduitをSourceやSinkと繋げる方法は2種類あり、ConduitをSourceと連結させるかSinkと連結させるかで使用する演算子が変わります。

SourceとConduitの場合は$=演算子を使用し、結合した結果の型はSourceになります。


ConduitとSinkの場合は=$演算子を使用し、結合した結果の型はSinkになります。


Conduit同士を結合することも可能で、使用する演算子は=$=になります。

各演算子の型は下記の様になります。

($=) :: Monad m => Source m a -> Conduit a m b -> Source m b

(=$) :: Monad m => Conduit a m b -> Sink b m c -> Sink a m c
(=$=) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r


自作Conduitの作成

Conduitの自作はSourceとSinkの両方を合わせたような感じで行います。

つまり、データの取得はawait、データの送出はyieldで行います。

import Control.Monad (when)

import Data.Conduit (Conduit, ($$), ($=), await, yield)
import qualified Data.Conduit.List as CL

-- | 入力を3つ毎に区切り、そのタプルを送出します。
-- 取得できた入力が3つ未満の場合は切り捨てます。
tuple3 :: Monad m => Conduit a m (a, a, a)
tuple3 = do
xs <- CL.take 3
case xs of
[a,b,c] -> yield (a,b,c) >> tuple3
_ -> return ()

-- | 入力の値が昇順になっている間だけ値を送出します。
ascend :: (Monad m, Ord i) => Conduit i m i
ascend = await >>= maybe (return ()) go
where
go i = do
yield i
await >>= maybe (return ()) (\j -> when (i <= j) $ go j)

main :: IO ()
main = CL.sourceList (['a'..'s'] ++ ['a'..'d']) $= ascend $= tuple3 $$ CL.mapM_ print

-- 出力結果:
-- ('a','b','c')
-- ('d','e','f')
-- ('g','h','i')
-- ('j','k','l')
-- ('m','n','o')
-- ('p','q','r')

tuple3のように、Consumerを利用して自作のConduitを構築することも出来ます(同様の事はSourceとSinkでも出来ます)。