6
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

非同期処理のモヤモヤ...

プログラミングをする際に、使用するパッケージによってはasyncawait等のキーワードが必要となる場合があります。私がこれまで使ってきたパッケージだと、RustではActix Web、TypeScriptだとPlaywrightがその例ですが、async,awaitが必要となるパッケージは他にも沢山あるはずです。

初めてこの状況に直面した時、Webで簡単に調べて「asyncとかawaitが『非同期処理』で使われるものであり、非同期処理はマルチスレッドとは異なる」ということを少なくとも理解しました。ただそれ以上には理解できていなかったので、「ビルドが通ればそれでOKで、ビルドに失敗したら一応awaitを式の最後に追加してビルドが通ったらOKとする」のように、だいぶ場当たり的な対応でこれまで乗り切ってきました。

非同期処理はasyncawaitを使わずに実装することもできますが、本記事ではこれらを使う前提で説明します。

この浅い理解の状態ではいつか壁にぶち当たりそうなので、「非同期処理のモヤモヤ」をいい加減解消したくなってきました。

そこで今回は非同期処理の理解を一歩進めるために、シングルスレッド、マルチスレッド、非同期処理についてRustで順番に実装してみて、その挙動の違いを調べてみようと思い立ちました。

TL;DR

非同期処理の実装によりわかったことを、先にまとめておきます。

  • 非同期処理で実装すると、一つのスレッドで並列に処理できるようになる
  • 非同期処理を行う関数では、関数名にasyncキーワードをつける
  • 非同期処理が必要な箇所にはawaitをつける必要がある。awaitをつけ忘れると、該当タスク内でその処理が終わるのを待たずして次の処理に進んでしまうため、意図しない順序でコードが実行される原因となる
  • 非同期処理の中に「非同期でない処理(つまり同期処理)」が紛れ込むと、その同期処理が実行されている間、スレッドは他の非同期タスクを実行することができない。その結果、非同期処理の並行性と効率性が損なわれ、全体のパフォーマンスが低下する可能性がある

非同期処理とマルチスレッドの違いは?

自分の理解も兼ねて、非同期処理とマルチスレッドの違いについて簡単に書いてみます。両者は「プログラムを並列に処理する」という点は共通していますが、並列処理のやり方が微妙に異なります。

マルチスレッドはその名の通り、「プログラムを複数のスレッドに分割して並列処理する」ものです。例えば、複数のCPUコアを搭載したマシンでマルチスレッドを使用する場合、各スレッドが異なるCPUコアを同時に利用できます。そのため、CPUを多く使う処理では高速化が期待できます。

一方、非同期処理では「単一のスレッド」で複数のタスクを処理できます。単一スレッドで順次タスクを処理する場合、マルチスレッドのように複数CPUコアを使った高速化はできません。しかしながら、ネットワーク通信、ファイルI/O、データベースアクセスなど、待機時間が多く、CPU処理が少ないタスクには非同期処理が向いています。

実装例

それではRustで、「1秒のスリープを10回実行する処理」を、シングルスレッド、マルチスレッド、非同期処理で実装してみることにします。

それぞれタスクの違いを比較できるように、以下を標準出力します。

  • 各タスクの開始、終了時の出力
  • 各タスクのスレッドID
  • 全タスクが終了するのにかかった時間

シングルスレッド

あまり面白みがないですが、次のように1秒スリープを順番に10回行います。

main.rs
use std::thread;
use std::time::{Duration, Instant};

fn main() {
  let tid_main = thread::current().id();
  println!("Main thread id = {:?}", tid_main);

  let start = Instant::now();

  for i in 0..10 {
    println!("Iteration {} of {:?} is starting to sleep", i, tid_main);
    thread::sleep(Duration::from_secs(1));
    println!("Iteration {} of {:?} has finished sleeping", i, tid_main);
  }

  let duration = start.elapsed();
  println!("All iterations have finished in {:.3} seconds.", duration.as_secs_f64());
}

