30
23

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

非同期処理が書き込むメモリを守る(守れなかった)

Last updated at Posted at 2020-06-07

Rustのasync/.await勉強メモです

Update 6/7 14:20

let future = host_data.copy_from(&gpu_data);
drop(future);
println!("{:?}", host_data);

のようにすることで用意に回避出来てしまうため、以下のようなlifetimeで対策は効果が無いとのこと...(せっかくなので一応以下も残しておきます)

参考リンク

Motivation

1コア性能が伸びなくなった結果、計算性能を稼ぐために多くの計算機が複数の計算ユニットを協調させて使っています。例えばGPUを使った計算ではCPUにつながっているメインメモリの他にGPU側にもメモリが、CPUクラスタでは異なるノードのメインメモリがあり、それらへのコピーは一般に時間がかかります。これらの転送を待ったままでは計算資源がもったいないので、転送を非同期に実行するAPIがそれぞれの環境で用意されています。これをRustのasync/.awaitと合わせて上手く使う方法を説明することがこの記事の目的です。

途中で止められる計算

async/.await以前のRustは一通り把握している人向けのasync/.await入門です。ここでは上で説明した非同期メモリコピーの例に沿って説明していきます。
async/.awaitでは「途中で止められる計算」が出てきます。

let mut host_data = vec![0_u32; 10];
host_data.copy_from(&gpu_data); // GPU -> CPUにメモリをコピーするのは時間がかかる
println!("{:?}", host_data);

GPUにあるメモリをCPU側にコピーしたいとしましょう。このコピーには時間がかかりますが、このコピーの処理は別のハードウェアが制御してくれるのでその間CPUは特にやることがありません。そこでメモリのコピーを開始したらその処理はそこで一旦止まって、コピーが終わったころに再開するような処理が簡単に書けると嬉しいですね。それを実現するのが async/.awaitです

let copy_future = async {
  let mut host_data = vec![0_u32; 10];
  host_data.copy_from(&gpu_data).await;
  println!("{:?}", host_data);
};

途中で止められる「計算」を定義するのが async、途中で止まる所を指定するのが await です。asyncはクロージャと同様に計算を定義するのであって、この段階では実行されません

let copy_closure = || {
  let mut host_data = vec![0_u32; n];
  host_data.copy_from(&gpu_data);
  println!("{:?}", host_data);
};

のようにクロージャを定義しても実行されないのと似ていますね。クロージャが impl Fn()な型を返すのと同様に copy_futurestd::future::Future<Output=()> トレイトを実装した型になります。

さて、 copy_future には .await を挟んでコピーを開始するまでの前半部分と取得したデータを表示する後半部分があります。一般にasyncブロック内にはたくさん.awaitを書くことが出きるので、今回のように二つではなくたくさんの部分からなります。個々の部分の中では途中で中断されることはなく連続に処理されることが保証されていて、.awaitの切れ目部分では前の処理が終わっている事しか保証されず別のスレッドで動いているかもしれません。作業待ちの間に別の作業を始めたら、元の作業が終わっても新しく始めた作業が終わってないことはよくありますね?その時は別のスレッドが後半の仕事を引き受けて実行してくれます。
しかしここで注意があります。後半の処理を実行するには前半の処理で使った host_data を使う必要がありますが、これは別のスレッドで作られたデータかもしれません。単一のスレッドで上手く動作するものも複数スレッドの下では上手く動作するとは限らない(例えばstd::rc::Rc)ので、Rustでは「複数のスレッド間でデータを移動しても良い」属性である Send と「複数のスレッドから安全にデータを共有できる」属性である Sync を使って安全性を管理していたのでした。後半部分が別スレッドで実行するには host_dataSend 属性がついている必要があります。この場合は std::vec::Vec には Send がついているので大丈夫ですね。つまり copy_future の型は impl Future<Output=()> + Send になります。この属性は copy_future の方に反映されており、内部で Send で無い型を変数にもってしまうと copy_future の型から Send が外れます。

