LoginSignup
51
26

More than 5 years have passed since last update.

Rustでfutures用Mutexを自作してみる (シングルスレッド編)

Posted at

はじめに

なんか面白そうなので自作してみただけです。

これを書いて筆者はこんなことを学びました

  • futures/tokio のパーキング・通知処理まわり
  • 標準ライブラリの Mutex の実装の細かいところ
  • ポイズニングの詳細
  • async/await のコツ
  • tokio::runtime::current_thread の使い方

ソースコード

以下、ちょっとながい背景

futures とは

Rustは、実行時オーバーヘッドのある抽象化はできるだけオプショナルで提供するような設計になっています。軽量スレッドもそのひとつで、Rustの標準ライブラリはOSのスレッドを(プラットフォームの違いだけ吸収して)できるだけそのまま提供しています。

この軽量スレッドの機能を提供するのが、 futures/tokio です。 futures/tokio では、軽量スレッドをタスクと呼んでいます。 futures はタスクを組み立てるための枠組みを提供していて、 tokio は実際にそれを動かすのに必要なランタイムやI/Oライブラリなどを提供しています。

async/await の登場

現状の futures のかかえる問題を解決するため、現在 async/await 構文の導入が進められています。たとえば、現在の tokio::io::copy に相当する関数を async/await を用いて自作すると、およそ以下のようになります。

async fn copy<R, W>(mut reader: R, mut writer: W) -> Result<u64, io::Error>
where
    R: AsyncRead,
    W: AsyncWrite,
{
    let mut buf = [0; 1024];
    let mut count = 0;
    loop {
        let num_read = await!(io::read(&mut reader, &mut buf[..]).compat())?.2;
        if num_read == 0 {
            break;
        }
        await!(io::write_all(&mut writer, &buf[..num_read]).compat())?;
        count += num_read as u64;
    }
    Ok(count)
}

ライブラリの互換性やら何やらの問題で、完璧にシュッとしてるのとは言いがたいのと、 write_all を使っているのがちょっとズルいのが難点ですが、これを理想的な形に書き直すと以下のようになるはずです。

async fn copy<R, W>(mut reader: R, mut writer: W) -> Result<u64, io::Error>
where
    R: AsyncRead,
    W: AsyncWrite,
{
    let mut buf = [0; 1024];
    let mut count = 0;
    loop {
        let num_read = (await reader.aread(&mut buf))?;
        if num_read == 0 {
            break;
        }
        let mut pos = 0;
        while pos < num_read {
            let num_written = (await writer.awrite(&buf[pos..num_read]))?;
            pos += num_written;
        }
        count += num_read as u64;
    }
    Ok(count)
}

これを以下の普通の copy と比べてみてください。

fn copy<R, W>(mut reader: R, mut writer: W) -> Result<u64, io::Error>
where
    R: Read,
    W: Write,
{
    let mut buf = [0; 1024];
    let mut count = 0;
    loop {
        let num_read = reader.read(&mut buf)?;
        if num_read == 0 {
            break;
        }
        let mut pos = 0;
        while pos < num_read {
            let num_written = writer.write(&buf[pos..num_read])?;
            pos += num_written;
        }
        count += num_read as u64;
    }
    Ok(count)
}

かなり近い感じでプログラミングできることになります。(この世界が早くきてほしい)

ブロッキング処理にご注意

さて、async Rustで書いたほうがいい理由は、他の言語の軽量スレッドの利点と似ているので省略します。async Rustを書く上で気をつけなければいけないことは何でしょうか。それはブロッキング処理の取り扱いです。

async Rustの内部では、スレッドをブロックする処理は原則使用禁止です。そのかわり、 tokio などが提供する対応物を使います。それらはタスクをブロックするが、スレッドをブロックしないように設計されています。これらを使うための構文が await! だったというわけです。先ほどの例では Read/Write の対応物として AsyncRead/AsyncWrite がありました。

並行処理プリミティブ

I/Oの他によくブロックする処理として、並行処理プリミティブがあります。では futures/tokio ではどうするかというと、標準ライブラリにある mpsc チャネルの対応物が提供されています。

一方、 Mutex の対応物は提供されていません。 (ただし、 futures-0.1 には BiLock という Mutex の限定版があります)

