7
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

ACCESSAdvent Calendar 2015

Day 19

FRPNowをサーバで使ってみた

Posted at

この記事は ACCESS Advent Calendar 2015 19日目の記事です。
ACCESSの @asukamirai です。

FRP(Functional reactive programming)をサーバのロジックとして使うのはどうなのだろう、ということで試しに感覚を掴むためにやってみました。
使用したFRPのライブラリは、ICFP2015に登場した論文FRPNow。言語はHaskellです。

確認環境など

  • stack 0.1.10.1
    • lts-3.18
  • frpnow-0.18
  • OS: Windows7 64bit

動機

FRPというと、なんとなくGUI的なものを扱うようなイメージが強かったのですが、その一方で、データフローでプログラミングを行うというのは、ストリーミング処理系のライブラリとも共通する部分がありそうだと思っていました。
先月行われた関数型ストリーム処理勉強会でも、ほんのわずかですがFRPという言葉が聞こえた気がします。
では、ストリーミング処理系をよく使いそうな場所のひとつであるサーバのロジックとしてFRPは役に立ったりするんだろうか、という疑問がはじまりです。

今回使用したFRPNowも基本的にはGUIを想定しているようで、サーバと組み合わせるためのコードは自作する必要がありました。サーバはwarpを使用しています。

なお、FRPの存在自体はだいぶ以前から知っていたのですが、それなりに手を出したのは今回が初めてです。

作成してみたもの

動作

数を数えるだけのサーバです。

実験目的なのでAPI設計はとても適当です。

コード

作成したコードはgithubにあります。あまりちゃんとは整備していませんが。(以下に貼り付けてあるコードとは僅かに違う部分があります)
2つのファイルに分かれています。

Counter.hs
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE OverloadedStrings #-}

module FRPNow.Counter where

import qualified Control.Concurrent        as CC
import qualified Control.Concurrent.Async  as AS
import qualified Control.Concurrent.Chan   as Chan
import qualified Control.Concurrent.MVar   as MV
import qualified Control.Exception         as E
import qualified Control.FRPNow            as FRP
import           Control.FRPNow            (Behavior, Event, EvStream, Now)
import qualified Control.Monad             as M

type InputIF a = a -> IO ()

data Command =
      StartCount
    | StopCount
    | AddCount Int
    | GetCount (InputIF Int)
    | Shutdown

instance Show Command where
    show StartCount   = "StartCount"
    show StopCount    = "StopCount"
    show (AddCount a) = "AddCount " ++ show a
    show (GetCount _) = "GetCount"
    show Shutdown     = "Shutdown"

getGetCount :: Command -> Maybe (InputIF Int)
getGetCount (GetCount inIF) = Just inIF
getGetCount _               = Nothing

getCountState :: Command -> Maybe Bool
getCountState StartCount = Just True
getCountState StopCount  = Just False
getCountState _          = Nothing

getAddCount :: Command -> Maybe Int
getAddCount (AddCount x) = Just x
getAddCount _            = Nothing

getShutdown :: Command -> Maybe ()
getShutdown Shutdown = Just ()
getShutdown _        = Nothing

