Summary
アクターモデルで重い処理を並列でしょりする
こんなかんじ
たとえば5秒かかるしょりがあるとします
// 5秒かかる処理
let take5sec =
System.Threading.Thread.Sleep 5000
この重い処理を10回おこなうとすると普通は50秒かかるかと思いますが、これを並列で処理すると50秒以下で処理できるようになります。
アクターモデルでの定形コード
// 並列処理する定型コード
// MailboxProcessorは長いのでActorと名前を変更
type Actor<'T> = MailboxProcessor<'T>
// 返信用のチャンネル
type Msg = AsyncReplyChannel<int>
let actorBehavior ( actor:Actor<Msg> ) =
// アクターを使い回すので再帰にしておく
let rec messageLoop() = async {
// メッセージハンドラのトリガー
let! reply = actor.Receive()
// 処理内容(重い処理)
take5sec
// とりあえずスレッドIDを返してみる
reply.Reply System.Threading.Thread.CurrentThread.ManagedThreadId
return! messageLoop()
}
messageLoop()
let futures actorAmount runnerAmount behavior =
// アクターをn個用意する
let roundRobinRouter actorAmount =
List.init actorAmount (fun _ -> Actor<Msg>.Start( behavior ) )
// アクターにラウンドロビンで処理を割り当てる
Seq.initInfinite( fun n -> n % (actorAmount) )
|> Seq.take runnerAmount
|> Seq.map ( fun i -> (roundRobinRouter actorAmount).[i].PostAndAsyncReply(id) )
実行してみる
// アクターを5個用意して10個のランナーを作成する
futures 5 10 actorBehavior
|> Async.Parallel
|> Async.RunSynchronously
|> printfn "%A"
結果
[Loading /Users/callmekohei/tmp/abcabc.fsx]
[|17; 15; 18; 21; 15; 17; 17; 17; 17; 15|]
namespace FSI_0010
val take5sec : unit
type Actor<'T> = MailboxProcessor<'T>
type Msg = AsyncReplyChannel<int>
val actorBehavior : actor:Actor<Msg> -> Async<'a>
val futures :
actorAmount:int ->
runnerAmount:int ->
behavior:(MailboxProcessor<Msg> -> Async<unit>) -> seq<Async<int>>
*** time : 5.197822 s ***
50秒かかるところが5秒で実施できた!