Cloud Haskellは"Towards Haskell in the Cloud" by Jeff Epstein, Andrew Black, and Simon Peyton Jones という論文で提案された、Haskellで分散環境で動くプログラムを書くためのDSLです。その実装としてdistributed-processというライブラリがあります。論文の著者であるJeffを中心に今も活発に開発が続けられており、歴史については公式によくまとまっています。
Basic messaging
Cloud Haskellが採用しているのはErlangに影響されたメッセージパッシング型の通信モデルです。Erlangとの大きな違いはメッセージに型がついているところでしょう。
ping :: Process ()
ping = do
Pong partner <- expect
self <- getSelfPid
send partner (Ping self)
ping
data Ping = Ping ProcessId deriving (Generic, Typeable)
data Pong = Pong ProcessId deriving (Generic, Typeable)
instance Binary Ping
instance Binary Pong
この例は論文から引用してきたものです。pingという関数はPongのメッセージが送られてくるのを待ち、受け取ったら送り元にPingのメッセージを返すという動作を繰り返すプロセスの例です。
expect :: Serializable a => Process a
expectの型は上記のようになっており、他のプロセスから送られてきたメッセージを受け取ることができます。注目したいのは受け取るメッセージの型が決められているところで、上の例の場合aとしてPongが推論されるためPongメッセージが来るまで待ち続けます。つまりexpectでブロックしているタイミングでPingメッセージが来ても処理が進むことはなく、確実にPongのメッセージを受け取ることができます。(複数のメッセージを受け取る方法は後で紹介します。)
send :: Serializable a => ProcessId -> a -> Process ()
sendの型はこの様になっていて、Serializableのインスタンスであれば何でもメッセージとして送ることができます。
class (Binary a, Typeable a) => Serializable a
instance (Binary a, Typeable a) => Serializable a
Serializableはこのように定義されており、BinaryとTypeableのインスタンスであればSerializableのインスタンスとなる事がわかります。
本来であればプロセス間の通信やノード間の通信にどういったプロトコルを使うかはバイナリのレベルでちゃんと設計して実装しなければいけませんが、Cloud Haskellはそこを抽象化してくれており、開発者はProcessモナドに包まれた処理を記述してSerializableのインスタンスとなるようなHaskellの値をメッセージとして送りあうコードを書くだけで、複数のプロセス・複数のノードで分散して動くようなプログラムを簡単に実装することができるのです。
以下にCloud Haskellで提供されているインターフェースの一覧を載せておきます(論文より抜粋、一部distributed-processに合わせて修正)。
-----------------------------
-- Basic messaging
-----------------------------
instance Monad Process
instance MonadIO Process
send :: Serializable a => ProcessId -> a -> Process ()
expect :: Serializable a => Process a
-----------------------------
-- Advanced messaging
-----------------------------
data Match b
receiveWait :: [Match b] -> Process b
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
match :: Serializable a => (a -> Process b) -> Match b
matchIf :: Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
matchUnknown :: Process b -> Match b
-----------------------------
-- Channels
-----------------------------
newChan :: Serializable a => Process (SendPort a, ReceivePort a)
sendChan :: Serializable a => SendPort a -> a -> Process ()
receiveChan :: Serializable a => ReceivePort a -> Process a
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
-----------------------------
-- Process management
-----------------------------
spawn :: NodeId -> Closure (Process ()) -> Process ProcessId
terminate :: Process a
getSelfPid :: Process ProcessId
getSelfNode :: Process NodeId
-----------------------------
-- Process monitoring
-----------------------------
link :: ProcessId -> Process ()
monitor :: ProcessId -> Process MonitorRef
Advanced messaging
matchとreceiveWaitを組み合わせれば複数の型のメッセージを同時に待つことができます。例えば足し算と掛け算とその結果を表す型を以下のように定義します。
data Add = Add ProcessId Double Double deriving (Generic, Typeable)
data Multiply = Multiply ProcessId Double Double deriving (Generic, Typeable)
data Result = Result Double deriving (Generic, Typeable)
instance Binary Add
instance Binary Multiply
instance Binary Result
このAddとMultiplyのメッセージを待って結果を返すようなプロセスは以下のように書くことができます。
calc :: Process ()
calc = forever $
receiveWait [ match $ \(Add pid x y) -> send pid (Result (x + y))
, match $ \(Multiply pid x y) -> send pid (Result (x * y))
]
matchIfを使えば受け取ったメッセージが第一引数の条件を満たすときだけ処理を行うといったことも可能です。
Cloud Haskellではこのようにメッセージごとに型を定義します。例えばAddとMutilplyは、
data MathOp = Add ...
| Multiply ...
のように和型として書けるかもしれませんが、expectやreceiveWaitが型で受け取るメッセージを振り分けていることを考えると得策ではありません。更に和型に新しいメッセージを追加したとすると分散している全てのノードのプログラムを一斉に書き換える必要が出てくるため変更に対して弱くなってしまいます。
Channels
前述のPingやAddなどのメッセージでは別のメッセージを送り返すために送り元のProcessIdがメッセージに含まれていました。Cloud Haskellにはチャネルという概念があり送信チャネルと受信チャネルと作ることでProcessIdを引き回す工夫をする必要がなくなり、更にメッセージの型を固定することもできます。
例えばpingの例は
ping2 :: SendPort Ping2 -> ReceivePort Pong2 -> Process ()
ping2 pingout pongin = do
receiveChan pongin
sendChan pingout Ping2
ping2 pingout pongin
data Ping2 = Ping2 deriving (Generic, Typeable)
data Pong2 = Pong2 deriving (Generic, Typeable)
instance Binary Ping2
instance Binary Pong2
のように書くことができるでしょう。
ポイントはメッセージに ProcessId を含める必要がなくなったことと、SendPort Ping2, ReceivePort Pong2のように書く送受信チャネルでどの種類のメッセージが送られてくるのかを方で表現することができるところです。
またSendPortはSerializableのインスタンスになっているため、メッセージに含めて他のプロセスに送信することも可能です。
Process management
spawnはProcessの値を実行するための関数です。
spawn :: NodeId -> Closure (Process ()) -> Process ProcessId
改めてspawnの型を見ると、実行するプロセスはClosureという型に包まれています。実はspawnはNodeIdに別のノードのIDを指定することができて、つまり自分のノードで作成したプロセスを別のノードに実行させることが可能です。この時に、作成したプロセスの中に自分の環境への参照(例えばプロセスの外で定義ている変数)が残されたままだと別のノード上でうまく実行することができません。このようなプロセスが参照している環境をまとめて詰め込んだものがClosureというわけです。
実際にClosureの値を作るときはTemplate Haskellを使います。
ping :: Process ()
ping = do
Pong partner <- expect
self <- getSelfPid
send partner (Ping self)
ping
remotable ['ping]
main = do
...
...
spawn nid $(mkStaticClosure 'ping)
別のノードで実行したい関数を予めremotableで宣言しておくのがポイントです。今回の例ではpingは引数を取らなかったのでmkStaticClosureを使いましたが、引数を取るプロセスをClosureに包むときはmkClosureを使います。
Control.Distributed.Process.Node
ところで上の例では...として誤魔化しましたが、実際にProcessを実行する処理を一回書いてみましょう。Control.Distributed.Process.Nodeを使えば、ローカルのノードでプロセスを立ち上げることができます。以下にping/pongの完全なExampleを載せます。
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE TemplateHaskell #-}
import Control.Concurrent (threadDelay)
import Data.Binary
import Data.Typeable (Typeable)
import GHC.Generics (Generic)
import Control.Distributed.Process
import Control.Distributed.Process.Node
import Network.Transport.TCP (createTransport, defaultTCPParameters)
-------------------------
-- Messages
-------------------------
data Ping = Ping ProcessId deriving (Generic, Typeable)
data Pong = Pong ProcessId deriving (Generic, Typeable)
instance Binary Ping
instance Binary Pong
-------------------------
-- Processes
-------------------------
ping :: Process ()
ping = do
Pong partner <- expect
say "received Pong"
liftIO $ threadDelay 2000000
self <- getSelfPid
send partner (Ping self)
ping
pong :: Process ()
pong = do
Ping partner <- expect
say "received Ping"
self <- getSelfPid
send partner (Pong self)
pong
main :: IO ()
main = do
let (host, port) = ("127.0.0.1", "8000")
Right transport <- createTransport host port ((,) host) defaultTCPParameters
localNode <- newLocalNode transport initRemoteTable
runProcess localNode $ do
pingPID <- spawnLocal ping
pongPID <- spawnLocal pong
send pingPID (Ping pongPID)
receiveWait []
newLocalNodeを使ってノードを立ち上げrunProcessを使ってそのノード上でプロセスを実行しています。
distributed-process-simplelocalnet
distributed-process-simplelocalnetはdistributed-processに依存したライブラリで、ローカルネットにSlaveとなるノードを立てておくとそれらのノードに対してマスターノードからプロセスを送って分散実行することができる仕組みを簡単に実装することができます。UDPマルチキャストによるローカルネットでのノードの探索とマスタースレイブの仕組みを提供してくれているライブラリです。
master :: Backend -> [NodeId] -> Process ()
master backend slaves = do
-- Do something interesting with the slaves
liftIO . putStrLn $ "Slaves: " ++ show slaves
-- Terminate the slaves when the master terminates (this is optional)
terminateAllSlaves backend
main :: IO ()
main = do
args <- getArgs
case args of
["master", host, port] -> do
backend <- initializeBackend host port initRemoteTable
startMaster backend (master backend)
["slave", host, port] -> do
backend <- initializeBackend host port initRemoteTable
startSlave backend
これはControl.Distributed.Process.Backend.SimpleLocalnetに記載されている例です。Slaveとして実行されたときはstartSlaveを実行し、単純な待機状態となります。Slaveをいくつか立ち上げた後にMasterを実行すると、master関数の引数にSlaveのNodeIdが渡され、そのNodeIdを使ってSlaveノードにプロセスをspawnしたりすることができます。例では単純にterminateAllSlavesを実行して待機状態のSlaveをkillしています。
まとめ
CloudHaskellを使えば分散環境で動作するプログラムを簡単に作れることがわかりました。distributed-process系のライブラリは他にもdistributed-process-client-serverやdistributed-process-p2pなどがあり名前の通りのネットワークが楽に作れるようになっています。
ところで、この記事を書いてる途中で検証していてdistributed-process-simplelocalnetを使ってSlaveノードにプロセスを実行させるのには成功したのですが、自分でforkIOを使って複数のLocalNodeを立ててプロセスをspawnさせるのには失敗してしまいました。そもそもリモートのノードに任意のプロセスを実行させる機能は信頼できないP2P環境などであればOffにしたいと思うのでこの挙動はありがたいのですが、どういう条件であればspawnさせることができるのか誰か詳しい人がいたらコメント等で教えていただけると幸いです ![]()
