Tokioでworker
あまり深く考えなくてもいいように簡単にまとめてみようということです。
例えば別のスレッドで音を鳴らしたいけど同時に別の音を鳴らしたくない。
音を鳴らす以外に座標変換を一気にスレッド立ち上げてやってもらいたい等、
async main
で#[tokio::main(flavor="multi_thread",workier__threads=1)]
などと
してしまうと融通が利かなくなってやだなーという場合によいのではないでしょうか。
dependenceis
tokio = { version = "1.35.0", features = ["full"] }
futures = "0.3.30"
tokioとfuturesを利用します。
バージョンは現在使える最新を使えば基本的には大丈夫だと思います。
基本形
worker利用&処理の終了を待つ
main.rs
use tokio::runtime::{Builder, Runtime};
use futures::stream::[FuturesUnordered, StreamEx};
async fn myfunc(index: i32) {
println!("start {}", index);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
println!("end {}", index);
}
fn main() {
let local_runtime = Runtime::new().unwrap();
let runtime = Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.unwrap();
// workerリスト
let mut threads = FuturesUnordered::new();
// workerの起動・登録
for i in 0..5 {
let thread = runtime.spawn(async move {
myfunc(i).await;
});
threads.push(thread);
}
// 起動した全workerの終了を待つ
local_runtime.block_on(async {
while let Some(_result) = threads.next().await {};
});
println!("finished all!");
}
まずは簡単に。
runtime.spawnで処理を呼びます、runtimeを初期化する際のworker_threads
で同時に走るworkerの数を指定できます。
※mainをasyncにしていない点に注意してください。
全てのworkerが処理を終えるとlocal_runtime.block_on(
を抜けてそれ以降のコードが実行されます。
workerから値を返す
i32を返しmainのVecに登録するパターン
ArcとMutexを利用する。
use std::sync::{Arc, Mutex};
myfuncが値を返すように変更 とりえずindex^2
を返すということにしておく。
async fn myfunc(index: i32) -> i32 {
...
let return_value:i32 = index * index;
return_value
}
共有メモリを定義
let array_data = Arc::new(Mutex::new(Vec::new()));
関数をspawnする部分を修正し、値をarray_dataへ格納する。
for i in 0..10 {
let ad = Arc::clone(&array_data);
let thread = runtime.spawn(async move {
let response = myfunc(i).await;
ad.lock().unwrap().push(response);
});
threads.push(thread);
}
finished allの後に結果を表示
println!("{:?}", *array_data.lock().unwrap());
戻り値を利用したい場合は let ad = array_dat.lock().unwrap()
などとする。
HashMapに値を入れたい場合。
Vec利用からHashMap利用に修正する
HashMapを使えるように
use std::collections::HashMap;
myfuncがHashMapを返すように。 {n: n^2}
という値が返るようにしている。
//async fn myfunc(index: i32) -> i32 {
async fn myfunc(index: i32) -> HashMap<i32, i32> {
...
let mut result = HashMap::new();
result.insert(index, index*index);
result
}
共有メモリを定義
//let array_data = Arc::new(Mutex::new(Vec::new()));
let aray_data = Arc::new(Mutex::new(HashMap::new()));
関数をspawnする部分を修正し、値をarray_dataへ格納する。
for i in 0..10 {
let ad = Arc::clone(&array_data);
let thread = runtime.spawn(async move {
let response = myfunc(i).await;
//ad.lock().unwrap().push(response);
let mut data = ad.lock().unwrap();
for (key, value in response {
data.insert(key, value);
}
});
threads.push(thread);
}
まとめ
専門のcrateがあるし簡単にできるだろうと思って実例探したりしたらあまりジャストなものがなくて苦労したのでメモ程度に書いてみました。
crateを作るまでではない、crateにはしづらいようなこういった処理パターンについてスニペット的にドキュメントを作っておくのはアリだと感じました。
今後も何かあればやってみようと思います。