じゃあ Mutex は使えないのかというとそうではなく、ロック取得時間が長くなるような使い方をしなければスレッドをブロックしないので、通常の Mutex を部分的に使っても問題ないということだと思われます。逆に言えば、長期間ロックするような使い方になりそうな場合は、設計を見直してチャネルなどに置き換えたほうがいいということではないかと思います。

まあそれはそれとして、ないならいい練習問題なので作ってみようと思いました。以上背景。

通常のMutex<T> の要件

通常の Mutex<T> の要件は端的に言うと以下です。

  • 一度に最大一人だけがロックを取れる。
  • ロックを取ると &Mutex<T> から &mut T が手に入る。
    • & は共有参照、 &mut は専有参照であることに注意
  • try_lock では、ロックを取れなかった場合は即座に返る。
  • lock では、ロックを取れなかった場合はロックが取れるまでスレッドをブロックする。

それに加えて、Rustのパニック安全性のためにポイズニングという機構がついています。これは以下のようなものです。

  • ロックを取得中にパニックが発生した場合、poisonフラグが立てられる。 (処理中の内部状態が露出されているかもしれないというフラグ)
  • poisonフラグがついたものをロックしようとした場合、原則エラーになる。
  • ただし救済措置として、ポイズニングされたロックを明示的に取り出すことはできるようになっている。

今回、futuresで動くMutexを作るにあたって、 try_lock はあまり関係なく、 lock のほうが重要な役割を持っていそうです。

ちなみに肝心の実装ですが、以下のようになっています。

pub struct Mutex<T: ?Sized> {
    // Note that this mutex is in a *box*, not inlined into the struct itself.
    // Once a native mutex has been used once, its address can never change (it
    // can't be moved). This mutex type can be safely moved at any time, so to
    // ensure that the native mutex is used correctly we box the inner mutex to
    // give it a constant address.
    inner: Box<sys::Mutex>,
    poison: poison::Flag,
    data: UnsafeCell<T>,
}

この sys::Mutex はOSの提供するプリミティブ (pthread mutexなど) を共通のインターフェースで使えるようにした内部向け構造体です。つまり、排他制御自体はpthreadに完全におまかせということですね。

おそらくですが、 try_lock とスピンロックだけからなるmutexなら AtomicU8 などで自作することができるはずです。そうしないのは、スレッドを正しくブロックする機構が必要だからだと考えられます。

ちなみに parking_lot クレートの Mutex は実際に AtomicU8 を用いて実装されています。こちらは基本的にスピンロックを使いつつ、ロックやアンロックで時間にかかりそうなときはアドレスのハッシュ値をもとにグローバルの条件変数で待つような実装になっているようです(駐車場という名前も、おそらくそういう比喩)。

シングルスレッドのMutexを作る

「シングルスレッドのMutex」というと意味不明ですが、ここではシングルスレッド・マルチタスクの状況で、タスクに対してMutexのように動作するライブラリを作ることを考えます。本当はちゃんとした Mutex にしたいのですが、いきなり書くのはかなり難しい(タスク機構とスレッド並行性の両方を同時に考えないといけない)ので、いったんシングルスレッドでやって、futureに慣れていきたいと思います。

では、実際に実装していきます。

今回はnightlyでRust2018を使って作っていこうと思うので以下のようにします。(つまり、以下の実装は今後動かなくなるかもしれないということです)

$ rustup install nightly
$ cargo +nightly new --lib futures-mutex
$ cd futures-mutex
$ echo nightly > rust-toolchain

依存関係は、とりあえず以下のようにしておきます。

Cargo.toml
[dependencies]
futures-preview = "0.3.0-alpha.9"

あとでマルチスレッド版を作りたいという動機があったので、シングルスレッド版は unsync というモジュールに入れておくことにします。

src/lib.rs
pub mod unsync;
src/unsync.rs
// これから書く

Mutex型を作る

ポイズニングとブロックの話は置いておいて、最低限で作るとこんな感じになるはずです。

src/unsync.rs
use std::cell::{Cell, UnsafeCell};

pub struct Mutex<T: ?Sized> {
    locked: Cell<bool>,
    data: UnsafeCell<T>,
}

