Cloud Haskellは"Towards Haskell in the Cloud" by Jeff Epstein, Andrew Black, and Simon Peyton Jones という論文で提案された、Haskellで分散環境で動くプログラムを書くためのDSLです。その実装としてdistributed-processというライブラリがあります。論文の著者であるJeffを中心に今も活発に開発が続けられており、歴史については公式によくまとまっています。
Basic messaging
Cloud Haskellが採用しているのはErlangに影響されたメッセージパッシング型の通信モデルです。Erlangとの大きな違いはメッセージに型がついているところでしょう。
ping :: Process ()
ping = do
Pong partner <- expect
self <- getSelfPid
send partner (Ping self)
ping
data Ping = Ping ProcessId deriving (Generic, Typeable)
data Pong = Pong ProcessId deriving (Generic, Typeable)
instance Binary Ping
instance Binary Pong
この例は論文から引用してきたものです。ping
という関数はPong
のメッセージが送られてくるのを待ち、受け取ったら送り元にPing
のメッセージを返すという動作を繰り返すプロセスの例です。
expect :: Serializable a => Process a
expect
の型は上記のようになっており、他のプロセスから送られてきたメッセージを受け取ることができます。注目したいのは受け取るメッセージの型が決められているところで、上の例の場合a
としてPong
が推論されるためPong
メッセージが来るまで待ち続けます。つまりexpect
でブロックしているタイミングでPing
メッセージが来ても処理が進むことはなく、確実にPong
のメッセージを受け取ることができます。(複数のメッセージを受け取る方法は後で紹介します。)
send :: Serializable a => ProcessId -> a -> Process ()
send
の型はこの様になっていて、Serializable
のインスタンスであれば何でもメッセージとして送ることができます。
class (Binary a, Typeable a) => Serializable a
instance (Binary a, Typeable a) => Serializable a
Serializable
はこのように定義されており、Binary
とTypeable
のインスタンスであればSerializable
のインスタンスとなる事がわかります。
本来であればプロセス間の通信やノード間の通信にどういったプロトコルを使うかはバイナリのレベルでちゃんと設計して実装しなければいけませんが、Cloud Haskell
はそこを抽象化してくれており、開発者はProcess
モナドに包まれた処理を記述してSerializable
のインスタンスとなるようなHaskellの値をメッセージとして送りあうコードを書くだけで、複数のプロセス・複数のノードで分散して動くようなプログラムを簡単に実装することができるのです。
以下にCloud Haskellで提供されているインターフェースの一覧を載せておきます(論文より抜粋、一部distributed-process
に合わせて修正)。
-----------------------------
-- Basic messaging
-----------------------------
instance Monad Process
instance MonadIO Process
send :: Serializable a => ProcessId -> a -> Process ()
expect :: Serializable a => Process a
-----------------------------
-- Advanced messaging
-----------------------------
data Match b
receiveWait :: [Match b] -> Process b
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
match :: Serializable a => (a -> Process b) -> Match b
matchIf :: Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
matchUnknown :: Process b -> Match b
-----------------------------
-- Channels
-----------------------------
newChan :: Serializable a => Process (SendPort a, ReceivePort a)
sendChan :: Serializable a => SendPort a -> a -> Process ()
receiveChan :: Serializable a => ReceivePort a -> Process a
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
-----------------------------
-- Process management
-----------------------------
spawn :: NodeId -> Closure (Process ()) -> Process ProcessId
terminate :: Process a
getSelfPid :: Process ProcessId
getSelfNode :: Process NodeId
-----------------------------
-- Process monitoring
-----------------------------
link :: ProcessId -> Process ()
monitor :: ProcessId -> Process MonitorRef
Advanced messaging
match
とreceiveWait
を組み合わせれば複数の型のメッセージを同時に待つことができます。例えば足し算と掛け算とその結果を表す型を以下のように定義します。
data Add = Add ProcessId Double Double deriving (Generic, Typeable)
data Multiply = Multiply ProcessId Double Double deriving (Generic, Typeable)
data Result = Result Double deriving (Generic, Typeable)
instance Binary Add
instance Binary Multiply
instance Binary Result
このAdd
とMultiply
のメッセージを待って結果を返すようなプロセスは以下のように書くことができます。
calc :: Process ()
calc = forever $
receiveWait [ match $ \(Add pid x y) -> send pid (Result (x + y))
, match $ \(Multiply pid x y) -> send pid (Result (x * y))
]
matchIf
を使えば受け取ったメッセージが第一引数の条件を満たすときだけ処理を行うといったことも可能です。
Cloud Haskellではこのようにメッセージごとに型を定義します。例えばAdd
とMutilply
は、
data MathOp = Add ...
| Multiply ...
のように和型として書けるかもしれませんが、expect
やreceiveWait
が型で受け取るメッセージを振り分けていることを考えると得策ではありません。更に和型に新しいメッセージを追加したとすると分散している全てのノードのプログラムを一斉に書き換える必要が出てくるため変更に対して弱くなってしまいます。
Channels
前述のPing
やAdd
などのメッセージでは別のメッセージを送り返すために送り元のProcessId
がメッセージに含まれていました。Cloud Haskellにはチャネルという概念があり送信チャネルと受信チャネルと作ることでProcessId
を引き回す工夫をする必要がなくなり、更にメッセージの型を固定することもできます。
例えばping
の例は
ping2 :: SendPort Ping2 -> ReceivePort Pong2 -> Process ()
ping2 pingout pongin = do
receiveChan pongin
sendChan pingout Ping2
ping2 pingout pongin
data Ping2 = Ping2 deriving (Generic, Typeable)
data Pong2 = Pong2 deriving (Generic, Typeable)
instance Binary Ping2
instance Binary Pong2
のように書くことができるでしょう。
ポイントはメッセージに ProcessId
を含める必要がなくなったことと、SendPort Ping2
, ReceivePort Pong2
のように書く送受信チャネルでどの種類のメッセージが送られてくるのかを方で表現することができるところです。
またSendPort
はSerializable
のインスタンスになっているため、メッセージに含めて他のプロセスに送信することも可能です。
Process management
spawn
はProcess
の値を実行するための関数です。
spawn :: NodeId -> Closure (Process ()) -> Process ProcessId
改めてspawn
の型を見ると、実行するプロセスはClosure
という型に包まれています。実はspawn
はNodeId
に別のノードのIDを指定することができて、つまり自分のノードで作成したプロセスを別のノードに実行させることが可能です。この時に、作成したプロセスの中に自分の環境への参照(例えばプロセスの外で定義ている変数)が残されたままだと別のノード上でうまく実行することができません。このようなプロセスが参照している環境をまとめて詰め込んだものがClosure
というわけです。
実際にClosure
の値を作るときはTemplate Haskellを使います。
ping :: Process ()
ping = do
Pong partner <- expect
self <- getSelfPid
send partner (Ping self)
ping
remotable ['ping]
main = do
...
...
spawn nid $(mkStaticClosure 'ping)
別のノードで実行したい関数を予めremotable
で宣言しておくのがポイントです。今回の例ではping
は引数を取らなかったのでmkStaticClosure
を使いましたが、引数を取るプロセスをClosure
に包むときはmkClosure
を使います。
Control.Distributed.Process.Node
ところで上の例では...
として誤魔化しましたが、実際にProcess
を実行する処理を一回書いてみましょう。Control.Distributed.Process.Node
を使えば、ローカルのノードでプロセスを立ち上げることができます。以下にping/pongの完全なExampleを載せます。
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE TemplateHaskell #-}
import Control.Concurrent (threadDelay)
import Data.Binary
import Data.Typeable (Typeable)
import GHC.Generics (Generic)
import Control.Distributed.Process
import Control.Distributed.Process.Node
import Network.Transport.TCP (createTransport, defaultTCPParameters)
-------------------------
-- Messages
-------------------------
data Ping = Ping ProcessId deriving (Generic, Typeable)
data Pong = Pong ProcessId deriving (Generic, Typeable)
instance Binary Ping
instance Binary Pong
-------------------------
-- Processes
-------------------------
ping :: Process ()
ping = do
Pong partner <- expect
say "received Pong"
liftIO $ threadDelay 2000000
self <- getSelfPid
send partner (Ping self)
ping
pong :: Process ()
pong = do
Ping partner <- expect
say "received Ping"
self <- getSelfPid
send partner (Pong self)
pong
main :: IO ()
main = do
let (host, port) = ("127.0.0.1", "8000")
Right transport <- createTransport host port ((,) host) defaultTCPParameters
localNode <- newLocalNode transport initRemoteTable
runProcess localNode $ do
pingPID <- spawnLocal ping
pongPID <- spawnLocal pong
send pingPID (Ping pongPID)
receiveWait []
newLocalNode
を使ってノードを立ち上げrunProcess
を使ってそのノード上でプロセスを実行しています。
distributed-process-simplelocalnet
distributed-process-simplelocalnet
はdistributed-process
に依存したライブラリで、ローカルネットにSlaveとなるノードを立てておくとそれらのノードに対してマスターノードからプロセスを送って分散実行することができる仕組みを簡単に実装することができます。UDPマルチキャストによるローカルネットでのノードの探索とマスタースレイブの仕組みを提供してくれているライブラリです。
master :: Backend -> [NodeId] -> Process ()
master backend slaves = do
-- Do something interesting with the slaves
liftIO . putStrLn $ "Slaves: " ++ show slaves
-- Terminate the slaves when the master terminates (this is optional)
terminateAllSlaves backend
main :: IO ()
main = do
args <- getArgs
case args of
["master", host, port] -> do
backend <- initializeBackend host port initRemoteTable
startMaster backend (master backend)
["slave", host, port] -> do
backend <- initializeBackend host port initRemoteTable
startSlave backend
これはControl.Distributed.Process.Backend.SimpleLocalnetに記載されている例です。Slaveとして実行されたときはstartSlave
を実行し、単純な待機状態となります。Slaveをいくつか立ち上げた後にMasterを実行すると、master
関数の引数にSlaveのNodeId
が渡され、そのNodeId
を使ってSlaveノードにプロセスをspawn
したりすることができます。例では単純にterminateAllSlaves
を実行して待機状態のSlaveをkillしています。
まとめ
CloudHaskellを使えば分散環境で動作するプログラムを簡単に作れることがわかりました。distributed-process
系のライブラリは他にもdistributed-process-client-serverやdistributed-process-p2pなどがあり名前の通りのネットワークが楽に作れるようになっています。
ところで、この記事を書いてる途中で検証していてdistributed-process-simplelocalnet
を使ってSlaveノードにプロセスを実行させるのには成功したのですが、自分でforkIOを使って複数のLocalNodeを立ててプロセスをspawnさせるのには失敗してしまいました。そもそもリモートのノードに任意のプロセスを実行させる機能は信頼できないP2P環境などであればOffにしたいと思うのでこの挙動はありがたいのですが、どういう条件であればspawnさせることができるのか誰か詳しい人がいたらコメント等で教えていただけると幸いです