Haskell
FRP
FRPNow
ACCESSDay 19

FRPNowをサーバで使ってみた

More than 3 years have passed since last update.

この記事は ACCESS Advent Calendar 2015 19日目の記事です。

ACCESSの @asukamirai です。

FRP(Functional reactive programming)をサーバのロジックとして使うのはどうなのだろう、ということで試しに感覚を掴むためにやってみました。

使用したFRPのライブラリは、ICFP2015に登場した論文FRPNow。言語はHaskellです。


確認環境など


  • stack 0.1.10.1


    • lts-3.18



  • frpnow-0.18

  • OS: Windows7 64bit


動機

FRPというと、なんとなくGUI的なものを扱うようなイメージが強かったのですが、その一方で、データフローでプログラミングを行うというのは、ストリーミング処理系のライブラリとも共通する部分がありそうだと思っていました。

先月行われた関数型ストリーム処理勉強会でも、ほんのわずかですがFRPという言葉が聞こえた気がします。

では、ストリーミング処理系をよく使いそうな場所のひとつであるサーバのロジックとしてFRPは役に立ったりするんだろうか、という疑問がはじまりです。

今回使用したFRPNowも基本的にはGUIを想定しているようで、サーバと組み合わせるためのコードは自作する必要がありました。サーバはwarpを使用しています。

なお、FRPの存在自体はだいぶ以前から知っていたのですが、それなりに手を出したのは今回が初めてです。


作成してみたもの


動作

数を数えるだけのサーバです。

実験目的なのでAPI設計はとても適当です。


コード

作成したコードはgithubにあります。あまりちゃんとは整備していませんが。(以下に貼り付けてあるコードとは僅かに違う部分があります)

2つのファイルに分かれています。


Counter.hs

