tokioランタイム上の同期コールバック内で非同期処理を行う方法
はじめに
Rustで非同期プログラミングを行う際、tokioは非常に強力なツールです。しかし、既存のライブラリとの統合時に、同期的なコールバック関数の中で非同期処理を行いたいという状況に遭遇することがあります。この記事では、そのような状況での問題と解決方法を紹介します。
問題の背景
- 同期的なコールバックしか受け付けないライブラリに対して関数を登録する必要がある
- コールバック内でHTTPリクエストなどの非同期処理を行いたい
-
reqwest::blocking
は非同期コンテキスト内では使用できない(パニックする)
なぜ難しいのか
- 非同期ランタイム上で同期的なブロッキング処理を行うとデッドロックのリスクがある
- 単純な
blocking_recv()
の使用はデッドロックを引き起こす可能性が高い - ランタイムの設定(スレッド数など)によって挙動が変わる
解決方法
以下の3つのtokioの機能を組み合わせることで問題を解決できます:
-
tokio::spawn
: 非同期処理を別タスクで実行 -
tokio::sync::oneshot
: タスク間でのデータの受け渡し -
tokio::task::block_in_place
: 安全なブロッキング処理の実現
use std::error::Error;
use tokio::sync::oneshot;
use tokio::task::block_in_place;
// ライブラリが提供する型やトレイトを模倣
trait DataProcessor {
fn process(&self, data: &str) -> String;
}
struct Handler {
processor: Box<dyn DataProcessor>,
}
impl Handler {
fn new<P: DataProcessor + 'static>(processor: P) -> Self {
Self {
processor: Box::new(processor),
}
}
fn handle(&self, data: &str) -> String {
self.processor.process(data)
}
}
// 実際の実装
struct MyProcessor;
impl DataProcessor for MyProcessor {
fn process(&self, data: &str) -> String {
// この中で非同期処理(HTTP リクエストなど)を行いたい
// しかし、これは同期的なコンテキスト
// 非同期処理の結果を受け取るためのチャネルを作成
let (tx, rx) = oneshot::channel();
// 新しいタスクで非同期処理を実行
tokio::spawn(async move {
// 実際のコードでは reqwest::Client を使用してリクエストを行う
// この例では非同期処理をシミュレート
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let result = format!("Processed: {}", data);
let _ = tx.send(result);
});
// ブロッキング処理を明示的に行う
// これにより、tokio のワーカースレッドがブロックされることを防ぐ
block_in_place(move || {
rx.blocking_recv().unwrap_or_else(|_| "Error".to_string())
})
}
}
#[tokio::main(worker_threads = 2)]
async fn main() -> Result<(), Box<dyn Error>> {
let processor = MyProcessor;
let handler = Handler::new(processor);
// 同期的なコンテキストで呼び出し
let result = handler.handle("test data");
println!("Result: {}", result);
Ok(())
}
実装時の注意点
-
current_thread
ランタイムでは動作しない(パニックする) - ワーカースレッド数は2以上必要
-
block_in_place
を使用することで、明示的にブロッキングポイントを示す
まとめ
この方法を使うことで、以下のメリットがあります:
- 既存の同期APIと非同期処理の統合が可能
- デッドロックを防ぎつつ、非同期処理の結果を同期的に取得できる
- エラーケースが明確になる(
current_thread
でのパニック)