Rustのasync/.await勉強メモです
Update 6/7 14:20
let future = host_data.copy_from(&gpu_data);
drop(future);
println!("{:?}", host_data);
のようにすることで用意に回避出来てしまうため、以下のようなlifetimeで対策は効果が無いとのこと...(せっかくなので一応以下も残しておきます)
参考リンク
Motivation
1コア性能が伸びなくなった結果、計算性能を稼ぐために多くの計算機が複数の計算ユニットを協調させて使っています。例えばGPUを使った計算ではCPUにつながっているメインメモリの他にGPU側にもメモリが、CPUクラスタでは異なるノードのメインメモリがあり、それらへのコピーは一般に時間がかかります。これらの転送を待ったままでは計算資源がもったいないので、転送を非同期に実行するAPIがそれぞれの環境で用意されています。これをRustのasync/.awaitと合わせて上手く使う方法を説明することがこの記事の目的です。
途中で止められる計算
async/.await以前のRustは一通り把握している人向けのasync/.await入門です。ここでは上で説明した非同期メモリコピーの例に沿って説明していきます。
async/.awaitでは「途中で止められる計算」が出てきます。
let mut host_data = vec![0_u32; 10];
host_data.copy_from(&gpu_data); // GPU -> CPUにメモリをコピーするのは時間がかかる
println!("{:?}", host_data);
GPUにあるメモリをCPU側にコピーしたいとしましょう。このコピーには時間がかかりますが、このコピーの処理は別のハードウェアが制御してくれるのでその間CPUは特にやることがありません。そこでメモリのコピーを開始したらその処理はそこで一旦止まって、コピーが終わったころに再開するような処理が簡単に書けると嬉しいですね。それを実現するのが async/.awaitです
let copy_future = async {
let mut host_data = vec![0_u32; 10];
host_data.copy_from(&gpu_data).await;
println!("{:?}", host_data);
};
途中で止められる「計算」を定義するのが async
、途中で止まる所を指定するのが await
です。async
はクロージャと同様に計算を定義するのであって、この段階では実行されません
let copy_closure = || {
let mut host_data = vec![0_u32; n];
host_data.copy_from(&gpu_data);
println!("{:?}", host_data);
};
のようにクロージャを定義しても実行されないのと似ていますね。クロージャが impl Fn()
な型を返すのと同様に copy_future
は std::future::Future<Output=()>
トレイトを実装した型になります。
さて、 copy_future
には .await
を挟んでコピーを開始するまでの前半部分と取得したデータを表示する後半部分があります。一般にasync
ブロック内にはたくさん.await
を書くことが出きるので、今回のように二つではなくたくさんの部分からなります。個々の部分の中では途中で中断されることはなく連続に処理されることが保証されていて、.await
の切れ目部分では前の処理が終わっている事しか保証されず別のスレッドで動いているかもしれません。作業待ちの間に別の作業を始めたら、元の作業が終わっても新しく始めた作業が終わってないことはよくありますね?その時は別のスレッドが後半の仕事を引き受けて実行してくれます。
しかしここで注意があります。後半の処理を実行するには前半の処理で使った host_data
を使う必要がありますが、これは別のスレッドで作られたデータかもしれません。単一のスレッドで上手く動作するものも複数スレッドの下では上手く動作するとは限らない(例えばstd::rc::Rc
)ので、Rustでは「複数のスレッド間でデータを移動しても良い」属性である Send
と「複数のスレッドから安全にデータを共有できる」属性である Sync
を使って安全性を管理していたのでした。後半部分が別スレッドで実行するには host_data
に Send
属性がついている必要があります。この場合は std::vec::Vec
には Send
がついているので大丈夫ですね。つまり copy_future
の型は impl Future<Output=()> + Send
になります。この属性は copy_future
の方に反映されており、内部で Send
で無い型を変数にもってしまうと copy_future
の型から Send
が外れます。
FutureとLifetime
この copy_from
はどうやって実装すればいいでしょうか?非同期メモリコピーをしてくれるCの関数をFFIで呼び出すとするとこんな感じでしょうか:
trait Memcpy<T> {
fn copy_from(&mut self, src: &[T]) -> Pin<Box<dyn Future<Output=()> + Send>>;
}
impl<T> Memcpy<T> for [T] {
fn copy_from(&mut self, src: &[T]) -> Pin<Box<dyn Future<Output=()> + Send>> {
assert_eq!(self.len(), src.len());
// 非同期コピーを開始
let stream = ffi::Stream::new();
unsafe { ffi::copy_async(stream, self.len(), self.as_mut_ptr(), src.as_ptr()) };
// 非同期処理が終わったことを通知してくれる Future を作る
Box::pin(async {
tokio::task::spawn_blocking(move || {
stream.sync();
}).await.unwrap()
})
}
}
ここではコピーが終わるまでの処理を待つFutureをトレイトオブジェクト Box<dyn Future<Output = ()> + Send>>
で定義しています。本当は impl Future<Output=()> + Send
としたいのですが、現在トレイト内で impl Trait の機能は使えないのでBox
で動的に扱います。std::pin::Pin
が 必要なのは、非同期処理時に途中でメモリが移動してしまうことを防ぐ為です。
しかしこれだと self
と src
への参照はこの関数が終わった段階で開放されてしまいます。つまり
async {
let mut host_data = vec![0_u32; 10];
// 非同期処理が開始
let future = host_data.copy_from(&gpu_data); // &mut host_dataはここで開放されてしまう
// 非同期処理中に host_data にアクセスできてしまう
println!("{:?}", &host_data);
future.await; // ここまで &mut host_data を保持していて欲しい
}
のように非同期処理中のデータにアクセスできてしまいます。これはデータ競合状態になっているのでRustの意味でSafeになっていません。FFIの呼び出し時の unsafe
での処理が適切ではないわけですね。Cで実装する際はこのデータ競合状態を適切に回避することはプログラマの責任ですが、Rustではライブラリレベルでデータ競合を防ぐ事が期待されます(どうしても解決出来ないときは unsafe
で提供されますが)。
これを解決するには Future
にlifetimeを加えます
trait Memcpy<T> {
fn copy_from<'a>(&'a mut self, src: &'a [T]) -> Pin<Box<dyn Future<Output=()> + Send + 'a>>;
}
impl<T> Memcpy<T> for [T] {
fn copy_from<'a>(&'a mut self, src: &'a [T]) -> Pin<Box<dyn Future<Output=()> + Send + 'a>> { /* 全く同じ */ }
}
これで future
が消費されるまでは &mut host_data
が保持されるようになるので、
async {
let mut host_data = vec![0_u32; 10];
// 非同期処理が開始
let future = host_data.copy_from(&gpu_data);
println!("{:?}", &host_data); // &mut host_data があるので &で借りれない(のでコンパイルエラーになる)
future.await; // ここで &mut host_data が開放される
}
このように非同期処理により書き込まれるメモリにアクセスしてデータ競合を起こすことを、lifetimeを追加することでコンパイル時に検出できます。これでSafeな Memcpy::copy_from
を実装する事が出来ました。
なおこの型 Pin<Box<dyn Future<Output = T> + Send + 'a>>
は futures::future::BoxFuture<'a, T>
として定義されています
https://docs.rs/futures/0.3.5/futures/future/type.BoxFuture.html
まとめ
既存のCで提供されるFFIをsafe Rustとしてラップする方法について説明しました。
この内容は主にRustのGPGPUフレームワークである accel の開発の為に勉強した内容に基づいています(FFIはCUDA Driver APIに対応します)