5
2
お題は不問!Qiita Engineer Festa 2024で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

Rust 100 Ex 🏃【33/37】 チャネルなしで実装・Syncの話 ~考察回です~

Last updated at Posted at 2024-07-17

前の記事

全記事一覧

100 Exercise To Learn Rust 演習第33回になります!

今回の関連ページ

[07_threads/13_without_channels] Arc<RwLock<T>> だけで実装!

本問題は今までのサーバークライアントシステムを全部なかったことにし(!)、 TicketStoreArc<RwLock<T>> で包んでしまおう!という回です。ほぼ何もする必要がありません!

tests/check.rs
// ...超省略!!...
use std::sync::{Arc, RwLock};

#[test]
fn works() {
    let store = todo!();

    // ...全部省略!...
}

一応全体(回答済み)
src/lib.rs
// TODO: You don't actually have to change anything in the library itself!
//  We mostly had to **remove** code (the client type, the launch function, the command enum)
//  that's no longer necessary.
//  Fix the `todo!()` in the testing code and see how the new design can be used.

pub mod data;
pub mod store;
src/data.rs
use crate::store::TicketId;
use ticket_fields::{TicketDescription, TicketTitle};

#[derive(Clone, Debug, PartialEq)]
pub struct Ticket {
    pub id: TicketId,
    pub title: TicketTitle,
    pub description: TicketDescription,
    pub status: Status,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TicketDraft {
    pub title: TicketTitle,
    pub description: TicketDescription,
}

#[derive(Clone, Debug, Copy, PartialEq, Eq)]
pub enum Status {
    ToDo,
    InProgress,
    Done,
}
src/store.rs
use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};

use crate::data::{Status, Ticket, TicketDraft};

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct TicketId(u64);

#[derive(Clone)]
pub struct TicketStore {
    tickets: BTreeMap<TicketId, Arc<RwLock<Ticket>>>,
    counter: u64,
}

impl TicketStore {
    pub fn new() -> Self {
        Self {
            tickets: BTreeMap::new(),
            counter: 0,
        }
    }

    pub fn add_ticket(&mut self, ticket: TicketDraft) -> TicketId {
        let id = TicketId(self.counter);
        self.counter += 1;
        let ticket = Ticket {
            id,
            title: ticket.title,
            description: ticket.description,
            status: Status::ToDo,
        };
        let ticket = Arc::new(RwLock::new(ticket));
        self.tickets.insert(id, ticket);
        id
    }

    pub fn get(&self, id: TicketId) -> Option<Arc<RwLock<Ticket>>> {
        self.tickets.get(&id).cloned()
    }
}
tests/check.rs
use std::sync::{Arc, RwLock};
use std::thread::spawn;

use ticket_fields::test_helpers::{ticket_description, ticket_title};
use without_channels::data::TicketDraft;
use without_channels::store::TicketStore;

#[test]
fn works() {
    let store = Arc::new(RwLock::new(TicketStore::new()));

    let store1 = store.clone();
    let client1 = spawn(move || {
        let draft = TicketDraft {
            title: ticket_title(),
            description: ticket_description(),
        };
        store1.write().unwrap().add_ticket(draft)
    });

    let store2 = store.clone();
    let client2 = spawn(move || {
        let draft = TicketDraft {
            title: ticket_title(),
            description: ticket_description(),
        };
        store2.write().unwrap().add_ticket(draft)
    });

    let ticket_id1 = client1.join().unwrap();
    let ticket_id2 = client2.join().unwrap();

    let reader = store.read().unwrap();

    let ticket1 = reader.get(ticket_id1).unwrap();
    assert_eq!(ticket_id1, ticket1.read().unwrap().id);

    let ticket2 = reader.get(ticket_id2).unwrap();
    assert_eq!(ticket_id2, ticket2.read().unwrap().id);

    // Patchパターン (storeは読み取りモードで、個別のチケットだけ修飾する場合) は確かめていないらしい...
}

解説

変更箇所は TicketStore も最初に Arc::new(Mutex::new(...)) で包むだけになります!

