Haskell
分散処理

Cloud Haskellについて調べてみた

Cloud Haskellは"Towards Haskell in the Cloud" by Jeff Epstein, Andrew Black, and Simon Peyton Jones という論文で提案された、Haskellで分散環境で動くプログラムを書くためのDSLです。その実装としてdistributed-processというライブラリがあります。論文の著者であるJeffを中心に今も活発に開発が続けられており、歴史については公式によくまとまっています

Cloud Haskell 公式サイト

Basic messaging

Cloud Haskellが採用しているのはErlangに影響されたメッセージパッシング型の通信モデルです。Erlangとの大きな違いはメッセージに型がついているところでしょう。

ping :: Process ()
ping = do
  Pong partner <- expect
  self <- getSelfPid
  send partner (Ping self)
  ping


data Ping = Ping ProcessId deriving (Generic, Typeable)

data Pong = Pong ProcessId deriving (Generic, Typeable)

instance Binary Ping
instance Binary Pong

この例は論文から引用してきたものです。pingという関数はPongのメッセージが送られてくるのを待ち、受け取ったら送り元にPingのメッセージを返すという動作を繰り返すプロセスの例です。

expect :: Serializable a => Process a

expectの型は上記のようになっており、他のプロセスから送られてきたメッセージを受け取ることができます。注目したいのは受け取るメッセージの型が決められているところで、上の例の場合aとしてPongが推論されるためPongメッセージが来るまで待ち続けます。つまりexpectでブロックしているタイミングでPingメッセージが来ても処理が進むことはなく、確実にPongのメッセージを受け取ることができます。(複数のメッセージを受け取る方法は後で紹介します。)

send :: Serializable a => ProcessId -> a -> Process ()

sendの型はこの様になっていて、Serializableのインスタンスであれば何でもメッセージとして送ることができます。

class (Binary a, Typeable a) => Serializable a

instance (Binary a, Typeable a) => Serializable a

Serializableはこのように定義されており、BinaryTypeableのインスタンスであればSerializableのインスタンスとなる事がわかります。

本来であればプロセス間の通信やノード間の通信にどういったプロトコルを使うかはバイナリのレベルでちゃんと設計して実装しなければいけませんが、Cloud Haskellはそこを抽象化してくれており、開発者はProcessモナドに包まれた処理を記述してSerializableのインスタンスとなるようなHaskellの値をメッセージとして送りあうコードを書くだけで、複数のプロセス・複数のノードで分散して動くようなプログラムを簡単に実装することができるのです。

以下にCloud Haskellで提供されているインターフェースの一覧を載せておきます(論文より抜粋、一部distributed-processに合わせて修正)。

-----------------------------
-- Basic messaging
-----------------------------

instance Monad Process

instance MonadIO Process

send :: Serializable a => ProcessId -> a -> Process ()

expect :: Serializable a => Process a

-----------------------------
-- Advanced messaging
-----------------------------

data Match b

receiveWait :: [Match b] -> Process b

receiveTimeout :: Int -> [Match b] -> Process (Maybe b)

match :: Serializable a => (a -> Process b) -> Match b

matchIf :: Serializable a => (a -> Bool) -> (a -> Process b) -> Match b

matchUnknown :: Process b -> Match b

-----------------------------
-- Channels
-----------------------------

newChan :: Serializable a => Process (SendPort a, ReceivePort a)

sendChan :: Serializable a => SendPort a -> a -> Process ()

receiveChan :: Serializable a => ReceivePort a -> Process a

mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)

mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)

-----------------------------
-- Process management
-----------------------------

spawn :: NodeId -> Closure (Process ()) -> Process ProcessId

terminate :: Process a

getSelfPid :: Process ProcessId

getSelfNode :: Process NodeId

-----------------------------
-- Process monitoring
-----------------------------

link :: ProcessId -> Process ()

monitor :: ProcessId -> Process MonitorRef

Advanced messaging

matchreceiveWaitを組み合わせれば複数の型のメッセージを同時に待つことができます。例えば足し算と掛け算とその結果を表す型を以下のように定義します。

data Add = Add ProcessId Double Double deriving (Generic, Typeable)

data Multiply = Multiply ProcessId Double Double deriving (Generic, Typeable)

data Result = Result Double deriving (Generic, Typeable)

instance Binary Add
instance Binary Multiply
instance Binary Result

このAddMultiplyのメッセージを待って結果を返すようなプロセスは以下のように書くことができます。

calc :: Process ()
calc = forever $
  receiveWait [ match $ \(Add      pid x y) -> send pid (Result (x + y))
              , match $ \(Multiply pid x y) -> send pid (Result (x * y))
              ]

matchIfを使えば受け取ったメッセージが第一引数の条件を満たすときだけ処理を行うといったことも可能です。

Cloud Haskellではこのようにメッセージごとに型を定義します。例えばAddMutilplyは、

data MathOp = Add ...
            | Multiply ...

のように和型として書けるかもしれませんが、expectreceiveWaitが型で受け取るメッセージを振り分けていることを考えると得策ではありません。更に和型に新しいメッセージを追加したとすると分散している全てのノードのプログラムを一斉に書き換える必要が出てくるため変更に対して弱くなってしまいます。

Channels

前述のPingAddなどのメッセージでは別のメッセージを送り返すために送り元のProcessIdがメッセージに含まれていました。Cloud Haskellにはチャネルという概念があり送信チャネルと受信チャネルと作ることでProcessIdを引き回す工夫をする必要がなくなり、更にメッセージの型を固定することもできます。

例えばpingの例は

ping2 :: SendPort Ping2 -> ReceivePort Pong2 -> Process ()
ping2 pingout pongin = do
  receiveChan pongin
  sendChan pingout Ping2
  ping2 pingout pongin

