2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Conduitで遊んでみた

Posted at
{-# LANGUAGE RankNTypes, BangPatterns, ViewPatterns #-}
module Algorithm.Fina.Helper (zipC, teeC, tapC, dupC, sma, ema, fir) where

import Data.Maybe (isJust, fromJust)
import Control.Applicative
import qualified Data.Text as T
import qualified Data.Conduit as C
import qualified Data.Conduit.List as C


{-| 遅延
  NOTE: 先頭にダミーを挟み込んでるだけなので、長さがその分伸びてしまう

>>> C.sourceList [1..10] C.=$= tapC 2 0 C.$$ C.consume
[0,0,1,2,3,4,5,6,7,8,9,10]

-}
tapC :: Monad m =>
    Int     -- 遅延サンプル数
    -> o    -- 遅延期間中のダミー出力値
    -> C.Pipe o o m ()
tapC n o | n < 0  = error "tapC: The n should ge. zero."
             | otherwise = C.haveMore (C.map id) (return ()) (replicate n o)



{-| 0次補完でオーバーサンプリング

>>> C.sourceList [1..9] C.=$= dupC 3 C.$$ C.consume
[1,1,1,2,2,2,3,3,3,4,4,4,5,5,5,6,6,6,7,7,7,8,8,8,9,9,9]

-}
dupC :: Monad m => 
    Int     -- 倍数
    -> C.Pipe a a m ()
dupC n | n <= 0 = error "dupC: The n should greater than zero."
       | otherwise = let dc = C.NeedInput (\x -> C.haveMore dc (return ()) (replicate n x)) (return ()) in dc


{-| 単純移動平均 -}
sma :: (Monad m, Real a, Fractional b) =>
    Int                 -- 平均回数
    -> C.Conduit a m b
sma n = C.conduitState
      (replicate (n - 1) 0)
      (\xs (realToFrac -> y) ->
        return $ let !z = (sum xs + y) / fromIntegral n
                  in C.StateProducing (y:init xs) [z]
      )
      (\_ -> return [])

{-| 指数平滑移動平均 -}
ema :: (Monad m, Real a, Fractional b) =>
    Int    -- 平均回数
    -> C.Conduit a m b
ema n = C.conduitState
          0
          (\y (realToFrac -> x) -> return $ let z = y + alpha * (x - y) in C.StateProducing z [z])
          (\_ -> return [])
  where alpha = 2 / fromIntegral (n + 1)


type Queue a = ([a], [a])
emptyQ = ([], [])
isEmptyQ ([], []) = True
isEmptyQ _ = False
singletonQ x = ([x], [])
enQ y ([], ys) = (reverse (y:ys), [])
enQ y (xs, ys) = (xs, y:ys)
deQ ([], _) = undefined
deQ (_:[], ys) = (reverse ys, [])
deQ (_:xs, ys) = (xs, ys)
headQ ([], _) = undefined
headQ (x:_, _) = x

{-| 入力を2つのPipeに食わせて、出力をfで束ねて出力するpipe

>>> C.sourceList [0..10] C.=$= (zipC (,) (C.map ((+ 2))) (C.map negate)) C.$$ C.consume
[(2,0),(3,-1),(4,-2),(5,-3),(6,-4),(7,-5),(8,-6),(9,-7),(10,-8),(11,-9),(12,-10)]

>>> C.sourceList [0..10] C.=$= (zipC (,) (tapC 2 (-1)) (C.map id)) C.$$ C.consume
[(-1,0),(-1,1),(0,2),(1,3),(2,4),(3,5),(4,6),(5,7),(6,8),(7,9),(8,10)]

>>> C.sourceList [0..10] C.=$= (zipC (,) (dupC 2) (C.map id)) C.$$ C.consume
[(-1,0),(-1,1),(0,2),(1,3),(2,4),(3,5),(4,6),(5,7),(6,8),(7,9),(8,10)]

--タプルじゃなくてリストにまで拡張したい...そしてconcurrentへ
joinC :: Monad m => [C.Pipe i o m ()] -> CPipe i [Maybe o] m ()
parC :: Monad m => [C.Pipe i o m ()] -> CPile [i] [o] m ()

splitC :: C.Pipe i a m () -> (C.Source m a, C.Source m a)
  let (s1,s2) = splitC (c1 =$= c2 =$= c3)
  s1 =$= c4 $$ mysink1
  s2 =$= c5 $$ mysink2
って方が使いやすいかったかな?

-}
zipC :: forall a b c i m. (Monad m) => (a -> b -> c) -> C.Pipe i a m () -> C.Pipe i b m () -> C.Pipe i c m ()
zipC f p1 p2 = zc f emptyQ p1 p2
  where
    -- NOTE: 引数をひっくり返しながら動くので、型シグネチャを書いて多相にしないと駄目
    zc :: Monad m => (a -> b -> c) -> ([i], [i]) -> C.Pipe i a m () -> C.Pipe i b m () -> C.Pipe i c m ()
    -- 出力完成
    zc f b (C.HaveOutput na ca oa) (C.HaveOutput nb cb ob) = C.HaveOutput (zc f b na nb) (ca >> cb) (f oa ob)
    --
    zc f b (C.PipeM mp1 ca) (C.PipeM mp2 cb) = C.PipeM (do { p1 <- mp1; p2 <- mp2; return $ zc f b p1 p2 }) (ca >> cb)
    zc f b (C.PipeM mp clean) p2 = C.PipeM (do { p <- mp; return $ zc f b p p2 }) clean
    zc f b p1 (C.PipeM mp clean) = C.PipeM (do { p <- mp; return $ zc f b p1 p }) clean
    --
    zc f b (C.NeedInput g p1) p2
      | isEmptyQ b  = case p2 of
                        C.NeedInput h p2' -> C.NeedInput (\x -> zc f b (g x) (h x)) (zc f b p1 p2')  -- 需要が合致しているので産地直送するように最適化
                        _ -> C.NeedInput (\x -> zc (flip f) (singletonQ x) p2 (g x)) (zc f b p1 p2) -- NOTE: flipの畳み込みはいつ起こる?
      | otherwise   = zc f (deQ b) (g $ headQ b) p2
    zc f b p1 (C.NeedInput h p2) = C.NeedInput (\x -> zc f (enQ x b) p1 (h x)) (zc f b p1 p2)
    -- FIXME: 片方が閉じたとき、もう片方はどうする?
    zc _ _ (C.Done _ _) _ = C.Done Nothing ()
    zc _ _ _ (C.Done _ _) = C.Done Nothing ()