FutureとLifetime

この copy_from はどうやって実装すればいいでしょうか?非同期メモリコピーをしてくれるCの関数をFFIで呼び出すとするとこんな感じでしょうか:

trait Memcpy<T> {
  fn copy_from(&mut self, src: &[T]) -> Pin<Box<dyn Future<Output=()> + Send>>;
}

impl<T> Memcpy<T> for [T] {
  fn copy_from(&mut self, src: &[T]) -> Pin<Box<dyn Future<Output=()> + Send>> {
    assert_eq!(self.len(), src.len());

    // 非同期コピーを開始
    let stream = ffi::Stream::new();
    unsafe { ffi::copy_async(stream, self.len(), self.as_mut_ptr(), src.as_ptr()) };

    // 非同期処理が終わったことを通知してくれる Future を作る
    Box::pin(async {
        tokio::task::spawn_blocking(move || {
            stream.sync();
        }).await.unwrap()
    })
  }
}

ここではコピーが終わるまでの処理を待つFutureをトレイトオブジェクト Box<dyn Future<Output = ()> + Send>> で定義しています。本当は impl Future<Output=()> + Send としたいのですが、現在トレイト内で impl Trait の機能は使えないのでBox で動的に扱います。std::pin::Pin が 必要なのは、非同期処理時に途中でメモリが移動してしまうことを防ぐ為です。

しかしこれだと selfsrc への参照はこの関数が終わった段階で開放されてしまいます。つまり

async {
  let mut host_data = vec![0_u32; 10];

  // 非同期処理が開始
  let future = host_data.copy_from(&gpu_data); // &mut host_dataはここで開放されてしまう

  // 非同期処理中に host_data にアクセスできてしまう
  println!("{:?}", &host_data);

  future.await; // ここまで &mut host_data を保持していて欲しい
}

のように非同期処理中のデータにアクセスできてしまいます。これはデータ競合状態になっているのでRustの意味でSafeになっていません。FFIの呼び出し時の unsafe での処理が適切ではないわけですね。Cで実装する際はこのデータ競合状態を適切に回避することはプログラマの責任ですが、Rustではライブラリレベルでデータ競合を防ぐ事が期待されます(どうしても解決出来ないときは unsafe で提供されますが)。

これを解決するには Future にlifetimeを加えます

trait Memcpy<T> {
  fn copy_from<'a>(&'a mut self, src: &'a [T]) -> Pin<Box<dyn Future<Output=()> + Send + 'a>>;
}

impl<T> Memcpy<T> for [T] {
  fn copy_from<'a>(&'a mut self, src: &'a [T]) -> Pin<Box<dyn Future<Output=()> + Send + 'a>> { /* 全く同じ */ }
}

これで future が消費されるまでは &mut host_data が保持されるようになるので、

async {
  let mut host_data = vec![0_u32; 10];

  // 非同期処理が開始
  let future = host_data.copy_from(&gpu_data);

  println!("{:?}", &host_data); // &mut host_data があるので &で借りれない(のでコンパイルエラーになる)

  future.await; // ここで &mut host_data が開放される
}

このように非同期処理により書き込まれるメモリにアクセスしてデータ競合を起こすことを、lifetimeを追加することでコンパイル時に検出できます。これでSafeな Memcpy::copy_from を実装する事が出来ました。

なおこの型 Pin<Box<dyn Future<Output = T> + Send + 'a>>futures::future::BoxFuture<'a, T> として定義されています
https://docs.rs/futures/0.3.5/futures/future/type.BoxFuture.html

まとめ

既存のCで提供されるFFIをsafe Rustとしてラップする方法について説明しました。
この内容は主にRustのGPGPUフレームワークである accel の開発の為に勉強した内容に基づいています(FFIはCUDA Driver APIに対応します)

30
23
5

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
30
23

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?