Conduit (1) - Haskell Streaming library - Qiita
Conduit (2) - Monadic composition - Qiita
Conduit (3) - Resource - Qiita
Haskell のStreaming ライブラリである Conduit についての記事です。
記事のソースは作者による以下の記事です。
Conduit Documents - snoyberg/conduit@github
APIは以下にあります。
conduit: Streaming data processing library
前回まではもっぱらリストを扱う例でしたが、今回はファイルを扱います。
ConduitでResourceを扱うことにより、必要以上のリソースを長時間保持したり、長い処理の途中でエラーにあいリソースの解放に失敗したり、するような事態を避けることができるようになります。
1. ファイルのコピー(冗長版)
以下はファイルのコピーを行うプログラムです。ファイルの中身はByteStringとしてstreamを流れます。
module Resource1
( runTest
) where
import Conduit
import qualified System.IO as IO
import Data.ByteString (ByteString)
sourceFile' :: MonadResource m => FilePath -> ConduitT i ByteString m ()
sourceFile' fp =
bracketP (IO.openBinaryFile fp IO.ReadMode) IO.hClose sourceHandle
sinkFile' :: MonadResource m => FilePath -> ConduitT ByteString o m ()
sinkFile' fp =
bracketP (IO.openBinaryFile fp IO.WriteMode) IO.hClose sinkHandle
runTest :: IO ()
runTest = runResourceT
$ runConduit
$ sourceFile' "input.txt"
.| sinkFile' "output.txt"
runResourceT とbracketP で、処理の途中でエラーが起きようと、正常に終了しようと、resourceが解放されることが保証されます。
bracketP :: MonadResource m
=> IO a -- 最初に実行("resource獲得")
-> (a -> IO ()) -- 最後に実行("resource解放")
-> (a -> ConduitT i o m r) -- 実行されるべき処理本体
-> ConduitT i o m r
2. ファイルのコピー(シンプル版)
上のプログラムは少し煩雑すぎます。これはもっと簡略化して書くことができます。
「runResourceT . runConduit」 は runConduitRes と略記できます。
また sourceFile と sinkFile を使えば bracketP 行の記述を簡素化できます。
つまり以下のように書くことができます。
module Resource2
( runTest
) where
import Conduit
runTest :: IO ()
runTest = runConduitRes $ sourceFile "input.txt" .| sinkFile "output.txt"
3. ディレクトリ探索
以下はカレントディレクトリからディレクトリ探索を行い、Haskellのソースコードファイルを探し出し、all-haskell-filesというファイルにすべて書き出すプログラムです。
module Resource3
( runTest
) where
import Conduit
import System.FilePath (takeExtension)
runTest :: IO ()
runTest = runConduitRes
$ sourceDirectoryDeep True "."
.| filterC (\fp -> takeExtension fp == ".hs")
.| awaitForever sourceFile
.| sinkFile "all-haskell-files"
このプログラムには次のような素晴らしい点があります。
- 一度に2ファイルしかオープンしません:all-haskell-filesと現在読み込み中のファイルです。
- 探索中に必要最小限のdirectory handlesしかオープンしません。
- 途中でどんなエラーが起きようとも、resourceの後処理が行われます。
プログラムの処理の流れは以下のようなものです。
- sourceDirectoryDeep は与えられたディレクトリのコンテンツ(ディレクトリ・ファイル名)をstreamに流します。
- filterC でHaskellのファイルだけを選別して、下流に流します。
- awaitForever では、stream の中に *.hs のファイ名がある間ループして、sourceFileを呼び出します。
- sourceFileはstreamの中にfileコンテンツをbinary dataとして吐き出します。
- sinkFileはstreamに流れてきたコンテンツを全てall-haskell-filesに書き出します。
-- Deeply stream the contents of the given directory.
sourceDirectoryDeep :: MonadResource m
=> Bool -- Follow directory symlinks
-> FilePath --Root directory
-> ConduitT i FilePath m ()
-- Stream the contents of a file as binary data.
sourceFile :: MonadResource m => FilePath -> ConduitT i ByteString m ()
-- Stream all incoming data to the given file.
sinkFile :: MonadResource m => FilePath -> ConduitT ByteString o m ()
-- Wait for a single input value from upstream. If no data is available, returns Nothing.
-- Once await returns Nothing, subsequent calls will also return Nothing.
await :: Monad m => Consumer i m (Maybe i)
-- Wait for input forever, calling the given inner component for each piece of new input.
-- This function is provided as a convenience for the common pattern of awaiting input,
-- checking if it's Just and then looping.
awaitForever :: Monad m => (i -> ConduitT i o m r) -> ConduitT i o m ()
今回は以上です。