lib.rs
#[test]
fn works() {
    let store = Arc::new(RwLock::new(TicketStore::new()));
}

図で表すと次のようになった形です、サーバーを介す必要がなくなり、スッキリしました!(大嘘)

元々の構造 (前回最後)

もはや必要なのはクライアント(アクセスを行うスレッド)のみ

[RW] : 読み書き専有ロック
[RO] : 読み取り共用ロック

例の網羅性のために処理内容は変えていますが...うーん、カオス!

Book には「最もきめ細かい並行処理」みたいな感じで紹介されておりますが、「この方針が一番良い!」とはあまり思えないですね...実際、ストアのロックとチケットのロックは独立しており、ストアのロックを得たからといってチケットの変更操作までロックできていないという点が長所短所両方を抱えています。

PoCソースコード
tests/check.rs (追記)
// 上部省略

use std::sync::mpsc::{channel, Sender};

fn add_dummy_ticket(store: &mut TicketStore) -> TicketId {
    let draft = TicketDraft {
        title: ticket_title(),
        description: ticket_description(),
    };
    store.add_ticket(draft)
}

#[test]
fn ticket_op_in_store_locked() {
    let store = Arc::new(RwLock::new(TicketStore::new()));

    let (id_tx_12, id_rx_12): (Sender<TicketId>, _) = channel();
    let (tx_21, rx_21): (Sender<()>, _) = channel();

    let store1 = store.clone();
    let client1 = spawn(move || {
        let mut writer = store1.write().unwrap();
        let id1 = add_dummy_ticket(&mut writer);

        id_tx_12.send(id1).unwrap();

        drop(writer);

        rx_21.recv().unwrap();

        let mut writer = store1.write().unwrap();
        let id2 = add_dummy_ticket(&mut writer);

        id_tx_12.send(id2).unwrap();

        let _ = add_dummy_ticket(&mut writer);
    });

    let store2 = store.clone();
    let client2 = spawn(move || {
        let id1 = id_rx_12.recv().unwrap();

        let store_reader = store2.read().unwrap();

        let ticket1 = store_reader.get(id1).unwrap();

        let t1_reader = ticket1.read().unwrap();

        tx_21.send(()).unwrap();

        drop(store_reader);

        let _ = id_rx_12.recv().unwrap();

        println!("{:?}", *t1_reader);
    });

    client1.join().unwrap();
    client2.join().unwrap();
}

追加機能しかなく、削除機能がなければこの構造に特に問題はありません。しかし、削除できるようにするとどうでしょうか...? Arc は参照カウントが0になるまでデータを保持する構造体です。編集中のストアから対象チケットの Arc が削除されていても、クライアントは気づくことができません!自身が持つロックを解除し、 Arc の所有権を失った瞬間にアクセスできなくなります。あるいは削除を実行した他のクライアントから見ると、削除したはずなのに削除したという情報がまだロックを保有しているクライアントに渡らないという事態になります。

このように複雑にしてしまうぐらいなら、各チケットにまで Arc<RwLock<Ticket>> を設けるのではなく、ストアのみ Arc<Mutex<TicketStore>> として排他的にしたほうがデータ不整合の危険性は少ないですし、そもそも最初のようにサーバーで中央集権的に扱うほうがオプションが多そうです。

確かに並行処理の効率化という観点では今回の改良は良さそうですが、実際にプログラムを組む際はあまり取らない方針なんじゃないかと思いました。

[07_threads/14_sync] SyncトレイトとSendトレイト

Send に加えてここまで解説していなかった Sync の話題です!本エクササイズには問題はありません。というわけでBookの内容を参照しつつ軽い検証でも行おうと思います。

解説

Send は本記事群ではまともに説明してないかもしれないですが、これ以上はよいでしょう。スレッド間でその値を転送できることを示すマーカートレイトです。

Sync についてです。なぜ SyncSync という名前であり何に必要なのか...は何か深そうなので後で考えるとして、事実ベースで考えてみたいと思います。前回話した通りその値の参照が Send ならば Sync ...つまり impl Send for &Timpl Sync for T が同値です。

