前の記事
- 【0】 準備 ← 初回
- ...
- 【28】 リーク・スコープ付きスレッド ~ライフタイムに技あり!~ ← 前回
- 【29】 チャネル・参照の内部可変性 ~Rustの虎の子、mpscと
Rc<RefCell<T>>
~ ← 今回
全記事一覧
- 【0】 準備
- 【1】 構文・整数・変数
- 【2】 if・パニック・演習
- 【3】 可変・ループ・オーバーフロー
- 【4】 キャスト・構造体 (たまにUFCS)
- 【5】 バリデーション・モジュールの公開範囲 ~ → カプセル化!~
- 【6】 カプセル化の続きと所有権とセッター ~そして不変参照と可変参照!~
- 【7】 スタック・ヒープと参照のサイズ ~メモリの話~
- 【8】 デストラクタ(変数の終わり)・トレイト ~終わりと始まり~
- 【9】 Orphan rule (孤児ルール)・演算子オーバーロード・derive ~Empowerment 💪 ~
- 【10】 トレイト境界・文字列・Derefトレイト ~トレイトのアレコレ~
- 【11】 Sized トレイト・From トレイト・関連型 ~おもしろトレイトと関連型~
- 【12】 Clone・Copy・Dropトレイト ~覚えるべき主要トレイトたち~
- 【13】 トレイトまとめ・列挙型・match式 ~最強のトレイトの次は、最強の列挙型~
- 【14】 フィールド付き列挙型とOption型 ~チョクワガタ~
- 【15】 Result型 ~Rust流エラーハンドリング術~
- 【16】 Errorトレイトと外部クレート ~依存はCargo.tomlに全部お任せ!~
- 【17】 thiserror・TryFrom ~トレイトもResultも自由自在!~
- 【18】 Errorのネスト・慣例的な書き方 ~Rustらしさの目醒め~
- 【19】 配列・動的配列 ~スタックが使われる配列と、ヒープに保存できる動的配列~
- 【20】 動的配列のリサイズ・イテレータ ~またまたトレイト登場!~
- 【21】 イテレータ・ライフタイム ~ライフタイム注釈ようやく登場!~
- 【22】 コンビネータ・RPIT ~ 「
Iterator
トレイトを実装してるやつ」~ - 【23】
impl Trait
・スライス ~配列の欠片~ - 【24】 可変スライス・下書き構造体 ~構造体で状態表現~
- 【25】 インデックス・可変インデックス ~インデックスもトレイト!~
- 【26】 HashMap・順序・BTreeMap ~Rustの辞書型~
- 【27】 スレッド・'staticライフタイム ~並列処理に見るRustの恩恵~
- 【28】 リーク・スコープ付きスレッド ~ライフタイムに技あり!~
- 【29】 チャネル・参照の内部可変性 ~Rustの虎の子、mpscと
Rc<RefCell<T>>
~ - 【30】 双方向通信・リファクタリング ~返信用封筒を入れよう!~
- 【31】 上限付きチャネル・PATCH機能 ~パンクしないように制御!~
- 【32】
Send
・排他的ロック(Mutex
)・非対称排他的ロック(RwLock
) ~真打Arc<Mutex<T>>
登場~ - 【33】 チャネルなしで実装・Syncの話 ~考察回です~
- 【34】
async fn
・非同期タスク生成 ~Rustの非同期入門~ - 【35】 非同期ランタイム・Futureトレイト ~非同期のお作法~
- 【36】 ブロッキング・非同期用の実装・キャンセル ~ラストスパート!~
- 【37】 Axumでクラサバ! ~最終回~
- 【おまけ1】 Rustで勘違いしていたこと3選 🏄🌴 【100 Exercises To Learn Rust 🦀 完走記事 🏃】
- 【おまけ2】 【🙇 懺悔 🙇】Qiitanグッズ欲しさに1日に33記事投稿した話またはQiita CLIとcargo scriptを布教する的な何か
100 Exercise To Learn Rust 演習第29回になります!
今回の関連ページ
[07_threads/05_channels] チャネル (Multi Producer Single Consumer)
問題はこちらです。
use std::sync::mpsc::{Receiver, Sender};
pub mod data;
pub mod store;
pub enum Command {
Insert(todo!()),
}
// Start the system by spawning the server the thread.
// It returns a `Sender` instance which can then be used
// by one or more clients to interact with the server.
pub fn launch() -> Sender<Command> {
let (sender, receiver) = std::sync::mpsc::channel();
std::thread::spawn(move || server(receiver));
sender
}
// TODO: The server task should **never** stop.
// Enter a loop: wait for a command to show up in
// the channel, then execute it, then start waiting
// for the next command.
pub fn server(receiver: Receiver<Command>) {}
その他のファイル
use crate::store::TicketId;
use ticket_fields::{TicketDescription, TicketTitle};
#[derive(Clone, Debug, PartialEq)]
pub struct Ticket {
pub id: TicketId,
pub title: TicketTitle,
pub description: TicketDescription,
pub status: Status,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TicketDraft {
pub title: TicketTitle,
pub description: TicketDescription,
}
#[derive(Clone, Debug, Copy, PartialEq, Eq)]
pub enum Status {
ToDo,
InProgress,
Done,
}
use crate::data::{Status, Ticket, TicketDraft};
use std::collections::BTreeMap;
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct TicketId(u64);
#[derive(Clone)]
pub struct TicketStore {
tickets: BTreeMap<TicketId, Ticket>,
counter: u64,
}
impl TicketStore {
pub fn new() -> Self {
Self {
tickets: BTreeMap::new(),
counter: 0,
}
}
pub fn add_ticket(&mut self, ticket: TicketDraft) -> TicketId {
let id = TicketId(self.counter);
self.counter += 1;
let ticket = Ticket {
id,
title: ticket.title,
description: ticket.description,
status: Status::ToDo,
};
self.tickets.insert(id, ticket);
id
}
}
// TODO: Set `move_forward` to `true` in `ready` when you think you're done with this exercise.
// Feel free to call an instructor to verify your solution!
use channels::data::TicketDraft;
use channels::{launch, Command};
use std::time::Duration;
use ticket_fields::test_helpers::{ticket_description, ticket_title};
#[test]
fn a_thread_is_spawned() {
let sender = launch();
std::thread::sleep(Duration::from_millis(200));
sender
.send(Command::Insert(TicketDraft {
title: ticket_title(),
description: ticket_description(),
}))
// If the thread is no longer running, this will panic
// because the channel will be closed.
.expect("Did you actually spawn a thread? The channel is closed!");
}
#[test]
fn ready() {
// There's very little that we can check automatically in this exercise,
// since our server doesn't expose any **read** actions.
// We have no way to know if the inserts are actually happening and if they
// are happening correctly.
let move_forward = false; // 完了後 true に変更していますが、解説では省略
assert!(move_forward);
}
今回から、クラサバ(クライアント・サーバーシステム)とするべく、チケット管理システムを改修していく問題です!
クラサバを実現するには、サーバー用スレッド・クライアント用スレッド1を用意し、両スレッド間でデータ送受信を実現すると具合が良さそうです。
しかし前回までとは異なり、クライアント用スレッドはともかく、少なくともサーバー用スレッドは息が長いスレッドになりそうです。前回までで扱ってきた「スレッド生成時にクロージャに変数をキャプチャさせる」「スレッド終了時に join
して返された値を活用する」方法ではこのデータの送受信を行うのは難しいでしょう。
解説
というわけで、スレッド間でデータの送受信を行う新たな機能 mpsc
(Multi Producer Single Consumer) の登場です!クラサバに限らず、Rustにおけるスレッド間通信において主役級の働きをする仕組みです。
今回の問題は mpsc
を使ってみましょう演習です!
use std::sync::mpsc::{Receiver, Sender};
pub mod data;
pub mod store;
use data::TicketDraft;
use store::TicketStore;
pub enum Command {
Insert(TicketDraft),
}
pub fn launch() -> Sender<Command> {
let (sender, receiver) = std::sync::mpsc::channel();
std::thread::spawn(move || server(receiver));
sender
}
pub fn server(receiver: Receiver<Command>) {
let mut ticket_store = TicketStore::new();
// while let Ok(Command::Insert(ticket)) = receiver.recv() {
for Command::Insert(ticket) in receiver {
ticket_store.add_ticket(ticket);
dbg!(&ticket_store);
}
}
mpsc
に関しては正直筆者の下手な解説よりもわかりやすい TRPL の方を読んでほしいなと思いますが(Golangのチャネルモデルを参考にしているなど、面白いことが書いてあります。)、とりあえず回答範囲で必要なことについて解説したいと思います。
let (sender, receiver): (Sender<Command>, Receiver<Command>) = std::sync::mpsc::channel();
この行で、クライアントからサーバーへのコマンド Command
を送信するためのチャネルを作成しています。 Sender<Command>
はクライアント側、 Receiver<Command>
はサーバー側で持つ形になります。受信側となる Receiver
は誰に割り振ればよいのかわからないため単一で Clone
できませんが、一方で Sender
の方は複数スレッドから送信しても問題ないため Clone
が実装されており複製可能です。故に「mpsc」、マルチプロデューサシングルコンシューマと呼びます2!
TODOとなっている部分について、コマンドにより Insert(TicketDraft)
として下書きチケットをストアに挿入できるようにした上で、サーバーの処理を記述しています。
pub fn server(receiver: Receiver<Command>) {
let mut ticket_store = TicketStore::new();
// while let Ok(Command::Insert(ticket)) = receiver.recv() {
for Command::Insert(ticket) in receiver {
ticket_store.add_ticket(ticket);
dbg!(&ticket_store);
}
}
ここで、 receiver
にてコマンドを受信し取り出す方法は複数あります。一つはコメントアウトしてある while let
構文を使う方法で、この構文は条件式がパターンにマッチし続ける間繰り返しを行います。ここでは「 receiver.recv()
が Ok(Command::Insert(ticket))
にマッチし続ける間、ブロック内処理を繰り返す」という意味になっています。今回示した mpsc
の他、スタックやキュー、ヒープ構造等の何かから値を取り出し続ける処理をきれいに書けたりして競プロで重宝したりします。何気に筆者が好きな構文です!
また Receiver
は第20回で登場した IntoIterator
を実装しているため、 for
文に直接指定することも可能です!この場合、イテレートごとに Command::Insert(tikcet)
が得られるので、これを分割代入で受け取り、 for
文内で ticket_store
に登録しています。
[07_threads/06_interior_mutability] 参照の内部可変性 ( Rc<RefCell<T>>
)
内部可変性
問題に入る前に3、今回の Book は中々深い内容なので是非読んでほしいです(毎度のツッコミながら、Bookはそもそも全ページ気づきがありますね)。以下、Bookの内容をそのまま受け売りして解説しています。
先の問題の mpsc::Sender
は、不変参照であるにも関わらず「値をどこかしらへ保存する」という あからさまに可変であるかのような挙動をしています。値自体は Receiver
へ送られるため、UIとして見た場合 Sender
は不変であるほうがわかりやすいですが、実態としては「不変」参照ではないんじゃないか?という疑問が湧きます。
基本的には不変参照以下の要素が可変になることはない4のですが、 Sender
のような可変ながら不変参照として扱いたいという需要に応えるため、唯一の例外として UnsafeCell
を持つときに限り その UnsafeCell
の対象は例え不変参照以下でも可変にすることができます。これを「内部可変性」といいます。
そして「内部可変性」の存在により「不変参照」は実は不変とは限らず、以下のように捉えるべきらしいです。
- 直感的で大体合っている捉え方
-
&T
: 不変参照 -
&mut T
: 可変参照
-
- もう少し正確な捉え方
-
&T
: 共有参照 -
&mut T
: 排他参照
-
この例外的な扱いはコンパイラのために存在するらしく、 UnsafeCell
が付いているフィールドには最適化が施されない (逆に言うと UnsafeCell
が付いていない通常の不変参照は最適化される )そうです。第12回で扱ったように、マーカートレイト Copy
が付いていると複製に副作用がないと仮定しメモリをコピーするだけになったり、 Copy
と Drop
が排他的であったりと同様に、(今回はトレイトではなく型ですが)コンパイラが吐くバイナリの挙動に影響を及ぼす機構ということですね!
UnsafeCell
自体をそのまま使うことはあまりなく、 UnsafeCell
に類似した機能として Cell
や RefCell
といったラッパーの形で目にすることが大半でしょう。 Cell
と RefCell
の使い分けですが筆者もよく忘れるので、以下に現在の理解をメモしておきます(直感を優先し詳細な説明を省いています)。
-
Cell
: 構造体のCopy
なフィールドに対してゲッター・セッターのようなものを提供してくれる -
RefCell
: ランタイム時に参照の排他性を確認することで、内包する値への共有参照または排他参照を提供してくれる
Playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=f0940f6b57e1e70cf15c946605fcadd4
問題
大分前置きが長くなりましたが、問題はこちらです。
// TODO: Use `Rc` and `RefCell` to implement `DropTracker<T>`, a wrapper around a value of type `T`
// that increments a shared `usize` counter every time the wrapped value is dropped.
use std::cell::RefCell;
use std::rc::Rc;
pub struct DropTracker<T> {
value: T,
counter: todo!(),
}
impl<T> DropTracker<T> {
pub fn new(value: T, counter: todo!()) -> Self {
Self { value, counter }
}
}
impl<T> Drop for DropTracker<T> {
fn drop(&mut self) {
todo!()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let counter = Rc::new(RefCell::new(0));
let _ = DropTracker::new((), Rc::clone(&counter));
assert_eq!(*counter.borrow(), 1);
}
#[test]
fn multiple() {
let counter = Rc::new(RefCell::new(0));
{
let a = DropTracker::new(5, Rc::clone(&counter));
let b = DropTracker::new(6, Rc::clone(&counter));
}
assert_eq!(*counter.borrow(), 2);
}
}
解説
チケットストアからは一旦離れ、 Rc<RefCell<T>>
を利用して「 T
型の値がドロップした回数をカウントする DropTracker
なる構造体を作ろう」という問題です!
use std::cell::RefCell;
use std::rc::Rc;
pub struct DropTracker<T> {
value: T,
counter: Rc<RefCell<usize>>,
}
impl<T> DropTracker<T> {
pub fn new(value: T, counter: Rc<RefCell<usize>>) -> Self {
Self { value, counter }
}
}
impl<T> Drop for DropTracker<T> {
fn drop(&mut self) {
let DropTracker { counter, .. } = self;
*counter.borrow_mut() += 1;
}
}
上述した RefCell
に加え、 Rc
(Reference Counter、参照カウンタ) という構造体が登場しています。
Rc
は(語弊を恐れずに言うと)「所有権を」共有する構造体です。 Rc<T>
は Deref<Target = T>
を実装しているため、r: Rc<T>
に対し &r
とすることで内包する値への参照 &T
を得ることができます!
共有参照とどう違うのでしょうか...?ミソは所有権の所在です。普通の参照は参照先の値の「所有者」がいて、その上で複数の不変参照(共有参照)あるいは単一の可変参照(排他参照)が共有されています。
通常の値とその参照の関係
一方で Rc
は内包する値の所有者の所在(片付ける責任を持つ変数)が不明瞭になる構造体で、「 Rc
参照が残っている限り実体が生き続ける」構造体です!
Rc
(図がわかりにくかったら申し訳ないです...)
以上より Rc
は参照の存在を基準に値のライフタイムを定めたい時(例: リスト構造)に重宝します。これ、実は「ガベージコレクションの一種」です!そう、Rustには別にガベージコレクションが「ない」わけではないんですね...使いたい時に明示されるだけなのです。
Rc
は「複製されるたび、内部で参照数をカウントし、参照数が0になったら削除」という仕組みで動いています...「内部でカウント」...?そうです! Rc
もまた 内部可変性 を持った構造体です。
Rc<T>
はあくまでも Deref<Target = T>
のみ実装し、 DerefMut
は実装しないため、内包する値への可変参照を得たい場合更に別な構造体でラップする必要があります。今回の問題ではカウンターを増減させるために可変が必要ですね。ランタイム時に可変を得られれば良いので、 RefCell
で包んでいます。
このようにしてRust名物のややこしい構造体 Rc<RefCell<T>>
が登場します!今回の問題では、それぞれ次の目的で使われています。
-
Rc
: 複数のDropTracker
間でカウンターを共有するために使用。 最後のDropTracker
が消えるまでカウンターには生きていてもらいたい -
RefCell
: カウンターの共有参照から可変を得るため(内部可変性のため)に使用。 カウンターへの排他参照が可能なタイミングであれば可変として参照できる 。今回はDrop時に取得しカウントを増やしている
この辺の話題を扱っていると &T
なのか &Cell<T>
なのか &Rc<T>
なのか Ref<'_, T>
なのか、所有権はどうなのか借用条件はどうなのか等、まさしくRustの所有権パラダイムで迷子になる感覚がありますね...!その分他の回よりも力が入ってしまいました
...では次の問題に行きましょう!(強引)
次の記事: 【30】 双方向通信・リファクタリング ~返信用封筒を入れよう!~
登場したPlayground
(実際に無効化したことはないですが、)Rust Playground上のデータが喪失する可能性を鑑みて、一応記事にもソースコードを掲載することとしました。
use std::cell::{Cell, RefCell};
fn cell_example() {
struct Singleton {
counter_field: Cell<u32>,
}
fn count_up(s: &Singleton) {
let count = s.counter_field.get();
s.counter_field.set(count + 1);
}
fn print_singleton(s: &Singleton) {
println!("count: {}", s.counter_field.get());
}
let s = Singleton {
counter_field: Cell::new(0),
};
for _ in 0..10 {
print_singleton(&s);
count_up(&s);
}
print_singleton(&s);
}
fn refcell_example() {
fn count_up(c: &mut u32) {
*c += 1;
}
fn print_counter(c: &u32) {
println!("count: {}", c);
}
let counter: RefCell<u32> = RefCell::new(0);
for _ in 0..10 {
print_counter(&counter.borrow());
count_up(&mut counter.borrow_mut());
}
print_counter(&counter.borrow());
}
fn main() {
println!("cell_example");
cell_example();
println!("\nrefcell_example");
refcell_example();
}
use std::rc::{Rc, Weak};
use std::fmt::Display;
use std::cell::RefCell;
struct Node<T: Display> {
val: T,
next: RefCell<Option<Rc<Node<T>>>>,
prev: RefCell<Weak<Node<T>>>,
}
impl<T: Display> Node<T> {
fn new(val: T) -> Rc<Self> {
Rc::new(Self {
val,
next: RefCell::new(None),
prev: RefCell::new(Weak::new()),
})
}
fn get_val(&self) -> &T {
&self.val
}
fn chain(self: &Rc<Self>, other: Rc<Self>) {
*other.prev.borrow_mut() = Rc::downgrade(self);
*self.next.borrow_mut() = Some(other);
}
fn get_next(&self) -> Option<Rc<Self>> {
self.next.borrow().as_ref().cloned()
}
fn get_prev(&self) -> Option<Rc<Self>> {
self.prev.borrow().upgrade()
}
}
fn print_nodes<T: Display>(mut top: Rc<Node<T>>) {
print!("{}", top.get_val());
while let Some(r) = top.get_next() {
print!("->{}", r.get_val());
top = r;
}
println!("");
}
fn print_parents<T: Display>(mut last: Rc<Node<T>>) {
print!("{}", last.get_val());
while let Some(p) = last.get_prev() {
print!("<-{}", p.get_val());
last = p;
}
println!("");
}
impl<T: Display> Drop for Node<T> {
fn drop(&mut self) {
println!("Drop: {}", self.get_val());
}
}
fn main() {
let top = Node::new(0_usize);
let mut cursor = Rc::clone(&top);
for i in 1..=10 {
let new_node = Node::new(i);
cursor.chain(Rc::clone(&new_node));
print_nodes(Rc::clone(&top));
cursor = new_node;
}
println!("Parents:");
print_parents(cursor);
}
-
バックエンドエンジニア視点だとクライアントが同一マシンの別スレッドというのは何か違和感を感じますが...まぁ例なので良いですし今回作るようなケースは実際の事例としてなくはないと思います ↩
-
mpmc (Multi Producer Multi Consumer) も無いわけではなく、crossbeam にそれに該当するものがあったりします(要確認)。まぁ通常用途ではConsumer側は複製禁止の方がわかりやすいです。 ↩
-
問題に関係ない部分が長くなった&なんとなくノリで、いつもと異なり問題パート・解説パートの前に内容パートを入れてみました。本当なら各記事こういうレイアウトの方が実りがあるかもですね。 ↩
-
不変な実体(構造体)が持つ可変参照については、
UnsafeCell
がなくとも可変に扱えます。内部可変性はあくまでも不変参照下での話のようです。 ↩