0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

今年の言語はRust その62

Posted at

今年の言語はRust その62

Rustを学びます

Rustの日本語ドキュメント 2nd Edition
https://doc.rust-jp.rs/book/second-edition/

オリジナル(英語)
https://doc.rust-lang.org/book/

実行環境

$ cargo -V
cargo 1.33.0 (f099fe94b 2019-02-12)

$ rustup -V
rustup 1.17.0 (069c88ed6 2019-03-05)

$ rustc --version
rustc 1.33.0 (2aa4c46cf 2019-02-28)

$  cat /proc/version
Linux version 4.14.97-74.72.amzn1.x86_64 (mockbuild@gobi-build-64002) 
(gcc version 7.2.1 20170915 (Red Hat 7.2.1-2) (GCC)) 
# 1 SMP Tue Feb 5 20:59:30 UTC 2019

$ uname -a
Linux ip-10-100-0-8 4.14.97-74.72.amzn1.x86_64 
# 1 SMP Tue Feb 5 20:59:30 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux

$ cat /etc/\*release
NAME="Amazon Linux AMI"
VERSION="2018.03"
ID="amzn"
ID_LIKE="rhel fedora"
VERSION_ID="2018.03"
PRETTY_NAME="Amazon Linux AMI 2018.03"
ANSI_COLOR="0;33"
CPE_NAME="cpe:/o:amazon:linux:2018.03:ga"
HOME_URL="http://aws.amazon.com/amazon-linux-ami/"
Amazon Linux AMI release 2018.03

20. マルチスレッドのWebサーバーを構築する

やっとここまで来ました。最後の章です。

実はWebサーバーの仕組みが知りたくて知りたくて60回もやってきました。

以下がレッスン内容になります

  • TCPとHTTPについて少し学ぶ。
  • ソケットでTCP接続をリッスンする。
  • 少量のHTTPリクエストを構文解析する。
  • 適切なHTTPレスポンスを生成する。
  • スレッドプールでサーバのスループットを強化する。

20.2 マルチスレッドイヒ

現在のサーバーで実装で遅いリクエストをシミュレーションする

まずは定番の__重たい処理__ を入れます

extern crate hello;

use std::thread;
use std::time::Duration;

use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::fs::File;

fn main() {

	let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

	for stream in listener.incoming() {

		let stream = stream.unwrap();

		handle_connection(stream);
	}
}

