1
0
お題は不問!Qiita Engineer Festa 2024で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

Rust 100 Ex 🏃【31/37】 上限付きチャネル・PATCH機能 ~パンクしないように制御!~

Last updated at Posted at 2024-07-17

前の記事

全記事一覧

100 Exercise To Learn Rust 演習第31回になります!今回も技術的に難しい点は少なめです。

今回の関連ページ

[07_threads/09_bounded] 上限付きチャネル

問題はこちらです。

src/lib.rs
// TODO: Convert the implementation to use bounded channels.
use crate::data::{Ticket, TicketDraft};
use crate::store::{TicketId, TicketStore};
use std::sync::mpsc::{Receiver, Sender};

pub mod data;
pub mod store;

#[derive(Clone)]
pub struct TicketStoreClient {
    sender: todo!(),
}

impl TicketStoreClient {
    pub fn insert(&self, draft: TicketDraft) -> Result<TicketId, todo!()> {
        todo!()
    }

    pub fn get(&self, id: TicketId) -> Result<Option<Ticket>, todo!()> {
        todo!()
    }
}

pub fn launch(capacity: usize) -> TicketStoreClient {
    todo!();
    std::thread::spawn(move || server(receiver));
    todo!()
}

enum Command {
    Insert {
        draft: TicketDraft,
        response_channel: todo!(),
    },
    Get {
        id: TicketId,
        response_channel: todo!(),
    },
}

pub fn server(receiver: Receiver<Command>) {
    let mut store = TicketStore::new();
    loop {
        match receiver.recv() {
            Ok(Command::Insert {
                draft,
                response_channel,
            }) => {
                let id = store.add_ticket(draft);
                todo!()
            }
            Ok(Command::Get {
                id,
                response_channel,
            }) => {
                let ticket = store.get(id);
                todo!()
            }
            Err(_) => {
                // There are no more senders, so we can safely break
                // and shut down the server.
                break;
            }
        }
    }
}
その他のファイル
src/data.rs
use crate::store::TicketId;
use ticket_fields::{TicketDescription, TicketTitle};

#[derive(Clone, Debug, PartialEq)]
pub struct Ticket {
    pub id: TicketId,
    pub title: TicketTitle,
    pub description: TicketDescription,
    pub status: Status,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TicketDraft {
    pub title: TicketTitle,
    pub description: TicketDescription,
}

#[derive(Clone, Debug, Copy, PartialEq, Eq)]
pub enum Status {
    ToDo,
    InProgress,
    Done,
}
store.rs
use crate::data::{Status, Ticket, TicketDraft};
use std::collections::BTreeMap;

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct TicketId(u64);

#[derive(Clone)]
pub struct TicketStore {
    tickets: BTreeMap<TicketId, Ticket>,
    counter: u64,
}

impl TicketStore {
    pub fn new() -> Self {
        Self {
            tickets: BTreeMap::new(),
            counter: 0,
        }
    }

    pub fn add_ticket(&mut self, ticket: TicketDraft) -> TicketId {
        let id = TicketId(self.counter);
        self.counter += 1;
        let ticket = Ticket {
            id,
            title: ticket.title,
            description: ticket.description,
            status: Status::ToDo,
        };
        self.tickets.insert(id, ticket);
        id
    }

    pub fn get(&self, id: TicketId) -> Option<&Ticket> {
        self.tickets.get(&id)
    }
}
tests/insert.rs
use bounded::data::{Status, TicketDraft};
use bounded::launch;
use ticket_fields::test_helpers::{ticket_description, ticket_title};

#[test]
fn works() {
    let client = launch(5);
    let draft = TicketDraft {
        title: ticket_title(),
        description: ticket_description(),
    };
    let ticket_id = client.insert(draft.clone()).unwrap();

    let client2 = client.clone();
    let ticket = client2.get(ticket_id).unwrap().unwrap();
    assert_eq!(ticket_id, ticket.id);
    assert_eq!(ticket.status, Status::ToDo);
    assert_eq!(ticket.title, draft.title);
    assert_eq!(ticket.description, draft.description);
}

解説

Book には「無制限にメッセージを送れる channel だと送信者側が多すぎる場合メモリがパンクしてしまう可能性があるから、プロダクトコードでは 絶対に 使うな!制限付きの sync_channel を使え!」という強い思想が書かれています。というわけで、クライアントからサーバーへのチャネルを sync_channel に置き換えましょう。

src/lib.rs
// TODO: Convert the implementation to use bounded channels.
use crate::data::{Ticket, TicketDraft};
use crate::store::{TicketId, TicketStore};
use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender};

pub mod data;
pub mod store;

#[derive(Clone)]
pub struct TicketStoreClient {
    sender: SyncSender<Command>,
}