実行すると次のように出力されます。

Main thread id = ThreadId(1)
Iteration 0 of ThreadId(1) is starting to sleep
Iteration 0 of ThreadId(1) has finished sleeping
Iteration 1 of ThreadId(1) is starting to sleep
Iteration 1 of ThreadId(1) has finished sleeping
Iteration 2 of ThreadId(1) is starting to sleep
Iteration 2 of ThreadId(1) has finished sleeping
Iteration 3 of ThreadId(1) is starting to sleep
Iteration 3 of ThreadId(1) has finished sleeping
Iteration 4 of ThreadId(1) is starting to sleep
Iteration 4 of ThreadId(1) has finished sleeping
Iteration 5 of ThreadId(1) is starting to sleep
Iteration 5 of ThreadId(1) has finished sleeping
Iteration 6 of ThreadId(1) is starting to sleep
Iteration 6 of ThreadId(1) has finished sleeping
Iteration 7 of ThreadId(1) is starting to sleep
Iteration 7 of ThreadId(1) has finished sleeping
Iteration 8 of ThreadId(1) is starting to sleep
Iteration 8 of ThreadId(1) has finished sleeping
Iteration 9 of ThreadId(1) is starting to sleep
Iteration 9 of ThreadId(1) has finished sleeping
All iterations have finished in 10.002 seconds.

全ての処理がスレッドID 1 で実行されます。また順番に10回実行されるので、全処理が終わるのに約10秒掛かります。

マルチスレッド

続いて、マルチスレッドを使って1秒スリープする処理を10並列で実行できるようにしてみます。

main.rs
use std::thread;
use std::time::{Duration, Instant};

fn main() {
  let mut handles = vec![];

  let tid_main = thread::current().id();
  println!("Main thread id = {:?}", tid_main);

  let start = Instant::now();

  for i in 0..10 {
    let handle = thread::spawn(move || {
      let tid = thread::current().id();
      println!("Thread {} (ID: {:?}) is sleeping", i, tid);
      thread::sleep(Duration::from_secs(1));
      println!("Thread {} (ID: {:?}) has woken up", i, tid);
    });
    handles.push(handle);
  }

  for handle in handles {
    // 各スレッドが終わるまで待機する
    handle.join().unwrap();
  }

  let duration = start.elapsed();
  println!("All threads have finished in {:.3} seconds.", duration.as_secs_f64());
}

実行結果は以下です。

Main thread id = ThreadId(1)
Thread 0 (ID: ThreadId(2)) is sleeping
Thread 1 (ID: ThreadId(3)) is sleeping
Thread 2 (ID: ThreadId(4)) is sleeping
Thread 4 (ID: ThreadId(6)) is sleeping
Thread 5 (ID: ThreadId(7)) is sleeping
Thread 3 (ID: ThreadId(5)) is sleeping
Thread 7 (ID: ThreadId(9)) is sleeping
Thread 6 (ID: ThreadId(8)) is sleeping
Thread 8 (ID: ThreadId(10)) is sleeping
Thread 9 (ID: ThreadId(11)) is sleeping
Thread 0 (ID: ThreadId(2)) has woken up
Thread 1 (ID: ThreadId(3)) has woken up
Thread 2 (ID: ThreadId(4)) has woken up
Thread 3 (ID: ThreadId(5)) has woken up
Thread 6 (ID: ThreadId(8)) has woken up
Thread 5 (ID: ThreadId(7)) has woken up
Thread 9 (ID: ThreadId(11)) has woken up
Thread 8 (ID: ThreadId(10)) has woken up
Thread 4 (ID: ThreadId(6)) has woken up
Thread 7 (ID: ThreadId(9)) has woken up
All threads have finished in 1.002 seconds.

メインスレッドと、各sleepのタスクが実行されたスレッドIDが全て異なっています。また1秒のスリープが10並列で実行されるため、全体の処理は約1秒で終わっているのもわかります。

非同期処理

