0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

[Rust] actixを使ってアクターモデルを実装してみた

Last updated at Posted at 2022-07-02

はじめに

actixを使ってアクターモデリングを実装している。
アクターモデルのライブラリといえばakkaが有名だ。
実際にakkaを使ってアクターモデル実装をしたことがあるのでその内容をRustを使って書き直してakkaとactixでどのような差分があるのかなどをみてみる。

アクターモデルとは

こちらを参照

actix以外の候補

redditのスレッドにRust製アクターモデル実装ライブラリをまとめたものがあった。

github starが1番多かったのがactixなのでactixを使ってみた。

実行環境

macOS Big Sur
cargo 1.61.0 (a028ae42f 2022-04-29)
Rust edition 2021

前提

webframeworkとして、axumを使用している

アクターの全体構成

特にこの構成にした意味はなく、実装したい内容を組み込むとこの構成になった。

supervisor actor ----> search actor  --> search engine
              ↑
              ↑
              ↑
timer actor --↑
  • supervisor actorはsearch actorを作成したり等の監視を行う
  • timer actorは、search actorが処理するものがなくなったらストップさせるために監視目的で定期的にsupervisor actorにメッセージを送っている。akkaではwatchWithみたいな監視用の便利なメソッドがあったが、actixにはなさそうだったので自前で監視する

実装例

以下githubリポジトリに全体のコードを置いている。
https://github.com/kinshotomoya/web-server

まずsupervisor actorとtimer actorを起動

// プロセス内で共有するモジュールを格納する
pub struct Modules {
    // 各ユースケースインスタンス
    pub project_usecase: ProjectUsecase<RepositoryImpl>,
    pub actor_usecase: ActorUsecase,
}

impl Modules {
    pub async fn new(settings: &Settings) -> Modules {
        let mysql_client: Arc<MysqlClient> = Arc::new(MysqlClient::new(settings));
        let repository_modules: RepositoryImpl = RepositoryImpl::new(mysql_client);
        let project_usecase: ProjectUsecase<RepositoryImpl> =
            ProjectUsecase::new(repository_modules);
        // ↓こいつ
        let mut supervisor_actor = SuperVisorActor::new();
        let message = supervisor_actor.initializing();
        let supervisor_actor = Arc::new(Supervisor::start(|_| supervisor_actor));
        // ↓こいつ
        let mut timer_actor = TimerActor::new();
        let timer_actor = Supervisor::start(|_| timer_actor);
        timer_actor
            .send(Message::CheckSearchActor {
                supervisor_actor_address: Arc::clone(&supervisor_actor),
            })
            .await;
        supervisor_actor
            .send(message)
            .await
            .expect("mailbox error")
            .expect("can not initialize supervisor actor");

        let actor_usecase: ActorUsecase = ActorUsecase::new(supervisor_actor);
        Self {
            project_usecase,
            actor_usecase,
        }
    }
}

supervisor actorの構成

FSMにしたいと思ったので、自分自身の状態を保持
子アクターであるsearch actorを管理するために作成したsearch actorを保持

pub struct SuperVisorActor {
    // 状態を持っておけば、finite state machineにできる
    state: State,
    child_actors: Arc<Mutex<HashMap<u64, Addr<SearchActor>>>>, // NOTE: Arcで包んだ参照を可変参照にするには、Mutexで包んであげる必要がある。lockして他のスレッドから参照されないようにする必要がある
}

timer actorの構成

定期的にsearch actorに生存確認メッセージを投げるために、intervalを保持

pub struct TimerActor {
    interval: Duration,
}

supervisor actorにメッセージを投げる

sendメソッドで先ほど作成したsupervisor actorにメッセージを投げている
非同期なので、async fnにする必要がある

pub struct ActorUsecase {
    supervisor_actor: Arc<Addr<SuperVisorActor>>,
}

impl ActorUsecase {
    // TODO: Resultが二重になっているのでflatにする
    pub async fn execute_actor(&self, project_id: u64) -> Result<Result<(), Error>, Error> {
        let res: Result<Result<(), Error>, Error> = self
            .supervisor_actor
            .send(Message::StartSearch { project_id })
            .await
            .map_err(|e| Error::SupervisorActorMailBoxError(e.to_string()));
        debug!("{:?}", res);
        res
    }