impl<T> Mutex<T> {
    pub fn new(inner: T) -> Self {
        Self {
            locked: Cell::new(false),
            data: UnsafeCell::new(inner),
        }
    }
}

先に簡単なものを2つ実装してしまいます。 into_innerget_mut は排他制御と関係ないので、すぐに実装できます。

src/unsync.rs
impl<T> Mutex<T> {
    pub fn into_inner(self) -> T {
        let Self { data, .. } = self;
        let inner = data.into_inner();
        inner
    }
}

impl<T: ?Sized> Mutex<T> {
    pub fn get_mut(&mut self) -> &mut T {
        let inner = unsafe { &mut *self.data.get() };
        inner
    }
}

MutexGuardを作る

Mutexのロックに成功したらMutexGuardが返ってきます。これは &mut T として動作しつつ、自動でロックを返すRAIIガードの役割を果たします。 DerefDerefMut を実装しておきます。

src/unsync.rs
use std::ops::{Deref, DerefMut};

pub struct MutexGuard<'a, T: ?Sized + 'a> {
    mutex: &'a Mutex<T>,
}

impl<'a, T: ?Sized + 'a> Deref for MutexGuard<'a, T> {
    type Target = T;
    fn deref(&self) -> &Self::Target {
        unsafe { &*self.mutex.data.get() }
    }
}

impl<'a, T: ?Sized + 'a> DerefMut for MutexGuard<'a, T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        unsafe { &mut *self.mutex.data.get() }
    }
}

try_lock を実装する

前述のとおり、 try_lock はブロッキングが関与しないのであまり考えることがありません。

MutexGuard を作るときに locked フラグをセットします。そして、 MutexGuard がドロップされるときに locked フラグを落とします。もちろん、既に locked なときはロック失敗を返すようにします。

src/unsync.rs
impl<T: ?Sized> Mutex<T> {
    pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
        if self.locked.get() {
            return None;
        }

        let guard = MutexGuard::new(self);
        Some(guard)
    }
}

impl<'a, T: ?Sized + 'a> MutexGuard<'a, T> {
    fn new(mutex: &'a Mutex<T>) -> Self {
        mutex.locked.set(true);
        Self { mutex }
    }
}

impl<'a, T: ?Sized + 'a> Drop for MutexGuard<'a, T> {
    fn drop(&mut self) {
        self.mutex.locked.set(false);
    }
}

poll_lock を実装する

lock の内部実装になる予定の poll_lock を先に作ります。

poll_locktry_lock とだいたい同じですが、futuresの Poll インターフェースにあわせた形で返却します。もっと大きな違いとして、 poll_lockwakerの登録をするという違いがあります。これがfuturesのブロッキングの仕組みに大きくかかわってきます。

これから実装する Future::poll は、処理を即座に実行できないときは NotReady を返して処理を中断します(そうすると、タスク側から見るとあたかもブロックしたかのように見える)。tokioランタイムは、 NotReady を返されたタスクを自動で再実行しません。wakerというハンドルに通知が来たときだけ、poll を再実行するという仕組みになっています。これにより、ビジーループを避けることができるようになっています。

さて、この場合、ロックを取得できなかったタスクに次のチャンスが回ってくるのは、誰かがロックを解放したときです。なので、そのタイミングで通知を行えるように、 LocalWaker というハンドルを保存しておきます。

src/lib.rs
// Future, LocakWaker, Poll などを使うときに必要
// (futures-0.3以降では一部APIが標準ライブラリに取り込まれているが、
// それがまだ安定化されていない)
#![feature(futures_api)]
src/unsync.rs
use futures::task::LocalWaker;

pub struct Mutex<T: ?Sized> {
    locked: Cell<bool>,
    waiters: Cell<Vec<LocalWaker>>, // 追加
    data: UnsafeCell<T>,
}

impl<T> Mutex<T> {
    pub fn new(inner: T) -> Self {
        Self {
            locked: Cell::new(false),
            waiters: Cell::new(Vec::new()), // 追加
            data: UnsafeCell::new(inner),
        }
    }
}

保存された LocalWaker を用いて、ロックの解放を通知します。

