1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

お題は不問!Qiita Engineer Festa 2024で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

Rust 100 Ex 🏃【32/37】 `Send`・排他的ロック(`Mutex`)・非対称排他的ロック(`RwLock`) ~真打 `Arc<Mutex<T>>` 登場~

Last updated at Posted at 2024-07-17

前の記事

全記事一覧

100 Exercise To Learn Rust 演習第32回になります!

今回の関連ページ

[07_threads/11_locks] 排他的アクセス

前回エクササイズの問題点と、その解決策ミューテックス Mutex

前回エクササイズでは単にパッチを投げてそれをサーバー側が受け取り更新処理としていました。

今回のBookの最初にて指摘されている通り、こちらは「競合状態(レースコンディション)」の問題があります。2つのクライアントがほぼ同時に同じチケットに対する更新処理を送ってきたら、先に来た片方のクライアントの更新は虚無に消えてしまいます!

2つ解決策が思い浮かび、一つはバージョンで管理する方法です。

「楽観的並行処理」として、こちらは任意課題扱いになっていますね。分散システムでもよく見られるパターンですし、自分も最終回での実装にてこちらの方針を採用しています!

しかし、今回はもう一つの解決策を取ります。「ロック方式」です!変更を加えたいチケットに対して、チケットではなくそのチケットのロックであるMutex<Ticket> (MUTual EXclusion, ミューテックス, 相互排他)を取り、ロック中は他のクライアントからはアクセスできないようにします。ロックはチケットごとに用意してあり、今回サーバーからもらえるのはこのチケットへの参照( Arc<Mutex<Ticket>> )です!

というわけで、演習問題は上図の実現です!

問題

前置きが長くなりましたが、今回の問題はこちらです。src/lib.rs の方はご丁寧にも Arc<Mutex<T>> を使用するようにすでに改良が施されているため、 src/store.rs のみ改変することになります。

src/lib.rs には何も改変を加える必要はありませんが、今回問題に関わる部分を diff_rust マークアップでハイライトしておきます。追加行ではありません。

src/lib.rs
// TODO: Fill in the missing methods for `TicketStore`.
//  Notice how we no longer need a separate update command: `Get` now returns a handle to the ticket
//  which allows the caller to both modify and read the ticket.
use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError};
+ use std::sync::{Arc, Mutex};

use crate::data::{Ticket, TicketDraft};
use crate::store::{TicketId, TicketStore};

pub mod data;
pub mod store;

#[derive(Clone)]
pub struct TicketStoreClient {
    sender: SyncSender<Command>,
}

impl TicketStoreClient {
    pub fn insert(&self, draft: TicketDraft) -> Result<TicketId, OverloadedError> {
        let (response_sender, response_receiver) = sync_channel(1);
        self.sender
            .try_send(Command::Insert {
                draft,
                response_channel: response_sender,
            })
            .map_err(|_| OverloadedError)?;
        Ok(response_receiver.recv().unwrap())
    }

+     pub fn get(&self, id: TicketId) -> Result<Option<Arc<Mutex<Ticket>>>, OverloadedError> {
        let (response_sender, response_receiver) = sync_channel(1);
        self.sender
            .try_send(Command::Get {
                id,
                response_channel: response_sender,
            })
            .map_err(|_| OverloadedError)?;
        Ok(response_receiver.recv().unwrap())
    }
}

#[derive(Debug, thiserror::Error)]
#[error("The store is overloaded")]
pub struct OverloadedError;

pub fn launch(capacity: usize) -> TicketStoreClient {
    let (sender, receiver) = sync_channel(capacity);
    std::thread::spawn(move || server(receiver));
    TicketStoreClient { sender }
}

enum Command {
    Insert {
        draft: TicketDraft,
        response_channel: SyncSender<TicketId>,
    },
    Get {
        id: TicketId,
+         response_channel: SyncSender<Option<Arc<Mutex<Ticket>>>>,
    },
}