本題の非同期処理の動作も確認してみましょう。
今回やりたいことを非同期処理で実現するのにtokioとfuturesのクレートが必要になるため、Cargo.tomlのに以下のように記載します。

Cargo.toml
(出力略)

[dependencies]
tokio = { version = "1.38.0", features = ["full"] }
futures = "0.3.30"

設定ファイルの準備はできたので、非同期処理で1秒スリープする処理を10並列で実行するプログラムを書いてみます。

main.rs
use std::thread;
use tokio::time::{sleep, Duration, Instant};
use futures::future::join_all;

// #[tokio::main]                         // デフォルトは、実行マシンのコア数だけスレッドが作られる
#[tokio::main(flavor = "current_thread")] // そこで、1スレッドで非同期処理を行う設定に変更
async fn main() {
  let tid_main = thread::current().id();
  println!("Main thread id = {:?}", tid_main);

  let start = Instant::now();

  let mut tasks = Vec::new();
  for i in 0..10 {
    let task = tokio::spawn(async move {
      let tid = thread::current().id();
      println!("Task {} (TID: {:?}) is sleeping", i, tid);
      // 該当タスクは1秒待機し、それ以外のタスクは並列で実行される
      sleep(Duration::from_secs(1)).await;
      println!("Task {} (TID: {:?}) has woken up", i, tid);
    });
    tasks.push(task);
  }

  // 全タスクが終わるまで待機する
  join_all(tasks).await;

  let duration = start.elapsed();
  println!("All tasks have finished in {:.3} seconds.", duration.as_secs_f64());
}

実行結果は以下です。

Main thread id = ThreadId(1)
Task 0 (TID: ThreadId(1)) is sleeping
Task 1 (TID: ThreadId(1)) is sleeping
Task 2 (TID: ThreadId(1)) is sleeping
Task 3 (TID: ThreadId(1)) is sleeping
Task 4 (TID: ThreadId(1)) is sleeping
Task 5 (TID: ThreadId(1)) is sleeping
Task 6 (TID: ThreadId(1)) is sleeping
Task 7 (TID: ThreadId(1)) is sleeping
Task 8 (TID: ThreadId(1)) is sleeping
Task 9 (TID: ThreadId(1)) is sleeping
Task 0 (TID: ThreadId(1)) has woken up
Task 1 (TID: ThreadId(1)) has woken up
Task 2 (TID: ThreadId(1)) has woken up
Task 3 (TID: ThreadId(1)) has woken up
Task 4 (TID: ThreadId(1)) has woken up
Task 5 (TID: ThreadId(1)) has woken up
Task 6 (TID: ThreadId(1)) has woken up
Task 7 (TID: ThreadId(1)) has woken up
Task 8 (TID: ThreadId(1)) has woken up
Task 9 (TID: ThreadId(1)) has woken up
All tasks have finished in 1.002 seconds.

全てのタスクがメインスレッドと同じID 1 で実行されていますが、全体の処理は約1秒で終わっています。つまり、一つのスレッドで、1秒スリープが10個並列で実行されたということです。
「これぞ非同期処理!!」というのが、何となくおわかりいただけたかなと思います!(?)

awaitについて

10並列でsleepしている部分で、awaitが使われている箇所について簡単に解説します。

  for i in 0..10 {
    let task = tokio::spawn(async move {
      let tid = thread::current().id();
      println!("Task {} (TID: {:?}) is sleeping", i, tid);
      // 該当タスクは1秒待機し、それ以外のタスクは並列で実行される
      sleep(Duration::from_secs(1)).await;
      println!("Task {} (TID: {:?}) has woken up", i, tid);
    });
    tasks.push(task);
  }

コメントにも書いている通り、sleep実行中は該当タスクはここで1秒停止します。しかしながら、それ以外の残り9個のタスクは処理を進めることができるので、全部のタスクが止まって渋滞するようなことはありません。そのため全体の処理は10秒掛からずに、約1秒で終わります。

awaitを適切に使わなかった場合の悪影響