src/unsync.rs
impl<'a, T: ?Sized + 'a> Drop for MutexGuard<'a, T> {
    fn drop(&mut self) {
        self.mutex.locked.set(false);
        let mut waiters = self.mutex.waiters.replace(Vec::new());
        for waiter in waiters.drain(..) {
            waiter.wake();
        }
        self.mutex.waiters.replace(waiters);
    }
}

Cell から一旦取り出して元に戻しているのは個人的な趣味にすぎないので、 RefCell とかでもいいと思います。

さて、ここでちょっと不思議に思うところがあるかもしれません。**たくさんの人がロックを待機していても、実際に次に取得できるのは1人なんだから、1つだけ取り出して通知すればよいのでは?** ただ、これはやめておいたほうがよさそうです。**futuresでは基本的に、通知とpoll呼び出しが1対1に対応している保証はない**からです。これはjoinとselectのせいです。
  • join を使うと、同一タスク内で複数のfutureを同時に待機し、両方の完了を待って次に進むことができます。この場合、タスクが通知を受けても実際の由来がわからないため、両方をpollする必要があります。これにより、「通知を受けていないが poll が再実行される」という現象が起こりえます。
  • select を使うと、同一タスク内で複数のfutureを同時に待機し、どちらか片方の完了を待って次に進むことができます。この場合、どちらか一方のfutureが通知を予約していても、もう一方のfutureの完了をもってタスクが進捗し、通知を予約していた側のfutureは早々に破棄されている、という順番になることがあります。これにより、「通知を受けたものの、それに対応する poll はもはや呼ばれない」という現象が起こりえます。

……と思って調べてみたのですが、 mpsc のbounded queueの実装ではあまり考えずにnotify anyをしていますね。もしかしたらそれでいいのかもしれません。とりあえずStack Overflowの質問にしておきました。

本稿では、とりあえず全員に通知するつもりで実装を続けることにします。通知しすぎても、パフォーマンス上不利になりうるだけで、定性的な問題はありません。

ロック解放時の通知は実装したので、それを使って poll_lock を実装します。

src/unsync.rs
use futures::task::{LocalWaker, Poll};

impl<T: ?Sized> Mutex<T> {
    pub fn poll_lock(&self, lw: &LocalWaker) -> Poll<MutexGuard<'_, T>> {
        if self.locked.get() {
            let mut waiters = self.waiters.replace(Vec::new());
            waiters.push(lw.clone());
            self.waiters.replace(waiters);
            return Poll::Pending;
        }

        let guard = MutexGuard::new(self);
        Poll::Ready(guard)
    }
}

pub にするかどうかはお好みですが、現状ではpoll系の関数も公開する傾向にありそうです。

この関数はだいたい try_lock と似てますが、ロックが取得できなかったときは LocalWaker のインスタンスを解放待ちリストに登録しています。

lock を実装する

poll_lock のラッパーとして lock を実装しましょう。 lock はタスクをブロックするので、 MutexGuard 自体ではなく、そのfutureを返します。そのための構造体も作ります。

src/lib.rs
// arbitrary_self_types: Pin<&mut Self> をレシーバにするために必要
// pin: Pin型を使うために必要
#![feature(arbitrary_self_types, pin, futures_api)]
src/unsync.rs
use std::pin::Pin;

// Future型が必要になってきたので、preludeを丸ごと入れる
use futures::prelude::*;

impl<T: ?Sized> Mutex<T> {
    // ラッパーを返すだけ
    pub fn lock(&self) -> MutexAcquire<'_, T> {
        MutexAcquire { mutex: self }
    }
}

pub struct MutexAcquire<'a, T: ?Sized + 'a> {
    mutex: &'a Mutex<T>,
}

impl<'a, T: ?Sized + 'a> Future for MutexAcquire<'a, T> {
    type Output = MutexGuard<'a, T>;
    // Pinの扱いは色々面倒だけど、今回は&selfにしか興味がないので普通に使える
    fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
        self.mutex.poll_lock(lw)
    }
}

これでMutexとしての最低限の機能が整いました。

ポイズニングを実装する

Mutex の使用感を再現するために、ポイズニングを実装しましょう。

シングルスレッドではあまり有り難みがないかもしれませんが、本稿は最終的にはマルチスレッド化を目指すので、そこら辺も考慮しています。