pub fn server(receiver: Receiver<Command>) {
    let mut store = TicketStore::new();
    loop {
        match receiver.recv() {
            Ok(Command::Insert {
                draft,
                response_channel,
            }) => {
                let id = store.add_ticket(draft);
                let _ = response_channel.send(id);
            }
            Ok(Command::Get {
                id,
                response_channel,
            }) => {
                let ticket = store.get(id);
                let _ = response_channel.send(ticket);
            }
            Err(_) => {
                // There are no more senders, so we can safely break
                // and shut down the server.
                break;
            }
        }
    }
}
src/store.rs
use crate::data::{Status, Ticket, TicketDraft};
use std::collections::BTreeMap;
+ use std::sync::{Arc, Mutex};

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct TicketId(u64);

#[derive(Clone)]
pub struct TicketStore {
+     tickets: BTreeMap<TicketId, Arc<Mutex<Ticket>>>,
    counter: u64,
}

impl TicketStore {
    pub fn new() -> Self {
        Self {
            tickets: BTreeMap::new(),
            counter: 0,
        }
    }

    pub fn add_ticket(&mut self, ticket: TicketDraft) -> TicketId {
        let id = TicketId(self.counter);
        self.counter += 1;
        let ticket = Ticket {
            id,
            title: ticket.title,
            description: ticket.description,
            status: Status::ToDo,
        };
        todo!();
        id
    }

    // The `get` method should return a handle to the ticket
    // which allows the caller to either read or modify the ticket.
    pub fn get(&self, id: TicketId) -> Option<todo!()> {
        todo!()
    }
}
その他のファイル
src/data.rs
use crate::store::TicketId;
use ticket_fields::{TicketDescription, TicketTitle};

#[derive(Clone, Debug, PartialEq)]
pub struct Ticket {
    pub id: TicketId,
    pub title: TicketTitle,
    pub description: TicketDescription,
    pub status: Status,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TicketDraft {
    pub title: TicketTitle,
    pub description: TicketDescription,
}

#[derive(Clone, Debug, Copy, PartialEq, Eq)]
pub enum Status {
    ToDo,
    InProgress,
    Done,
}
tests/check.rs
use locks::data::{Status, TicketDraft};
use locks::launch;
use ticket_fields::test_helpers::{ticket_description, ticket_title};

#[test]
fn works() {
    let client = launch(5);
    let draft = TicketDraft {
        title: ticket_title(),
        description: ticket_description(),
    };
    let ticket_id = client.insert(draft.clone()).unwrap();

    let ticket = client.get(ticket_id).unwrap().unwrap();
    {
        let mut ticket = ticket.lock().unwrap();
        assert_eq!(ticket_id, ticket.id);
        assert_eq!(ticket.status, Status::ToDo);
        assert_eq!(ticket.title, draft.title);
        assert_eq!(ticket.description, draft.description);

        ticket.status = Status::InProgress;
    }

    let ticket = client.get(ticket_id).unwrap().unwrap();
    {
        let ticket = ticket.lock().unwrap();
        assert_eq!(ticket_id, ticket.id);
        assert_eq!(ticket.status, Status::InProgress);
    }
}

解説

Arc<Mutex<Ticket>> を使用するように改変します!

lib.rs
use crate::data::{Status, Ticket, TicketDraft};
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct TicketId(u64);

#[derive(Clone)]
pub struct TicketStore {
    tickets: BTreeMap<TicketId, Arc<Mutex<Ticket>>>,
    counter: u64,
}

impl TicketStore {
    pub fn new() -> Self {
        Self {
            tickets: BTreeMap::new(),
            counter: 0,
        }
    }

    pub fn add_ticket(&mut self, ticket: TicketDraft) -> TicketId {
        let id = TicketId(self.counter);
        self.counter += 1;
        let ticket = Ticket {
            id,
            title: ticket.title,
            description: ticket.description,
            status: Status::ToDo,
        };
        self.tickets.insert(id, Arc::new(Mutex::new(ticket)));
        id
    }