(&&&) :: forall a b i m. (Monad m) => C.Pipe i a m () -> C.Pipe i b m () -> C.Pipe i (a, b) m ()
(&&&) = zipC (,)


{-| tee

>>> C.sourceList [0..5] `teeC` (C.map ((+) 1), C.map negate) C.$$ C.consume
[(1,0),(2,-1),(3,-2),(4,-3),(5,-4),(6,-5)]

 -}
teeC :: Monad m => C.Pipe i a m () -> (C.Pipe a b m (), C.Pipe a c m ()) -> C.Pipe i (b,c) m ()
teeC p1 (p2, p3) = p1 C.=$= (p2 &&& p3)

{-| 3タップのFIRフィルタ
  状態付きがあればtapだのzipだのは要らなかったのだった

>>> C.sourceList [60,60,60,80,40,60,60] C.=$= fir 0.25 0.5 0.25 C.$$ C.consume
[15.0,45.0,60.0,65.0,65.0,55.0,55.0]
 -}
fir :: forall a m. (Num a, Monad m) => a -> a -> a -> C.Conduit a m a
fir a1 a2 a3=
  C.conduitState
    (0, 0)
    (\(z1, z2) x -> return $ C.StateProducing (x, z1) [x * a1 + z1 * a2 + z2 * a3])
    (\_ -> return [])