-- | カウンターをセットアップする。
setupCounter :: EvStream Command -> Now (Event ())
setupCounter inputStream = do
    -- inputStreamを4つのEvStreamに分ける
    -- StartCount/StopCountはTrue/Falseに変換してひとつのEvStreamに。
    let addCountStream   = getAddCount   `FRP.filterMapEs` inputStream
        countStateStream = getCountState `FRP.filterMapEs` inputStream
        getCountStream   = getGetCount   `FRP.filterMapEs` inputStream
        shutdownStream   = getShutdown   `FRP.filterMapEs` inputStream
    -- 秒カウントのEvStreamを生成
    secStream <- genSecStream
    -- countStateStreamの最後のイベントの値を表すBehaviorを作成
    countingState <- FRP.sampleNow $ False `FRP.fromChanges` countStateStream
    -- countStateがTrueの間だけ、秒カウントするEvStreamを作成
    let secCountStream   = secStream `FRP.during` countingState
    -- addCountStreamとsecCountStreamのイベントをマージしたEvStreamを作成
    let countUpStream    = secCountStream `FRP.merge` addCountStream
    -- countUpStreamの内容を合算した状態を表すBehaviorを作成
    countState <- FRP.sampleNow $ FRP.foldEs (+) 0 countUpStream
    -- GetCountイベントに、そのときのcounterの値を応答させる
    respondToGetCount countState `FRP.callStream` getCountStream
    -- counterの値が変化するたびに、その値をコンソールに出力する
    (putStrLn . ("count " ++) . show) `FRP.callIOStream` FRP.toChanges countState
    -- 最初に来たShutdownイベントを取り出して返す
    -- このイベントが届くとFRPの実行が終了する
    FRP.sampleNow $ FRP.next shutdownStream
    where
        -- | GetCountコマンドに応答する
        respondToGetCount :: Behavior Int -> [InputIF Int] -> Now ()
        respondToGetCount countState inIFs = do
            -- counterの値を採取する。
            count <- FRP.sampleNow countState
            -- コンソールに表示してGetCountイベントに応答
            FRP.sync $ do
                putStrLn $ "respond to GetCount : " ++ show count
                -- 「同時」に届いたGetCountイベントはリストになって渡されるので全てに同じ値を渡す
                M.forM_ inIFs ($ count)
        -- | 1秒ごとにイベント(内容はIntの1)を発行するEvStreamを生成する。
        --   終了をサポートしていないので、放置するとリークする。今は気にしない。
        genSecStream :: Now (EvStream Int)
        genSecStream = do
            (evs, emitEv) <- FRP.callbackStream
            FRP.sync . M.void . AS.async . M.forever $ do
                emitEv 1
                CC.threadDelay 1000000
            return evs

-- | 入力EvStreamを受け取るNowモナドを実行する。
--   返り値は、入力EvStreamへの入力アクション。
runNow :: (EvStream a -> Now (Event x)) -> IO (InputIF a)
runNow now = do
    -- frpnow側が多数のスレッドからの同時イベント発行に対応していない
    -- ように見えるので、チャネルを設けてタイミング制御を行っている。
    -- が、これで正しく動く保証はない。
    mvEmitEv <- MV.newEmptyMVar
    mvWait   <- MV.newEmptyMVar
    M.void . AS.async . FRP.runNowMaster $ do
        (inputStream, emitEv) <- FRP.callbackStream
        M.void . FRP.sync $ mvEmitEv `MV.putMVar` emitEv
        MV.putMVar mvWait `FRP.callIOStream` inputStream
        now inputStream
    emitEv <- MV.takeMVar mvEmitEv
    chan <- Chan.newChan
    M.void . AS.async . E.handle handleException . M.forever $ do
        a <- Chan.readChan chan
        emitEv a
        M.void $ MV.takeMVar mvWait
    return $ Chan.writeChan chan
    where
        handleException (E.SomeException _err) = return ()

runCounter :: IO (InputIF Command)
runCounter = runNow setupCounter

だいたいの流れはコメントに書いています。

setupCounterがメインのロジックです。最初に実行されて回路のようなものが構築され、以降はその回路が動く、というイメージです。
詳しくおいかけていくとイベントのストリームを組み合わせたりして処理を作っているのがなんとなくわかるんじゃないかと思います。

runNowはFRPライブラリのrunNowMasterに入力のイベントストリームを渡す手段を提供するための仕組みを加えたようなものです。返り値のInputIFにCommandを与えると、それが構築した回路の入力になります。サーバ以外の用途でも、外から入力を受け取りたい場合には使いまわせる関数です。
コメントにも書いてある通り、これでいつでもちゃんと動くという確証はないのでお気をつけください。