    // The `get` method should return a handle to the ticket
    // which allows the caller to either read or modify the ticket.
    pub fn get(&self, id: TicketId) -> Option<Arc<Mutex<Ticket>>> {
        self.tickets.get(&id).cloned()
    }
}

add_thicket メソッドにて BTreeMap::insert に格納する際には、 Arc<Mutex<Ticket>> 型になるようにここで ArcMutex を作成しています。

get メソッドの方は、以前は &Ticket を返せばよかったので get(&id) で止まっていました。今回は Arc<Mutex<Ticket>> を返したいです。 get(&id) は結果が Some なら中身が &Arc<...> となっているため、この時だけ Arc::clone (Clone::clone) が呼ばれるように cloned メソッド を使い、 Arc を複製して他スレッドに渡せるようにします!

そして get でチケットへのハンドラを扱えるようになったため、前回追加した Update コマンド及びそれに関連する処理、 get_mut メソッドはもはや不要になりました!今後は、クライアントが勝手にロックして中身を書き換えられます。

SendRc<RefCell<T>>Arc<Mutex<T>> 関係早見表

ここで「 Arc<Mutex<T>>Rc<RefCell<T>> とほぼ同じ機能では...?」という定番質問があります。「マルチスレッドで使える(SendSync が付いているのでスレット跨ぎができる)のが Arc<Mutex<T>> 、使えないのが Rc<RefCell<T>> 」というのが簡単な回答になります。

Rc の内部カウンターは本記事1枚目の図のようなレースコンディションを起こしてしまう仕組みになってしまっており、 Arc はカウント操作をアトミックにすることでそれを克服しています。このスレッド間転送可能かどうかの印として、 Send マーカートレイトが用いられます。 Send がなければスレッドを跨ぐクロージャに渡すことができません!

両者の違いや Send マーカートレイトの詳細について BookTRPLどちらにも説明があるので、解説はこちらに譲り、この先 Sync が登場することも踏まえ代わりに「Send・Sync早見表」を設置しておきます!

  • Send: 本体を別スレッドに送れる
  • Sync: 本体への参照を別スレッドに送れる
    • つまり impl Sync for Timpl Send for &T が同値
    • 意味的な部分は 次回 扱う予定

:o: : Send, Sync
:x: : !Send, !Sync

ローカルスレッド向け

Send Sync 機能
Rc :x: :x: 所有権シェア
RefCell :o: :x: 動的可変参照1
Ref :x: :x: RefCell::<T>::borrow で得られる Deref<Target = T>
RefMut :x: :x: RefCell::<T>::borrow_mut で得られる DerefMut<Target = T> 2

マルチスレッド向け

Send Sync 機能
Arc :o: :o: 所有権シェア
Mutex :o: :o: 排他アクセス機能
MutexGuard :x: :o: Mutex::<T>::lock で得られる Deref<Target = T> & DerefMut

RefCell:o: :x:MutexGuard:x: :o: なのが興味深いですね...これについては次回にて考察したいと思います3

RefCellMutex には挙動にも違いがあります。 RefCell はローカルスレッドでしかアクセスを許さない代わりに、可変参照・不変参照の制約をランタイム時に調べてくれて、満たさない場合パニックします( try_borrow* なら Result 化可能)。一方 Mutex の方は不変・可変を問わず「ロックを取ります」。このロックというのが厄介で、ロック中にロックを得ようとするとローカルスレッド内ですら(つまり RefCell同様の使い方をした場合でも)パニックしてくれず デッドロックします 4。つまり扱いとしては Mutex の方が気をつけるべき点が多く5、むしろ違いとしてはこちらの方が大きそうですね。

Arc<Mutex<T>> は万能...?

ある値を取り扱うのに、いままでは所有権で苦しんでいたところに、 Arc のお陰で「複数のスレッド・変数を跨いで共有できる」し、 Mutex のお陰で「複数の変数から変更可能」になりました。これで他言語の"変数"とようやくトントンに色々できそうです!safe Rustの範囲で可変なグローバル変数のように扱うことも頑張ればできそうです。

そして、万能かと聞かれるとデッドロックするから答えは「No」になるのですが、つまるところ、「万能な可変グローバル変数」なんてものは人類が扱ってはいけないことを、長い型名とMutex のデッドロックしてしまう性質から教えてくれているんじゃないかと思います。

Rustの考えが豊富に詰め込まれているという点では、 Arc<Mutex<T>> は "真打" 感がありますね。適切なシーンで使えるようになりたいです(筆者の経験則的にはできるだけ mpsc を利用するのが正解...)。

[07_threads/12_rw_lock] 非対称排他的アクセス ( RwLock )

Mutex は読み書き両方を含めたロックしか取れませんが、 RwLock ならば(& のように)複数アクセスを認める読み取り専用ロックと、(&mut のように)専有アクセスしか認めない読み書きロックという器用な2つのロックを使い分けることができ、より RefCell と似た感じに扱えます! MutexRwLock の良し悪しは Book に書かれているのでそちらに譲ります。

マルチスレッド向け (続き)

Send Sync 機能
RwLock :o: :o: Mutex において、読み書き専有ロック(RwLockWriteGuard)か読み込み共有ロック(RwLockReadGuard)かを選べるようにしたもの
RwLockWriteGuard :x: :o: LwLock::<T>::read で得られる Deref<Target = T>
RwLockReadGuard :x: :o: LwLock::<T>::write で得られる DerefMut<Target = T> 2

問題・解説

MutexRwLock に置換する必要があるだけでほとんど前問と変わりませんtests/check.rs で呼び出しているメソッドには変化があります。

一応ファイル (回答済み)
src/lib.rs
// TODO: Replace `Mutex` with `RwLock` in the `TicketStore` struct and
//  all other relevant places to allow multiple readers to access the ticket store concurrently.
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::{Arc, RwLock};

use crate::data::{Ticket, TicketDraft};
use crate::store::{TicketId, TicketStore};

pub mod data;
pub mod store;

#[derive(Clone)]
pub struct TicketStoreClient {
    sender: SyncSender<Command>,
}

impl TicketStoreClient {
    pub fn insert(&self, draft: TicketDraft) -> Result<TicketId, OverloadedError> {
        let (response_sender, response_receiver) = sync_channel(1);
        self.sender
            .try_send(Command::Insert {
                draft,
                response_channel: response_sender,
            })
            .map_err(|_| OverloadedError)?;
        Ok(response_receiver.recv().unwrap())
    }