fn handle_connection(mut stream: TcpStream) {

	// Read Request

	let mut buffer = [0; 512];
	stream.read(&mut buffer).unwrap();
	println!("{:?}", String::from_utf8_lossy(&buffer[..]));

	// Routing

	let get = b"GET / HTTP/1.1\r\n";
	let sleep = b"GET /sleep HTTP/1.1\r\n";

	let (status_line, filename) = if buffer.starts_with(get) {
		("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
	}else if buffer.starts_with(sleep){
		thread::sleep(Duration::from_secs(5));
		("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
	}else{
		("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
	};

	// Return Contents
	
	let mut file = File::open(filename).unwrap();
	let mut contents = String::new();
	file.read_to_string(&mut contents).unwrap();

	let response = format!("{}{}", status_line, contents);

	stream.write(response.as_bytes()).unwrap();
	stream.flush().unwrap();
}

各リクエストに対してスレッドを立ち上げられる場合のコードの構造

簡単ですね。

thread::spawn を使用して、リクエストが来るたびにスレッドを生成しています。

(注意) このやり方だとDoS攻撃で1000万件のリクエストが来た場合に、スレッドを作りすぎておかしくなっちゃいます

fn main() {

	let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

	for stream in listener.incoming() {

		let stream = stream.unwrap();

		// スレッドを生成
		thread::spawn(|| {
			handle_connection(stream);
		});

	}
}

有限数のスレッド用に似たインターフェイスを作成する

// main.rs

extern crate hello;
use hello::ThreadPool;

fn main() {

	let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
	let pool = ThreadPool::new(4);

	for stream in listener.incoming() {

		let stream = stream.unwrap();

		// スレッドを生成
		pool.execute(|| {
			handle_connection(stream);
		});

	}
}
// lib.rs

pub struct ThreadPool;

impl ThreadPool {

	/// 新しいThreadPoolを生成する
	///
	/// sizeがプールのスレッド数です
	///
	/// # panic
	/// 
	/// sizeが0なら *new* 関数はパニックします
	pub fn new(size: usize) -> ThreadPool {
		assert!( size > 0 );

		ThreadPool
	}

	pub fn execute<F>(&self, f: F) 
		where F: FnOnce() + Send + 'static
	{

	}

}

スレッドを生成するスペースを生成する

use std::thread;

pub struct ThreadPool{
	threads: Vec<thread::JoinHandle<()>>,
}


impl ThreadPool {

	/// 新しいThreadPoolを生成する
	///
	/// sizeがプールのスレッド数です
	///
	/// # panic
	/// 
	/// sizeが0なら *new* 関数はパニックします
	pub fn new(size: usize) -> ThreadPool {
		assert!( size > 0 );

		let mut threads = Vec::with_capacity(size);

		for _ in 0..size {
			// スレッドを生成してベクタに格納する
		}

		ThreadPool{
			threads
		}
	}

	pub fn execute<F>(&self, _f: F) 
		where F: FnOnce() + Send + 'static
	{

	}

}

ThreadPoolからスレッドにコードを送信する責任を負うWorker構造体

use std::thread;

pub struct ThreadPool{
	workers: Vec<Worker>,
}


impl ThreadPool {

	pub fn new(size: usize) -> ThreadPool {
		assert!( size > 0 );

		let mut workers = Vec::with_capacity(size);

		for id in 0..size {
			workers.push(Worker::new(id));
		}

		ThreadPool{
			workers
		}
	}

	pub fn execute<F>(&self, _f: F) 
		where F: FnOnce() + Send + 'static
	{

	}

}


struct Worker {
	id: usize,
	thread: thread::JoinHandle<()>,
}

impl Worker {
	fn new(id: usize) -> Worker {
		let thread = thread::spawn(|| {});

		Worker {
			id,
			thread,
		}
	}
}

チャンネル軽油でスレッドにリクエストを送信する

複数のWorkerreceiverを渡していますが、これは所有権を渡すことになり、コンパイルエラーになります

複数のスレッド に所有権を共有しつつ

use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;

pub struct ThreadPool{
	workers: Vec<Worker>,
	sender: mpsc::Sender<Job>
}


trait FnBox {
	fn call_box(self: Box<Self>);
}

impl<F: FnOnce()> FnBox for F {
	fn call_box(self: Box<F> ){
		(*self)()
	}
}

type Job = Box<FnBox + Send + 'static>;

impl ThreadPool {

	/// 新しいThreadPoolを生成する
	///
	/// sizeがプールのスレッド数です
	///
	/// # panic
	/// 
	/// sizeが0なら *new* 関数はパニックします
	pub fn new(size: usize) -> ThreadPool {
		assert!( size > 0 );

		let (sender, receiver) = mpsc::channel();

		let receiver = Arc::new(Mutex::new(receiver));

		let mut workers = Vec::with_capacity(size);

		for id in 0..size {
			workers.push( Worker::new(id, Arc::clone(&receiver)) );
		}

		ThreadPool{
			workers,
			sender,
		}
	}

	pub fn execute<F>(&self, f: F) 
		where F: FnOnce() + Send + 'static
	{
		let job = Box::new(f);

		self.sender.send(job).unwrap();
	}

}


struct Worker {
	id: usize,
	thread: thread::JoinHandle<()>,
}

impl Worker {
	fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>> ) -> Worker {

		let thread = thread::spawn(move || {

			loop {
				let job = receiver.lock().unwrap().recv().unwrap();

				println!("Worker {} got a job; executing.", id);

				// (*job)();
				job.call_box();
			}

		});

		Worker {
			id,
			thread,
		}
	}
}



# [cfg(test)]
mod tests {

    #[test]
    fn it_works() {
        assert_eq!(2 + 2, 4);
    }

}

最終的に上記のようになった。

正直理解できなかった。。

Rustむつかしいかもしれない。。

所有権 Mutex スレッド 関数型...

そもそもjob がどういうことかわかりません。。



trait FnBox {
	fn call_box(self: Box<Self>);
}

impl<F: FnOnce()> FnBox for F {
	fn call_box(self: Box<F> ){
		(*self)()
	}
}

type Job = Box<FnBox + Send + 'static>;

FnOnce()トレイトを実装する任意の型Fに対してFnBoxトレイトを実装します。
実質的にこれは、あらゆるFnOnce()クロージャがcall_boxメソッドを使用できることを意味します.
call_boxの実装は、*(*self)()を使用してBox*からクロージャをムーブし、クロージャを呼び出します。

fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>> ) -> Worker 
pub fn execute<F>(&self, f: F)  where F: FnOnce() + Send + 'static
{
	let job = Box::new(f);

	self.sender.send(job).unwrap();
}

だめだ理解できない。。。

自転車本を並行読みして理解を深めるしかないか。。 ( <=Amazon待ち . 現在 )

悔しい!

あと、5/8にRustの最新本がでるみたいです! 気が付いてラッキー!購入しました。 (現在5/9)

実践Rust入門[言語仕様から開発手法まで] 単行本(ソフトカバー) – 2019/5/8

※アフェっときます

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?