2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

お題は不問!Qiita Engineer Festa 2024で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

Rust 100 Ex 🏃【29/37】 チャネル・参照の内部可変性 ~Rustの虎の子、mpscと `Rc<RefCell<T>>`~

Last updated at Posted at 2024-07-17

前の記事

全記事一覧

100 Exercise To Learn Rust 演習第29回になります!

今回の関連ページ

[07_threads/05_channels] チャネル (Multi Producer Single Consumer)

問題はこちらです。

lib.rs
use std::sync::mpsc::{Receiver, Sender};

pub mod data;
pub mod store;

pub enum Command {
    Insert(todo!()),
}

// Start the system by spawning the server the thread.
// It returns a `Sender` instance which can then be used
// by one or more clients to interact with the server.
pub fn launch() -> Sender<Command> {
    let (sender, receiver) = std::sync::mpsc::channel();
    std::thread::spawn(move || server(receiver));
    sender
}

// TODO: The server task should **never** stop.
//  Enter a loop: wait for a command to show up in
//  the channel, then execute it, then start waiting
//  for the next command.
pub fn server(receiver: Receiver<Command>) {}
その他のファイル
data.rs
use crate::store::TicketId;
use ticket_fields::{TicketDescription, TicketTitle};

#[derive(Clone, Debug, PartialEq)]
pub struct Ticket {
    pub id: TicketId,
    pub title: TicketTitle,
    pub description: TicketDescription,
    pub status: Status,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TicketDraft {
    pub title: TicketTitle,
    pub description: TicketDescription,
}

#[derive(Clone, Debug, Copy, PartialEq, Eq)]
pub enum Status {
    ToDo,
    InProgress,
    Done,
}

store.rs
use crate::data::{Status, Ticket, TicketDraft};
use std::collections::BTreeMap;

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct TicketId(u64);

#[derive(Clone)]
pub struct TicketStore {
    tickets: BTreeMap<TicketId, Ticket>,
    counter: u64,
}

impl TicketStore {
    pub fn new() -> Self {
        Self {
            tickets: BTreeMap::new(),
            counter: 0,
        }
    }

