2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

RustでCPUバウンド・IOバウンドの処理をマルチスレッド(rayon)や非同期(tokio)で処理して速度を比較してみる

Last updated at Posted at 2024-01-05

Rustの処理を高速化したい

Rustはとても高速ですが、特に何も気にせずプログラムするとシングルスレッドで処理が走ります。

単純なプログラムを作る場合にはあまり問題になりませんが、多少重たい処理をする場合はすぐに処理が詰まってしまってパフォーマンスに影響が出ますし、最近のPCはCPUがめちゃくちゃ積んであるので、とても勿体無いです。

パフォーマンス向上のとても簡単な手段として、(Rustに限らずですが)マルチスレッドで処理するライブラリや非同期で処理を行うライブラリを使う手があります。

今回は使うだけならとても簡単に(とはいえ難しいが)使うことができる「rayon」と「tokio」を利用して、処理時間を計測してみました!

具体的には以下の2つの超簡単なタスクを3つの手法で計測してみます。

  • CPUバウンドの処理
    • 10,000,000行のファイルを事前に作成しておく(200MB程度のテキストファイル)
      • 1行ごとにランダムな文字列が記載されている
    • 全ての行に対してハッシュ値を計算する
  • I/Oバウンドの処理
    • 10,000ファイルのテキストを作成する
    • 10,000行のランダムな文字列を書き込む

設定

たいしたものではないですが、こちらに利用したコードを置いておきました。

リポジトリをcloneし、cargo runを実行するだけで動きます。

Cargo.tomlは以下のようになっています。

[package]
name = "app"
version = "0.1.0"
edition = "2021"

[dependencies]
md5 = "0.7.0"
rand = "0.8.5"
rayon = "1.8.0"
tokio = { version = "1.35.1", features = ["full"] }

実装

実装内容を紹介していきます。

まずはsrc/main.rs

use rand::Rng;
use std::fs::File;
use std::io::{BufWriter, Write};

use app::{
    async_process::cpu_bound_process as async_cpu_bound_process,
    async_process::io_bound_process as async_io_bound_process,
    multi_thread::cpu_bound_process as multi_thread_cpu_bound_process,
    multi_thread::io_bound_process as multi_thread_io_bound_process,
    single_thread::cpu_bound_process as single_thread_cpu_bound_process,
    single_thread::io_bound_process as single_thread_io_bound_process,
};

async fn generate_large_file(file_name: &str, lines: usize) -> std::io::Result<()> {
    let mut file = BufWriter::new(File::create(file_name)?);
    let mut rng = rand::thread_rng();

    for _ in 0..lines {
        writeln!(file, "{}", rng.gen::<u64>())?;
    }

    Ok(())
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
    std::fs::create_dir_all("output")?;

    for entry in std::fs::read_dir("output")? {
        let entry = entry?;
        let path = entry.path();
        if path.is_file() && path.extension().unwrap() == "txt" {
            std::fs::remove_file(path)?;
        }
    }

    println!("Generating large file...");
    generate_large_file("output/large_file.txt", 10_000_000).await?;

    println!("Processing cpu bound task...");
    single_thread_cpu_bound_process()?;
    multi_thread_cpu_bound_process()?;
    async_cpu_bound_process().await?;

    println!("Processing io bound task...");
    single_thread_io_bound_process()?;
    multi_thread_io_bound_process()?;
    async_io_bound_process().await?;

    Ok(())
}

非同期で処理を行いたいため、main()関数はマクロを利用してasync関数にしていきます。

最初に不要ファイルを削除するような処理が入っていますが、あまり気にしなくて良いです。

generate_large_file()関数ではランダムな文字列を10,000,000行書き込んだファイルを作成します。

その後はひたすらシングルスレッド・マルチスレッド・非同期で処理を行なっていきます。

シングルスレッド

シングルスレッドでの処理は特に説明もいらないと思います。
とにかく逐次的にハッシュ値の計算やファイル読み込み・書き込みを行います。

  • src/single_thread.rs
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Write};

fn process_line(line: &str) -> String {
    format!("{:x}", md5::compute(line))
}

pub fn cpu_bound_process() -> std::io::Result<()> {
    let start = std::time::Instant::now();

    let reader = BufReader::new(File::open("output/large_file.txt")?);
    let mut writer = BufWriter::new(File::create("output/output_single.txt")?);

    for line in reader.lines() {
        let line = line?;
        let result = process_line(&line);
        writeln!(writer, "{}", result)?;
    }

    let duration = start.elapsed();
    println!("Time taken (Single Thread): {:?}", duration);

    Ok(())
}

fn generate_files(file_count: usize, lines_per_file: usize) -> std::io::Result<()> {
    let mut rng = StdRng::from_entropy();

    for i in 0..file_count {
        let mut file = BufWriter::new(File::create(format!("output/file_{}.txt", i))?);

        for _ in 0..lines_per_file {
            writeln!(file, "{}", rng.gen::<u64>())?;
        }
    }

    Ok(())
}

pub fn io_bound_process() -> std::io::Result<()> {
    let start = std::time::Instant::now();

    let file_count = 10000;
    let lines_per_file = 10000;

    generate_files(file_count, lines_per_file)?;

    let duration = start.elapsed();
    println!("Time taken (Single Thread): {:?}", duration);

    Ok(())
}

マルチスレッド

マルチスレッドではrayonというほぼデファクトスタンダードのクレートを利用します。
この程度の処理であればとっっっっっても簡単に利用することができ、シングルスレッド時にループさせていたような処理をinto_par_iter()に置き換えるだけでマルチスレッドで動作します。

スレッド間で変数を共有したい場合などは当然考慮することが増えますが、今回は取り扱いません。

  • src/multi_thread.rs
use rand::{rngs::StdRng, Rng, SeedableRng};
use rayon::prelude::*;
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Write};

fn process_line(line: &str) -> String {
    format!("{:x}", md5::compute(line))
}

pub fn cpu_bound_process() -> std::io::Result<()> {
    let start = std::time::Instant::now();

    let reader = BufReader::new(File::open("output/large_file.txt")?);
    let lines: Vec<String> = reader.lines().filter_map(Result::ok).collect();

    let processed_lines: Vec<String> = lines.par_iter().map(|line| process_line(line)).collect();

    let mut writer = BufWriter::new(File::create("output/output_parallel.txt")?);
    for line in processed_lines {
        writeln!(writer, "{}", line)?;
    }

    let duration = start.elapsed();
    println!("Time taken (Parallel): {:?}", duration);

    Ok(())
}

fn generate_file(i: usize, lines_per_file: usize) -> std::io::Result<()> {
    let mut file = BufWriter::new(File::create(format!("output/file_{}.txt", i))?);
    let mut rng = StdRng::from_entropy();

    for _ in 0..lines_per_file {
        writeln!(file, "{}", rng.gen::<u64>())?;
    }

    Ok(())
}

pub fn io_bound_process() -> std::io::Result<()> {
    let start = std::time::Instant::now();

    (0..10000).into_par_iter().for_each(|i| {
        generate_file(i, 10000).unwrap();
    });

    let duration = start.elapsed();
    println!("Time taken (Parallel): {:?}", duration);

    Ok(())
}

非同期

非同期に関しては、ライブラリを利用してても、マルチスレッドと比較すると難易度が高い印象です。

特に、非同期でのCPUバウンドのタスクは、多分そもそも向いていない処理のようですね。
あまり深くは追求しませんでしたが、効率よく処理させるのはなかなか難しそうでした…

I/Oバウンドの処理では、tokio::spawnにasync関数を渡すことでマルチスレッドかつ非同期で処理を行なってくれるようです。
(ただ、それでもawaitすると同期処理になってしまうので、なるべく避ける)

  • src/async_process.rs
use rand::{rngs::StdRng, Rng, SeedableRng};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};

async fn process_line_async(line: String) -> String {
    format!("{:x}", md5::compute(line))
}

pub async fn cpu_bound_process() -> std::io::Result<()> {
    let start = std::time::Instant::now();

    let file = File::open("output/large_file.txt").await?;
    let reader = BufReader::new(file);
    let mut lines = reader.lines();

    let file_output = File::create("output/output_async.txt").await?;
    let mut writer = BufWriter::new(file_output);

    while let Some(line) = lines.next_line().await? {
        let result = process_line_async(line).await;
        writer.write_all(result.as_bytes()).await?;
        writer.write_all(b"\n").await?;
    }

    let duration = start.elapsed();
    println!("Time taken (Async): {:?}", duration);

    Ok(())
}

async fn generate_file_async(i: usize, lines_per_file: usize) -> std::io::Result<()> {
    let mut file = BufWriter::new(File::create(format!("output/file_{}.txt", i)).await?);
    let mut rng = StdRng::from_entropy();

    for _ in 0..lines_per_file {
        file.write_all(format!("{}\n", rng.gen::<u64>()).as_bytes())
            .await?;
    }

    file.flush().await
}

pub async fn io_bound_process() -> std::io::Result<()> {
    let start = std::time::Instant::now();

    let mut handles = Vec::new();
    for i in 0..10000 {
        let handle = tokio::spawn(generate_file_async(i, 10000));
        handles.push(handle);
    }

    for handle in handles {
        let _ = handle.await?;
    }

    let duration = start.elapsed();
    println!("Time taken (Async): {:?}", duration);

    Ok(())
}

結果

結論として、以下のようになりました。

Generating large file...

Processing cpu bound task...
Time taken (Single Thread): 24.25011125s
Time taken (Parallel): 7.651150834s
Time taken (Async): 27.129945625s