data Ping2 = Ping2 deriving (Generic, Typeable)

data Pong2 = Pong2 deriving (Generic, Typeable)

instance Binary Ping2
instance Binary Pong2

のように書くことができるでしょう。

ポイントはメッセージに ProcessId を含める必要がなくなったことと、SendPort Ping2, ReceivePort Pong2のように書く送受信チャネルでどの種類のメッセージが送られてくるのかを方で表現することができるところです。

またSendPortSerializableのインスタンスになっているため、メッセージに含めて他のプロセスに送信することも可能です。

Process management

spawnProcessの値を実行するための関数です。

spawn :: NodeId -> Closure (Process ()) -> Process ProcessId

改めてspawnの型を見ると、実行するプロセスはClosureという型に包まれています。実はspawnNodeIdに別のノードのIDを指定することができて、つまり自分のノードで作成したプロセスを別のノードに実行させることが可能です。この時に、作成したプロセスの中に自分の環境への参照(例えばプロセスの外で定義ている変数)が残されたままだと別のノード上でうまく実行することができません。このようなプロセスが参照している環境をまとめて詰め込んだものがClosureというわけです。

実際にClosureの値を作るときはTemplate Haskellを使います。

ping :: Process ()
ping = do
  Pong partner <- expect
  self <- getSelfPid
  send partner (Ping self)
  ping


remotable ['ping]


main = do
  ...
    ...
    spawn nid $(mkStaticClosure 'ping)

別のノードで実行したい関数を予めremotableで宣言しておくのがポイントです。今回の例ではpingは引数を取らなかったのでmkStaticClosureを使いましたが、引数を取るプロセスをClosureに包むときはmkClosureを使います。

Control.Distributed.Process.Node

ところで上の例では...として誤魔化しましたが、実際にProcessを実行する処理を一回書いてみましょう。Control.Distributed.Process.Nodeを使えば、ローカルのノードでプロセスを立ち上げることができます。以下にping/pongの完全なExampleを載せます。

{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE TemplateHaskell #-}

import Control.Concurrent (threadDelay)
import Data.Binary
import Data.Typeable (Typeable)
import GHC.Generics (Generic)

import Control.Distributed.Process
import Control.Distributed.Process.Node
import Network.Transport.TCP (createTransport, defaultTCPParameters)

-------------------------
-- Messages
-------------------------

data Ping = Ping ProcessId deriving (Generic, Typeable)

data Pong = Pong ProcessId deriving (Generic, Typeable)

instance Binary Ping
instance Binary Pong

-------------------------
-- Processes
-------------------------

ping :: Process ()
ping = do
  Pong partner <- expect
  say "received Pong"
  liftIO $ threadDelay 2000000
  self <- getSelfPid
  send partner (Ping self)
  ping


pong :: Process ()
pong = do
  Ping partner <- expect
  say "received Ping"
  self <- getSelfPid
  send partner (Pong self)
  pong


main :: IO ()
main = do
  let (host, port) = ("127.0.0.1", "8000")
  Right transport <- createTransport host port ((,) host) defaultTCPParameters
  localNode <- newLocalNode transport initRemoteTable
  runProcess localNode $ do
    pingPID <- spawnLocal ping
    pongPID <- spawnLocal pong
    send pingPID (Ping pongPID)
    receiveWait []

newLocalNodeを使ってノードを立ち上げrunProcessを使ってそのノード上でプロセスを実行しています。

distributed-process-simplelocalnet

distributed-process-simplelocalnetdistributed-processに依存したライブラリで、ローカルネットにSlaveとなるノードを立てておくとそれらのノードに対してマスターノードからプロセスを送って分散実行することができる仕組みを簡単に実装することができます。UDPマルチキャストによるローカルネットでのノードの探索とマスタースレイブの仕組みを提供してくれているライブラリです。

master :: Backend -> [NodeId] -> Process ()
master backend slaves = do
  -- Do something interesting with the slaves
  liftIO . putStrLn $ "Slaves: " ++ show slaves
  -- Terminate the slaves when the master terminates (this is optional)
  terminateAllSlaves backend

main :: IO ()
main = do
  args <- getArgs

  case args of
    ["master", host, port] -> do
      backend <- initializeBackend host port initRemoteTable
      startMaster backend (master backend)
    ["slave", host, port] -> do
      backend <- initializeBackend host port initRemoteTable
      startSlave backend

これはControl.Distributed.Process.Backend.SimpleLocalnetに記載されている例です。Slaveとして実行されたときはstartSlaveを実行し、単純な待機状態となります。Slaveをいくつか立ち上げた後にMasterを実行すると、master関数の引数にSlaveのNodeIdが渡され、そのNodeIdを使ってSlaveノードにプロセスをspawnしたりすることができます。例では単純にterminateAllSlavesを実行して待機状態のSlaveをkillしています。

まとめ

CloudHaskellを使えば分散環境で動作するプログラムを簡単に作れることがわかりました。distributed-process系のライブラリは他にもdistributed-process-client-serverdistributed-process-p2pなどがあり名前の通りのネットワークが楽に作れるようになっています。

ところで、この記事を書いてる途中で検証していてdistributed-process-simplelocalnetを使ってSlaveノードにプロセスを実行させるのには成功したのですが、自分でforkIOを使って複数のLocalNodeを立ててプロセスをspawnさせるのには失敗してしまいました。そもそもリモートのノードに任意のプロセスを実行させる機能は信頼できないP2P環境などであればOffにしたいと思うのでこの挙動はありがたいのですが、どういう条件であればspawnさせることができるのか誰か詳しい人がいたらコメント等で教えていただけると幸いです :pray: