なにこれ
RUDP(Reliable User Datagram Protocol)を使ってみます。
とりあえずここでの定義は、UDPよりも高い確率で相手に届くUDPを総称してRUDPと呼称します。
(確実に相手に届くプロトコルは今のところないので「高い確率」と言ってます。断線等の影響とかそういう話)
RUDPの実装にはKCPが有名所っぽい。なんの略かはわからんですね。
(HTTP/3で使われているQUICもRUDPの一種ですが。ここでは触れません)
KCPの概略はここに書いてあるので読んでみると良いかもしれません。
ライブラリはこれを使っていきます。
紆余曲折あり、上記のライブラリを改造したのが以下です。
今回はこの紆余曲折を説明しつつ、実際にKCPで通信をしてみます。
やりたいこと一覧
半二重通信
デフォルトでサポートしてくれてます。
KcpStreamという型がいい感じにやってくれているので良さげです。
L4通信セッションの管理もこいつがやってます。
でも、ソケットをWriterとReaderに分割できないのが難点。
というかググって出てくるRustのKCPライブラリは総じてこれ。
ソケットをReadとWriteで分割してそれぞれの状態に依存せずに読み書きしたい。
つまり全二重通信したい。
全二重通信
ソケットの分割をしたい。
こんな感じでやっていく。
送信スレッドを用意する(準備A)
- 標準入力を受け付けるスレッドを用意する
- サーバへソケットを介しデータを送信するスレッドを用意する
標準入力から受け付けた<データ>
を、送信用スレッドに投げると
transmitter.send(<データ>)
みたいな感じで投げるようにする。
準備A-1のスレッドが管理する標準入力から、mpscチャンネルを通じて
準備A-2のスレッドが管理しているソケットへ投げてサーバに送信される。
受信スレッドを用意する。(準備B)
- ソケットに受信したデータをひたすら読むスレッドを用意する
- 受信データを処理するスレッドを用意する
準備B-1のスレッドが管理する受信用ソケットから、mpscチャンネルを通じて
準備B-2のスレッドが管理している処理ロジックへ投げて受信を完了する。
デフォルトのKCPライブラリでやるとどうなるの?
ただし、この段階では前述した通り、ソケットのReadとWriteで分割ができないです。
一旦、1つのKcpStream
オブジェクトでやろうとする事を考えます。
(Arc<Mutex<KcpStream>>
型を2つのスレッドで共有するという方法です。)
- 受信スレッドでブロッキングする(準備B-1のスレッド)
- 任意のタイミングで送信したいけど、1でMutexを取得されているのでロックされる
- サーバがなんかデータを投げるまではひたすらにブロックする(準備A-2のスレッドが動かない)
悲しい。
というわけで、TokioのTcpStreamを見習って、splitメソッドを生やす。
全二重通信の実装へ
use async_std::{
io::{stdin, stdout},
prelude::*,
task::spawn,
};
use tokio_kcp::*;
// とにかくソケットに書き込むだけのスレッド(準備A-2)
async fn sender(mut rx: mpsc::Receiver<Communicate>, mut sock_sender: OwnedWriteHalf) {
loop {
match rx.recv().await {
Some(Communicate::Send(data)) => {
// ソケットに送信する
sock_sender.send(&data).await;
}
None => { /* とりあえず何もしない */ }
_ => {}
}
}
}
// とにかくソケットを監視し続けて、サーバから送信されるデータを受信するスレッド(準備B-1)
async fn reader(tx: mpsc::Sender<Communicate>, mut sock_reader: OwnedReadHalf) {
let mut buf = [0u8; 8192];
loop {
match sock_reader.recv(&mut buf).await {
Ok(size) => {
// ソケットに来たデータを取得する。
// 読み取ったsizeが0の場合、ソケットがcloseされている(EOF)ので終わる。
if size == 0 {
break;
}
tx.send(Communicate::Recv(buf[..size].to_vec())).await;
}
Err(e) => { /* とりあえず何もしない */ }
}
}
println!("session closed.");
}
#[tokio::main]
async fn main() {
let mut task = Vec::new();
let mut config = KcpConfig::default();
config.stream = true;
let addr = "127.0.0.1:12345".parse().unwrap();
let conn = KcpStream::connect(&config, addr).await.unwrap();
let (transmitter, receiver) = conn.split_owned();
{
// 準備A-2を起動する
let (tx, rx) = mpsc::channel(100);
task.push(Task {
task: async_std::task::spawn(sender(rx, transmitter)),
ch: Channel::Sender(tx),
});
}
{
// 準備B-1を起動する
let (tx, rx) = mpsc::channel(100);
task.push(Task {
task: async_std::task::spawn(reader(tx, receiver)),
ch: Channel::Receiver(rx),
});
}
// 準備A-1を起動する
let _ = if let Channel::Sender(s) = &task[0].ch {
spawn(send_msg(s.clone()))
} else {
return;
};
// 準備B-2:受信したデータをいい感じに処理するロジック
loop {
let buf = task[1].recv().await;
if buf.len() > 0 {
print!("server > ");
let s = String::from_utf8(buf.to_vec()).unwrap();
print!("{}", s);
} else {
println!("err");
}
}
}
ざっくりこんなことをしたい。
でも、今のライブラリの状態だと無理なので、ライブラリを改造してみる。
というわけで本編へ。
改造してみる
参考にするコードについて
tokioのtokio::net::TcpStream#into_split
を参考にしてみます。
でも、完全に移植なんてできるはずがないので、とりあえず移植の参考になりそうなコードを自前で書いてみようと思います。
実装までRead/Writeだと、何に書き込むのかが若干曖昧かと思いました。
なので、ストリームに紐づく送受信の実装は「Transmitter(送信機)」と「Receiver(受信機)」という名前にしました。
struct TestTransmitter{
inner:Vec<u8>
}
struct TestReceiver{
inner:Vec<u8>
}
struct StreamSession;
struct TestStream{
transmitter:Arc<Mutex<TestTransmitter>>,
receiver:Arc<Mutex<TestReceiver>>,
session:StreamSession
}
// とりあえずこういう構造にすれば、TestStreamのインスタンスから
// TestTransmitter/TestReceiverへ分割するということはできそうです。
impl TestStream{
fn split(&mut self)->(Arc<Mutex<TestTransmitter>>,Arc<Mutex<TestReceiver>>){/*...*/}
}
なんとなく型名が不満。
このまま実装していくのもいいですが、Arc<Mutex<TestTransmitter>>
みたいな長ったらしい型名のまま持ち回るのはすごく嫌です。
ですので、更にtokioのソースコードを参考にします。
tokioのsplitではArcを使用していますが、それをOwnedReadHalfやOwnedWriteHalfという型名に再度変換して短くしています。
更に、今から作るコードはtokioのものに比べると、更にMutexのロックを行うコードを書かなければなりません。
こういったコードもOwnedReadHalfやOwnedWriteHalfに実装したいです。
OwnedReadHalfやOwnedWriteHalfを実装する。
KCPライブラリに存在した元々の関数シグネチャ(インタフェース)は変更せずに、いい感じに実装するとこうなります。
macro_rules! async_run {
($block:expr) => {{
futures::executor::block_on($block)
}};
}
#[derive(Debug)]
pub struct OwnedWriteHalf {
inner: Arc<Mutex<Transmitter>>,
}
impl OwnedWriteHalf {
pub fn poll_send(&mut self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<KcpResult<usize>> {
async_run!(async { self.inner.lock().await.poll_send(cx, buf) })
}
pub async fn send(&mut self, buf: &[u8]) -> KcpResult<usize> {
self.inner.lock().await.send(buf).await
}
}
#[derive(Debug)]
pub struct OwnedReadHalf {
inner: Arc<Mutex<Receiver>>,
}
impl OwnedReadHalf {
pub fn poll_recv(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<KcpResult<usize>> {
async_run!(async { self.inner.lock().await.poll_recv(cx, buf) })
}
pub async fn recv(&mut self, buf: &mut [u8]) -> KcpResult<usize> {
self.inner.lock().await.recv(buf).await
}
}
トランスミッタとレシーバを作ってみる
まずはレシーバ
元々有った処理を切り分けてコピペしただけなので、省略しておきます。
長いし。
#[derive(Debug)]
pub struct Receiver {
session: Arc<KcpSession>,
buffer: Vec<u8>,
buffer_pos: usize,
buffer_cap: usize,
}
impl Receiver {
fn new(session: Arc<KcpSession>) -> Self {
Receiver {
session,
buffer: Vec::new(),
buffer_pos: 0,
buffer_cap: 0,
}
}
/// `recv` data into `buf`
pub fn poll_recv(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<KcpResult<usize>> {
loop {
/*...(省略)...*/
}
}
}
/// `recv` data into `buf`
pub async fn recv(&mut self, buf: &mut [u8]) -> KcpResult<usize> {
future::poll_fn(|cx| self.poll_recv(cx, buf)).await
}
}
次はトランスミッタ
#[derive(Debug)]
pub struct Transmitter {
session: Arc<KcpSession>,
}
impl Transmitter {
fn new(session: Arc<KcpSession>) -> Self {
Transmitter { session }
}
/// `send` data in `buf`
pub fn poll_send(&mut self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<KcpResult<usize>> {
// Mutex doesn't have poll_lock, spinning on it.
let mut kcp = self.session.kcp_socket().lock();
let result = ready!(kcp.poll_send(cx, buf));
self.session.notify();
result.into()
}
/// `send` data in `buf`
pub async fn send(&mut self, buf: &[u8]) -> KcpResult<usize> {
future::poll_fn(|cx| self.poll_send(cx, buf)).await
}
}
いい感じに分割できたので、最後はKcpStreamを整理し、split_owned
メソッドを追加します。
impl KcpStream {
/*...(省略)...*/
// 元々の使用方法ができるように、ブロッキング処理はブロッキング処理のまま実装しておく。
// fork元のテストが使えなくなるのは割りと不便だし。
// そもそもやりたいことはAPIの変更じゃなくてsplit_ownedメソッドの追加なのでこれでOK
// 他のメソッドも同じように整理した。
pub fn poll_send(&mut self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<KcpResult<usize>> {
async_run!(async {self.transmitter.lock().await.poll_send(cx, buf)})
}
pub async fn send(&mut self, buf: &[u8]) -> KcpResult<usize> {
self.transmitter.lock().await.send(buf).await
}
pub async fn recv(&mut self, buf: &mut [u8]) -> KcpResult<usize> {
self.receiver.lock().await.recv(buf).await
}
pub fn poll_recv(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<KcpResult<usize>> {
async_run!(async {self.receiver.lock().await.poll_recv(cx, buf)})
}
// これを追加したかったがために、めっちゃコードが追加されるの巻
pub fn split_owned(&self) -> (OwnedWriteHalf, OwnedReadHalf) {
(
OwnedWriteHalf {
inner: self.transmitter.clone(),
},
OwnedReadHalf {
inner: self.receiver.clone(),
},
)
}
}
これでめでたく、全二重通信ができるようになりました。
use async_std::{
io::{stdin, stdout},
prelude::*,
task::spawn,
};
use tokio_kcp::*;
// とにかくソケットに書き込むだけのスレッド
async fn sender(mut rx: mpsc::Receiver<Communicate>, mut sock_sender: OwnedWriteHalf) {
loop {
match rx.recv().await {
Some(Communicate::Send(data)) => {
// ソケットに送信する
sock_sender.send(&data).await;
}
None => { /* とりあえず何もしない */ }
_ => {}
}
}
}
// とにかくソケットを監視し続けて、サーバから送信されるデータを受信するスレッド
async fn reader(tx: mpsc::Sender<Communicate>, mut sock_reader: OwnedReadHalf) {
let mut buf = [0u8; 8192];
loop {
match sock_reader.recv(&mut buf).await {
Ok(size) => {
// ソケットに来たデータを取得する。
if size == 0 {
break;
}
tx.send(Communicate::Recv(buf[..size].to_vec())).await;
}
Err(e) => { /* とりあえず何もしない */ }
}
}
println!("session closed.");
}
use tokio::sync::mpsc;
#[derive(Debug)]
enum Channel {
Sender(mpsc::Sender<Communicate>),
Receiver(mpsc::Receiver<Communicate>),
}
#[derive(Debug)]
struct Task {
ch: Channel,
task: async_std::task::JoinHandle<()>,
}
impl Task {
async fn recv(&mut self) -> Vec<u8> {
if let Channel::Receiver(recv) = &mut self.ch {
print!("input > ");
stdout().flush().await;
let t = recv.recv();
if let Some(Communicate::Recv(data)) = t.await {
data
} else {
println!("recv error2");
Vec::new()
}
} else {
panic!(
"このpanicはソフトウェアロジックの不具合です(設計上、絶対に通過しないパスです)"
);
}
}
}
#[derive(Clone)]
enum Communicate {
Send(Vec<u8>),
Recv(Vec<u8>),
}
#[tokio::main]
async fn main() {
let mut task = Vec::new();
let mut config = KcpConfig::default();
config.stream = true;
let addr = "127.0.0.1:12345".parse().unwrap();
let conn = KcpStream::connect(&config, addr).await.unwrap();
let (transmitter, receiver) = conn.split_owned();
{
let (tx, rx) = mpsc::channel(100);
task.push(Task {
task: async_std::task::spawn(sender(rx, transmitter)),
ch: Channel::Sender(tx),
});
}
{
let (tx, rx) = mpsc::channel(100);
task.push(Task {
task: async_std::task::spawn(reader(tx, receiver)),
ch: Channel::Receiver(rx),
});
}
let _ = if let Channel::Sender(s) = &task[0].ch {
spawn(send_msg(s.clone()))
} else {
return;
};
loop {
let buf = task[1].recv().await;
if buf.len() > 0 {
print!("server > ");
let s = String::from_utf8(buf.to_vec()).unwrap();
print!("{}", s);
} else {
println!("err");
}
}
}
async fn send_msg(tx: mpsc::Sender<Communicate>) {
loop {
let mut buf = String::new();
let stdin = stdin();
stdin.read_line(&mut buf).await;
tx.send(Communicate::Send(buf.as_bytes().to_vec())).await;
}
}
サーバは半二重の公式のテストサーバを改造してみる。
use std::{net::SocketAddr, time::Duration};
use byte_string::ByteStr;
use log::{debug, error, info};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
time,
};
use tokio_kcp::{KcpConfig, KcpListener};
#[tokio::main]
async fn main() {
env_logger::init();
let config = KcpConfig::default();
let server_addr = "127.0.0.1:12345".parse::<SocketAddr>().unwrap();
let mut listener = KcpListener::bind(config, server_addr).await.unwrap();
loop {
let (mut stream, peer_addr) = match listener.accept().await {
Ok(s) => s,
Err(err) => {
println!("accept failed, error: {}", err);
time::sleep(Duration::from_secs(1)).await;
continue;
}
};
println!("accepted {}", peer_addr);
tokio::spawn(async move {
let mut buffer = [0u8; 8192];
while let Ok(n) = stream.read(&mut buffer).await {
println!("recv {:?}", ByteStr::new(&buffer[..n]));
if n == 0 {
break;
}
stream.write_all(&buffer[..n]).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
stream.write_all("サーバからの追加メッセージです\n".as_bytes()).await.unwrap();
println!("echo {:?}", ByteStr::new(&buffer[..n]));
}
println!("client {} closed", peer_addr);
});
}
}
2回に分けて受信をするので、半二重クライアントの場合は2回recvをしないといけないはずだけど
この実装で全二重がうまいこと実装できていればサーバに合わせなくてもいい感じにやってくれるはず。
というわけで、受信スレッドと送信スレッドを分割できました。
めでたきめでたき。
多分、blockする系の処理って、spawnじゃないほうがいいんですよね。
このあたりは手を抜いているのでちゃんと実装したい場合は非同期プログラミングの入門書みたいなのを読んだほうが良さそう。
ではこんなところで。