    pub fn new(supervisor_actor: Arc<Addr<SuperVisorActor>>) -> Self {
        Self { supervisor_actor }
    }
}

supervisor actorでメッセージを受け取る

  • actixでは、受け取ったメッセージ毎にレスポンス型を厳格に定義しないといけないので、別のメッセージを受け取りたい場合には別のHandlerを定義する必要がある。
  • FSMにしたいもがきが↓コードから見て取れる
  • cloneはしたくなかったので、Arcを駆使している
impl Handler<Message> for SuperVisorActor {
    // acto・コンテキスト内にアクセスしたい場合には、ResponseFutureではなくResponseActFutureを戻り方として定義する
    type Result = ResponseActFuture<Self, Result<(), Error>>;

    // SuperVisorActorが受け取ったメッセージ毎にこのhandleメソッドが呼ばれる
    fn handle(&mut self, msg: Message, ctx: &mut Self::Context) -> Self::Result {
        let ctx_address = Arc::new(ctx.address());
        let child_actors = Arc::clone(&self.child_actors);
        // actorの状態によって処理を変える
        match self.state {
            State::Idle => {
                self.state = State::Active;
                Box::pin(
                    async move { Ok(()) }
                        .into_actor(self)
                        .map(|res, _act, _ctx| {
                            // 自分自身に同じメッセージをもう一回投げている
                            // ResponseActFutureを使うことで、↓のようにcontextを利用できる
                            _ctx.notify(msg);
                            res
                        }),
                )
            }
            State::Active => {
                // https://users.rust-lang.org/t/actix-await-for-send-in-handle/47844
                // https://github.com/actix/actix/issues/438
                Box::pin(
                    async move {
                        // ここで何かしらの非同期処理を行う
                        SuperVisorActor::execute_message(msg, ctx_address, child_actors).await
                    }
                    .into_actor(self)
                    .map(|res, act, _ctx| res),
                )
            }
        }
    }
}

↑で呼んでいるexecute_messageメソッドがこれ

  • 保持しているハッシュマップに対象search actorが存在しない場合には、search actorを新規で作成・起動している
  • ハッシュマップに存在していればハッシュマップからsearch actorのaddressを取得しメッセージを投げる
    async fn execute_message(
        msg: Message,
        reply_to: Arc<Addr<SuperVisorActor>>,
        child_actors: Arc<Mutex<HashMap<u64, Addr<SearchActor>>>>,
    ) -> Result<(), Error> {
        match msg {
            Message::StartSearch { project_id } => {
                // 子アクターをすでに保持していなかったら作成するロジック
                //   作成したらHashMapに格納する => ここではできない(Arc<HashMap>になっているので)
                //   なのでこの関数の戻り値を作成したsearchActorのアドレスとidにする
                let mut locked_map = child_actors.lock().unwrap();
                match locked_map.get(&project_id) {
                    Some(search_actor) => {
                        search_actor
                            .send(search_actor::Message::Execute { project_id })
                            .await;
                        Ok(())
                    }
                    None => {
                        let mut search_actor = SearchActor::new(project_id, reply_to);
                        let message = search_actor.initializing();
                        // NOTE: cpu boundな計算をするときは、そのactorをSyncArbiter::startで別スレッドで実行するべき!
                        // 参考:https://actix.rs/book/actix/sec-5-arbiter.html#system-and-arbiter
                        // let search_actor = SyncArbiter::start(1, search_actor);
                        let search_actor = Supervisor::start(|_| search_actor);
                        let res: Result<(), Error> = search_actor
                            .send(message)
                            .await
                            .map_err(|e| Error::SearchActorMailBoxError(e.to_string()))?;
                        search_actor
                            .send(search_actor::Message::Execute { project_id })
                            .await;
                        locked_map.insert(project_id, search_actor);
                        res
                    }
                }
            }
            Message::CompletedSearch => {
                debug!("complete!");
                Ok(())
            }
        }
    }

timer actorでの監視

  • run_intervalメソッドで、定期的に引数に渡したクロージャを実行する
    • supervisor actorにCheckSearchActorメッセージを送っている