    pub fn add_ticket(&mut self, ticket: TicketDraft) -> TicketId {
        let id = TicketId(self.counter);
        self.counter += 1;
        let ticket = Ticket {
            id,
            title: ticket.title,
            description: ticket.description,
            status: Status::ToDo,
        };
        self.tickets.insert(id, ticket);
        id
    }
}
tests/insert.rs
// TODO: Set `move_forward` to `true` in `ready` when you think you're done with this exercise.
//  Feel free to call an instructor to verify your solution!
use channels::data::TicketDraft;
use channels::{launch, Command};
use std::time::Duration;
use ticket_fields::test_helpers::{ticket_description, ticket_title};

#[test]
fn a_thread_is_spawned() {
    let sender = launch();
    std::thread::sleep(Duration::from_millis(200));

    sender
        .send(Command::Insert(TicketDraft {
            title: ticket_title(),
            description: ticket_description(),
        }))
        // If the thread is no longer running, this will panic
        // because the channel will be closed.
        .expect("Did you actually spawn a thread? The channel is closed!");
}

#[test]
fn ready() {
    // There's very little that we can check automatically in this exercise,
    // since our server doesn't expose any **read** actions.
    // We have no way to know if the inserts are actually happening and if they
    // are happening correctly.
    let move_forward = false; // 完了後 true に変更していますが、解説では省略

    assert!(move_forward);
}

今回から、クラサバ(クライアント・サーバーシステム)とするべく、チケット管理システムを改修していく問題です!

クラサバを実現するには、サーバー用スレッド・クライアント用スレッド1を用意し、両スレッド間でデータ送受信を実現すると具合が良さそうです。

しかし前回までとは異なり、クライアント用スレッドはともかく、少なくともサーバー用スレッドは息が長いスレッドになりそうです。前回までで扱ってきた「スレッド生成時にクロージャに変数をキャプチャさせる」「スレッド終了時に join して返された値を活用する」方法ではこのデータの送受信を行うのは難しいでしょう。

解説

というわけで、スレッド間でデータの送受信を行う新たな機能 mpsc (Multi Producer Single Consumer) の登場です!クラサバに限らず、Rustにおけるスレッド間通信において主役級の働きをする仕組みです。

今回の問題は mpsc を使ってみましょう演習です!

lib.rs
use std::sync::mpsc::{Receiver, Sender};

pub mod data;
pub mod store;

use data::TicketDraft;
use store::TicketStore;

pub enum Command {
    Insert(TicketDraft),
}

pub fn launch() -> Sender<Command> {
    let (sender, receiver) = std::sync::mpsc::channel();
    std::thread::spawn(move || server(receiver));
    sender
}

pub fn server(receiver: Receiver<Command>) {
    let mut ticket_store = TicketStore::new();

    // while let Ok(Command::Insert(ticket)) = receiver.recv() {
    for Command::Insert(ticket) in receiver {
        ticket_store.add_ticket(ticket);

        dbg!(&ticket_store);
    }
}

mpsc に関しては正直筆者の下手な解説よりもわかりやすい TRPL の方を読んでほしいなと思いますが(Golangのチャネルモデルを参考にしているなど、面白いことが書いてあります。)、とりあえず回答範囲で必要なことについて解説したいと思います。

Rust (抜粋と追記)
let (sender, receiver): (Sender<Command>, Receiver<Command>) = std::sync::mpsc::channel();

この行で、クライアントからサーバーへのコマンド Command を送信するためのチャネルを作成しています。 Sender<Command> はクライアント側、 Receiver<Command> はサーバー側で持つ形になります。受信側となる Receiver は誰に割り振ればよいのかわからないため単一で Clone できませんが、一方で Sender の方は複数スレッドから送信しても問題ないため Clone が実装されており複製可能です。故に「mpsc」、マルチプロデューサシングルコンシューマと呼びます2

TODOとなっている部分について、コマンドにより Insert(TicketDraft) として下書きチケットをストアに挿入できるようにした上で、サーバーの処理を記述しています。

Rust
pub fn server(receiver: Receiver<Command>) {
    let mut ticket_store = TicketStore::new();

    // while let Ok(Command::Insert(ticket)) = receiver.recv() {
    for Command::Insert(ticket) in receiver {
        ticket_store.add_ticket(ticket);

        dbg!(&ticket_store);
    }
}

ここで、 receiver にてコマンドを受信し取り出す方法は複数あります。一つはコメントアウトしてある while let 構文を使う方法で、この構文は条件式がパターンにマッチし続ける間繰り返しを行います。ここでは「 receiver.recv()Ok(Command::Insert(ticket)) にマッチし続ける間、ブロック内処理を繰り返す」という意味になっています。今回示した mpsc の他、スタックやキュー、ヒープ構造等の何かから値を取り出し続ける処理をきれいに書けたりして競プロで重宝したりします。何気に筆者が好きな構文です!

また Receiver第20回で登場した IntoIterator を実装しているため、 for 文に直接指定することも可能です!この場合、イテレートごとに Command::Insert(tikcet) が得られるので、これを分割代入で受け取り、 for 文内で ticket_store に登録しています。

[07_threads/06_interior_mutability] 参照の内部可変性 ( Rc<RefCell<T>> )

内部可変性

問題に入る前に3、今回の Book は中々深い内容なので是非読んでほしいです(毎度のツッコミながら、Bookはそもそも全ページ気づきがありますね)。以下、Bookの内容をそのまま受け売りして解説しています。

先の問題の mpsc::Sender は、不変参照であるにも関わらず「値をどこかしらへ保存する」という あからさまに可変であるかのような挙動をしています。値自体は Receiver へ送られるため、UIとして見た場合 Sender は不変であるほうがわかりやすいですが、実態としては「不変」参照ではないんじゃないか?という疑問が湧きます。

基本的には不変参照以下の要素が可変になることはない4のですが、 Sender のような可変ながら不変参照として扱いたいという需要に応えるため、唯一の例外として UnsafeCellを持つときに限り その UnsafeCell の対象は例え不変参照以下でも可変にすることができます。これを「内部可変性」といいます。

そして「内部可変性」の存在により「不変参照」は実は不変とは限らず、以下のように捉えるべきらしいです。

  • 直感的で大体合っている捉え方
    • &T: 不変参照
    • &mut T: 可変参照
  • もう少し正確な捉え方
    • &T: 共有参照
    • &mut T: 排他参照

この例外的な扱いはコンパイラのために存在するらしく、 UnsafeCell が付いているフィールドには最適化が施されない (逆に言うと UnsafeCell が付いていない通常の不変参照は最適化される )そうです。第12回で扱ったように、マーカートレイト Copy が付いていると複製に副作用がないと仮定しメモリをコピーするだけになったり、 CopyDrop が排他的であったりと同様に、(今回はトレイトではなく型ですが)コンパイラが吐くバイナリの挙動に影響を及ぼす機構ということですね!

UnsafeCell 自体をそのまま使うことはあまりなく、 UnsafeCell に類似した機能として CellRefCell といったラッパーの形で目にすることが大半でしょう。 CellRefCell の使い分けですが筆者もよく忘れるので、以下に現在の理解をメモしておきます(直感を優先し詳細な説明を省いています)。