    pub fn get(&self, id: TicketId) -> Result<Option<Arc<RwLock<Ticket>>>, OverloadedError> {
        let (response_sender, response_receiver) = sync_channel(1);
        self.sender
            .try_send(Command::Get {
                id,
                response_channel: response_sender,
            })
            .map_err(|_| OverloadedError)?;
        Ok(response_receiver.recv().unwrap())
    }
}

#[derive(Debug, thiserror::Error)]
#[error("The store is overloaded")]
pub struct OverloadedError;

pub fn launch(capacity: usize) -> TicketStoreClient {
    let (sender, receiver) = sync_channel(capacity);
    std::thread::spawn(move || server(receiver));
    TicketStoreClient { sender }
}

enum Command {
    Insert {
        draft: TicketDraft,
        response_channel: SyncSender<TicketId>,
    },
    Get {
        id: TicketId,
        response_channel: SyncSender<Option<Arc<RwLock<Ticket>>>>,
    },
}

fn server(receiver: Receiver<Command>) {
    let mut store = TicketStore::new();
    loop {
        match receiver.recv() {
            Ok(Command::Insert {
                draft,
                response_channel,
            }) => {
                let id = store.add_ticket(draft);
                let _ = response_channel.send(id);
            }
            Ok(Command::Get {
                id,
                response_channel,
            }) => {
                let ticket = store.get(id);
                let _ = response_channel.send(ticket);
            }
            Err(_) => {
                // There are no more senders, so we can safely break
                // and shut down the server.
                break;
            }
        }
    }
}
src/data.rs
use crate::store::TicketId;
use ticket_fields::{TicketDescription, TicketTitle};

#[derive(Clone, Debug, PartialEq)]
pub struct Ticket {
    pub id: TicketId,
    pub title: TicketTitle,
    pub description: TicketDescription,
    pub status: Status,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TicketDraft {
    pub title: TicketTitle,
    pub description: TicketDescription,
}

#[derive(Clone, Debug, Copy, PartialEq, Eq)]
pub enum Status {
    ToDo,
    InProgress,
    Done,
}
src/store.rs
use crate::data::{Status, Ticket, TicketDraft};
use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct TicketId(u64);

#[derive(Clone)]
pub struct TicketStore {
    tickets: BTreeMap<TicketId, Arc<RwLock<Ticket>>>,
    counter: u64,
}

impl TicketStore {
    pub fn new() -> Self {
        Self {
            tickets: BTreeMap::new(),
            counter: 0,
        }
    }