{-# LANGUAGE MultiWayIf #-}

{-# LANGUAGE OverloadedStrings #-}

module FRPNow.Counter where

import qualified Control.Concurrent as CC
import qualified Control.Concurrent.Async as AS
import qualified Control.Concurrent.Chan as Chan
import qualified Control.Concurrent.MVar as MV
import qualified Control.Exception as E
import qualified Control.FRPNow as FRP
import Control.FRPNow (Behavior, Event, EvStream, Now)
import qualified Control.Monad as M

type InputIF a = a -> IO ()

data Command =
StartCount
| StopCount
| AddCount Int
| GetCount (InputIF Int)
| Shutdown

instance Show Command where
show StartCount = "StartCount"
show StopCount = "StopCount"
show (AddCount a) = "AddCount " ++ show a
show (GetCount _) = "GetCount"
show Shutdown = "Shutdown"

getGetCount :: Command -> Maybe (InputIF Int)
getGetCount (GetCount inIF) = Just inIF
getGetCount _ = Nothing

getCountState :: Command -> Maybe Bool
getCountState StartCount = Just True
getCountState StopCount = Just False
getCountState _ = Nothing

getAddCount :: Command -> Maybe Int
getAddCount (AddCount x) = Just x
getAddCount _ = Nothing

getShutdown :: Command -> Maybe ()
getShutdown Shutdown = Just ()
getShutdown _ = Nothing

-- | カウンターをセットアップする。
setupCounter :: EvStream Command -> Now (Event ())
setupCounter inputStream = do
-- inputStreamを4つのEvStreamに分ける
-- StartCount/StopCountはTrue/Falseに変換してひとつのEvStreamに。
let addCountStream = getAddCount `FRP.filterMapEs` inputStream
countStateStream = getCountState `FRP.filterMapEs` inputStream
getCountStream = getGetCount `FRP.filterMapEs` inputStream
shutdownStream = getShutdown `FRP.filterMapEs` inputStream
-- 秒カウントのEvStreamを生成
secStream <- genSecStream
-- countStateStreamの最後のイベントの値を表すBehaviorを作成
countingState <- FRP.sampleNow $ False `FRP.fromChanges` countStateStream
-- countStateがTrueの間だけ、秒カウントするEvStreamを作成
let secCountStream = secStream `FRP.during` countingState
-- addCountStreamとsecCountStreamのイベントをマージしたEvStreamを作成
let countUpStream = secCountStream `FRP.merge` addCountStream
-- countUpStreamの内容を合算した状態を表すBehaviorを作成
countState <- FRP.sampleNow $ FRP.foldEs (+) 0 countUpStream
-- GetCountイベントに、そのときのcounterの値を応答させる
respondToGetCount countState `FRP.callStream` getCountStream
-- counterの値が変化するたびに、その値をコンソールに出力する
(putStrLn . ("count " ++) . show) `FRP.callIOStream` FRP.toChanges countState
-- 最初に来たShutdownイベントを取り出して返す
-- このイベントが届くとFRPの実行が終了する
FRP.sampleNow $ FRP.next shutdownStream
where
-- | GetCountコマンドに応答する
respondToGetCount :: Behavior Int -> [InputIF Int] -> Now ()
respondToGetCount countState inIFs = do
-- counterの値を採取する。
count <- FRP.sampleNow countState
-- コンソールに表示してGetCountイベントに応答
FRP.sync $ do
putStrLn $ "respond to GetCount : " ++ show count
-- 「同時」に届いたGetCountイベントはリストになって渡されるので全てに同じ値を渡す
M.forM_ inIFs ($ count)
-- | 1秒ごとにイベント(内容はIntの1)を発行するEvStreamを生成する。
-- 終了をサポートしていないので、放置するとリークする。今は気にしない。
genSecStream :: Now (EvStream Int)
genSecStream = do
(evs, emitEv) <- FRP.callbackStream
FRP.sync . M.void . AS.async . M.forever $ do
emitEv 1
CC.threadDelay 1000000
return evs

-- | 入力EvStreamを受け取るNowモナドを実行する。
-- 返り値は、入力EvStreamへの入力アクション。
runNow :: (EvStream a -> Now (Event x)) -> IO (InputIF a)
runNow now = do
-- frpnow側が多数のスレッドからの同時イベント発行に対応していない
-- ように見えるので、チャネルを設けてタイミング制御を行っている。
-- が、これで正しく動く保証はない。
mvEmitEv <- MV.newEmptyMVar
mvWait <- MV.newEmptyMVar
M.void . AS.async . FRP.runNowMaster $ do
(inputStream, emitEv) <- FRP.callbackStream
M.void . FRP.sync $ mvEmitEv `MV.putMVar` emitEv
MV.putMVar mvWait `FRP.callIOStream` inputStream
now inputStream
emitEv <- MV.takeMVar mvEmitEv
chan <- Chan.newChan
M.void . AS.async . E.handle handleException . M.forever $ do
a <- Chan.readChan chan
emitEv a
M.void $ MV.takeMVar mvWait
return $ Chan.writeChan chan
where
handleException (E.SomeException _err) = return ()

runCounter :: IO (InputIF Command)
runCounter = runNow setupCounter


だいたいの流れはコメントに書いています。

setupCounterがメインのロジックです。最初に実行されて回路のようなものが構築され、以降はその回路が動く、というイメージです。

詳しくおいかけていくとイベントのストリームを組み合わせたりして処理を作っているのがなんとなくわかるんじゃないかと思います。

runNowはFRPライブラリのrunNowMasterに入力のイベントストリームを渡す手段を提供するための仕組みを加えたようなものです。返り値のInputIFにCommandを与えると、それが構築した回路の入力になります。サーバ以外の用途でも、外から入力を受け取りたい場合には使いまわせる関数です。

コメントにも書いてある通り、これでいつでもちゃんと動くという確証はないのでお気をつけください。


Main.hs

{-# LANGUAGE OverloadedStrings #-}

module Main where

import qualified FRPNow.Counter as FRP

import qualified Network.Wai as Wai
import qualified Network.Wai.Handler.Warp as Warp
import qualified Network.HTTP.Types.Status as Status

import qualified Control.Concurrent.MVar as MVar

import qualified Data.ByteString.Lazy.Char8 as LBS
import Data.Monoid ((<>))
import qualified Data.Text as TXT

import System.Timeout (timeout)

main :: IO ()
main = do
let port = 3000
let setting = Warp.setPort port Warp.defaultSettings
putStrLn $ "start server port=" ++ show port
input <- FRP.runCounter
Warp.runSettings setting $ simpleApp input

simpleApp :: FRP.InputIF FRP.Command -> Wai.Application
simpleApp input req respond = do
let path = Wai.pathInfo req
putStrLn $ "----- " ++ show path
case path of
["add", numTxt] -> do
let numStr = TXT.unpack numTxt
num = read numStr
numBS = LBS.pack numStr
input $ FRP.AddCount num
respond $ Wai.responseLBS Status.status200 [] $ "AddCount " <> numBS <> " accepted"
["start"] -> do
input FRP.StartCount
respond $ Wai.responseLBS Status.status200 [] "StartCount accepted"
["stop"] -> do
input FRP.StopCount
respond $ Wai.responseLBS Status.status200 [] "StopCount accepted"
["shutdown"] -> do
input FRP.Shutdown
respond $ Wai.responseLBS Status.status200 [] "Shutdown accepted"
["get"] -> do
mv <- MVar.newEmptyMVar
input $ FRP.GetCount $ MVar.putMVar mv
mcount <- timeout 1000000 $ MVar.takeMVar mv
case mcount of
Just count -> respond . Wai.responseLBS Status.status200 [] . LBS.pack $ show count
Nothing -> respond $ Wai.responseLBS Status.status200 [] "timeout"
_ -> do
respond $ Wai.responseLBS Status.status200 [] "not supported command"


こちらはCounter.hsで実装したコードとサーバをくっつける部分のコードです。

runCounterを実行し、そこにサーバへのリクエストに応じたCommandを投げているだけです。


感想

この内容だとそれなりにきれいに書けた気がしますが、タイミング制御をきちんとやろうとするととたんに面倒になります。

例えば上記のコードでは、秒をカウントするイベントストリームは、内部的にはstartのコマンドを与える以前から動作し続けていて、そのイベントストリームのイベントがstart状態でのみ通るようになっています。

startコマンドを受け取った時点からきっちり1秒を数え始めようと思うと、コマンドを受け取った段階で初めて秒カウントのイベントストリームを生成し、stopコマンドが着たら破棄して、次のstartでまたイベントストリームを作り直す、ということが必要になります。実のところこれをするコードも試しに書いてみたのですが、非常に見難いコードになってしまいました。もしかするとFRPに慣れればうまく書けるのかもしれませんが。

また、そんなにたいした量のコードではありませんが、書くのにけっこうな時間がかかっています。

ドキュメント(例:EvStream)を見ても、どういうときにどういう関数を使えばいいのか一見しただけではわからないものも多く、慣れるまではだいぶ時間がかかりそうに感じました。

関数がもっと体系化されていればわかりやすいのかもしれません。この観点ではFRPNowではなく、 Sodiumあたりがうまくまとまっていそうなイメージをもっていますが、私は触れていません。

さて、では当初の疑問である「サーバのロジックとして使えるのか」というと、正直これだけだとまだなんともいえません。こういうものがうまくハマる場所はあるかもしれませんが、その場合でも慣れるまでに時間がかかる覚悟を持っておいたほうがよさそうな感触を持っています。また、他のライブラリの使用も検討したほうがよさそうです。

ACCESS Advent Calendar、明日は @_mirer さんです。


参考

FRPNowでFizzBuzzなど

Sodium(FRP)を使ってみた