前の記事
- 【0】 準備 ← 初回
- ...
- 【31】 上限付きチャネル・PATCH機能 ~パンクしないように制御!~ ← 前回
- 【32】
Send・排他的ロック(Mutex)・非対称排他的ロック(RwLock) ~真打Arc<Mutex<T>>登場~ ← 今回
全記事一覧
- 【0】 準備
- 【1】 構文・整数・変数
- 【2】 if・パニック・演習
- 【3】 可変・ループ・オーバーフロー
- 【4】 キャスト・構造体 (たまにUFCS)
- 【5】 バリデーション・モジュールの公開範囲 ~ → カプセル化!~
- 【6】 カプセル化の続きと所有権とセッター ~そして不変参照と可変参照!~
- 【7】 スタック・ヒープと参照のサイズ ~メモリの話~
- 【8】 デストラクタ(変数の終わり)・トレイト ~終わりと始まり~
- 【9】 Orphan rule (孤児ルール)・演算子オーバーロード・derive ~Empowerment 💪 ~
- 【10】 トレイト境界・文字列・Derefトレイト ~トレイトのアレコレ~
- 【11】 Sized トレイト・From トレイト・関連型 ~おもしろトレイトと関連型~
- 【12】 Clone・Copy・Dropトレイト ~覚えるべき主要トレイトたち~
- 【13】 トレイトまとめ・列挙型・match式 ~最強のトレイトの次は、最強の列挙型~
- 【14】 フィールド付き列挙型とOption型 ~チョクワガタ~
- 【15】 Result型 ~Rust流エラーハンドリング術~
- 【16】 Errorトレイトと外部クレート ~依存はCargo.tomlに全部お任せ!~
- 【17】 thiserror・TryFrom ~トレイトもResultも自由自在!~
- 【18】 Errorのネスト・慣例的な書き方 ~Rustらしさの目醒め~
- 【19】 配列・動的配列 ~スタックが使われる配列と、ヒープに保存できる動的配列~
- 【20】 動的配列のリサイズ・イテレータ ~またまたトレイト登場!~
- 【21】 イテレータ・ライフタイム ~ライフタイム注釈ようやく登場!~
- 【22】 コンビネータ・RPIT ~
「 Iteratorトレイトを実装してるやつ」~ - 【23】
impl Trait・スライス ~配列の欠片~ - 【24】 可変スライス・下書き構造体 ~構造体で状態表現~
- 【25】 インデックス・可変インデックス ~インデックスもトレイト!~
- 【26】 HashMap・順序・BTreeMap ~Rustの辞書型~
- 【27】 スレッド・'staticライフタイム ~並列処理に見るRustの恩恵~
- 【28】 リーク・スコープ付きスレッド ~ライフタイムに技あり!~
- 【29】 チャネル・参照の内部可変性 ~Rustの虎の子、mpscと
Rc<RefCell<T>>~ - 【30】 双方向通信・リファクタリング ~返信用封筒を入れよう!~
- 【31】 上限付きチャネル・PATCH機能 ~パンクしないように制御!~
- 【32】
Send・排他的ロック(Mutex)・非対称排他的ロック(RwLock) ~真打Arc<Mutex<T>>登場~ - 【33】 チャネルなしで実装・Syncの話 ~考察回です~
- 【34】
async fn・非同期タスク生成 ~Rustの非同期入門~ - 【35】 非同期ランタイム・Futureトレイト ~非同期のお作法~
- 【36】 ブロッキング・非同期用の実装・キャンセル ~ラストスパート!~
- 【37】 Axumでクラサバ! ~最終回~
- 【おまけ1】 Rustで勘違いしていたこと3選 🏄🌴 【100 Exercises To Learn Rust 🦀 完走記事 🏃】
- 【おまけ2】 【🙇 懺悔 🙇】Qiitanグッズ欲しさに1日に33記事投稿した話またはQiita CLIとcargo scriptを布教する的な何か
100 Exercise To Learn Rust 演習第32回になります!
今回の関連ページ
[07_threads/11_locks] 排他的アクセス
前回エクササイズの問題点と、その解決策ミューテックス Mutex
前回エクササイズでは単にパッチを投げてそれをサーバー側が受け取り更新処理としていました。
今回のBookの最初にて指摘されている通り、こちらは「競合状態(レースコンディション)」の問題があります。2つのクライアントがほぼ同時に同じチケットに対する更新処理を送ってきたら、先に来た片方のクライアントの更新は虚無に消えてしまいます!
2つ解決策が思い浮かび、一つはバージョンで管理する方法です。
「楽観的並行処理」として、こちらは任意課題扱いになっていますね。分散システムでもよく見られるパターンですし、自分も最終回での実装にてこちらの方針を採用しています!
しかし、今回はもう一つの解決策を取ります。「ロック方式」です!変更を加えたいチケットに対して、チケットではなくそのチケットのロックであるMutex<Ticket> (MUTual EXclusion, ミューテックス, 相互排他)を取り、ロック中は他のクライアントからはアクセスできないようにします。ロックはチケットごとに用意してあり、今回サーバーからもらえるのはこのチケットへの参照( Arc<Mutex<Ticket>> )です!
というわけで、演習問題は上図の実現です!
問題
前置きが長くなりましたが、今回の問題はこちらです。src/lib.rs の方はご丁寧にも Arc<Mutex<T>> を使用するようにすでに改良が施されているため、 src/store.rs のみ改変することになります。
src/lib.rs には何も改変を加える必要はありませんが、今回問題に関わる部分を diff_rust マークアップでハイライトしておきます。追加行ではありません。
// TODO: Fill in the missing methods for `TicketStore`.
// Notice how we no longer need a separate update command: `Get` now returns a handle to the ticket
// which allows the caller to both modify and read the ticket.
use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError};
+ use std::sync::{Arc, Mutex};
use crate::data::{Ticket, TicketDraft};
use crate::store::{TicketId, TicketStore};
pub mod data;
pub mod store;
#[derive(Clone)]
pub struct TicketStoreClient {
sender: SyncSender<Command>,
}
impl TicketStoreClient {
pub fn insert(&self, draft: TicketDraft) -> Result<TicketId, OverloadedError> {
let (response_sender, response_receiver) = sync_channel(1);
self.sender
.try_send(Command::Insert {
draft,
response_channel: response_sender,
})
.map_err(|_| OverloadedError)?;
Ok(response_receiver.recv().unwrap())
}
+ pub fn get(&self, id: TicketId) -> Result<Option<Arc<Mutex<Ticket>>>, OverloadedError> {
let (response_sender, response_receiver) = sync_channel(1);
self.sender
.try_send(Command::Get {
id,
response_channel: response_sender,
})
.map_err(|_| OverloadedError)?;
Ok(response_receiver.recv().unwrap())
}
}
#[derive(Debug, thiserror::Error)]
#[error("The store is overloaded")]
pub struct OverloadedError;
pub fn launch(capacity: usize) -> TicketStoreClient {
let (sender, receiver) = sync_channel(capacity);
std::thread::spawn(move || server(receiver));
TicketStoreClient { sender }
}
enum Command {
Insert {
draft: TicketDraft,
response_channel: SyncSender<TicketId>,
},
Get {
id: TicketId,
+ response_channel: SyncSender<Option<Arc<Mutex<Ticket>>>>,
},
}
pub fn server(receiver: Receiver<Command>) {
let mut store = TicketStore::new();
loop {
match receiver.recv() {
Ok(Command::Insert {
draft,
response_channel,
}) => {
let id = store.add_ticket(draft);
let _ = response_channel.send(id);
}
Ok(Command::Get {
id,
response_channel,
}) => {
let ticket = store.get(id);
let _ = response_channel.send(ticket);
}
Err(_) => {
// There are no more senders, so we can safely break
// and shut down the server.
break;
}
}
}
}
use crate::data::{Status, Ticket, TicketDraft};
use std::collections::BTreeMap;
+ use std::sync::{Arc, Mutex};
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct TicketId(u64);
#[derive(Clone)]
pub struct TicketStore {
+ tickets: BTreeMap<TicketId, Arc<Mutex<Ticket>>>,
counter: u64,
}
impl TicketStore {
pub fn new() -> Self {
Self {
tickets: BTreeMap::new(),
counter: 0,
}
}
pub fn add_ticket(&mut self, ticket: TicketDraft) -> TicketId {
let id = TicketId(self.counter);
self.counter += 1;
let ticket = Ticket {
id,
title: ticket.title,
description: ticket.description,
status: Status::ToDo,
};
todo!();
id
}
// The `get` method should return a handle to the ticket
// which allows the caller to either read or modify the ticket.
pub fn get(&self, id: TicketId) -> Option<todo!()> {
todo!()
}
}
その他のファイル
use crate::store::TicketId;
use ticket_fields::{TicketDescription, TicketTitle};
#[derive(Clone, Debug, PartialEq)]
pub struct Ticket {
pub id: TicketId,
pub title: TicketTitle,
pub description: TicketDescription,
pub status: Status,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TicketDraft {
pub title: TicketTitle,
pub description: TicketDescription,
}
#[derive(Clone, Debug, Copy, PartialEq, Eq)]
pub enum Status {
ToDo,
InProgress,
Done,
}
use locks::data::{Status, TicketDraft};
use locks::launch;
use ticket_fields::test_helpers::{ticket_description, ticket_title};
#[test]
fn works() {
let client = launch(5);
let draft = TicketDraft {
title: ticket_title(),
description: ticket_description(),
};
let ticket_id = client.insert(draft.clone()).unwrap();
let ticket = client.get(ticket_id).unwrap().unwrap();
{
let mut ticket = ticket.lock().unwrap();
assert_eq!(ticket_id, ticket.id);
assert_eq!(ticket.status, Status::ToDo);
assert_eq!(ticket.title, draft.title);
assert_eq!(ticket.description, draft.description);
ticket.status = Status::InProgress;
}
let ticket = client.get(ticket_id).unwrap().unwrap();
{
let ticket = ticket.lock().unwrap();
assert_eq!(ticket_id, ticket.id);
assert_eq!(ticket.status, Status::InProgress);
}
}
解説
Arc<Mutex<Ticket>> を使用するように改変します!
use crate::data::{Status, Ticket, TicketDraft};
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct TicketId(u64);
#[derive(Clone)]
pub struct TicketStore {
tickets: BTreeMap<TicketId, Arc<Mutex<Ticket>>>,
counter: u64,
}
impl TicketStore {
pub fn new() -> Self {
Self {
tickets: BTreeMap::new(),
counter: 0,
}
}
pub fn add_ticket(&mut self, ticket: TicketDraft) -> TicketId {
let id = TicketId(self.counter);
self.counter += 1;
let ticket = Ticket {
id,
title: ticket.title,
description: ticket.description,
status: Status::ToDo,
};
self.tickets.insert(id, Arc::new(Mutex::new(ticket)));
id
}
// The `get` method should return a handle to the ticket
// which allows the caller to either read or modify the ticket.
pub fn get(&self, id: TicketId) -> Option<Arc<Mutex<Ticket>>> {
self.tickets.get(&id).cloned()
}
}
add_thicket メソッドにて BTreeMap::insert に格納する際には、 Arc<Mutex<Ticket>> 型になるようにここで Arc と Mutex を作成しています。
get メソッドの方は、以前は &Ticket を返せばよかったので get(&id) で止まっていました。今回は Arc<Mutex<Ticket>> を返したいです。 get(&id) は結果が Some なら中身が &Arc<...> となっているため、この時だけ Arc::clone (Clone::clone) が呼ばれるように cloned メソッド を使い、 Arc を複製して他スレッドに渡せるようにします!
そして get でチケットへのハンドラを扱えるようになったため、前回追加した Update コマンド及びそれに関連する処理、 get_mut メソッドはもはや不要になりました!今後は、クライアントが勝手にロックして中身を書き換えられます。
Send と Rc<RefCell<T>> ・ Arc<Mutex<T>> 関係早見表
ここで「 Arc<Mutex<T>> は Rc<RefCell<T>> とほぼ同じ機能では...?」という定番質問があります。「マルチスレッドで使える(Send や Sync が付いているのでスレット跨ぎができる)のが Arc<Mutex<T>> 、使えないのが Rc<RefCell<T>> 」というのが簡単な回答になります。
Rc の内部カウンターは本記事1枚目の図のようなレースコンディションを起こしてしまう仕組みになってしまっており、 Arc はカウント操作をアトミックにすることでそれを克服しています。このスレッド間転送可能かどうかの印として、 Send マーカートレイトが用いられます。 Send がなければスレッドを跨ぐクロージャに渡すことができません!
両者の違いや Send マーカートレイトの詳細について Book、TRPLどちらにも説明があるので、解説はこちらに譲り、この先 Sync が登場することも踏まえ代わりに「Send・Sync早見表」を設置しておきます!
-
Send: 本体を別スレッドに送れる -
Sync: 本体への参照を別スレッドに送れる- つまり
impl Sync for Tとimpl Send for &Tが同値 - 意味的な部分は 次回 扱う予定
- つまり
: Send, Sync
: !Send, !Sync
ローカルスレッド向け
| 型 | Send | Sync | 機能 |
|---|---|---|---|
| Rc | 所有権シェア | ||
| RefCell | 動的可変参照1 | ||
| Ref |
RefCell::<T>::borrow で得られる Deref<Target = T>
|
||
| RefMut |
RefCell::<T>::borrow_mut で得られる DerefMut<Target = T> 2
|
マルチスレッド向け
| 型 | Send | Sync | 機能 |
|---|---|---|---|
| Arc | 所有権シェア | ||
| Mutex | 排他アクセス機能 | ||
| MutexGuard |
Mutex::<T>::lock で得られる Deref<Target = T> & DerefMut
|
RefCell が
で MutexGuard が
なのが興味深いですね...これについては次回にて考察したいと思います3!
RefCell と Mutex には挙動にも違いがあります。 RefCell はローカルスレッドでしかアクセスを許さない代わりに、可変参照・不変参照の制約をランタイム時に調べてくれて、満たさない場合パニックします( try_borrow* なら Result 化可能)。一方 Mutex の方は不変・可変を問わず「ロックを取ります」。このロックというのが厄介で、ロック中にロックを得ようとするとローカルスレッド内ですら(つまり RefCell と同様の使い方をした場合でも)パニックしてくれず デッドロックします 4。つまり扱いとしては Mutex の方が気をつけるべき点が多く5、むしろ違いとしてはこちらの方が大きそうですね。
Arc<Mutex<T>> は万能...?
ある値を取り扱うのに、いままでは所有権で苦しんでいたところに、 Arc のお陰で「複数のスレッド・変数を跨いで共有できる」し、 Mutex のお陰で「複数の変数から変更可能」になりました。これで他言語の"変数"とようやくトントンに色々できそうです!safe Rustの範囲で可変なグローバル変数のように扱うことも頑張ればできそうです。
そして、万能かと聞かれるとデッドロックするから答えは「No」になるのですが、つまるところ、「万能な可変グローバル変数」なんてものは人類が扱ってはいけないことを、長い型名とMutex のデッドロックしてしまう性質から教えてくれているんじゃないかと思います。
Rustの考えが豊富に詰め込まれているという点では、 Arc<Mutex<T>> は "真打" 感がありますね。適切なシーンで使えるようになりたいです(筆者の経験則的にはできるだけ mpsc を利用するのが正解...)。
[07_threads/12_rw_lock] 非対称排他的アクセス ( RwLock )
Mutex は読み書き両方を含めたロックしか取れませんが、 RwLock ならば(& のように)複数アクセスを認める読み取り専用ロックと、(&mut のように)専有アクセスしか認めない読み書きロックという器用な2つのロックを使い分けることができ、より RefCell と似た感じに扱えます! Mutex と RwLock の良し悪しは Book に書かれているのでそちらに譲ります。
マルチスレッド向け (続き)
| 型 | Send | Sync | 機能 |
|---|---|---|---|
| RwLock |
Mutex において、読み書き専有ロック(RwLockWriteGuard)か読み込み共有ロック(RwLockReadGuard)かを選べるようにしたもの |
||
| RwLockWriteGuard |
LwLock::<T>::read で得られる Deref<Target = T>
|
||
| RwLockReadGuard |
LwLock::<T>::write で得られる DerefMut<Target = T> 2
|
問題・解説
Mutex を RwLock に置換する必要があるだけでほとんど前問と変わりません 。 tests/check.rs で呼び出しているメソッドには変化があります。
一応ファイル (回答済み)
// TODO: Replace `Mutex` with `RwLock` in the `TicketStore` struct and
// all other relevant places to allow multiple readers to access the ticket store concurrently.
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::{Arc, RwLock};
use crate::data::{Ticket, TicketDraft};
use crate::store::{TicketId, TicketStore};
pub mod data;
pub mod store;
#[derive(Clone)]
pub struct TicketStoreClient {
sender: SyncSender<Command>,
}
impl TicketStoreClient {
pub fn insert(&self, draft: TicketDraft) -> Result<TicketId, OverloadedError> {
let (response_sender, response_receiver) = sync_channel(1);
self.sender
.try_send(Command::Insert {
draft,
response_channel: response_sender,
})
.map_err(|_| OverloadedError)?;
Ok(response_receiver.recv().unwrap())
}
pub fn get(&self, id: TicketId) -> Result<Option<Arc<RwLock<Ticket>>>, OverloadedError> {
let (response_sender, response_receiver) = sync_channel(1);
self.sender
.try_send(Command::Get {
id,
response_channel: response_sender,
})
.map_err(|_| OverloadedError)?;
Ok(response_receiver.recv().unwrap())
}
}
#[derive(Debug, thiserror::Error)]
#[error("The store is overloaded")]
pub struct OverloadedError;
pub fn launch(capacity: usize) -> TicketStoreClient {
let (sender, receiver) = sync_channel(capacity);
std::thread::spawn(move || server(receiver));
TicketStoreClient { sender }
}
enum Command {
Insert {
draft: TicketDraft,
response_channel: SyncSender<TicketId>,
},
Get {
id: TicketId,
response_channel: SyncSender<Option<Arc<RwLock<Ticket>>>>,
},
}
fn server(receiver: Receiver<Command>) {
let mut store = TicketStore::new();
loop {
match receiver.recv() {
Ok(Command::Insert {
draft,
response_channel,
}) => {
let id = store.add_ticket(draft);
let _ = response_channel.send(id);
}
Ok(Command::Get {
id,
response_channel,
}) => {
let ticket = store.get(id);
let _ = response_channel.send(ticket);
}
Err(_) => {
// There are no more senders, so we can safely break
// and shut down the server.
break;
}
}
}
}
use crate::store::TicketId;
use ticket_fields::{TicketDescription, TicketTitle};
#[derive(Clone, Debug, PartialEq)]
pub struct Ticket {
pub id: TicketId,
pub title: TicketTitle,
pub description: TicketDescription,
pub status: Status,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TicketDraft {
pub title: TicketTitle,
pub description: TicketDescription,
}
#[derive(Clone, Debug, Copy, PartialEq, Eq)]
pub enum Status {
ToDo,
InProgress,
Done,
}
use crate::data::{Status, Ticket, TicketDraft};
use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct TicketId(u64);
#[derive(Clone)]
pub struct TicketStore {
tickets: BTreeMap<TicketId, Arc<RwLock<Ticket>>>,
counter: u64,
}
impl TicketStore {
pub fn new() -> Self {
Self {
tickets: BTreeMap::new(),
counter: 0,
}
}
pub fn add_ticket(&mut self, ticket: TicketDraft) -> TicketId {
let id = TicketId(self.counter);
self.counter += 1;
let ticket = Ticket {
id,
title: ticket.title,
description: ticket.description,
status: Status::ToDo,
};
let ticket = Arc::new(RwLock::new(ticket));
self.tickets.insert(id, ticket);
id
}
// The `get` method should return a handle to the ticket
// which allows the caller to either read or modify the ticket.
pub fn get(&self, id: TicketId) -> Option<Arc<RwLock<Ticket>>> {
self.tickets.get(&id).cloned()
}
}
use rwlock::data::{Status, TicketDraft};
use rwlock::launch;
use ticket_fields::test_helpers::{ticket_description, ticket_title};
#[test]
fn works() {
let client = launch(5);
let draft = TicketDraft {
title: ticket_title(),
description: ticket_description(),
};
let ticket_id = client.insert(draft.clone()).unwrap();
let ticket = client.get(ticket_id).unwrap().unwrap();
let lock1 = ticket.read().unwrap();
{
let ticket = ticket.read().unwrap();
assert_eq!(ticket_id, ticket.id);
assert_eq!(ticket.status, Status::ToDo);
assert_eq!(ticket.title, draft.title);
assert_eq!(ticket.description, draft.description);
}
drop(lock1);
let ticket = client.get(ticket_id).unwrap().unwrap();
{
let mut ticket = ticket.write().unwrap();
ticket.status = Status::InProgress;
}
}
では次の問題に行きましょう!
次の記事: 【33】 チャネルなしで実装・Syncの話 ~考察回です~
登場したPlayground
(実際に無効化したことはないですが、)Rust Playground上のデータが喪失する可能性を鑑みて、一応記事にもソースコードを掲載することとしました。
use std::sync::{Arc, Mutex, MutexGuard};
use std::fmt::Debug;
use std::cell::{RefCell, RefMut};
use std::rc::Rc;
pub fn get_lock_rrr<T: Debug>(rrr: Rc<RefCell<T>>, rec: bool) {
let rrr2 = Rc::clone(&rrr);
let rfmt: RefMut<'_, T> = rrr.borrow_mut();
if rec {
get_lock_rrr(rrr2, false);
}
println!("{:?}", *rfmt);
}
#[test]
#[should_panic(expected = "already borrowed")]
fn rrr_test() {
let rrr = Rc::new(RefCell::new(10));
get_lock_rrr(rrr, true);
}
pub fn get_lock_amr<T: Debug>(amr: Arc<Mutex<T>>, rec: bool) {
#[allow(unused_variables)]
let amr2 = Arc::clone(&amr);
let guard: MutexGuard<'_, T> = amr.lock().unwrap();
if rec {
// コメントを外すとデッドロック
// get_lock_amr(amr2, false);
}
println!("{:?}", *guard);
}
#[test]
fn amr_test() {
let amr = Arc::new(Mutex::new(10));
get_lock_amr(amr, true);
}
-
コンパイル時は不変参照だけど、ランタイム時に可変になれることより筆者が付けた呼称です。29回で扱ったように「不変参照に内部可変性を付与し、借用チェックをランタイムで行う機能」とか説明したほうがもう少し正確かもしれません。 ↩
-
Target = Tは前提トレイトのDerefから得られるもので、直接持っている関連型ではありませんが便宜上このように記載しています。 ↩ ↩2 -
メタいことを言えばここで解説しちゃうと次回話すネタがなくなってしまうのです... ↩
-
公式ドキュメントの解説だとパニックしてくれる場合もあるらしい(とにかく、値が返らないという点は変わらないと書いてある)ですが、Playground で示した環境だとデッドロックしました ↩
-
ランタイム時に想定していない挙動をするという点では
RefCellでもMutexでも変わらないですし、RefCellはそもそも使い所がイマイチなことが多いので、総括すると扱うにあたる体感の慎重さは両者間で変わらない気がします。 ↩