はじめに
Rustのチュートリアルのメモです⑥。
リンク集
C++入門
プログラミングRust (日本語)
プログラミング言語Rust入門
実践Rust入門
マルチスレッドのWebサーバーを構築する
こちらがWebサーバを構築するプランです:
- TCPとHTTPについて少し学ぶ。
- ソケットでTCP接続をリッスンする。
- 少量のHTTPリクエストを構文解析する。
- 適切なHTTPレスポンスを生成する。
- スレッドプールでサーバのスループットを強化する。
ですが、取り掛かる前に、ある小さな事実に触れなければなりません: わたしたちがこれから行うやり方は、RustでWebサーバを構築する最善の方法ではないだろうということです。 これから構築するよりもより完全なWebサーバとスレッドプールの実装を提供する製品利用可能な多くのクレートが、 https://crates.io/ で利用可能なのです。
しかしながら、この章での意図は、学習を手助けすることであり、簡単なやり方を選ぶことではありません。 Rustはシステムプログラミング言語なので、取りかかる抽象度を選ぶことができ、 他の言語で可能だったり実践的だったりするよりも低レベルまで行くことができます。一般的な考えと将来使う可能性のあるクレートの背後にある技術を学べるように、 手動で基本的なHTTPサーバとスレッドプールを書きます。
シングルスレッドのWebサーバを構築する
TCP: 情報がとあるサーバから別のサーバへどう到達するかの情報を記述するもの
HTTP: リクエストとレスポンスの中身を定義することでTCPの上に成り立っている
技術的にはHTTPを他のプロトコルとともに使用することができるが、過半数の場合、HTTPはTCPの上にデータを送信する
TCP接続をリッスンする
use std::net::TcpListener;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap(); // unwrapは学習用なので
// 接続が確立した
println!("Connection established!");
}
}
bind関数は、新しいTcpListenerインスタンスを返すという点でnew関数のような働きをする
bindと呼ばれている理由は、ネットワークにおいて、リッスンすべきポートに接続することは、「ポートに束縛する」(binding to a port)として知られている
bind関数はResultを返し、束縛が失敗することもあることを示している
ポート80に接続するには管理者権限が必要なので、管理者以外では束縛は上手くいかない
incomingメソッドは、一連のストリームを与えるイテレータを返す
(具体的には、型TcpStream)
マルチスレッドサーバー
シングルスレッドは、サーバのリクエストを順番に処理する
つまり、最初の接続が終了し終わるまで、2番目の接続は処理しないということ
サーバの受け付けるリクエストの量が増えるほど、この連続的な実行は、最適ではなくなる
サーバが処理するのに長い時間がかかるリクエストを受け付けたら、新しいリクエストは迅速に処理できても、続くリクエストは長いリスエストが完了するまで待たなければならなくなる
シングルスレッドのサーバの実装で遅いリクエストをシミュレーションする
fn handle_connection_add_sleep_request(mut stream: TcpStream) {
let mut buffer = [0; 512]; // 512バイト(0が512個)
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n"; // GETリクエスト文(bでバイトに変換している)
let sleep = b"GET /sleep HTTP/1.1\r\n"; // sleepリクエスト
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else if buffer.starts_with(sleep) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let mut file = File::open(filename).unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).unwrap();
let response = format!("{}{}", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
スレッドプールでスループットを向上させる
スレッドプールは、タスクを処理する準備のでききた一塊りの大量に生成されたスレッドを待機させている状態のこと
プログラムが新しいタスクを受け取ったら、プールのスレッドのどれかをタスクにあてがい、そのスレッドがそのタスクを処理する
プールの残りのスレッドは、最初のスレッドが処理中にやってくる他のあらゆるタスクを扱うために再利用可能
最初のスレッドがタスクの処理を完了したら、アイドル状態のスレッドプールに戻り、新しいタスクを処理する準備ができる
スレッドプールにより、並行で接続を処理でき、サーバのスループットを向上する
無限にスレッドを大量生成するのではなく、プールに固定された数のスレッドを待機させる
リクエストが来るたびに新しいスレッドを生成させたら、1000万リクエストが来れば、サーバのリソースを使い尽くし、リクエストの処理を停止に追い込むことになる
プールは、やってくるリクエストのキューを管理する
プールの書くスレッドがこのキューからリクエストを取り出し、リクエストを処理し、そして、別のリクエストをキューに要求する
この設計により、Nリクエストを並行して処理できる
各スレッドが実行時に時間のかかるリクエストに応答していたら、続くリクエストはそれでも、キュー内で待機させられてしまうこともあるが、その地点に到達する前に扱える時間のかかるリクエスト数を増加させた
このテクニックはWebサーバのスループットを向上させる多くの方法の1つに過ぎない
各リクエストに対してスレッドを立ち上げられる場合のコードの構造
// 無限にスレッドを起ち上げる
thread::spawn(|| {
// /へのリクエストは、/sleepが完了するのを待機しなくても済む
println!("新しいスレッド生成完了!");
handle_connection_add_sleep_request(stream);
});
有限数のスレッド用に似たインタフェースを作成する
pub struct ThreadPool;
impl ThreadPool {
/// 新しいThreadPoolを生成する。
///
/// sizeがプールのスレッド数です。
///
/// # パニック
///
/// sizeが0なら、`new`関数はパニックします。
///
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
// --snip--
}
ThreadPoolからスレッドにコードを送信する責任を負うWorker構造体
標準ライブラリはスレッドを生成する手段として、thread::spawnを提供し、thread::spawnは、生成されるとすぐにスレッドが実行すべき何らかのコードを得ることを予期する
ところが、我々の場合、スレッドを生成して、後ほど、送信するコードを待機してほしい
標準ライブラリのスレッドの実装は、それをするいかなる方法も含んでいないので、それを手動で実装する
この新しい振る舞いを管理するスレッドとThreadPool間に新しいデータ構造を導入することでこの振る舞いを実装する
このデータ構造をWorkerと呼び、プール実装では一般的な用語
レストランのキッチンで働く人々で例えると
労働者は、お客さんからオーダーが来るまで待機し、それからそれらのオーダーを取り、満たすことに責任を負う
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>, // Workerインスタンスのベクタを保持
}
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
// --snip--
}
// idとJoinhandle<()>を保持する
// id番号を取り、idと空のクロージャで大量生産されるスレッドを
// 保持するWorkerインスタンスを返すWorker::new関数を定義
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
チャンネル経由でスレッドにリクエストを送信する
チャンネルをキューの仕事として機能させ、executeはThreadPoolからWorkerインスタンスに仕事を送り、これが仕事をスレッドに送信する
- ThreadPoolはチャンネルを生成し、チャンネルの送信側につく
- 各Workerは、チャンネルの受信側につく
- チャンネルに送信したいクロージャを保持する新しいJob構造体を生成する
- executeメソッドは、実行したい仕事をチャンネルの送信側に送信する
- スレッド内で、Workerはチャンネルの受信側をループし、受け取ったあらゆる仕事のクロージャを実行する
use std::sync::mpsc;
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl Worker {
// チャンネルの受信側をワーカーに渡す
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
excuteメソッドを実装する
// FnOnceクロージャの型を保持するトレイトオブジェクト
type Job = Box<FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
// 新しいJobインスタンスを生成して、その仕事を
// チャンネルの送信側に送信する
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
// 受信側が全スレッド停止させた場合などにエラーになるのを防ぐ
// コンパイラにはわからないので
self.sender.send(job).unwrap();
}}
}
// --snip--
impl Worker {
// チャンネルの受信側をワーカーに渡す
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
// lockしてミューテックスを獲得し、それからunwrapを呼び出して、エラーの際にはパニックする
let job = receiver.lock().unwrap().recv().unwrap();
// ワーカー{}は仕事を得ました; 実行します
println!("Worker{} got a job; executing.", id);
// これではエラーになる
(*job)();
// Box<T>に格納されたFnOnceクロージャを呼び出すためには(Job型エイリアスが)、
// 呼び出す際にクロージャがselfの所有権を奪うので、
// クロージャは自身をBox<T>からムーブする必要がある
// 原則、Box<T>から値をムーブすることができない
// なぜならコンパイラは、Box<T>の内側の値がどれほどの大きさなのか検討がつかないから
// 型の既知のサイズがなにかを知るためにはSelfを使う
}
});
// --snip--
}
FnBoxトレイトにcall_boxという1つのメソッドがあり、これは、self: Boxを取ってselfの所有権を奪い、Boxから値をムーブする点を除いて、他のFn*トレイトのcallメソッドと類似している
次に、FnOnce()トレイトを実装する任意の型Fに対してFnBoxトレイトを実装する
実質的にこれは、あらゆるFnOnce()クロージャがcall_boxメソッドを使用できることを意味する
trait FnBox {
fn call_box(self: Box<Self>);
}
impl<F: FnOnce()> FnBox for F {
fn call_box(self: Box<F>) {
(*self)()
}
}
type Job = Box<FnBox + Send + 'static>;
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
// ジョブを実行できる
// call_boxでselfの所有権をムーブできたので
job.call_box();
}
});
Worker {
id,
thread,
}
}
}
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {} got a job; executing.", id);
job.call_box();
}
こうすると、望みどおりのスレッドの振る舞いにならない
遅いリクエストが来ても他のリクエストが処理されるのを待機させてしまう
Mutex構造体には公開のunlockメソッドがない
ロック所有権が、lockメソッドが返すLockResult>内のMutexGuradのライフタイムに基づくから
正常なシャットダウンとお片付け
プールの各スレッドに対してjoinを呼び出すDropトレイトを実装する
そして、スレッドに新しいリクエストの受付を停止し、終了するように教える方法を実装する
このコードが動いているのかを確かめるために、サーバを変更して正常にスレッドプールを終了する前に2つしかリクエストを受け付けないようにする
ThreadPoolにDropトレイトを実装する
スレッドがドロップされると、スレッドは全てjoinして、作業を完了するのを確かめる
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
// ワーカーを終了する
println!("Shutting down worker {}", worker.id);
// スレッドプールがスコープを抜けた時にスレッドをjoinさせる
worker.thread.join().unwrap(); // Error
}
}
}
// --snip--
struct Worker {
// joinメソッドがthreadを所有するWorkerインスタンスからスレッドをムーブする必要がある
// Workerが代わりにOption<thread::JoinHandle<()>> を保持してればOptionに対してtakeメソッドを呼び出し、
// Some列挙師から値をムーブし、その場所にNone列挙しを残すことができる
// 言い換えれば、実行中のWorkerにはthreadにSome列挙士があり、
// 片付けたいときには、ワーカーが実行するスレッドがないようにSomeをNoneに置き換える
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
// --snip--
// threadはOption値なので
// Workerを作成するときはSomeで包む、ドロップしてワーカーが実行するスレッドがないときはNoneになっている
Worker {
id,
thread: Some(thread),
}
}
}
### スレッドに仕事をリッスンするのを止めるように通知する
Workerインスタンスのスレッドで実行されるクロージャのロジックをスレッドが、実行すべきJobか、リッスンをやめて無限ループを抜ける通知をリッスンするように変更する
```rust:Message値を送受信し、WorkerがMessage::Terminateを受け取ったら、ループを抜ける
// スレッドが実行すべきJobを保持するNewJob
// スレッドをループから抜けさせ、停止させるTerminate
enum Message {
NewJob(Job),
Terminate,
}
impl Worker {
// Receiverの型をMessageに変更
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
// jobをMessageに変更
let message = receiver.lock().unwrap().recv().unwrap();
// Message列挙子をみてmatchでワーカーを動作を決定する
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; execting.", id);
job.call_box();
}
Message::Terminate => {
// ワーカー{}は停止するように指示された
println!("Worker {} was told to terminate.", id);
break;
}
}
}
});
// --snip--
}
fn main() {
// --snip--
// take(2)メソッドで2つのリクエストを処理した後にサーバを閉じる
for stream in listener.incoming().take(2) {
// --snip--
}
}