まずは、 Mutex がポイズンされたかどうかのフラグを作ります。

src/unsync.rs
use std::thread;

pub struct Mutex<T: ?Sized> {
    locked: Cell<bool>,
    poisoned: Cell<bool>, // 追加
    waiters: Cell<Vec<LocalWaker>>,
    data: UnsafeCell<T>,
}

impl<T> Mutex<T> {
    pub fn new(inner: T) -> Self {
        Self {
            locked: Cell::new(false),
            poisoned: Cell::new(false), // 追加
            waiters: Cell::new(Vec::new()),
            data: UnsafeCell::new(inner),
        }
    }
}

impl<T: ?Sized> Mutex<T> {
    pub fn is_poisoned(&self) -> bool {
        self.poisoned.get()
    }
}

pub struct MutexGuard<'a, T: ?Sized + 'a> {
    mutex: &'a Mutex<T>,
    is_panicking: bool, // 追加
}

impl<'a, T: ?Sized + 'a> MutexGuard<'a, T> {
    fn new(mutex: &'a Mutex<T>) -> Self {
        mutex.locked.set(true);
        Self {
            mutex,
            is_panicking: thread::panicking(), // 追加
        }
    }
}

impl<'a, T: ?Sized + 'a> Drop for MutexGuard<'a, T> {
    fn drop(&mut self) {
        self.mutex.locked.set(false);
        // この処理が追加。
        // ロックをホールド中にパニックになったときはフラグを立てる
        if !self.is_panicking && thread::panicking() {
            self.mutex.poisoned.set(true);
        }

        let mut waiters = self.mutex.waiters.replace(Vec::new());
        for waiter in waiters.drain(..) {
            waiter.wake();
        }
        self.mutex.waiters.replace(waiters);
    }
}

あとは、今まで作った関数の戻り値を、「ポイズンされていなければ Ok, ポイズンされていたら Err」となるように置き換えていきます。ポイズン関係の型を自作するのは面倒なので std::sync から借りてきます。

src/unsync.rs
use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult};

impl<T> Mutex<T> {
    // 戻り値変更
    pub fn into_inner(self) -> LockResult<T> {
        // poisonedを取得
        let Self { poisoned, data, .. } = self;
        let poisoned = poisoned.into_inner();
        let inner = data.into_inner();
        // 戻り値変更
        if poisoned {
            Err(PoisonError::new(inner))
        } else {
            Ok(inner)
        }
    }
}

impl<T: ?Sized> Mutex<T> {
    // 戻り値変更
    pub fn poll_lock(&self, lw: &LocalWaker) -> Poll<LockResult<MutexGuard<'_, T>>> {
        if self.locked.get() {
            let mut waiters = self.waiters.replace(Vec::new());
            waiters.push(lw.clone());
            self.waiters.replace(waiters);
            return Poll::Pending;
        }

        let guard = MutexGuard::new(self);
        // 戻り値変更
        if self.poisoned.get() {
            Poll::Ready(Err(PoisonError::new(guard)))
        } else {
            Poll::Ready(Ok(guard))
        }
    }

    // 戻り値変更
    pub fn try_lock(&self) -> TryLockResult<MutexGuard<'_, T>> {
        if self.locked.get() {
            // 戻り値変更
            return Err(TryLockError::WouldBlock);
        }

        let guard = MutexGuard::new(self);
        // 戻り値変更
        if self.poisoned.get() {
            Err(PoisonError::new(guard).into())
        } else {
            Ok(guard)
        }
    }

    // 戻り値変更
    pub fn get_mut(&mut self) -> LockResult<&mut T> {
        let inner = unsafe { &mut *self.data.get() };
        // 戻り値変更
        if self.poisoned.get() {
            Err(PoisonError::new(inner))
        } else {
            Ok(inner)
        }
    }
}

impl<'a, T: ?Sized + 'a> Future for MutexAcquire<'a, T> {
    // ここを変更
    type Output = LockResult<MutexGuard<'a, T>>;
}

せっかくpanic safeにしたので、 UnwindSafe/RefUnwindSafe を実装しておきます。

src/unsync.rs
use std::panic::{RefUnwindSafe, UnwindSafe};