とりあえず、事実ベースで SendSync がついているかどうかを確認してみます。 RefCellMutexGuard に注目すると面白いのでした。Book に記載されている、それぞれの理由を見てみます!

RefCell

トレイト 可否 理由
Send :o: RefCell それ自体がスレッドを跨ぐ際は借用等がされていない状態であり、跨ぐ上で障害となる存在がない。 Send である。
Sync :x: 実体の共有参照 &RefCell には borrow_mut 能力(参照カウンターへのアクセス権)がある。共有参照ということは borrow_mut 能力が複数存在できる 。 カウンターへアクセスする権利は、(アトミックな操作なしでは)スレッドを跨げない。以上より &RefCell!Send であるから RefCell!Sync

RefCell 関連のPoCコード

1つ目 Playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=33bb9c8b23cc13fa087294d217986b8d

Rust
use std::cell::RefCell;
use std::thread;

fn main() {
    let v = RefCell::new(10_usize);
    
    thread::spawn(move || {
        println!("{:?}", v.borrow());
    }).join().unwrap();
}

RefCellSend であることを確認するコードです。実体がスレッドをまたいでも何も問題はなさそうですね!

2つ目 Playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=4b72ed909a3d5ac2e35b1bdafdd91235

Rust (コンパイルエラー)
use std::cell::RefCell;
use std::thread;
use std::sync::Arc;

fn main() {
    let v = RefCell::new(10_usize);
    let a = Arc::new(v);
    let a2 = Arc::clone(&a);

    thread::spawn(move || {
        println!("{:?}", a2.borrow());
    }).join().unwrap();
}

&RefCell!Send (つまり RefCell!Sync) であることを確かめるコードで、せっかくなので Arc さんに手伝ってもらいました! Arc で包まれることにより、 &RefCell はスレッドをまたいで存在しようとします。が!参照カウンターがレースコンディションを起こしてしまうので、本操作はダメです!それが検知されてコンパイルエラーになります。

検知の秘密は ArcSend にあります。 ArcT: Send + Sync の時にしか Send になってくれません!よってこのコードの Arc!Send となるので、コンパイルエラーになっています。

ちなみにRc

Rc は複製を行う処理がアトミックではないため他のスレッドに移るとレースコンディションが起きてしまい !Send なのでした。(参照は複数あれど)実体は単一である RefCell と異なり、 Rc は複数の実体が単一の可変な値(カウンター)へ依存してしまっているので、実体ですらスレッドを跨げません!

あるいは RefCell<T> それ自体は Deref<Target = T> を実装していないから Send にはできる、という解釈もよさそうです。 Deref<Target = T> でありながらスレッドを跨ぐには、アトミックな演算が必要そうです。

MutexGuard

トレイト 可否 理由
Send :x: ドロップ時に元の Mutex の参照カウンターを戻す責務を持っているが、スレッドを跨ぐとそれを遂行できない1!ゆえに !Send である。
Sync :o: &MutexGuard には 参照カウンター操作の責務がない ため別なスレッドに送っても特に問題ない。

MutexGuard 関連のPoCコード

Playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=d731e57a8ace21df06fff7c620fcb22d

Rust
use std::thread;
use std::sync::{Mutex, Arc, MutexGuard};

fn thrd(amv: Arc<Mutex<String>>) {
    let mut v_mg: MutexGuard<'_, String> = amv.lock().unwrap();
    
    *v_mg = format!("{} {}", *v_mg, *v_mg);

    thread::scope(|s| {
        s.spawn(|| {
            println!("1: {}", *v_mg);
        });
        s.spawn(|| {
            println!("2: {}", *v_mg)
        });
    });

}

fn main() {
    let amv1 = Arc::new(Mutex::new("Beep".to_string()));
    let amv2 = amv1.clone();
    
    let h1 = thread::spawn(move || {thrd(amv1)});
    let h2 = thread::spawn(move || {thrd(amv2)});
    
    h1.join().unwrap();
    h2.join().unwrap();
}

