LoginSignup
1
1

More than 5 years have passed since last update.

(F#)重い処理を並列で処理する(Actor Model)

Posted at

Summary

アクターモデルで重い処理を並列でしょりする

こんなかんじ

たとえば5秒かかるしょりがあるとします

// 5秒かかる処理
let take5sec =
    System.Threading.Thread.Sleep 5000

この重い処理を10回おこなうとすると普通は50秒かかるかと思いますが、これを並列で処理すると50秒以下で処理できるようになります。

アクターモデルでの定形コード


// 並列処理する定型コード

// MailboxProcessorは長いのでActorと名前を変更
type Actor<'T> = MailboxProcessor<'T>
// 返信用のチャンネル
type Msg = AsyncReplyChannel<int>

let actorBehavior ( actor:Actor<Msg> ) =
    // アクターを使い回すので再帰にしておく
    let rec messageLoop() = async {
        // メッセージハンドラのトリガー
        let! reply = actor.Receive()
        // 処理内容(重い処理)
        take5sec
        // とりあえずスレッドIDを返してみる
        reply.Reply System.Threading.Thread.CurrentThread.ManagedThreadId
        return! messageLoop()
    }
    messageLoop()

let futures actorAmount runnerAmount behavior =

    // アクターをn個用意する
    let roundRobinRouter actorAmount =
        List.init actorAmount (fun _ -> Actor<Msg>.Start( behavior ) )

    // アクターにラウンドロビンで処理を割り当てる
    Seq.initInfinite( fun n -> n % (actorAmount) )
    |> Seq.take runnerAmount
    |> Seq.map ( fun i -> (roundRobinRouter actorAmount).[i].PostAndAsyncReply(id) )

実行してみる

// アクターを5個用意して10個のランナーを作成する
futures 5 10 actorBehavior
|> Async.Parallel
|> Async.RunSynchronously
|> printfn "%A"

結果


[Loading /Users/callmekohei/tmp/abcabc.fsx]
[|17; 15; 18; 21; 15; 17; 17; 17; 17; 15|]
namespace FSI_0010
  val take5sec : unit
  type Actor<'T> = MailboxProcessor<'T>
  type Msg = AsyncReplyChannel<int>
  val actorBehavior : actor:Actor<Msg> -> Async<'a>
  val futures :
    actorAmount:int ->
      runnerAmount:int ->
        behavior:(MailboxProcessor<Msg> -> Async<unit>) -> seq<Async<int>>


*** time : 5.197822 s ***

50秒かかるところが5秒で実施できた!

参考

Python: Pykka でアクターモデルについて学ぶ

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1