impl TicketStoreClient {
    pub fn insert(&self, draft: TicketDraft) -> Result<TicketId, String> {
        let (sender, receiver) = channel();
        self.sender
            .send(Command::Insert {
                draft,
                response_channel: sender,
            })
            .map_err(|e| format!("{:?}", e))?;
        Ok(receiver.recv().unwrap())
    }

    pub fn get(&self, id: TicketId) -> Result<Option<Ticket>, String> {
        let (sender, receiver) = channel();
        self.sender
            .send(Command::Get {
                id,
                response_channel: sender,
            })
            .map_err(|e| format!("{:?}", e))?;
        Ok(receiver.recv().unwrap())
    }
}

pub fn launch(capacity: usize) -> TicketStoreClient {
    let (sender, receiver) = sync_channel(capacity);
    std::thread::spawn(move || server(receiver));
    TicketStoreClient { sender }
}

#[derive(Debug)]
enum Command {
    Insert {
        draft: TicketDraft,
        response_channel: Sender<TicketId>,
    },
    Get {
        id: TicketId,
        response_channel: Sender<Option<Ticket>>,
    },
}

fn server(receiver: Receiver<Command>) {
    let mut store = TicketStore::new();
    loop {
        match receiver.recv() {
            Ok(Command::Insert {
                draft,
                response_channel,
            }) => {
                let id = store.add_ticket(draft);
                response_channel.send(id).unwrap();
            }
            Ok(Command::Get {
                id,
                response_channel,
            }) => {
                let ticket = store.get(id);
                response_channel.send(ticket.cloned()).unwrap();
            }
            Err(_) => {
                // There are no more senders, so we can safely break
                // and shut down the server.
                break;
            }
        }
    }
}

模範解答 の方ではサーバーからクライアントへの返送用チャネルも sync_channel にしていますが、サーバー以外にクライアントにメッセージを送る存在はいませんし、一度しか送らないので筆者はそのままにしてみました、地味なこだわり...

[07_threads/10_patch] PATCH (更新) 機能

問題はこちらです。

lib.rs
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};

// TODO: Implement the patching functionality.
use crate::data::{Ticket, TicketDraft, TicketPatch};
use crate::store::{TicketId, TicketStore};

pub mod data;
pub mod store;

#[derive(Clone)]
pub struct TicketStoreClient {
    sender: SyncSender<Command>,
}

impl TicketStoreClient {
    pub fn insert(&self, draft: TicketDraft) -> Result<TicketId, OverloadedError> { /* 省略 */ }

    pub fn get(&self, id: TicketId) -> Result<Option<Ticket>, OverloadedError> { /* 省略 */ }

    pub fn update(&self, ticket_patch: TicketPatch) -> Result<(), OverloadedError> { todo!() }
}

#[derive(Debug, thiserror::Error)]
#[error("The store is overloaded")]
pub struct OverloadedError;

pub fn launch(capacity: usize) -> TicketStoreClient {
    let (sender, receiver) = sync_channel(capacity);
    std::thread::spawn(move || server(receiver));
    TicketStoreClient { sender }
}

enum Command {
    Insert {
        draft: TicketDraft,
        response_channel: SyncSender<TicketId>,
    },
    Get {
        id: TicketId,
        response_channel: SyncSender<Option<Ticket>>,
    },
    Update {
        patch: TicketPatch,
        response_channel: SyncSender<()>,
    },
}

pub fn server(receiver: Receiver<Command>) {
    let mut store = TicketStore::new();
    loop {
        match receiver.recv() {
            Ok(Command::Insert {
                draft,
                response_channel,
            }) => {
                let id = store.add_ticket(draft);
                let _ = response_channel.send(id);
            }
            Ok(Command::Get {
                id,
                response_channel,
            }) => {
                let ticket = store.get(id);
                let _ = response_channel.send(ticket.cloned());
            }
            Ok(Command::Update {
                patch,
                response_channel,
            }) => {
                todo!()
            }
            Err(_) => {
                // There are no more senders, so we can safely break
                // and shut down the server.
                break;
            }
        }
    }
}

新たな構造体 TicketPatch の定義はこんな感じです。

src/data.rs (抜粋)
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TicketPatch {
    pub id: TicketId,
    pub title: Option<TicketTitle>,
    pub description: Option<TicketDescription>,
    pub status: Option<Status>,
}
その他のファイル
src/data.rs
use crate::store::TicketId;
use ticket_fields::{TicketDescription, TicketTitle};