impl<T: ?Sized> UnwindSafe for Mutex<T> {}
impl<T: ?Sized> RefUnwindSafe for Mutex<T> {}

諸々を実装する

おまけとして、 Mutex にあると便利な実装をいくつか書いておきます。まず From/Default 実装。

src/unsync.rs
impl<T> From<T> for Mutex<T> {
    fn from(x: T) -> Self {
        Mutex::new(x)
    }
}

impl<T: Default> Default for Mutex<T> {
    fn default() -> Self {
        Mutex::new(T::default())
    }
}

それから、 Debug/Display の実装です。 MutexDebug を実装していますが、ロックが取得できなかったときは <locked> と表示するようにしています。

src/unsync.rs
use std::fmt;

impl<T: ?Sized + fmt::Debug> fmt::Debug for Mutex<T> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        // Debug はポイズンしてても表示する
        let guard = match self.try_lock() {
            Ok(guard) => Ok(guard),
            Err(TryLockError::Poisoned(err)) => Ok(err.into_inner()),
            Err(TryLockError::WouldBlock) => Err(()),
        };
        if let Ok(guard) = guard {
            f.debug_struct("Mutex")
                // &dyn Debugが要求されるので二重参照をとる (よくあるテク)
                .field("data", &(&guard as &T))
                .finish()
        } else {
            // <locked> と表示するためのダミー構造体を作る
            struct LockedPlaceholder;
            impl fmt::Debug for LockedPlaceholder {
                fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
                    f.write_str("<locked>")
                }
            }
            f.debug_struct("Mutex")
                .field("data", &LockedPlaceholder)
                .finish()
        }
    }
}


impl<'a, T: ?Sized + fmt::Debug + 'a> fmt::Debug for MutexGuard<'a, T> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        // これは本家Mutexとちょっと実装が違うけど、多分こっちのほうが嬉しい
        f.debug_struct("MutexGuard")
            .field("data", &(self as &T))
            .finish()
    }
}

impl<'a, T: ?Sized + fmt::Display + 'a> fmt::Display for MutexGuard<'a, T> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        <T as fmt::Display>::fmt(self, f)
    }
}

// Future用ラッパーは普通にDebugをderiveしておけばだいたいOK
#[derive(Debug)]
pub struct MutexAcquire<'a, T: ?Sized + 'a> {
    mutex: &'a Mutex<T>,
}

ちょっと例を書いてみる

あんまり複雑でちゃんとした例を思いつかないので、食事する哲学者を書いて終わりにしようと思います。

