はじめに
なんか面白そうなので自作してみただけです。
これを書いて筆者はこんなことを学びました
-
futures
/tokio
のパーキング・通知処理まわり - 標準ライブラリの
Mutex
の実装の細かいところ - ポイズニングの詳細
-
async
/await
のコツ -
tokio::runtime::current_thread
の使い方
ソースコード
以下、ちょっとながい背景
futures
とは
Rustは、実行時オーバーヘッドのある抽象化はできるだけオプショナルで提供するような設計になっています。軽量スレッドもそのひとつで、Rustの標準ライブラリはOSのスレッドを(プラットフォームの違いだけ吸収して)できるだけそのまま提供しています。
この軽量スレッドの機能を提供するのが、 futures
/tokio
です。 futures
/tokio
では、軽量スレッドをタスクと呼んでいます。 futures
はタスクを組み立てるための枠組みを提供していて、 tokio
は実際にそれを動かすのに必要なランタイムやI/Oライブラリなどを提供しています。
async
/await
の登場
現状の futures
のかかえる問題を解決するため、現在 async
/await
構文の導入が進められています。たとえば、現在の tokio::io::copy
に相当する関数を async
/await
を用いて自作すると、およそ以下のようになります。
async fn copy<R, W>(mut reader: R, mut writer: W) -> Result<u64, io::Error>
where
R: AsyncRead,
W: AsyncWrite,
{
let mut buf = [0; 1024];
let mut count = 0;
loop {
let num_read = await!(io::read(&mut reader, &mut buf[..]).compat())?.2;
if num_read == 0 {
break;
}
await!(io::write_all(&mut writer, &buf[..num_read]).compat())?;
count += num_read as u64;
}
Ok(count)
}
ライブラリの互換性やら何やらの問題で、完璧にシュッとしてるのとは言いがたいのと、 write_all
を使っているのがちょっとズルいのが難点ですが、これを理想的な形に書き直すと以下のようになるはずです。
async fn copy<R, W>(mut reader: R, mut writer: W) -> Result<u64, io::Error>
where
R: AsyncRead,
W: AsyncWrite,
{
let mut buf = [0; 1024];
let mut count = 0;
loop {
let num_read = (await reader.aread(&mut buf))?;
if num_read == 0 {
break;
}
let mut pos = 0;
while pos < num_read {
let num_written = (await writer.awrite(&buf[pos..num_read]))?;
pos += num_written;
}
count += num_read as u64;
}
Ok(count)
}
これを以下の普通の copy
と比べてみてください。
fn copy<R, W>(mut reader: R, mut writer: W) -> Result<u64, io::Error>
where
R: Read,
W: Write,
{
let mut buf = [0; 1024];
let mut count = 0;
loop {
let num_read = reader.read(&mut buf)?;
if num_read == 0 {
break;
}
let mut pos = 0;
while pos < num_read {
let num_written = writer.write(&buf[pos..num_read])?;
pos += num_written;
}
count += num_read as u64;
}
Ok(count)
}
かなり近い感じでプログラミングできることになります。(この世界が早くきてほしい)
ブロッキング処理にご注意
さて、async Rustで書いたほうがいい理由は、他の言語の軽量スレッドの利点と似ているので省略します。async Rustを書く上で気をつけなければいけないことは何でしょうか。それはブロッキング処理の取り扱いです。
async Rustの内部では、スレッドをブロックする処理は原則使用禁止です。そのかわり、 tokio
などが提供する対応物を使います。それらはタスクをブロックするが、スレッドをブロックしないように設計されています。これらを使うための構文が await!
だったというわけです。先ほどの例では Read
/Write
の対応物として AsyncRead
/AsyncWrite
がありました。
並行処理プリミティブ
I/Oの他によくブロックする処理として、並行処理プリミティブがあります。では futures
/tokio
ではどうするかというと、標準ライブラリにある mpsc
チャネルの対応物が提供されています。
一方、 Mutex
の対応物は提供されていません。 (ただし、 futures-0.1
には BiLock
という Mutex
の限定版があります)
じゃあ Mutex
は使えないのかというとそうではなく、ロック取得時間が長くなるような使い方をしなければスレッドをブロックしないので、通常の Mutex
を部分的に使っても問題ないということだと思われます。逆に言えば、長期間ロックするような使い方になりそうな場合は、設計を見直してチャネルなどに置き換えたほうがいいということではないかと思います。
まあそれはそれとして、ないならいい練習問題なので作ってみようと思いました。以上背景。
通常のMutex<T>
の要件
通常の Mutex<T>
の要件は端的に言うと以下です。
- 一度に最大一人だけがロックを取れる。
- ロックを取ると
&Mutex<T>
から&mut T
が手に入る。-
&
は共有参照、&mut
は専有参照であることに注意
-
-
try_lock
では、ロックを取れなかった場合は即座に返る。 -
lock
では、ロックを取れなかった場合はロックが取れるまでスレッドをブロックする。
それに加えて、Rustのパニック安全性のためにポイズニングという機構がついています。これは以下のようなものです。
- ロックを取得中にパニックが発生した場合、poisonフラグが立てられる。 (処理中の内部状態が露出されているかもしれないというフラグ)
- poisonフラグがついたものをロックしようとした場合、原則エラーになる。
- ただし救済措置として、ポイズニングされたロックを明示的に取り出すことはできるようになっている。
今回、futuresで動くMutexを作るにあたって、 try_lock
はあまり関係なく、 lock
のほうが重要な役割を持っていそうです。
ちなみに肝心の実装ですが、以下のようになっています。
pub struct Mutex<T: ?Sized> {
// Note that this mutex is in a *box*, not inlined into the struct itself.
// Once a native mutex has been used once, its address can never change (it
// can't be moved). This mutex type can be safely moved at any time, so to
// ensure that the native mutex is used correctly we box the inner mutex to
// give it a constant address.
inner: Box<sys::Mutex>,
poison: poison::Flag,
data: UnsafeCell<T>,
}
この sys::Mutex
はOSの提供するプリミティブ (pthread mutexなど) を共通のインターフェースで使えるようにした内部向け構造体です。つまり、排他制御自体はpthreadに完全におまかせということですね。
おそらくですが、 try_lock
とスピンロックだけからなるmutexなら AtomicU8
などで自作することができるはずです。そうしないのは、スレッドを正しくブロックする機構が必要だからだと考えられます。
ちなみに parking_lot
クレートの Mutex
は実際に AtomicU8
を用いて実装されています。こちらは基本的にスピンロックを使いつつ、ロックやアンロックで時間にかかりそうなときはアドレスのハッシュ値をもとにグローバルの条件変数で待つような実装になっているようです(駐車場という名前も、おそらくそういう比喩)。
シングルスレッドのMutexを作る
「シングルスレッドのMutex」というと意味不明ですが、ここではシングルスレッド・マルチタスクの状況で、タスクに対してMutexのように動作するライブラリを作ることを考えます。本当はちゃんとした Mutex
にしたいのですが、いきなり書くのはかなり難しい(タスク機構とスレッド並行性の両方を同時に考えないといけない)ので、いったんシングルスレッドでやって、futureに慣れていきたいと思います。
では、実際に実装していきます。
今回はnightlyでRust2018を使って作っていこうと思うので以下のようにします。(つまり、以下の実装は今後動かなくなるかもしれないということです)
$ rustup install nightly
$ cargo +nightly new --lib futures-mutex
$ cd futures-mutex
$ echo nightly > rust-toolchain
依存関係は、とりあえず以下のようにしておきます。
[dependencies]
futures-preview = "0.3.0-alpha.9"
あとでマルチスレッド版を作りたいという動機があったので、シングルスレッド版は unsync
というモジュールに入れておくことにします。
pub mod unsync;
// これから書く
Mutex型を作る
ポイズニングとブロックの話は置いておいて、最低限で作るとこんな感じになるはずです。
use std::cell::{Cell, UnsafeCell};
pub struct Mutex<T: ?Sized> {
locked: Cell<bool>,
data: UnsafeCell<T>,
}
impl<T> Mutex<T> {
pub fn new(inner: T) -> Self {
Self {
locked: Cell::new(false),
data: UnsafeCell::new(inner),
}
}
}
先に簡単なものを2つ実装してしまいます。 into_inner
と get_mut
は排他制御と関係ないので、すぐに実装できます。
impl<T> Mutex<T> {
pub fn into_inner(self) -> T {
let Self { data, .. } = self;
let inner = data.into_inner();
inner
}
}
impl<T: ?Sized> Mutex<T> {
pub fn get_mut(&mut self) -> &mut T {
let inner = unsafe { &mut *self.data.get() };
inner
}
}
MutexGuardを作る
Mutexのロックに成功したらMutexGuardが返ってきます。これは &mut T
として動作しつつ、自動でロックを返すRAIIガードの役割を果たします。 Deref
と DerefMut
を実装しておきます。
use std::ops::{Deref, DerefMut};
pub struct MutexGuard<'a, T: ?Sized + 'a> {
mutex: &'a Mutex<T>,
}
impl<'a, T: ?Sized + 'a> Deref for MutexGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.mutex.data.get() }
}
}
impl<'a, T: ?Sized + 'a> DerefMut for MutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.mutex.data.get() }
}
}
try_lock
を実装する
前述のとおり、 try_lock
はブロッキングが関与しないのであまり考えることがありません。
MutexGuard
を作るときに locked
フラグをセットします。そして、 MutexGuard
がドロップされるときに locked
フラグを落とします。もちろん、既に locked
なときはロック失敗を返すようにします。
impl<T: ?Sized> Mutex<T> {
pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
if self.locked.get() {
return None;
}
let guard = MutexGuard::new(self);
Some(guard)
}
}
impl<'a, T: ?Sized + 'a> MutexGuard<'a, T> {
fn new(mutex: &'a Mutex<T>) -> Self {
mutex.locked.set(true);
Self { mutex }
}
}
impl<'a, T: ?Sized + 'a> Drop for MutexGuard<'a, T> {
fn drop(&mut self) {
self.mutex.locked.set(false);
}
}
poll_lock
を実装する
lock
の内部実装になる予定の poll_lock
を先に作ります。
poll_lock
は try_lock
とだいたい同じですが、futuresの Poll
インターフェースにあわせた形で返却します。もっと大きな違いとして、 poll_lock
はwakerの登録をするという違いがあります。これがfuturesのブロッキングの仕組みに大きくかかわってきます。
これから実装する Future::poll
は、処理を即座に実行できないときは NotReady
を返して処理を中断します(そうすると、タスク側から見るとあたかもブロックしたかのように見える)。tokioランタイムは、 NotReady
を返されたタスクを自動で再実行しません。wakerというハンドルに通知が来たときだけ、poll
を再実行するという仕組みになっています。これにより、ビジーループを避けることができるようになっています。
さて、この場合、ロックを取得できなかったタスクに次のチャンスが回ってくるのは、誰かがロックを解放したときです。なので、そのタイミングで通知を行えるように、 LocalWaker
というハンドルを保存しておきます。
// Future, LocakWaker, Poll などを使うときに必要
// (futures-0.3以降では一部APIが標準ライブラリに取り込まれているが、
// それがまだ安定化されていない)
#![feature(futures_api)]
use futures::task::LocalWaker;
pub struct Mutex<T: ?Sized> {
locked: Cell<bool>,
waiters: Cell<Vec<LocalWaker>>, // 追加
data: UnsafeCell<T>,
}
impl<T> Mutex<T> {
pub fn new(inner: T) -> Self {
Self {
locked: Cell::new(false),
waiters: Cell::new(Vec::new()), // 追加
data: UnsafeCell::new(inner),
}
}
}
保存された LocalWaker
を用いて、ロックの解放を通知します。
impl<'a, T: ?Sized + 'a> Drop for MutexGuard<'a, T> {
fn drop(&mut self) {
self.mutex.locked.set(false);
let mut waiters = self.mutex.waiters.replace(Vec::new());
for waiter in waiters.drain(..) {
waiter.wake();
}
self.mutex.waiters.replace(waiters);
}
}
Cell
から一旦取り出して元に戻しているのは個人的な趣味にすぎないので、 RefCell
とかでもいいと思います。
さて、ここでちょっと不思議に思うところがあるかもしれません。たくさんの人がロックを待機していても、実際に次に取得できるのは1人なんだから、1つだけ取り出して通知すればよいのでは?
ただ、これはやめておいたほうがよさそうです。futuresでは基本的に、通知とpoll呼び出しが1対1に対応している保証はないからです。これはjoinとselectのせいです。
join
を使うと、同一タスク内で複数のfutureを同時に待機し、両方の完了を待って次に進むことができます。この場合、タスクが通知を受けても実際の由来がわからないため、両方をpollする必要があります。これにより、「通知を受けていないがpoll
が再実行される」という現象が起こりえます。select
を使うと、同一タスク内で複数のfutureを同時に待機し、どちらか片方の完了を待って次に進むことができます。この場合、どちらか一方のfutureが通知を予約していても、もう一方のfutureの完了をもってタスクが進捗し、通知を予約していた側のfutureは早々に破棄されている、という順番になることがあります。これにより、「通知を受けたものの、それに対応するpoll
はもはや呼ばれない」という現象が起こりえます。
……と思って調べてみたのですが、 mpsc
のbounded queueの実装ではあまり考えずにnotify anyをしていますね。もしかしたらそれでいいのかもしれません。とりあえずStack Overflowの質問にしておきました。
本稿では、とりあえず全員に通知するつもりで実装を続けることにします。通知しすぎても、パフォーマンス上不利になりうるだけで、定性的な問題はありません。
ロック解放時の通知は実装したので、それを使って poll_lock
を実装します。
use futures::task::{LocalWaker, Poll};
impl<T: ?Sized> Mutex<T> {
pub fn poll_lock(&self, lw: &LocalWaker) -> Poll<MutexGuard<'_, T>> {
if self.locked.get() {
let mut waiters = self.waiters.replace(Vec::new());
waiters.push(lw.clone());
self.waiters.replace(waiters);
return Poll::Pending;
}
let guard = MutexGuard::new(self);
Poll::Ready(guard)
}
}
pub
にするかどうかはお好みですが、現状ではpoll系の関数も公開する傾向にありそうです。
この関数はだいたい try_lock
と似てますが、ロックが取得できなかったときは LocalWaker
のインスタンスを解放待ちリストに登録しています。
lock
を実装する
poll_lock
のラッパーとして lock
を実装しましょう。 lock
はタスクをブロックするので、 MutexGuard
自体ではなく、そのfutureを返します。そのための構造体も作ります。
// arbitrary_self_types: Pin<&mut Self> をレシーバにするために必要
// pin: Pin型を使うために必要
#![feature(arbitrary_self_types, pin, futures_api)]
use std::pin::Pin;
// Future型が必要になってきたので、preludeを丸ごと入れる
use futures::prelude::*;
impl<T: ?Sized> Mutex<T> {
// ラッパーを返すだけ
pub fn lock(&self) -> MutexAcquire<'_, T> {
MutexAcquire { mutex: self }
}
}
pub struct MutexAcquire<'a, T: ?Sized + 'a> {
mutex: &'a Mutex<T>,
}
impl<'a, T: ?Sized + 'a> Future for MutexAcquire<'a, T> {
type Output = MutexGuard<'a, T>;
// Pinの扱いは色々面倒だけど、今回は&selfにしか興味がないので普通に使える
fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
self.mutex.poll_lock(lw)
}
}
これでMutexとしての最低限の機能が整いました。
ポイズニングを実装する
Mutex
の使用感を再現するために、ポイズニングを実装しましょう。
シングルスレッドではあまり有り難みがないかもしれませんが、本稿は最終的にはマルチスレッド化を目指すので、そこら辺も考慮しています。
まずは、 Mutex
がポイズンされたかどうかのフラグを作ります。
use std::thread;
pub struct Mutex<T: ?Sized> {
locked: Cell<bool>,
poisoned: Cell<bool>, // 追加
waiters: Cell<Vec<LocalWaker>>,
data: UnsafeCell<T>,
}
impl<T> Mutex<T> {
pub fn new(inner: T) -> Self {
Self {
locked: Cell::new(false),
poisoned: Cell::new(false), // 追加
waiters: Cell::new(Vec::new()),
data: UnsafeCell::new(inner),
}
}
}
impl<T: ?Sized> Mutex<T> {
pub fn is_poisoned(&self) -> bool {
self.poisoned.get()
}
}
pub struct MutexGuard<'a, T: ?Sized + 'a> {
mutex: &'a Mutex<T>,
is_panicking: bool, // 追加
}
impl<'a, T: ?Sized + 'a> MutexGuard<'a, T> {
fn new(mutex: &'a Mutex<T>) -> Self {
mutex.locked.set(true);
Self {
mutex,
is_panicking: thread::panicking(), // 追加
}
}
}
impl<'a, T: ?Sized + 'a> Drop for MutexGuard<'a, T> {
fn drop(&mut self) {
self.mutex.locked.set(false);
// この処理が追加。
// ロックをホールド中にパニックになったときはフラグを立てる
if !self.is_panicking && thread::panicking() {
self.mutex.poisoned.set(true);
}
let mut waiters = self.mutex.waiters.replace(Vec::new());
for waiter in waiters.drain(..) {
waiter.wake();
}
self.mutex.waiters.replace(waiters);
}
}
あとは、今まで作った関数の戻り値を、「ポイズンされていなければ Ok
, ポイズンされていたら Err
」となるように置き換えていきます。ポイズン関係の型を自作するのは面倒なので std::sync
から借りてきます。
use std::sync::{LockResult, PoisonError, TryLockError, TryLockResult};
impl<T> Mutex<T> {
// 戻り値変更
pub fn into_inner(self) -> LockResult<T> {
// poisonedを取得
let Self { poisoned, data, .. } = self;
let poisoned = poisoned.into_inner();
let inner = data.into_inner();
// 戻り値変更
if poisoned {
Err(PoisonError::new(inner))
} else {
Ok(inner)
}
}
}
impl<T: ?Sized> Mutex<T> {
// 戻り値変更
pub fn poll_lock(&self, lw: &LocalWaker) -> Poll<LockResult<MutexGuard<'_, T>>> {
if self.locked.get() {
let mut waiters = self.waiters.replace(Vec::new());
waiters.push(lw.clone());
self.waiters.replace(waiters);
return Poll::Pending;
}
let guard = MutexGuard::new(self);
// 戻り値変更
if self.poisoned.get() {
Poll::Ready(Err(PoisonError::new(guard)))
} else {
Poll::Ready(Ok(guard))
}
}
// 戻り値変更
pub fn try_lock(&self) -> TryLockResult<MutexGuard<'_, T>> {
if self.locked.get() {
// 戻り値変更
return Err(TryLockError::WouldBlock);
}
let guard = MutexGuard::new(self);
// 戻り値変更
if self.poisoned.get() {
Err(PoisonError::new(guard).into())
} else {
Ok(guard)
}
}
// 戻り値変更
pub fn get_mut(&mut self) -> LockResult<&mut T> {
let inner = unsafe { &mut *self.data.get() };
// 戻り値変更
if self.poisoned.get() {
Err(PoisonError::new(inner))
} else {
Ok(inner)
}
}
}
impl<'a, T: ?Sized + 'a> Future for MutexAcquire<'a, T> {
// ここを変更
type Output = LockResult<MutexGuard<'a, T>>;
}
せっかくpanic safeにしたので、 UnwindSafe
/RefUnwindSafe
を実装しておきます。
use std::panic::{RefUnwindSafe, UnwindSafe};
impl<T: ?Sized> UnwindSafe for Mutex<T> {}
impl<T: ?Sized> RefUnwindSafe for Mutex<T> {}
諸々を実装する
おまけとして、 Mutex
にあると便利な実装をいくつか書いておきます。まず From
/Default
実装。
impl<T> From<T> for Mutex<T> {
fn from(x: T) -> Self {
Mutex::new(x)
}
}
impl<T: Default> Default for Mutex<T> {
fn default() -> Self {
Mutex::new(T::default())
}
}
それから、 Debug
/Display
の実装です。 Mutex
は Debug
を実装していますが、ロックが取得できなかったときは <locked>
と表示するようにしています。
use std::fmt;
impl<T: ?Sized + fmt::Debug> fmt::Debug for Mutex<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// Debug はポイズンしてても表示する
let guard = match self.try_lock() {
Ok(guard) => Ok(guard),
Err(TryLockError::Poisoned(err)) => Ok(err.into_inner()),
Err(TryLockError::WouldBlock) => Err(()),
};
if let Ok(guard) = guard {
f.debug_struct("Mutex")
// &dyn Debugが要求されるので二重参照をとる (よくあるテク)
.field("data", &(&guard as &T))
.finish()
} else {
// <locked> と表示するためのダミー構造体を作る
struct LockedPlaceholder;
impl fmt::Debug for LockedPlaceholder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("<locked>")
}
}
f.debug_struct("Mutex")
.field("data", &LockedPlaceholder)
.finish()
}
}
}
impl<'a, T: ?Sized + fmt::Debug + 'a> fmt::Debug for MutexGuard<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// これは本家Mutexとちょっと実装が違うけど、多分こっちのほうが嬉しい
f.debug_struct("MutexGuard")
.field("data", &(self as &T))
.finish()
}
}
impl<'a, T: ?Sized + fmt::Display + 'a> fmt::Display for MutexGuard<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
<T as fmt::Display>::fmt(self, f)
}
}
// Future用ラッパーは普通にDebugをderiveしておけばだいたいOK
#[derive(Debug)]
pub struct MutexAcquire<'a, T: ?Sized + 'a> {
mutex: &'a Mutex<T>,
}
ちょっと例を書いてみる
あんまり複雑でちゃんとした例を思いつかないので、食事する哲学者を書いて終わりにしようと思います。
[dev-dependencies]
futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] }
futures-test-preview = "0.3.0-alpha.9"
tokio = "0.1.11"
rand = "0.5.5"
#![feature(async_await, await_macro, pin, futures_api)]
use std::rc::Rc;
use futures::prelude::*;
use rand::prelude::*;
// 今回はtokioの標準ランタイムではなく、current_threadランタイムを使う。
// 今回作ったMutexがシングルスレッド用だったため、こちらでないと動かせない。
use tokio::runtime::current_thread::{Runtime, spawn};
use futures_mutex::unsync::Mutex;
use futures_test::future::FutureTestExt;
// スケジューリングを強制的に狂わせる。
// ありがたいことにcurrent_threadランタイムは決定的に動作するので、
// そのままだと簡単な例ではデッドロックが再現しないことがある。
// なので適当なタイミングでこれを呼んで強制的にスケジューリングをランダムにする。
async fn jitter() {
let num = thread_rng().gen_range(0, 10);
for _ in 0..num {
// pending_onceはfutures_testにある。1回休みをすることができて便利
await!(async {}.pending_once());
}
}
async fn main2() {
let resources = (0..5_i32).map(|i| Rc::new(Mutex::new(i))).collect::<Vec<_>>();
for i in 0..5 {
let res0 = resources[i].clone();
let res1 = resources[(i + 1) % 5].clone();
spawn(async move {
for _ in 0..100 {
let lock0 = await!(res0.lock()).unwrap();
await!(jitter());
eprintln!("Thread {}: acquired {}", i, *lock0);
let lock1 = await!(res1.lock()).unwrap();
await!(jitter());
eprintln!("Thread {}: acquired {}", i, *lock1);
// ここら辺のdropは勝手にやるから基本的には不要。
// jitterの呼び出しをするために入れている。
drop(lock1);
await!(jitter());
drop(lock0);
await!(jitter());
}
println!("Thread {}: done!", i);
Ok(())
}.boxed().compat());
// ↑futures-0.3用のFutureを0.1に変換するおまじない
// Resultを返さないと `compat` を解決できないので注意
}
}
fn main() {
let mut rt = Runtime::new().unwrap();
rt.spawn(async {
await!(main2());
Ok(())
}.boxed().compat());
// ↑futures-0.3用のFutureを0.1に変換するおまじない
// Resultを返さないと `compat` を解決できないので注意
rt.run().unwrap();
}
実際に実行してみると、だいたいどこかで止まります。
$ cargo run --example unsync_philosopher
Running `target/debug/examples/unsync_philosopher`
Thread 0: acquired 0
Thread 3: acquired 3
Thread 2: acquired 2
Thread 0: acquired 1
Thread 3: acquired 4
Thread 1: acquired 1
Thread 4: acquired 4
Thread 0: acquired 0
Thread 2: acquired 3
Thread 3: acquired 3
(中略)
Thread 3: acquired 3
Thread 1: acquired 2
Thread 2: acquired 2
Thread 0: acquired 1
Thread 4: acquired 0
Thread 1: acquired 1
Thread 0: acquired 0
Thread 3: acquired 4
Thread 4: acquired 4
Thread 3: acquired 3
^C
デッドロックしないバージョンも書いてみます。
#![feature(async_await, await_macro, pin, futures_api)]
use std::rc::Rc;
use futures::prelude::*;
use rand::prelude::*;
use tokio::runtime::current_thread::{Runtime, spawn};
use futures_mutex::unsync::Mutex;
use futures_test::future::FutureTestExt;
async fn jitter() {
let num = thread_rng().gen_range(0, 10);
for _ in 0..num {
await!(async {}.pending_once());
}
}
async fn main2() {
let resources = (0..5_i32).map(|i| Rc::new(Mutex::new(i))).collect::<Vec<_>>();
for i in 0..5 {
// res0, res1 のとり方だけ異なる
let (res0, res1) = if i == 4 {
(resources[0].clone(), resources[4].clone())
} else {
(resources[i].clone(), resources[i + 1].clone())
};
spawn(async move {
for _ in 0..100 {
let lock0 = await!(res0.lock()).unwrap();
await!(jitter());
eprintln!("Thread {}: acquired {}", i, *lock0);
let lock1 = await!(res1.lock()).unwrap();
await!(jitter());
eprintln!("Thread {}: acquired {}", i, *lock1);
drop(lock1);
await!(jitter());
drop(lock0);
await!(jitter());
}
println!("Thread {}: done!", i);
Ok(())
}.boxed().compat());
}
}
fn main() {
let mut rt = Runtime::new().unwrap();
rt.spawn(async {
await!(main2());
Ok(())
}.boxed().compat());
rt.run().unwrap();
}
これは実行するとちゃんと終わります。
$ cargo run --example unsync_philosopher_ok
Running `target/debug/examples/unsync_philosopher_ok`
Thread 3: acquired 3
Thread 3: acquired 4
Thread 1: acquired 1
Thread 0: acquired 0
Thread 2: acquired 2
Thread 2: acquired 3
Thread 3: acquired 3
Thread 3: acquired 4
Thread 1: acquired 2
Thread 2: acquired 2
(中略)
Thread 0: acquired 0
Thread 0: acquired 1
Thread 0: acquired 0
Thread 0: acquired 1
Thread 0: acquired 0
Thread 0: acquired 1
Thread 0: acquired 0
Thread 0: acquired 1
Thread 0: acquired 0
Thread 0: acquired 1
Thread 0: done!
今日は力尽きたのでここまでです。やる気とかが残ってたらマルチスレッド版も作ってみようと思います。