impl Handler<Message> for TimerActor {
    type Result = Result<(), Error>;

    fn handle(&mut self, msg: Message, ctx: &mut Self::Context) -> Self::Result {
        match msg {
            Message::CheckSearchActor {
                supervisor_actor_address,
            } => {
                // run_intervalでasync functionを実行したいならarbiterスレッドを作成して、そいつにasync taskを投げるようにする
                ctx.run_interval(self.interval, move |timer_actor, context| {
                    // 参考:https://www.reddit.com/r/rust/comments/srfho0/help_with_actix_arbiter/
                    let supervisor_actor_address_clone = Arc::clone(&supervisor_actor_address);
                    context.spawn(
                        async move {
                            supervisor_actor_address_clone
                                .send(supervisor_actor::Message::CheckSearchActor)
                                .await;
                        }
                        .into_actor(timer_actor),
                    ); // futureをactorにしている
                });
            }
        }
        Ok(())
    }
}

CheckSearchActorメッセージを受け取ったsupervisor actorは?

  • 自分が保持している子アクター(search actors)全てに生存確認メッセージを送る処理をしている。
  • ハッシュマップに存在するものをfor loopで処理しても良いが、スレッドを掴んだままにしたくないので再起的に自分自身にメッセージを投げている(LoopExeute)
    • (別のメッセージがsupervisor actorに送られた時にfor loopが終わるまで処理されなくなりスループットが落ちる)
    async fn execute_message(
        msg: Message,
        reply_to: Arc<Addr<SuperVisorActor>>,
        child_actors: Arc<Mutex<HashMap<u64, Addr<SearchActor>>>>,
    ) -> Result<(), Error> {
        match msg {
            Message::CheckSearchActor => {
                // 保持しているsearchActorに生存確認messageを投げる
                // スレッドを占有しないようにfor文で投げるんじゃなくて、再帰で投げる
                // 自分自身にmessageを投げる
                let res: Result<(), Error> = reply_to
                    .send(Message::LoopExecute { child_actors })
                    .await
                    .map_err(|e| Error::SupervisorActorMailBoxError(e.to_string()))?;
                res
            }
            Message::LoopExecute { child_actors } => {
                let child_actors_clone = Arc::clone(&child_actors);
                let mut locked_map = child_actors_clone.lock().unwrap();
                if locked_map.len() != 0usize {
                    debug!("manage child actors: {:?}", locked_map);
                    let key = locked_map.keys().next().copied().unwrap_or(0); // copied()・・・Option<&A> -> Option<A>
                    let search_actor_address = locked_map.get(&key);
                    if let Some(search_actor) = search_actor_address {
                        let res: SearchActorStatus = search_actor
                            .send(search_actor::CheckStatusMessage::Check)
                            .await
                            .map_err(|e| Error::SupervisorActorMailBoxError(e.to_string()))??;
                        match res {
                            SearchActorStatus::Idle => locked_map.remove(&key),
                            _ => None,
                        };
                    }
                    drop(locked_map);
                    // NOTE: ↓でさらにLoopExecute messageを投げることで、locked_mapがdead lockを起こしていたので、スレッドが止まってしまっていた。
                    // 明示的にlockを解消するためにdropさせている。
                    reply_to.send(Message::LoopExecute { child_actors }).await;
                } else {
                    debug!("child actor count is zero");
                }
                Ok(())
            }
        }
    }

ざっとこれくらい

感想

FSM(Finite State Machine)での実装がよくわからず

  • akkaでは、受け取れないメッセージを格納するbuffer機構があるのだが、actixではないので自分で実装するしかなさそう、、、
  • また受け取るメッセージ毎にエラー型を定義しHandlerを実装しないといけないのでめんどくさい、、、

同じアクター内での非同期処理がアクターモデルの良さを潰している??

  • supervisor actorでメッセージを受け取った後にsearch actorにメッセージを投げる際には非同期になるのだが、supervisor actor自身が保持するオブジェクトをArc<Mutex<>>にする必要があり、アクターモデルの良さである「マルチスレッド環境でよくある競合を気にしなくても良い」メリットがないのでは??

  • lock取得・解除等を結局実装しないといけないので、これは自分の実装が悪いからか?

参考

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?