Main.hs
{-# LANGUAGE OverloadedStrings #-}

module Main where

import qualified FRPNow.Counter as FRP

import qualified Network.Wai               as Wai
import qualified Network.Wai.Handler.Warp  as Warp
import qualified Network.HTTP.Types.Status as Status

import qualified Control.Concurrent.MVar as MVar

import qualified Data.ByteString.Lazy.Char8 as LBS
import           Data.Monoid                ((<>))
import qualified Data.Text                  as TXT

import           System.Timeout (timeout)

main :: IO ()
main = do
    let port = 3000
    let setting = Warp.setPort port Warp.defaultSettings
    putStrLn $ "start server port=" ++ show port
    input <- FRP.runCounter
    Warp.runSettings setting $ simpleApp input

simpleApp :: FRP.InputIF FRP.Command -> Wai.Application
simpleApp input req respond = do
    let path = Wai.pathInfo req
    putStrLn $ "----- " ++ show path
    case path of
        ["add", numTxt] -> do
            let numStr = TXT.unpack numTxt
                num    = read numStr
                numBS  = LBS.pack numStr
            input $ FRP.AddCount num
            respond $ Wai.responseLBS Status.status200 [] $ "AddCount " <> numBS <> " accepted"
        ["start"] -> do
            input FRP.StartCount
            respond $ Wai.responseLBS Status.status200 [] "StartCount accepted"
        ["stop"] -> do
            input FRP.StopCount
            respond $ Wai.responseLBS Status.status200 [] "StopCount accepted"
        ["shutdown"] -> do
            input FRP.Shutdown
            respond $ Wai.responseLBS Status.status200 [] "Shutdown accepted"
        ["get"] -> do
            mv <- MVar.newEmptyMVar
            input $ FRP.GetCount $ MVar.putMVar mv
            mcount <- timeout 1000000 $ MVar.takeMVar mv
            case mcount of
                Just count -> respond . Wai.responseLBS Status.status200 [] . LBS.pack $ show count
                Nothing    -> respond $ Wai.responseLBS Status.status200 [] "timeout"
        _ -> do
            respond $ Wai.responseLBS Status.status200 [] "not supported command"

こちらはCounter.hsで実装したコードとサーバをくっつける部分のコードです。
runCounterを実行し、そこにサーバへのリクエストに応じたCommandを投げているだけです。

感想

この内容だとそれなりにきれいに書けた気がしますが、タイミング制御をきちんとやろうとするととたんに面倒になります。
例えば上記のコードでは、秒をカウントするイベントストリームは、内部的にはstartのコマンドを与える以前から動作し続けていて、そのイベントストリームのイベントがstart状態でのみ通るようになっています。
startコマンドを受け取った時点からきっちり1秒を数え始めようと思うと、コマンドを受け取った段階で初めて秒カウントのイベントストリームを生成し、stopコマンドが着たら破棄して、次のstartでまたイベントストリームを作り直す、ということが必要になります。実のところこれをするコードも試しに書いてみたのですが、非常に見難いコードになってしまいました。もしかするとFRPに慣れればうまく書けるのかもしれませんが。

また、そんなにたいした量のコードではありませんが、書くのにけっこうな時間がかかっています。
ドキュメント(例:EvStream)を見ても、どういうときにどういう関数を使えばいいのか一見しただけではわからないものも多く、慣れるまではだいぶ時間がかかりそうに感じました。
関数がもっと体系化されていればわかりやすいのかもしれません。この観点ではFRPNowではなく、 Sodiumあたりがうまくまとまっていそうなイメージをもっていますが、私は触れていません。

さて、では当初の疑問である「サーバのロジックとして使えるのか」というと、正直これだけだとまだなんともいえません。こういうものがうまくハマる場所はあるかもしれませんが、その場合でも慣れるまでに時間がかかる覚悟を持っておいたほうがよさそうな感触を持っています。また、他のライブラリの使用も検討したほうがよさそうです。

ACCESS Advent Calendar、明日は @_mirer さんです。

参考

FRPNowでFizzBuzzなど
Sodium(FRP)を使ってみた

7
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?