  • Cell: 構造体の Copy なフィールドに対してゲッター・セッターのようなものを提供してくれる
  • RefCell: ランタイム時に参照の排他性を確認することで、内包する値への共有参照または排他参照を提供してくれる

Playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=f0940f6b57e1e70cf15c946605fcadd4

問題

大分前置きが長くなりましたが、問題はこちらです。

lib.rs
// TODO: Use `Rc` and `RefCell` to implement `DropTracker<T>`, a wrapper around a value of type `T`
//  that increments a shared `usize` counter every time the wrapped value is dropped.

use std::cell::RefCell;
use std::rc::Rc;

pub struct DropTracker<T> {
    value: T,
    counter: todo!(),
}

impl<T> DropTracker<T> {
    pub fn new(value: T, counter: todo!()) -> Self {
        Self { value, counter }
    }
}

impl<T> Drop for DropTracker<T> {
    fn drop(&mut self) {
        todo!()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn it_works() {
        let counter = Rc::new(RefCell::new(0));
        let _ = DropTracker::new((), Rc::clone(&counter));
        assert_eq!(*counter.borrow(), 1);
    }

    #[test]
    fn multiple() {
        let counter = Rc::new(RefCell::new(0));

        {
            let a = DropTracker::new(5, Rc::clone(&counter));
            let b = DropTracker::new(6, Rc::clone(&counter));
        }

        assert_eq!(*counter.borrow(), 2);
    }
}

解説

チケットストアからは一旦離れ、 Rc<RefCell<T>> を利用して「 T 型の値がドロップした回数をカウントする DropTracker なる構造体を作ろう」という問題です!

lib.rs
use std::cell::RefCell;
use std::rc::Rc;

pub struct DropTracker<T> {
    value: T,
    counter: Rc<RefCell<usize>>,
}

impl<T> DropTracker<T> {
    pub fn new(value: T, counter: Rc<RefCell<usize>>) -> Self {
        Self { value, counter }
    }
}

impl<T> Drop for DropTracker<T> {
    fn drop(&mut self) {
        let DropTracker { counter, .. } = self;

        *counter.borrow_mut() += 1;
    }
}

上述した RefCell に加え、 Rc (Reference Counter、参照カウンタ) という構造体が登場しています。

Rc は(語弊を恐れずに言うと)「所有権を」共有する構造体です。 Rc<T>Deref<Target = T> を実装しているため、r: Rc<T> に対し &r とすることで内包する値への参照 &T を得ることができます!

共有参照とどう違うのでしょうか...?ミソは所有権の所在です。普通の参照は参照先の値の「所有者」がいて、その上で複数の不変参照(共有参照)あるいは単一の可変参照(排他参照)が共有されています。

通常の値とその参照の関係

一方で Rc は内包する値の所有者の所在(片付ける責任を持つ変数)が不明瞭になる構造体で、「 Rc 参照が残っている限り実体が生き続ける」構造体です!

Rc

(図がわかりにくかったら申し訳ないです...)

以上より Rc は参照の存在を基準に値のライフタイムを定めたい時(例: リスト構造)に重宝します。これ、実は「ガベージコレクションの一種」です!そう、Rustには別にガベージコレクションが「ない」わけではないんですね...使いたい時に明示されるだけなのです。

Rc は「複製されるたび、内部で参照数をカウントし、参照数が0になったら削除」という仕組みで動いています...「内部でカウント」...?そうです! Rc もまた 内部可変性 を持った構造体です。

Rc<T> はあくまでも Deref<Target = T> のみ実装し、 DerefMut は実装しないため、内包する値への可変参照を得たい場合更に別な構造体でラップする必要があります。今回の問題ではカウンターを増減させるために可変が必要ですね。ランタイム時に可変を得られれば良いので、 RefCell で包んでいます。

このようにしてRust名物のややこしい構造体 Rc<RefCell<T>> が登場します!今回の問題では、それぞれ次の目的で使われています。