Processing io bound task...
Time taken (Single Thread): 60.726275625s
Time taken (Parallel): 8.780917875s
Time taken (Async): 46.815321292s

rayonによるマルチスレッドの処理がお手軽かつ速度的にも圧勝ですね!

tokioでの非同期処理の方は、あまりうまく書けている自信もなければ、タスク設定が悪いような気もするので、なんともいえませんが、少なくともさほどお手軽ではなかった上、CPUバウンド(のみ)の処理には、当然ですが効果は薄そうでした。

I/O処理かつCPUバウンドの処理が多数発生するWebAPI作成のようなタスクには向いてると思いますし、実際tokioを利用したWebフレームワークのAxumは効率が良さそうでした。

Axumについてはこんな記事も書いてますので、よければ見てみてください!

ということで結論としてはrayonによるマルチスレッド処理が圧倒的な速度で、尚且つすごい手軽だったのでこれからもたくさん使っていこうと思いました!

追記

と、思っていたのですが、社内のメンバーがすぐに「VS Code上のターミナルではなくiTermで実行したらI/Oバウンドはtokioが圧勝や」という指摘をくれたのでまっさか〜と思いながらiTermで実行してみたらマジで圧勝でした!

結果を貼っておきます!

Generating large file...

Processing cpu bound task...
Time taken (Single Thread): 25.763379459s
Time taken (Parallel): 7.112424708s
Time taken (Async): 27.560267542s

Processing io bound task...
Time taken (Single Thread): 68.961478292s
Time taken (Parallel): 9.130936292s
Time taken (Async): 783.666417ms

早すぎ!!!どんどんtokio使っていきます!

原因はわからないですが、皆さんもVS Codeのターミナルで作業を行うときはお気をつけください!

さらに追記

上のコマンド群、全て--releaseオプションをつけ忘れており、Rust本来の性能を発揮できていませんでした!
オプションをつけることでさらに7~10倍程度高速化しました!

% cargo run --release
Generating large file...

Processing cpu bound task...
Time taken (Single Thread): 5.170225125s
Time taken (Parallel): 1.621366375s
Time taken (Async): 5.621057875s

Processing io bound task...
Time taken (Single Thread): 6.439763458s
Time taken (Parallel): 1.808070334s
Time taken (Async): 481.427ms

おまけ

お試しでPythonで似たような処理(シングルスレッドのみ)をしてみました!

import hashlib
import os
import random
import time


def main():
    os.makedirs("output", exist_ok=True)

    for entry in os.scandir("output"):
        if entry.is_file() and entry.path.endswith(".txt"):
            os.remove(entry.path)

    print("Generating large file...")
    generate_large_file("output/large_file.txt", 10_000_000)

    print("Processing cpu bound task...")
    cpu_bound_process()

    print("Processing io bound task...")
    io_bound_process()


def process_line(line):
    return hashlib.md5(line.encode()).hexdigest()


def cpu_bound_process():
    start = time.time()

    with open("output/large_file.txt") as reader, open("output/output_single.txt", "w") as writer:
        for line in reader:
            result = process_line(line.strip())
            writer.write(result + "\n")

    duration = time.time() - start
    print(f"Time taken (Single Thread): {duration} s")


def generate_large_file(filename, size):
    with open(filename, "w") as f:
        for _ in range(size):
            f.write(str(random.randint(0, 1000000000)) + "\n")


def generate_files(file_count, lines_per_file):
    for i in range(file_count):
        with open(f"output/file_{i}.txt", "w") as file:
            for _ in range(lines_per_file):
                file.write(str(random.randint(0, 1000000000)) + "\n")


def io_bound_process():
    start = time.time()

    file_count = 10000
    lines_per_file = 10000

    generate_files(file_count, lines_per_file)

    duration = time.time() - start
    print(f"Time taken (Single Thread): {duration} s")


if __name__ == "__main__":
    main()

結果は以下のようになりました!

Generating large file...

Processing cpu bound task...
Time taken (Single Thread): 5.728252172470093 s

Processing io bound task...
Time taken (Single Thread): 40.693394899368286 s

この程度の処理であれば全然Pythondでも許容範囲内ですね!
CPUバウンドの処理はさほど変わらず、I/Oバウンドの処理であれば、asyncioなどの非同期ライブラリを利用するだけで解決しそうです!

さらに、Pythonはnumpyなどの強力なライブラリがあるため、CPUバウンドの処理も大抵はC++と同等の速度が出るはずです!

Rustはリアルタイム処理などの(超)高速な動作が求められる場合や、もっとCPUの負荷がかかるような大規模の場合、もしくは逆にサクッとマルチプラットフォームのCLIツールなどを作りたいときに向いており、単なるWebAPI(つまり、I/O処理がメイン)であればPythonでも特に問題なく利用することができそうです!

互いにメリットやデメリットがあるので、変なこだわりを持たず、柔軟な技術選定を行うのが良さそうです!

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?