Cargo.toml
[dev-dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
futures-test-preview = "0.3.0-alpha.9"
tokio = "0.1.11"
rand = "0.5.5"
examples/unsync_philosopher.rs
#![feature(async_await, await_macro, pin, futures_api)]

use std::rc::Rc;

use futures::prelude::*;
use rand::prelude::*;

// 今回はtokioの標準ランタイムではなく、current_threadランタイムを使う。
// 今回作ったMutexがシングルスレッド用だったため、こちらでないと動かせない。
use tokio::runtime::current_thread::{Runtime, spawn};
use futures_mutex::unsync::Mutex;
use futures_test::future::FutureTestExt;

// スケジューリングを強制的に狂わせる。
// ありがたいことにcurrent_threadランタイムは決定的に動作するので、
// そのままだと簡単な例ではデッドロックが再現しないことがある。
// なので適当なタイミングでこれを呼んで強制的にスケジューリングをランダムにする。
async fn jitter() {
    let num = thread_rng().gen_range(0, 10);
    for _ in 0..num {
        // pending_onceはfutures_testにある。1回休みをすることができて便利
        await!(async {}.pending_once());
    }
}

async fn main2() {
    let resources = (0..5_i32).map(|i| Rc::new(Mutex::new(i))).collect::<Vec<_>>();
    for i in 0..5 {
        let res0 = resources[i].clone();
        let res1 = resources[(i + 1) % 5].clone();
        spawn(async move {
            for _ in 0..100 {
                let lock0 = await!(res0.lock()).unwrap();
                await!(jitter());
                eprintln!("Thread {}: acquired {}", i, *lock0);

                let lock1 = await!(res1.lock()).unwrap();
                await!(jitter());
                eprintln!("Thread {}: acquired {}", i, *lock1);

                // ここら辺のdropは勝手にやるから基本的には不要。
                // jitterの呼び出しをするために入れている。
                drop(lock1);
                await!(jitter());
                drop(lock0);
                await!(jitter());
            }
            println!("Thread {}: done!", i);
            Ok(())
        }.boxed().compat());
        // ↑futures-0.3用のFutureを0.1に変換するおまじない
        // Resultを返さないと `compat` を解決できないので注意
    }
}

fn main() {
    let mut rt = Runtime::new().unwrap();
    rt.spawn(async {
        await!(main2());
        Ok(())
    }.boxed().compat());
    // ↑futures-0.3用のFutureを0.1に変換するおまじない
    // Resultを返さないと `compat` を解決できないので注意
    rt.run().unwrap();
}

実際に実行してみると、だいたいどこかで止まります。

$ cargo run --example unsync_philosopher
     Running `target/debug/examples/unsync_philosopher`
Thread 0: acquired 0
Thread 3: acquired 3
Thread 2: acquired 2
Thread 0: acquired 1
Thread 3: acquired 4
Thread 1: acquired 1
Thread 4: acquired 4
Thread 0: acquired 0
Thread 2: acquired 3
Thread 3: acquired 3
(中略)
Thread 3: acquired 3
Thread 1: acquired 2
Thread 2: acquired 2
Thread 0: acquired 1
Thread 4: acquired 0
Thread 1: acquired 1
Thread 0: acquired 0
Thread 3: acquired 4
Thread 4: acquired 4
Thread 3: acquired 3
^C

デッドロックしないバージョンも書いてみます。

examples/unsync_philosopher_ok.rs
#![feature(async_await, await_macro, pin, futures_api)]

use std::rc::Rc;

use futures::prelude::*;
use rand::prelude::*;

use tokio::runtime::current_thread::{Runtime, spawn};
use futures_mutex::unsync::Mutex;
use futures_test::future::FutureTestExt;

async fn jitter() {
    let num = thread_rng().gen_range(0, 10);
    for _ in 0..num {
        await!(async {}.pending_once());
    }
}

async fn main2() {
    let resources = (0..5_i32).map(|i| Rc::new(Mutex::new(i))).collect::<Vec<_>>();
    for i in 0..5 {
        // res0, res1 のとり方だけ異なる
        let (res0, res1) = if i == 4 {
            (resources[0].clone(), resources[4].clone())
        } else {
            (resources[i].clone(), resources[i + 1].clone())
        };
        spawn(async move {
            for _ in 0..100 {
                let lock0 = await!(res0.lock()).unwrap();
                await!(jitter());
                eprintln!("Thread {}: acquired {}", i, *lock0);

                let lock1 = await!(res1.lock()).unwrap();
                await!(jitter());
                eprintln!("Thread {}: acquired {}", i, *lock1);

                drop(lock1);
                await!(jitter());
                drop(lock0);
                await!(jitter());
            }
            println!("Thread {}: done!", i);
            Ok(())
        }.boxed().compat());
    }
}

fn main() {
    let mut rt = Runtime::new().unwrap();
    rt.spawn(async {
        await!(main2());
        Ok(())
    }.boxed().compat());
    rt.run().unwrap();
}

これは実行するとちゃんと終わります。

$ cargo run --example unsync_philosopher_ok
     Running `target/debug/examples/unsync_philosopher_ok`
Thread 3: acquired 3
Thread 3: acquired 4
Thread 1: acquired 1
Thread 0: acquired 0
Thread 2: acquired 2
Thread 2: acquired 3
Thread 3: acquired 3
Thread 3: acquired 4
Thread 1: acquired 2
Thread 2: acquired 2
(中略)
Thread 0: acquired 0
Thread 0: acquired 1
Thread 0: acquired 0
Thread 0: acquired 1
Thread 0: acquired 0
Thread 0: acquired 1
Thread 0: acquired 0
Thread 0: acquired 1
Thread 0: acquired 0
Thread 0: acquired 1
Thread 0: done!

今日は力尽きたのでここまでです。やる気とかが残ってたらマルチスレッド版も作ってみようと思います。

51
26
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
51
26