  • Rc: 複数の DropTracker 間でカウンターを共有するために使用。 最後の DropTracker が消えるまでカウンターには生きていてもらいたい
  • RefCell: カウンターの共有参照から可変を得るため(内部可変性のため)に使用。 カウンターへの排他参照が可能なタイミングであれば可変として参照できる 。今回はDrop時に取得しカウントを増やしている

この辺の話題を扱っていると &T なのか &Cell<T> なのか &Rc<T> なのか Ref<'_, T> なのか、所有権はどうなのか借用条件はどうなのか等、まさしくRustの所有権パラダイムで迷子になる感覚がありますね...!その分他の回よりも力が入ってしまいました :bow:

...では次の問題に行きましょう!(強引)

次の記事: 【30】 双方向通信・リファクタリング ~返信用封筒を入れよう!~

登場したPlayground

(実際に無効化したことはないですが、)Rust Playground上のデータが喪失する可能性を鑑みて、一応記事にもソースコードを掲載することとしました。

URL: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=f0940f6b57e1e70cf15c946605fcadd4

Rust
use std::cell::{Cell, RefCell};

fn cell_example() {
    struct Singleton {
        counter_field: Cell<u32>,
    }

    fn count_up(s: &Singleton) {
        let count = s.counter_field.get();
        s.counter_field.set(count + 1);
    }
    
    fn print_singleton(s: &Singleton) {
        println!("count: {}", s.counter_field.get());
    }
    
    let s = Singleton {
        counter_field: Cell::new(0),
    };

    for _ in 0..10 {
        print_singleton(&s);
        count_up(&s);
    }
    
    print_singleton(&s);
}

fn refcell_example() {
    fn count_up(c: &mut u32) {
        *c += 1;
    }
    
    fn print_counter(c: &u32) {
        println!("count: {}", c);
    }

    let counter: RefCell<u32> = RefCell::new(0);
    
    for _ in 0..10 {
        print_counter(&counter.borrow());
        count_up(&mut counter.borrow_mut());
    }
    
    print_counter(&counter.borrow());
}

fn main() {
    println!("cell_example");
    cell_example();
    println!("\nrefcell_example");
    refcell_example();
}

URL: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=f2de4bfa5ae20a590686fa02dd3a577a

Rust
use std::rc::{Rc, Weak};
use std::fmt::Display;
use std::cell::RefCell;

struct Node<T: Display> {
    val: T,
    next: RefCell<Option<Rc<Node<T>>>>,
    prev: RefCell<Weak<Node<T>>>,
}

impl<T: Display> Node<T> {
    fn new(val: T) -> Rc<Self> {
        Rc::new(Self {
            val,
            next: RefCell::new(None),
            prev: RefCell::new(Weak::new()),
        })
    }
    
    fn get_val(&self) -> &T {
        &self.val
    }
    
    fn chain(self: &Rc<Self>, other: Rc<Self>) {
        *other.prev.borrow_mut() = Rc::downgrade(self);
        *self.next.borrow_mut() = Some(other);
    }
    
    fn get_next(&self) -> Option<Rc<Self>> {
        self.next.borrow().as_ref().cloned()
    }
    
    fn get_prev(&self) -> Option<Rc<Self>> {
        self.prev.borrow().upgrade()
    }
}

fn print_nodes<T: Display>(mut top: Rc<Node<T>>) {
    print!("{}", top.get_val());

    while let Some(r) = top.get_next() {
        print!("->{}", r.get_val());
        top = r;
    }
    
    println!("");
}

fn print_parents<T: Display>(mut last: Rc<Node<T>>) {
    print!("{}", last.get_val());
    
    while let Some(p) = last.get_prev() {
        print!("<-{}", p.get_val());
        last = p;
    }
    
    println!("");
}

impl<T: Display> Drop for Node<T> {
    fn drop(&mut self) {
        println!("Drop: {}", self.get_val());
    }
}

fn main() {
    let top = Node::new(0_usize);
    let mut cursor = Rc::clone(&top);

    for i in 1..=10 {
        let new_node = Node::new(i);
        cursor.chain(Rc::clone(&new_node));
        
        print_nodes(Rc::clone(&top));

        cursor = new_node;
    }
    
    println!("Parents:");
    print_parents(cursor);
}
  1. バックエンドエンジニア視点だとクライアントが同一マシンの別スレッドというのは何か違和感を感じますが...まぁ例なので良いですし今回作るようなケースは実際の事例としてなくはないと思います

  2. mpmc (Multi Producer Multi Consumer) も無いわけではなく、crossbeam にそれに該当するものがあったりします(要確認)。まぁ通常用途ではConsumer側は複製禁止の方がわかりやすいです。

  3. 問題に関係ない部分が長くなった&なんとなくノリで、いつもと異なり問題パート・解説パートの前に内容パートを入れてみました。本当なら各記事こういうレイアウトの方が実りがあるかもですね。

  4. 不変な実体(構造体)が持つ可変参照については、 UnsafeCell がなくとも可変に扱えます。内部可変性はあくまでも不変参照下での話のようです。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?