awaitを書き忘れた場合

ちなみに以下のようにawaitがない状態にすると、

      // awaitを入れ忘れた状態
      sleep(Duration::from_secs(1))/*.await*/;

1秒のsleep処理が終わるのを待たずに処理が進んでしまい、以下のようにプログラムはほぼ0秒で終わってしまいます。

Main thread id = ThreadId(1)
Task 0 (TID: ThreadId(1)) is sleeping
Task 0 (TID: ThreadId(1)) has woken up
Task 1 (TID: ThreadId(1)) is sleeping
Task 1 (TID: ThreadId(1)) has woken up
Task 2 (TID: ThreadId(1)) is sleeping
Task 2 (TID: ThreadId(1)) has woken up
Task 3 (TID: ThreadId(1)) is sleeping
Task 3 (TID: ThreadId(1)) has woken up
Task 4 (TID: ThreadId(1)) is sleeping
Task 4 (TID: ThreadId(1)) has woken up
Task 5 (TID: ThreadId(1)) is sleeping
Task 5 (TID: ThreadId(1)) has woken up
Task 6 (TID: ThreadId(1)) is sleeping
Task 6 (TID: ThreadId(1)) has woken up
Task 7 (TID: ThreadId(1)) is sleeping
Task 7 (TID: ThreadId(1)) has woken up
Task 8 (TID: ThreadId(1)) is sleeping
Task 8 (TID: ThreadId(1)) has woken up
Task 9 (TID: ThreadId(1)) is sleeping
Task 9 (TID: ThreadId(1)) has woken up
All tasks have finished in 0.001 seconds.

一応お伝えしておくと、これは決して「処理が速く終わったからハッピー!」な事例ではなくて、本当はやらなきゃいけないことをすっ飛ばしたという悪い事例です。

非同期処理を実装していない処理を使った場合

また、sleepの処理を非同期処理ではない「非・非同期処理」な関数に置き換えてみると、

      // // 該当タスクは1秒待機し、それ以外のタスクは並列で実行される
      // sleep(Duration::from_secs(1)).await;
      // 非・非同期処理でsleepしてみる
      thread::sleep(Duration::from_secs(1));

並列実行できずに、全処理が終わるのに約10秒掛かってしまいます。

Main thread id = ThreadId(1)
Task 0 (TID: ThreadId(1)) is sleeping
Task 0 (TID: ThreadId(1)) has woken up
Task 1 (TID: ThreadId(1)) is sleeping
Task 1 (TID: ThreadId(1)) has woken up
Task 2 (TID: ThreadId(1)) is sleeping
Task 2 (TID: ThreadId(1)) has woken up
Task 3 (TID: ThreadId(1)) is sleeping
Task 3 (TID: ThreadId(1)) has woken up
Task 4 (TID: ThreadId(1)) is sleeping
Task 4 (TID: ThreadId(1)) has woken up
Task 5 (TID: ThreadId(1)) is sleeping
Task 5 (TID: ThreadId(1)) has woken up
Task 6 (TID: ThreadId(1)) is sleeping
Task 6 (TID: ThreadId(1)) has woken up
Task 7 (TID: ThreadId(1)) is sleeping
Task 7 (TID: ThreadId(1)) has woken up
Task 8 (TID: ThreadId(1)) is sleeping
Task 8 (TID: ThreadId(1)) has woken up
Task 9 (TID: ThreadId(1)) is sleeping
Task 9 (TID: ThreadId(1)) has woken up
All tasks have finished in 10.003 seconds.

これではTokioやFuturesが持っている非同期処理の機能を全く活かせなくなってしまいます。

おわりに

サンプルプログラムを実装してみて、非同期処理を有効に行うためには、asyncawaitを場当たり的にではなくて、正しく理解して使っていく必要があることがわかりました。今後、非同期処理をガチで実装する機会があった時に、本記事で書いた内容を思い出して極力行き詰まらないようにしたいと思います。

6
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?