&MutexGuardSend (MutexGuardSync) なことが意外だったので、これを確認するコードです。 'static でない参照でもコンパイルが通るよう、 thread::spawn を使用しました。

あくまでも MutexGuard のドロップは thrd 関数の最後で起き、それ以外ではカウンター操作が起きないため、このようにスレッドを跨いでも問題ないことが確認できました!

Sync という名称の軽い考察

最初に Sync の説明に出会ったとき、筆者はまだRustに慣れていなかったこともあって混乱しました。なぜなら「 &TSend なら TSync 」と言われても、「Arc を使わないと そもそも参照はスレッドを跨げなくないか...?」 と思っていたからでした。実際は第28回で見た通り、 &'static T であったり、あるいはスコープ付きスレッド( thread::scope )を使えばライフタイム制限がある何某でもスレッドを跨ぐことが可能だったわけですが、当時はここまでで述べたような事実ベースでそう説明されてもよくわからなかったです。そのため、 Sync という名称から一生懸命に意味を考えていた時期がありました :sweat_smile:

今となっては上記で具体的な必要性を確認できたのでこれ以上悩む必要はないかもしれませんが、当時の筆者のためにも、なぜ Sync という名称なのか、自分が納得できる理由を考えてみたいと思います。

TRPLSync の説明には「 Sync で複数のスレッドからのアクセスを許可する」「 Sync を実装した型は、複数のスレッドから参照されても安全であることを示唆します。」とあります。

図にしてみると...なるほど Sync (hronize) ですね!

共有参照は「実は不変とは限らない」という話を第29回にて取り扱いました。ここで RefCell を始めとした内部可変性の話が出てきたのでした。そうすると、内部可変性を介して、「 & による共有参照の変更をスレッドを跨いでどう伝えるか?」という問題が出てきます。

RefCell は自身の共有参照間において(あるいは Rc 達は彼らの間において)この変更をうまく伝える (シンクロ・同期させる) 能力がなく、 Mutex の参照(や Arc )はアトミック操作のおかげで参照カウンターの変更を同期できます。ここに、 Sync という名称の由来がありそうですね。とりあえずこの説明で納得できそうです!

祈手アンブラハンズ はおそらく &黎明卿ボンドルドへの共有参照 なのでしょう。そして、 精神隷属機ゾアホリック はそれ自体は Send かもしれませんが、 その参照アンブラハンズが「層を跨ぐと都合が悪い(シンクロできない)」ことから、少なくとも !Sync なのだと考えられます!

並行処理 (複数スレッドにまたがる処理) で気を付けること まとめ

7章では並行処理及びマルチスレッド処理を見ていきました。8章ではさらに進んで非同期処理を見ていきますが、ぶっちゃけ「並行処理の勘所」さえ分かっていればそこまで難しくありません!その勘所は、まさしく並行処理文脈で出てきた SendSync'static だと筆者は考えています。

'static ライフタイム

「スレッドはいつまで存在するかわからない」ことより、「スレッドの存在期限を限定する( std::thread::scope )」か、この 'static ライフタイムを持つ型のみやり取りする 、という話が出てきました。簡単に言えば、困ったら .clone() で解決です!

SendSync

「スレッドを跨げるか否か」を表すマーカートレイト群でした。 第36回でおそらく扱う通り、スレッドを跨いだりしやすい専用の型 ( tokio::sync::Mutex など) を活用できればこれらのマーカートレイトがついてくれて解決できる場合がありますが、その際はどのみち デッドロック に気を付ける必要はありそうでした。非同期処理ではスレッドを跨ぎまくりますが、裏を返すとこの2つのマーカートレイトを押さえておくことがポイントでしょう。

では次の問題に行きましょう!

次の記事: 【34】 async fn・非同期タスク生成 ~Rustの非同期入門~

  1. 第36回でも話す予定ですが(忘れてるかも...)、非同期処理ランタイムtokio用の MutexGuard は、スレッドをまたいでもこのカウンターを戻す能力を有しているらしく Send です。強いけどその分すごく重そう...

5
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
2