この記事はRust Advent Calendar 2024 シリーズ3の24日目の記事です。
目次
- Rustの非同期処理についてざっくり
- Streamについて
- wakeの話
- poll_nextを実装
- デコーダの実装
- テスト
- Example オレオレプロトコルサーバプログラム
- サーバプログラム起動
6. まとめと感想
7. 最後に
8. おまけ
9. ライセンス情報
1. イントロダクション
今回、Rustで非同期処理を学ぶ中で、簡単な独自ネットワークプロトコルを作成しました、そこで本記事では、Streamを利用したプロトコルデコーダとサーバプログラムの実装例を紹介します。本記事を読むことで、Rustの非同期プログラミングにおけるStreamの仕組みとその応用例を学び、自作のプロトコルで非同期処理を体験してもらえると嬉しいです。
Rustは読み書き始めて1年くらいで仕事には使用していませんが、個人的にRustでMQTTのサーバプログラムを書いています。そこでまずはMQTTで実装するまえにオレオレネットワークプロトコルでStreamについて理解したいと思ったのがモチベーションです。プロトコルの内容はMQTTを意識したものとなっていますが本編ではMQTTに関することはほぼ出てきません。
2. Goal
- 簡単なプロトコルのデコーダを書いて、RustでStreamを処理するライブラリを作成する
- 作成したライブラリでオレオレプロトコルサーバプログラムを作成して動作させる
3. 技術背景
Rustの非同期についてざっくり
ざっくりですが、
RustはGoとは異なり、標準で非同期ランタイムを含んでいません。(ランタイムとはプログラムを動作させるプログラム)
非同期ランタイムを含んでいませんが、言語自体に非同期を実現する仕組みであるFuture
があり、それとtokioやasync-stdなどの非同期ランタイムライブラリを組み合わせることで非同期の処理を実現している。ということです。
ここではRustのasyncに深く立ち入らない代わりに、先人の方々のRustの非同期を理解するのに役立つ記事にリンクを記事の末尾に貼りましたが、Future trait/awakeについて1つページを紹介するとしたら Asynchronous Programming in RustのFutureトレイトの章 が参考になります。
Streamについて
CPUよりも遅いI/Oについて、I/Oを待つ間にタスクを手放すことができるので、非同期のIOであるネットワークやファイルと相性が良く、リソースを効率利用できます。Streamについて理解するための下記の記事が参考になりました。
https://synamon.hatenablog.com/
上記でも充分ですが、せっかくですのでここでfutures::stream::StreamのDocsを読んでみます。
If Future is an asynchronous version of T, then Stream is an asynchronous version of Iterator. A stream represents a sequence of value-producing events that occur asynchronously to the caller.
The trait is modeled after Future, but allows poll_next to be called even after a value has been produced, yielding None once the stream has been fully exhausted. (https://docs.rs/futures/latest/futures/stream/trait.Stream.html
)
Future<Output = T>
がTの非同期版だとしたら``Stream`は`Iterator`の非同期版ということのようです。
poll_nextは何度も呼ばれます。その返り値は3種類
- Poll::Pending ・・・まだ準備ができていない
- Poll::Ready(Some(val)) ・・・ストリームが正しく値を生成する。まだ続くかも
- Poll::Ready(None)・・・終了でこれ以上poll_nextが呼び出されることはない
size_hintは任意の実装で、今はネットワークのStreamを想定しているので、ここでは省略します。
Futureとの対比の通りtraitの定義を見るとStreamとFutureは似ています。std::future::FutureのDocsにも目を通してみます。
pub trait Future {
type Output;
// Required method
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
なのに対して
pub trait Stream {
type Item;
// Required method
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;
// Provided method
fn size_hint(&self) -> (usize, Option<usize>) { ... }
}
です。
Poll::PendingとPoll::Ready(val)の説明もほぼ同じですが、Streamには終了の定義が追加されています。
また、Streamには説明のなかったWakeの話がありましたので、次節で確認します。
wakeの話
Wakeは非同期ランタイムがタスクの進行を管理する方法として重要な仕組みです。Docsには下記のようにあります。
When a future is not ready yet, poll returns Poll::Pending and stores a clone of the Waker copied from the current Context. This Waker is then woken once the future can make progress. For example, a future waiting for a socket to become readable would call .clone() on the Waker and store it.
When a signal arrives elsewhere indicating that the socket is readable, Waker::wake is called and the socket future’s task is awoken. Once a task has been woken up, it should attempt to poll the future again, which may or may not produce a final value. (https://doc.rust-lang.org/std/future/trait.Future.html)
Poll::Pending状態から起動するためにWakerで非同期ランタイムに動かしてもらうようにお願いする。そのタイミングはI/Oが到着した際など、ランタイム外の実装の誰かがcontextを通じて行う。逆に言えば、Poll::Pendingを返した後、誰かがWakerを呼び出すまでpollを試みない、という理解です。
4. 今回作成するプロトコル
武器がすべて手に入ったように思えますので今回実装する簡単なプロトコルについて説明します。
headerとして2bytes(uint16)のみでpayloadの長さを定義し、その後にその長さ分だけDataが続くようなプロトコルです。2bytesのヘッダーを採用した理由はシンプルさを重視したためで、今回の目標であるStreamの仕組みの理解に集中できるように、かつ、IoTデバイス間通信の基礎を再現するイメージで設計しました。
方向はクライアントからサーバへの一方通行のみです。
Field | Size | Spec |
---|---|---|
length | 2bytes | Payload Length |
payload | variable length | Data |
もちろん、このプロトコルが実用に耐えるものではなく、意味はありませんが、
イントロダクションで述べた通り、MQTTを意識したプロトコルになっています。
動作としては下記のようになります。bufferは構造体に用意します。「データが到着」はストリームからデータを取得できる場合に動作します。
。図が複雑に見えますが、やりたいことは単純で、headerを解読しているフェーズとpayloadを解読しているフェーズが存在し、データが不足している場合は追加のデータを待つといった挙動を実現したいというだけです。
5. 実装
フル版は
にありますのでここでは大雑把に説明します。
コードのコアの部分は下記になります。Stream部分です。この部分のキモは「もう一度呼び出してほしい場合(状態遷移図の★の箇所)にはcx.waker().wake_by_ref();
を呼ぶこと」です。
poll_nextを実装
impl<S> Stream for OreStream<S>
where
S: Stream<Item = ByteStream> + Unpin,
{
type Item = OreProtocolStreamResult;
// poll_nextが呼び出されるたびに、現在の状態に応じて処理を進める
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.as_mut().project();
// バッファにデータが残っている場合
if !this.buffer.is_empty() {
match this.protocol.state {
decoder::ProtocolState::WaitHeader => {
// 現在ヘッダーを読み取っているフェーズ
if let Err(_e) = this.protocol.decode_fixed_header(this.buffer) {
// ヘッダーが完全に読み取れない場合、データの到着を待つ
} else {
cx.waker().wake_by_ref();// 処理を再開させるためにWakerを呼び出す
return Poll::Pending;// 再試行を要求
}
}
decoder::ProtocolState::WaitPayload => {
// 現在ペイロードを読み取っているフェーズ
if let Err(_e) = this.protocol.decode_payload(this.buffer) {
// ヘッダーが完全に読み取れない場合、データの到着を待つ
} else {
// 完了したプロトコルを返し、次の処理へ進む
let completed_protocol = std::mem::take(this.protocol);
*this.protocol = decoder::OreProtocol::new();
return Poll::Ready(Some(Ok(completed_protocol)));
// ここでひとつ正常終了!!
}
}
}
}
// ネットワークストリームから新しいデータを取得
match ready!(this.stream.poll_next(cx)) {
Some(Ok(bytes)) => {
this.buffer.extend_from_slice(&bytes);
cx.waker().wake_by_ref();
return Poll::Pending;
}
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => {
return Poll::Ready(None);
}
};
}
}
デコーダを実装
pub fn decode_fixed_header(&mut self, buf: &mut BytesMut) -> Result<()> {
if buf.len() < 2 {
return Err(OreErrorInsufficient);
}
self.payload_size = ((buf[0] as u16) << 8) + buf[1] as u16;
self.state = ProtocolState::WaitPayload;
// 前にすすめる
buf.advance(2);
Ok(())
}
pub fn decode_payload(&mut self, buf: &mut BytesMut) -> Result<()> {
if buf.len() < self.payload_size as usize {
return Err(OreErrorInsufficient);
}
// buf.split_toで消費するのでadvanceする必要がない
self.payload = Some(buf.split_to(self.payload_size.into()));
Ok(())
}
テスト
Streamにしてくれるfutures::stream::iterを使ってByteのstreamを生成し、実装したライブラリでプロトコルのStreamに変換して期待値と比較します。
Streamが分割されていたとしても成立するはずなので何パターンか分割したバージョンのテストを作成します。
// Vecをiterにする関数を用意しておく
fn mock_stream(
testdata: Vec<Result<Bytes, io::Error>>,
) -> impl futures::Stream<Item = ByteStream> {
stream::iter(testdata)
}
#[tokio::test]
async fn test_ore_stream_single() {
let mock_stream = mock_stream(vec![
Ok(Bytes::from(vec![0x00, 0x0b])), //ここ
Ok(Bytes::from("hello world")), // ここ
]);
/*
// このようにしても成り立つ
let mock_stream = mock_stream(vec![
Ok(Bytes::from(vec![0x00])),
Ok(Bytes::from(vec![0x0b])),
Ok(Bytes::from("hello")),
Ok(Bytes::from(" world")),
Ok(Bytes::from(vec![0x00])),
Ok(Bytes::from(vec![0x0c])),
Ok(Bytes::from("hello wo")),
Ok(Bytes::from("rld2")),
]);
*/
let mut ore_stream = OreStream::new(mock_stream);
let mut results = vec![];
while let Some(result) = ore_stream.next().await {
match result {
Ok(protocol) => results.push(protocol),
Err(e) => {
eprintln!("Error: {:?}", e);
break;
}
}
}
assert_eq!(
results[0].clone().payload.unwrap()[..],
Bytes::from_static(b"hello world")
);
}
Example オレオレプロトコルサーバプログラム
tokioを利用してTCPサーバを構築します。
use bytes::Bytes;
use futures::{StreamExt, TryStreamExt};
use tokio_util::codec::{BytesCodec, FramedRead};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use tokio::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server listening on 127.0.0.1:8080");
loop {
let (socket, _addr) = listener.accept().await?;
tokio::spawn(async move {
let framed = FramedRead::new(socket, BytesCodec::new())
.map_ok(|bytes_mut| Bytes::from(bytes_mut));
let mut ore_stream = ore_protocol::OreStream::new(framed);
while let Some(item) = ore_stream.next().await {
match item {
Ok(protocol) => {
println!("Decoded protocol: {:?}", protocol);
}
Err(e) => {
eprintln!("Error: {:?}", e);
break;
}
}
}
});
}
}
サーバプログラム起動
[~/ore_protocol]$ cargo run --example tokio_tcp_server
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.12s
Running `target/debug/examples/tokio_tcp_server`
Server listening on 127.0.0.1:8080
nc(netcat)を使ってTCPパケットをサーバ側にオレオレプロトコルで送信します。
$ echo -ne '\x00\x0bhello worl2' | nc 127.0.0.1 8080
サーバ側に受けたdecodeされた結果が出力されます。
Decoded protocol: OreProtocol { payload_size: 11, state: WaitPayload, payload: Some(b"hello worl2") }
6. まとめと感想
- RustはGoとは異なりランタイムの種類が複数あり、所有権の縛りもあることから非同期のハードルが高く感じることもありますが、一度理解すると少ないパーツで構成されているので簡単に進めることができました。
- オレオレネットワークプロトコルのサーバを実装しました。状態遷移図も改めて書き起こすことでコードとは異なる側面で動作を記述されることがわかり設計書の効用を改めて感じました。
- 今後は非同期ランタイムの側面から複数のランタイムを比較したり自作してみたりして理解を深めていけるといいなと思いました。
7. 最後に
今年の年始頃にRustを始めたのですが、書いていて楽しいプログラミング言語だと思っています。最近はChatGPTなどのLLMを活用しながらラバーダッキングの如く進められるのでより捗ると感じております。
記事に関して何かございましたらコメントいただければ幸いです。
最後まで読んでいただきありがとうございました。
8. おまけ
学習の過程で得た知識や他に参考になった記事を並べておきます。
mqttについて
MQTTにもヘッダーにRemaining Lengthがあり、残りのバイト数が示されます。
今回のプロトコルよりもはるかに複雑ですが、エッセンスは今回のオレオレプロトコルと共通の部分があると思っています。OASISのspecです。
Rustの非同期に関連する資料
ready!
futures_core::readyはマクロready!で下記のように定義されて、readyの意味の通り、readyになるまでpendingが帰ってきます。ここではStreamのデータを待つために使っています。cargo expand
でマクロを見てみましょう。
// まずは定義
#[macro_export]
macro_rules! ready {
($e:expr $(,)?) => {
match $e {
$crate::task::Poll::Ready(t) => t,
$crate::task::Poll::Pending => return $crate::task::Poll::Pending,
}
};
}
cargo expandすると
// 展開前
match ready!(this.stream.poll_next(cx)) {
Some(Ok(bytes)) => {
this.buffer.extend_from_slice(&bytes);
cx.waker().wake_by_ref();
return Poll::Pending;
}
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => {
return Poll::Ready(None);
}
};
// 展開後
match match this.stream.poll_next(cx) {
::futures_core::task::Poll::Ready(t) => t,
::futures_core::task::Poll::Pending => {
return ::futures_core::task::Poll::Pending;
}
} {
Some(Ok(bytes)) => {
this.buffer.extend_from_slice(&bytes);
cx.waker().wake_by_ref();
return Poll::Pending;
}
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => {
return Poll::Ready(None);
}
};
examplesフォルダ
このようにexamplesフォルダの配下にmain関数があるソースを置くとcargo run --example [ファイル名(拡張子無し)]
で実行できるので便利です。
.
├── Cargo.lock
├── Cargo.toml
├── Readme.md
├── examples
│ └── tokio_tcp_server.rs
├── src
│ ├── decoder.rs
│ └── lib.rs
└── target
cargo run --example tokio_tcp_server
asyncとPinについて
async周りでPin/Unpinが関わってくる理由を理解するのに助かった記事です。
所有権
Streamで値を返すときにSelfから所有権を奪うためにstd::mem::takeを利用しています。
Rust非同期についての参考資料
Asynchronous Programming in Rust
言わずとしれたasyncの公式的なチュートリアルです。
tokio tutorial
非同期ランタイム tokioのチュートリアルです。実装するにあたって非常に参考になります。
tokio tutorialの日本語訳です
ライブラリを使わない非同期処理(前編)
ランタイムの気持ちがわかる解説記事です
9. license情報
記事中で引用したDocs/trait定義のコードのライセンス情報です。
futures = "0.3.31"
Copyright (c) 2016 Alex Crichton
Copyright (c) 2017 The Tokio Authors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
--
Rust
Rust is primarily distributed under the terms of both the MIT license and the Apache License (Version 2.0), with portions covered by various BSD-like licenses.
See LICENSE-APACHE, LICENSE-MIT, and COPYRIGHT for details.