1. termoshtt

    Posted

    termoshtt
Changes in title
+非同期処理が書き込むメモリを守る
Changes in tags
Changes in body
Source | HTML | Preview
@@ -0,0 +1,124 @@
+Rustのasync/.await勉強メモです
+
+Motivation
+-----------
+1コア性能が伸びなくなった結果、計算性能を稼ぐために多くの計算機が複数の計算ユニットを協調させて使っています。例えばGPUを使った計算ではCPUにつながっているメインメモリの他にGPU側にもメモリが、CPUクラスタでは異なるノードのメインメモリがあり、それらへのコピーは一般に時間がかかります。これらの転送を待ったままでは計算資源がもったいないので、転送を非同期に実行するAPIがそれぞれの環境で用意されています。これをRustのasync/.awaitと合わせて上手く使う方法を説明することがこの記事の目的です。
+
+途中で止められる計算
+----------------
+
+async/.await以前のRustは一通り把握している人向けのasync/.await入門です。ここでは上で説明した非同期メモリコピーの例に沿って説明していきます。
+async/.awaitでは「途中で止められる計算」が出てきます。
+
+```rust
+let mut host_data = vec![0_u32; n];
+host_data.copy_from(&gpu_data); // GPU -> CPUにメモリをコピーするのは時間がかかる
+println!("{:?}", host_data);
+```
+
+GPUにあるメモリをCPU側にコピーしたいとしましょう。このコピーには時間がかかりますが、このコピーの処理は別のハードウェアが制御してくれるのでその間CPUは特にやることがありません。そこでメモリのコピーを開始したらその処理はそこで一旦止まって、コピーが終わったころに再開するような処理が簡単に書けると嬉しいですね。それを実現するのが async/.awaitです
+
+```rust
+let copy_future = async {
+ let mut host_data = vec![0_u32; n];
+ host_data.copy_from(&gpu_data).await;
+ println!("{:?}", host_data);
+};
+```
+
+途中で止められる「計算」を定義するのが `async`、途中で止まる所を指定するのが `await` です。`async`はクロージャと同様に計算を定義するのであって、この段階では実行されません
+
+```rust
+let copy_closure = || {
+ let mut host_data = vec![0_u32; n];
+ host_data.copy_from(&gpu_data);
+ println!("{:?}", host_data);
+};
+```
+
+のようにクロージャを定義しても実行されないのと似ていますね。クロージャが `impl Fn()`な型を返すのと同様に `copy_future` は `std::future::Future<Output=()>` トレイトを実装した型になります。
+
+さて、 `copy_future` には `.await` を挟んでコピーを開始するまでの前半部分と取得したデータを表示する後半部分があります。一般に`async`ブロック内にはたくさん`.await`を書くことが出きるので、今回のように二つではなくたくさんの部分からなります。個々の部分の中では途中で中断されることはなく連続に処理されることが保証されていて、`.await`の切れ目部分では前の処理が終わっている事しか保証されず別のスレッドで動いているかもしれません。作業待ちの間に別の作業を始めたら、元の作業が終わっても新しく始めた作業が終わってないことはよくありますね?その時は別のスレッドが後半の仕事を引き受けて実行してくれます。
+しかしここで注意があります。後半の処理を実行するには前半の処理で使った `host_data` を使う必要がありますが、これは別のスレッドで作られたデータかもしれません。単一のスレッドで上手く動作するものも複数スレッドの下では上手く動作するとは限らない(例えば`std::rc::Rc`)ので、Rustでは「複数のスレッド間でデータを移動しても良い」属性である `Send` と「複数のスレッドから安全にデータを共有できる」属性である `Sync` を使って安全性を管理していたのでした。後半部分が別スレッドで実行するには `host_data` に `Send` 属性がついている必要があります。この場合は `std::vec::Vec` には `Send` がついているので大丈夫ですね。つまり `copy_future` の型は `impl Future<Output=()> + Send` になります。この属性は `copy_future` の方に反映されており、内部で `Send` で無い型を変数にもってしまうと `copy_future` の型から `Send` が外れます。
+
+FutureとLifetime
+----------------
+
+この `copy_from` はどうやって実装すればいいでしょうか?非同期メモリコピーをしてくれるCの関数をFFIで呼び出すとするとこんな感じでしょうか:
+
+```rust
+trait Memcpy {
+ fn copy_from(&mut self, &src: [T]) -> Pin<Box<dyn Future<Output=()> + Send>>;
+}
+
+impl Memcpy for [T] {
+ fn copy_from(&mut self, &src: [T]) -> Pin<Box<dyn Future<Output=() + Send> {
+ assert_eq!(self.len(), src.len());
+
+ // 非同期コピーを開始
+ let stream = ffi::Stream::new();
+ unsafe { ffi::copy_async(stream, self.len(), self.as_mut_ptr(), src.as_ptr()) };
+
+ // 非同期処理が終わったことを通知してくれる Future を作る
+ Box::pin(tokio::task::spawn_blocking(move || {
+ stream.sync();
+ }))
+ }
+}
+```
+
+ここではコピーが終わるまでの処理を待つFutureをトレイトオブジェクト `Box<dyn Future<Output = ()> + Send>` で定義しています。本当は `impl Future<Output=()> + Send` としたいのですが、現在トレイト内で impl Trait の機能は使えないので`Box` で動的に扱います。`std::pin::Pin` が 必要なのは、非同期処理時に途中でメモリが移動してしまうことを防ぐ為です。
+
+しかしこれだと `self` と `src` への参照はこの関数が終わった段階で開放されてしまいます。つまり
+
+```rust
+async {
+ let mut host_data = vec![0_u32; n];
+
+ // 非同期処理が開始
+ let future = host_data.copy_from(&gpu_data); // &mut host_dataはここで開放されてしまう
+
+ // 非同期処理中に host_data にアクセスできてしまう
+ println!("{:?}", &host_data);
+
+ future.await; // ここまで &mut host_data を保持していて欲しい
+}
+```
+のように非同期処理中のデータにアクセスできてしまいます。これはデータ競合状態になっているのでRustの意味でSafeになっていません。FFIの呼び出し時の `unsafe` での処理が適切ではないわけですね。Cで実装する際はこのデータ競合状態を適切に回避することはプログラマの責任ですが、Rustではライブラリレベルでデータ競合を防ぐ事が期待されます(どうしても解決出来ないときは `unsafe` で提供されますが)。
+
+これを解決するには `Future` にlifetimeを加えます
+
+```rust
+trait Memcpy {
+ fn copy_from<'a>(&'a mut self, &'a src: [T]) -> Pin<Box<dyn Future<Output=()> + Send + 'a>>;
+}
+
+impl Memcpy for [T] {
+ fn copy_from<'a>(&'a mut self, &'a src: [T]) -> Pin<Box<dyn Future<Output=() + Send + 'a> { /* 全く同じ */ }
+}
+```
+
+これで `future` が消費されるまでは `&mut host_data` が保持されるようになるので、
+
+```rust
+async {
+ let mut host_data = vec![0_u32; n];
+
+ // 非同期処理が開始
+ let future = host_data.copy_from(&gpu_data);
+
+ println!("{:?}", &host_data); // &mut host_data があるので &で借りれない(のでコンパイルエラーになる)
+
+ future.await; // ここで &mut host_data が開放される
+}
+```
+
+このように非同期処理により書き込まれるメモリにアクセスしてデータ競合を起こすことを、lifetimeを追加することでコンパイル時に検出できます。これでSafeな `Memcpy::copy_from` を実装する事が出来ました。
+
+なおこの型 `Pin<Box<dyn Future<Output = T> + Send + 'a>` は `futures::future::BoxFuture<'a, T>` として定義されています
+https://docs.rs/futures/0.3.5/futures/future/type.BoxFuture.html
+
+まとめ
+------
+既存のCで提供されるFFIをsafe Rustとしてラップする方法について説明しました。
+この内容は主にRustのGPGPUフレームワークである [accel](https://gitlab.com/termoshtt/accel) の開発の為に勉強した内容に基づいています(FFIは[CUDA Driver API](https://docs.nvidia.com/cuda/cuda-driver-api/index.html)に対応します)