当記事ではActix frameworkについての概要と使い方について説明します。
前提
actix: 0.7.3
Actix
ActixとはRustにおけるアクターモデルのフレームワークです。
アクターモデルの詳細については触れませんが、それぞれが専用のメールボックスを持ち、並行して非同期で動作するアクターと呼ばれる構造体群を起動させて、アクター同士でメッセージを送受信して処理を行う方式になります。
アクターとメッセージがオブジェクト指向におけるクラスとメソッドに似ていますが、アクターは並行性を兼ねている点が異なります。
メッセージは個々のアクターが持つメールボックスにキューイングされて、アクターが処理可能なタイミングで一つずつ取得して処理を行います。
そのため、アクターの処理自体はマルチスレッドにおけるロックなどの排他処理は考えずにシングルスレッドで構築することが出来ます。
また、アクター自体はどこかのスレッド上で動作しているのですが、メッセージパッシングの観点からは動作している場所を気にせずにメッセージを送信して処理するため、スレッドやロックについてあまり考えずに並行処理を組み立てられる点が大きな利点だと考えています。
アクターモデル実装はErlang/Elixirや、Scala/JavaにおけるAkkaが有名ですが、私がそれらを実際に触ったことが無いので比較等はできません。
認識している限りではまだリモートサーバー実行が出来ない(*1)等、Akka等と比較すると機能は少ない点があるようです。
よって、現状Actixが有効に作用するものとしては、シングルサーバー、シングルプロセスで動作するマルチスレッドを活かしたアプリケーションとなります。
尚、actix-webというWeb frameworkはこのActixシステムの上で構築されており、actix-webでActixによるアクターモデルを使用することも可能です。
*1) actix-remoteというリモートサーバーで使用できるようにするためのリポジトリは在りますが、開発は進んでいないようです。
System
SystemはActixにおけるランタイムとなります。
アクターはSystemが作成されるまでは起動することは出来ません。
System::newを呼び出すとSystemRunnerが返却されますが、アクターが動作する場所となるイベントループを開始して処理をブロックするためにSystemRunnerのrunメソッドを実行する必要があります。
イベントループを停止するには起動しているSystemインスタンスのstopメソッドを呼び出す必要があります。
現在のSystemインスタンス自体はActixイベントループ上であればSystem::currentで取得できるため、System::current().stop()という一連の処理で停止出来ます。
アクターを開始してメッセージを出力し、すぐに処理を終了する簡単なコード例を以下に記載します。
extern crate actix;
use actix::prelude::*;
// Testアクター構造体
struct Test;
// Actorトレイト実装
impl Actor for Test {
type Context = Context<Self>;
// Testアクター開始時に呼ばれる処理
fn started(&mut self, _ctx: &mut Self::Context) {
println!("started");
// System停止処理
System::current().stop();
}
}
fn main() {
// System作成
let system = System::new("test");
// Testアクター開始
Test.start();
// イベントループを開始し、System停止処理が呼ばれるまでブロックする
system.run();
}
Actor
ActixではActorトレイトを実装した構造体がアクターになります。
アクターは全て独立した実行コンテキスト上で動作します。
Actorトレイト実装時には関連型としてコンテキストを定義する必要があります。
現在設定できるのはContext<A>かSyncContext<A>の二種類となっています。
後述するSyncArbiterを使う場合はSyncContext<A>、それ以外はContext<A>を使う形になります。
Start
アクターを開始するためのメソッドとして以下3種類があります。
-
start- プログラムで作成した対象アクターを開始します。
-
start_default- Defaultトレイトが実装されている対象アクターをデフォルトで作成して開始します。
-
create- Contextを受け取って対象アクターを返却する関数を受け取り、その実行結果のアクターで開始します。
上記全てアドレスAddr<A>を戻り値とし、このアドレスを使用してメッセージ送信を行うことが出来ます。
メッセージについては後述するMessage欄で説明します。
これらのメソッドで開始した場合、メソッド実行時のArbiter上で動作します。
Arbiterについては後述しますが、メソッド実行時と同一スレッド/同一イベントループで動作することとなります。
Lifecycle
アクターにはStarted, Running, Stopping, Stoppedの4種類の状態(ライフサイクル)があります。
アクター起動直後はStartedとなり、Actorトレイトのstartedメソッドが呼ばれます。
startedメソッド呼び出しが完了するとRunningに移行します。
Running状態ではメッセージの受信が可能になります。
Running状態で以下の何れかが発生した際、Stopping状態に移行し、stoppingメソッドが呼ばれます。
stoppingメソッドではそのままStopped状態に移行するか、Running状態に移行するのかを戻り値で選択することが出来ます。デフォルトではStopped状態に移行となります。
- 対象アクター自身によって
Context::stopが呼び出される - 対象アクターに対する全てのアドレスがドロップされる
- Contextにイベントが登録されていない
Stopped状態に移行するとstoppedメソッドが呼ばれます。
stoppedメソッド処理が終了すると、対象アクターはドロップされます。
尚、stoppedメソッド内でContextに非同期処理を登録しても実行されずにドロップされます。
サンプルコード
ここまで説明した内容によるサンプルコードを記載します。
extern crate actix;
use actix::prelude::*;
# [derive(Default)]
struct Test;
impl Actor for Test {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
println!("started");
}
fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
println!("stopping");
Running::Stop
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
println!("stopped");
System::current().stop();
}
}
fn main() {
let system = System::new("test");
// 戻り値のAddr<Test>を保持しないためすぐにdropされ、Testアクターはstopping状態に移行します
Test.start();
// Test::start_default();
// Test::create(|_| Test);
system.run();
}
Context
Contextはアクターの実行コンテキストを表します。
実行コンテキストはアクター単位で独立しています。
アクターを停止したり、非同期処理を行う際に使用します。
いくつかのメソッドを紹介します。
-
set_mailbox_capacity- メールボックスのサイズを変更します。デフォルトは16です。
-
stop- アクターをStopping状態に移行します。
-
terminate- アクターをStopped状態に移行します。
-
address- 自アクターの
Addr<A>を取得します。
- 自アクターの
-
spawn- 渡された
ActorFutureを実行します。実行をキャンセルするためのSpawnHandleを返却します。
- 渡された
-
cancel_future-
SpawnHandleを受け取り、対象非同期処理をキャンセルします。
-
-
wait- 渡された
ActorFutureを実行します。この非同期処理が完了するまでこの実行コンテキストに発生するイベントは待たされます。 - 例として
started内で非同期初期化を行うさいにこのメソッドを使用すると、初期化が完了するまでメッセージは処理されずに待たされる状況にすることが出来ます。
- 渡された
-
notify- 自分自身にメッセージを送信します。
-
add_stream- 非同期で複数のデータが返却される
Streamを登録します。これを使用するアクターは対象Streamの返却Item及びErrorに該当するStreamHandlerトレイトを実装する必要があり、StreamHandler::handleメソッドで処理されることになります。
- 非同期で複数のデータが返却される
-
add_message_stream-
add_streamに似ていますがエラーは無視されます。また、返却するItemはMessageトレイトを実装している必要があります。StreamHandlerは実装不要で、返却されるMessageを処理するHandlerで処理されます。
-
ActorFuture
Actixでは利便性の為にFutureにアクター自身とContextを紐付けたActorFutureが用意されています。
selfやContextにはSendが実装されていないため、通常のFutureを使用した非同期処理をContextで実行した際の結果を持ってselfやContextを操作することが出来ません。
そこで、対象のFutureをActorFutureに変換することでmapやthenに渡す関数で非同期結果と共にself及びContextを受け取り、操作することが出来るようになっています。
FutureにはWrapFutureトレイトが実装されているため、into_actorメソッドを使用してActorFutureに変換することが可能です。
サンプルコード
ここまで説明した内容によるサンプルコードを記載します。
TCP接続して文字列を送信すると、接続している全員にブロードキャストされるチャットサーバーです。
extern crate actix;
extern crate tokio;
use std::{
collections::HashMap,
io::BufReader,
net::SocketAddr
};
use actix::prelude::*;
use tokio::{
prelude::*,
io,
net::{TcpListener, TcpStream}
};
# [derive(Default)]
struct Chat {
writers: HashMap<SocketAddr, io::WriteHalf<TcpStream>>
}
impl Actor for Chat {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("started");
// TCPサーバー起動
let socket = "127.0.0.1:3000".parse().unwrap();
let listener = TcpListener::bind(&socket).unwrap();
ctx.add_stream(listener.incoming());
}
}
impl StreamHandler<TcpStream, io::Error> for Chat {
fn handle(&mut self, stream: TcpStream, ctx: &mut Self::Context) {
println!("Incoming user.");
// キーとして接続ユーザーのソケットアドレスを取得
let remote_addr = stream.peer_addr().unwrap();
// streamをreaderとwriterに分割
let (reader, writer) = stream.split();
// writerを保持
self.writers.insert(remote_addr.clone(), writer);
// linesを使用してreaderから入力されるデータを行単位StringのStreamに変換
let task = io::lines(BufReader::new(reader))
.map_err(|e| println!("Read error. [{}]", e))
.into_actor(self) // ActorStreamに変換
.map(|mut line, actor, _| {
// 行単位で入力処理
println!("Lines. [{}]", line);
// 改行を付与してbyte列に変換
line.push('\n');
let send_data = line.into_bytes();
// 入室者全員にブロードキャスト (同期処理)
for writer in actor.writers.values_mut() {
if let Err(e) = writer.write(&send_data) {
println!("Broadcast error. [{}]", e);
}
}
})
.finish() // ActorStreamをActorFutureに変換
.then(move |_, actor, _| {
// 退室処理
println!("Leave user.");
actor.writers.remove(&remote_addr);
actix::fut::ok(())
});
ctx.spawn(task);
}
}
fn main() {
let system = System::new("test");
let _addr = Chat::start_default();
system.run();
}
Address
Addr<A>はアクターに対する参照となっており、アクターに送信するメッセージはこのアドレス、又は後述するRecipient<M>を使用して送信することが出来ます。
アドレスはクローン可能であり、Send、Syncが実装されているためスレッドを跨いで引き渡すことが可能です。
アドレスは前述したアクター開始メソッドの戻り値、及びContextのaddressメソッド、又は後述するRegistry等から取得することが出来ます。
Addr<A>では3つのメッセージ送信メソッドが用意されています。
-
do_send- メールボックス容量を無視してメッセージを送信します。メールボックスがクローズしていた場合は何も起きません。メッセージの戻り値は取得できません。
-
try_send- メールボックス容量がフルであったり、クローズしていた場合は
Errが返ります。メッセージの戻り値は取得できません。
- メールボックス容量がフルであったり、クローズしていた場合は
-
send- メッセージの戻り値を持つ
Futureが返却され、非同期処理の後続処理を記述出来ます。 - 注意点として、返却される
FutureのItemがメッセージの戻り値、Errorはメッセージ送信のエラーを表すMailboxErrorとなります。例として送信メッセージの戻り値がResultの場合は、ItemとしてResultが返却される形となります。
- メッセージの戻り値を持つ
Recipient
Recipient<M>はAddr<A>の特殊系であり、Addr<A>のrecipientメソッドで取得することが出来ます。
Addr<A>の状態ではどのメッセージも送信することが出来ますが、Recipient<M>は一つのメッセージだけを送信することが出来るアドレスとなります。
どのような場合に便利かというと、複数のアクターに同じメッセージを送信する場合にAddr<A>形式だとAが実装アクターとなるためVecや配列で持つことが出来ませんが、Recipient<M>はMがメッセージとなるため持つことが可能となる事があります。
また、Actixに含まれているプロセスシグナルアクター(actix::actors::signal::ProcessSignals)で使用しているような、特定の場合に自身へとメッセージを送信してもらう場合に自身のRecipientを送信するSubscribeメッセージ等といった使い方があります。
Message
Messageトレイトはアクターが送受信するメッセージを表します。
構造体を定義してMessageトレイトを実装することでメッセージとなります。
Messageトレイト実装時には関連型Resultとして戻り値を指定する必要があります。
また、Messageトレイトはderiveして実装することも可能です。
// 通常の実装方法
struct TestMessage1;
impl Message for TestMessage1 {
type Result = ();
}
// 戻り値がResult型の場合
struct TestMessage2;
impl Message for TestMessage2 {
type Result = Result<u64, ::std::io::Error>;
}
// deriveした場合。戻り値が()の場合はrtype不要
# [derive(Message)]
struct TestMessage3;
// deriveで戻り値がResultの場合
# [derive(Message)]
# [rtype(result="Result<u64, ::std::io::Error>")]
struct TestMessage4;
Handler
Handler<M>トレイトはアクターに実装し、メッセージの処理を行います。
Handler<M>トレイトも関連型としてResultの指定が必要になります。
このResultはMessageのResultとは少し異なり、MessageResponseトレイトを実装した型である必要があります。
一般的なプリミティブな型やResult、Option、Box<Future<Item=I, Error=E>>(ResponseFuture<I, E>)等はデフォルトで実装されていますが、自作の構造体等は通常Result等で包んで返却することになります。
また、一つのメッセージで同期処理する場合と非同期処理する場合がある時はResponse<I, E>型を使用することも出来ます。
ResponseFutureやResponseで返却する場合、メッセージの関連型はFutureが解決された結果のResultである必要があります。
Box<Future<Item=I, Error=E>>では無い点に注意して下さい。
Handler<M>実装時にhandleメソッドの記述が必要であり、メッセージを受信した際はこのメソッドが呼ばれます。
Arbiter
Arbiterは一つのスレッドによる一つのイベントループを表します。
マルチスレッドで複数のイベントループを動作させたい場合は、Arbiterを作成してその上でActorを動作させるようにします。
ArbiterはSystemを作成したタイミングで一つ作成され、System::current().arbiter()でSystem Arbiterのアドレスを取得することが出来ます。
特に何も指定せずにActorを開始した場合は、開始処理が行われた際のArbiter上で開始します。
よって、明示的にArbiterを作成しない場合はシングルスレッドによる一つのイベントループで全てのActorが動作することになります。
Arbiterを開始するには二つの方法があります。
一つ目はArbiter::newで作成し、Arbiterのアドレスを取得する方法、二つ目はArbiter::startで新規Arbiter上で開始したいActorを指定する方法です。
Arbiter::startの戻り値は開始したActorのアドレスになります。
現在のArbiterとは別の作成済みArbiterでActorを開始したい場合は、対象のArbiterのアドレスにStartActorメッセージを送信することで可能です。
どのActorをどのスレッド(イベントループ)で動作させるかはプログラマがこのArbiterを使用して制御することになります。
この起動部分以外ではスレッドについてあまり考えずにコードを記述できるのがActix(アクターモデル)の良い点だと思います。
SyncArbiter
同じActorを複数別のArbiterで動作させて、メッセージの処理を分散させたい場合があると思います。
その際はこのSyncArbiterを使用することが出来ます。
ただし、SyncArbiterで動かせるのはSyncContextを使用するActorだけとなり、内部で非同期処理を行うことが出来ません。
具体的にはSyncContextにはAsyncContextトレイトが実装されていないため、ActorFuture等の非同期処理を実行することが出来ません。
そのため非同期処理を行った結果をselfに入れたりすることが出来ないという形です。
名前からも、主にCPUバウンドな処理を並列実行することを想定しているものと思われます。
Actorを初期化してから渡すことも出来ないため、初期化を非同期で行う必要があったり、失敗する可能性があるActorの場合は使い辛かったりします。
個人的には非同期Actorを複数まとめて起動できるような仕組みがあれば嬉しいです。
※ざっと見たところAkkaにはそういうのもあるようですね。
Supervisor
Supervisorは自身の上で動作するActorを監視し、停止した際に自動的に再起動を行うことが出来ます。
注意点としては、ここでいう停止とはActorの状態がStoppedになることを意味します。
panicが発生した際は現状拾われないようですので注意が必要です。
また、再起動と記載しましたが、実際に対象のActor自体が新規に作成されるのではなく、同じ
ActorでSupervised::restarting及びActor::startedが呼び出される形になります。
よって、再起動時に必要な初期化処理をこの2か所で制御する必要があります。
Actorの停止は明示的にContext::stopをする必要があるため、復帰不可能な状態になった場合にpanicさせるのではなくContext::stopさせるようにプログラムしておく必要があります。
panicが拾われないのは現状残念ですね。
尚、Supervisor上で動作させるためにはSupervisedトレイトを実装したActorである必要があります。
Registry
最後にRegistryを紹介します。
ここまでの説明ではアクターの起動とその時に取得したアドレスを自身で管理し、各アクターに必要な他アクターのアドレスをcloneして渡したりする必要があります。
Registryは名前の通りアドレスを管理するレジストリとなり、グローバル的に使用可能なアドレス保管庫となります。
レジストリは二種類存在します。
一つはSystemRegistryであり、システム全体で一意なレジストリになります。
SystemRegistryはSystem::current().registry()で取得することが出来ます。
こちらはSystemServiceトレイトを実装したアクターを管理することが出来ます。
もう一つはRegistryであり、Arbiter単位で一意なレジストリとなります。
RegistryはArbiter::registry()で取得することが出来ます。
こちらはArbiterServiceトレイトを実装したアクターを管理することが出来ます。
SystemServiceトレイトとArbiterServiceトレイトは殆ど同じものになります。
これらのトレイトの実装条件として、SupervisedトレイトとDefaultトレイトが実装されている必要があります。
それぞれのレジストリにはgetとsetというメソッドがあります。
getメソッドで対象アクターのアドレスを取得します。まだ対象アクターが起動していない場合は、Defaultでアクターが作成、開始されてそのアドレスが返ります。
事前に初期化などをしたい場合は、アクターを開始した後にsetメソッドを使用してアドレスを登録することも可能です。
尚、アドレスを取得する際は上記に記載したレジストリのgetメソッドを使用してもいいのですが、SystemService及びArbiterServiceで実装されているfrom_registryが便利です。
let addr = TestActor::from_registry()というような形でTestActorのアドレスをレジストリから取得することが出来ますので、私はこの方法が好みです。
後書き
私にとって初めてのアクターモデルでしたが、最初の設定時以外はあまりスレッドを意識せずにプログラムが出来たというのが感想です。
元々Rustはデータ競合をコンパイル時に発見出来るので、通常のマルチスレッド言語におけるデータ競合が問題になることは少ないと思うので、その点で若干利点は少ないのかも知れません。
リモートサーバー分散が出来ないのが痛いところですが、そこが出来るようになるとRustの利用領域がより大きくなるかと思います。
Rustを学んで初期の頃はFuture周りは非常にややこしいと思います。
Rust 2018でasync/await構文が来ると大きく改善されると期待しています。
Rustはとても良い言語だと思いますので広まって欲しいところです。