#[derive(Clone, Debug, PartialEq)]
pub struct Ticket {
    pub id: TicketId,
    pub title: TicketTitle,
    pub description: TicketDescription,
    pub status: Status,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TicketDraft {
    pub title: TicketTitle,
    pub description: TicketDescription,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TicketPatch {
    pub id: TicketId,
    pub title: Option<TicketTitle>,
    pub description: Option<TicketDescription>,
    pub status: Option<Status>,
}

#[derive(Clone, Debug, Copy, PartialEq, Eq)]
pub enum Status {
    ToDo,
    InProgress,
    Done,
}
src/store.rs
use crate::data::{Status, Ticket, TicketDraft};
use std::collections::BTreeMap;

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct TicketId(u64);

#[derive(Clone)]
pub struct TicketStore {
    tickets: BTreeMap<TicketId, Ticket>,
    counter: u64,
}

impl TicketStore {
    pub fn new() -> Self {
        Self {
            tickets: BTreeMap::new(),
            counter: 0,
        }
    }

    pub fn add_ticket(&mut self, ticket: TicketDraft) -> TicketId {
        let id = TicketId(self.counter);
        self.counter += 1;
        let ticket = Ticket {
            id,
            title: ticket.title,
            description: ticket.description,
            status: Status::ToDo,
        };
        self.tickets.insert(id, ticket);
        id
    }

    pub fn get(&self, id: TicketId) -> Option<&Ticket> {
        self.tickets.get(&id)
    }

    pub fn get_mut(&mut self, id: TicketId) -> Option<&mut Ticket> {
        self.tickets.get_mut(&id)
    }
}
tests/check.rs
use patch::data::{Status, TicketDraft, TicketPatch};
use patch::launch;
use ticket_fields::test_helpers::{ticket_description, ticket_title};

#[test]
fn works() {
    let client = launch(5);
    let draft = TicketDraft {
        title: ticket_title(),
        description: ticket_description(),
    };
    let ticket_id = client.insert(draft.clone()).unwrap();

    let ticket = client.get(ticket_id).unwrap().unwrap();
    assert_eq!(ticket_id, ticket.id);
    assert_eq!(ticket.status, Status::ToDo);
    assert_eq!(ticket.title, draft.title);
    assert_eq!(ticket.description, draft.description);

    let patch = TicketPatch {
        id: ticket_id,
        title: None,
        description: None,
        status: Some(Status::InProgress),
    };
    client.update(patch).unwrap();

    let ticket = client.get(ticket_id).unwrap().unwrap();
    assert_eq!(ticket.id, ticket_id);
    assert_eq!(ticket.status, Status::InProgress);
}

「更新処理がなかったから追加しよう!」という問題です。

解説

Book 曰く、「クラサバではなくシングルスレッドだった今までは get_mut で可変参照を得てそのまま変更してしまえば良かったけど、 可変参照をクライアントに送るわけにはいかない ので、代わりにクライアント側から更新用データを送ってもらいサーバー側で更新しよう!」とのことなので、そのように解きます!

lib.rs
impl TicketStoreClient {
    // ...省略...

    pub fn update(&self, ticket_patch: TicketPatch) -> Result<(), OverloadedError> {
        let (response_sender, response_receiver) = sync_channel(1);
        self.sender
            .try_send(Command::Update {
                patch: ticket_patch,
                response_channel: response_sender,
            })
            .map_err(|_| OverloadedError)?;
        Ok(response_receiver.recv().unwrap())
    }
}

// ...省略...

fn server(receiver: Receiver<Command>) {
    let mut store = TicketStore::new();
    loop {
        match receiver.recv() {
            // ...省略...
            Ok(Command::Update {
                patch,
                response_channel,
            }) => {
                let TicketPatch {
                    id,
                    title,
                    description,
                    status,
                } = patch;

                let Some(ticket) = store.get_mut(id) else {
                    response_channel.send(()).unwrap();
                    continue;
                };

                if let Some(title) = title {
                    ticket.title = title;
                }

                if let Some(description) = description {
                    ticket.description = description;
                }

                if let Some(status) = status {
                    ticket.status = status;
                }

                response_channel.send(()).unwrap();
            }
            Err(_) => {
                // There are no more senders, so we can safely break
                // and shut down the server.
                break;
            }
        }
    }
}

特に難しい部分はないですね。更新処理は完了したことだけクライアントに伝えられれば良いので () (ユニット型)を返送しています。更新後データを返しても良さそうですし、成功失敗を返したい場合は真偽値やResult型もありなんじゃないかと思います。とりあえず今回は問題に合わせています。

サーバー側では get_mut で可変参照を得ても特に問題がないため、後は TicketPatchSome になっていたら可変参照を介してデータを更新しています。

今回内容ですが実は問題が残っていて、このエクササイズは次回への伏線だったみたい...というわけで、さっそく次の問題に行きましょう!

次の記事: 【32】 Send・排他的ロック(Mutex)・非対称排他的ロック(RwLock) ~真打 Arc<Mutex<T>> 登場~

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0