    pub fn add_ticket(&mut self, ticket: TicketDraft) -> TicketId {
        let id = TicketId(self.counter);
        self.counter += 1;
        let ticket = Ticket {
            id,
            title: ticket.title,
            description: ticket.description,
            status: Status::ToDo,
        };
        let ticket = Arc::new(RwLock::new(ticket));
        self.tickets.insert(id, ticket);
        id
    }

    // The `get` method should return a handle to the ticket
    // which allows the caller to either read or modify the ticket.
    pub fn get(&self, id: TicketId) -> Option<Arc<RwLock<Ticket>>> {
        self.tickets.get(&id).cloned()
    }
}
tests/check.rs
use rwlock::data::{Status, TicketDraft};
use rwlock::launch;
use ticket_fields::test_helpers::{ticket_description, ticket_title};

#[test]
fn works() {
    let client = launch(5);
    let draft = TicketDraft {
        title: ticket_title(),
        description: ticket_description(),
    };
    let ticket_id = client.insert(draft.clone()).unwrap();

    let ticket = client.get(ticket_id).unwrap().unwrap();
    let lock1 = ticket.read().unwrap();
    {
        let ticket = ticket.read().unwrap();
        assert_eq!(ticket_id, ticket.id);
        assert_eq!(ticket.status, Status::ToDo);
        assert_eq!(ticket.title, draft.title);
        assert_eq!(ticket.description, draft.description);
    }

    drop(lock1);

    let ticket = client.get(ticket_id).unwrap().unwrap();
    {
        let mut ticket = ticket.write().unwrap();
        ticket.status = Status::InProgress;
    }
}

では次の問題に行きましょう!

次の記事: 【33】 チャネルなしで実装・Syncの話 ~考察回です~

登場したPlayground

(実際に無効化したことはないですが、)Rust Playground上のデータが喪失する可能性を鑑みて、一応記事にもソースコードを掲載することとしました。

URL: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=e13197b4fa6bee7489e8513f86d8f768

Rust
use std::sync::{Arc, Mutex, MutexGuard};
use std::fmt::Debug;
use std::cell::{RefCell, RefMut};
use std::rc::Rc;

pub fn get_lock_rrr<T: Debug>(rrr: Rc<RefCell<T>>, rec: bool) {
    let rrr2 = Rc::clone(&rrr);
    
    let rfmt: RefMut<'_, T> = rrr.borrow_mut();

    if rec {
        get_lock_rrr(rrr2, false);
    }
    
    println!("{:?}", *rfmt);
}

#[test]
#[should_panic(expected = "already borrowed")]
fn rrr_test() {
    let rrr = Rc::new(RefCell::new(10));
    
    get_lock_rrr(rrr, true);
}

pub fn get_lock_amr<T: Debug>(amr: Arc<Mutex<T>>, rec: bool) {
    #[allow(unused_variables)]
    let amr2 = Arc::clone(&amr);

    let guard: MutexGuard<'_, T> = amr.lock().unwrap();

    if rec {
        // コメントを外すとデッドロック
        // get_lock_amr(amr2, false);
    }
    
    println!("{:?}", *guard);
}

#[test]
fn amr_test() {
    let amr = Arc::new(Mutex::new(10));
    
    get_lock_amr(amr, true);
}
  1. コンパイル時は不変参照だけど、ランタイム時に可変になれることより筆者が付けた呼称です。29回で扱ったように「不変参照に内部可変性を付与し、借用チェックをランタイムで行う機能」とか説明したほうがもう少し正確かもしれません。

  2. Target = T は前提トレイトの Deref から得られるもので、直接持っている関連型ではありませんが便宜上このように記載しています。 2

  3. メタいことを言えばここで解説しちゃうと次回話すネタがなくなってしまうのです...

  4. 公式ドキュメントの解説だとパニックしてくれる場合もあるらしい(とにかく、値が返らないという点は変わらないと書いてある)ですが、Playground で示した環境だとデッドロックしました

  5. ランタイム時に想定していない挙動をするという点では RefCell でも Mutex でも変わらないですし、 RefCell はそもそも使い所がイマイチなことが多いので、総括すると扱うにあたる体感の慎重さは両者間で変わらない気がします。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?