{-|
 -}
tsv :: Monad m => C.Conduit T.Text m [T.Text]
tsv = C.map (T.split ((==) '\t'))

{-| [a]な入力の先頭n個とそれ以外を受け取るConduitに分割する
    Excelで見る事前提なCSVなんかだと、nカラムおきに複数チャンネルが重畳されてたりするから、
    その分割用。
    NOTE: これだと、片方のチャンネルがDoneになった時点で両方とも止まってしまうのでは?
 -}
multiChannelTSV :: Monad m => Int -> C.Conduit [c] m a -> C.Conduit [c] m b -> C.Conduit [c] m (a,b)
multiChannelTSV n pa pb = (C.map (take n) C.=$= pa) &&& (C.map (drop n) C.=$= pb)


{-| Eitherを使って分岐するPipe
    NOTE: 入力を与えていないのに出力が出てくるようなPipeの場合、両方とも出力を
          持つケースがあるが、その場合は左側から優先して取り出す。
 -}
splitEitherC :: forall a b c d m. Monad m => C.Conduit a m b -> C.Conduit c m d -> C.Conduit (Either a c) m (Either b d)
splitEitherC (C.HaveOutput na ca oa) p2 = C.HaveOutput (splitEitherC na p2) ca (Left oa)
splitEitherC p1 (C.HaveOutput nb cb ob) = C.HaveOutput (splitEitherC p1 nb) cb (Right ob)
splitEitherC (C.PipeM ma ca) p2 = C.PipeM (do { p1 <- ma; return $ splitEitherC p1 p2 }) ca
splitEitherC p1 (C.PipeM mb cb) = C.PipeM (do { p2 <- mb; return $ splitEitherC p1 p2 }) cb
splitEitherC (C.Done _ _) _ = C.Done Nothing ()
splitEitherC _ (C.Done _ _) = C.Done Nothing ()
splitEitherC p1@(C.NeedInput f ca) p2@(C.NeedInput g cb)
  = C.NeedInput (\x -> case x of { Left xx -> splitEitherC (f xx) p2; Right xx -> splitEitherC p1 (g xx) }) (splitEitherC ca cb)


eitherJoinC :: forall a m. Monad m => C.Conduit (Either a a) m a
eitherJoinC = C.map (either id id)

{-|

>>> C.sourceList [Left 1,Left 2,Right 3, Left 4, Right 5, Right 6] C.=$= (C.map ((+) 2) ||| C.map ((*) 2)) C.$$ C.consume
[3,4,6,6,10,12]
-}
(|||) :: forall a b c m. Monad m => C.Conduit a m c -> C.Conduit b m c -> C.Conduit (Either a b) m c
(|||) x y = splitEitherC x y C.=$= eitherJoinC


{-|
 -}
maybeC :: forall a b m. Monad m => C.Conduit a m b -> C.Conduit (Maybe a) m (Maybe b)
maybeC (C.HaveOutput na ca oa) = C.HaveOutput (maybeC na) ca (Just oa)
maybeC (C.PipeM ma ca) = C.PipeM (do { p <- ma; return $ maybeC p }) ca
maybeC (C.Done _ _) = C.Done Nothing ()
maybeC p@(C.NeedInput f ca) = C.NeedInput (maybe (C.HaveOutput (maybeC p) (return ()) Nothing) $ maybeC . f) (maybeC ca)

fromJustC :: forall a m. Monad m => C.Conduit (Maybe a) m a
fromJustC = C.filter isJust C.=$= C.map fromJust

(?=$=) :: forall a b c d m. Monad m => C.Pipe b (Maybe a) m () -> C.Pipe a c m d -> C.Pipe b c m d
(?=$=) b c = b C.=$= fromJustC C.=$= c

applyC :: forall a b m. Monad m => C.Conduit (a -> b, a) m b
applyC = C.map (\(f, x) -> f x) -- もうArrowでやれというか...
2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?