概要
RustのHTTPクライアントであるreqwestでSpotify Web APIに対して非同期I/Oでリクエストをし、その結果を返したいと思いいろいろ調べました。
この記事は自分が試したものを紹介し、皆さんの意見を聞きながら(コメントください!!)情報を追加していきたいと思います。
今回は以下の2つの方法で非同期I/Oを実装する紹介をします:
- 非同期タスク間で通信をするためにチャンネルを作成して結果を返す方法
- 並行処理するタスクをバッファリングして完了した順に結果を返す方法
コードはSpotify Web APIに対してリクエストを行うクライアントであるspotify_apiにあるので参考にしてください。
環境
- Rust 1.35
- reqwest 0.9.14
- tokio 0.1.21
- futures 0.1.27
参考
今回実装する上で以下のウェブサイトを参考にしました。
- [Rust] reqwest + futures + tokioで非同期I/Oリクエスト入門
- How can I perform parallel asynchronous HTTP GET requests with reqwest? - Stack Overflow
検証
それでは本題の非同期I/Oリクエストを行う実装を解説したいと思います。
以下のオブジェクトはプレイリストの情報を表していて、Spotify Web APIから返ってくるデータをこのオブジェクトにデシリアライズします。
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
pub struct Playlist {
pub collaborative: bool,
pub description: Option<String>,
pub external_urls: ExternalURL,
pub followers: Option<Follower>,
pub href: String,
pub id: String,
pub images: Vec<Image>,
pub name: String,
pub owner: User,
pub public: Option<bool>,
pub snapshot_id: String,
#[serde(rename = "type")]
pub object_type: String,
pub uri: String,
}
非同期タスク間で通信をするためにチャンネルを作成して結果を返す方法
tokio::sync::mpsc::channel
でチャンネルを作成し、SenderとReceiverを使ってプレイリストのデータを保持する仕組みを作ります。
use spotify_api::object::Playlist;
use futures::prelude::*;
use tokio;
pub fn get_playlists(
playlist_ids: Vec<String>,
access_token: String,
) -> Option<Vec<Playlist>> {
let size = playlist_ids.len();
let client = reqwest::r#async::Client::new(); // 非同期I/Oリクエストを行うクライアントを作成
let (tx, rx) = tokio::sync::mpsc::channel(size);
let mut rt = tokio::runtime::Runtime::new().unwrap(); // ランタイム作成
let playlists = playlist_ids.into_iter().map(move |id| {
let tx = tx.clone();
let url = format!("https://api.spotify.com/v1/playlists/{}", id);
client
.get(&url)
.bearer_auth(&access_token)
.send()
.and_then(move |mut res| res.json::<Playlist>())
.then(move |playlist| tx.send(playlist.unwrap()))
.map(|_| ())
.map_err(|e| println!("Error: {}", e))
});
rt.spawn(futures::future::join_all(playlists).map(|_| ()));
rx.take(size as u64).collect().wait().ok()
}
コード解説
チャンネルの作成
let (tx, rx) = tokio::sync::mpsc::channel(size);
非同期タスク間で通信を行うためにチャンネルを作成してデータのやり取りを行います。
tokio::sync::mpsc::channel
はtokio::sync::mpsc::Sender
とtokio::sync::mpsc::Receiver
のタプルを返します。
Sender
で送信されたデータはReceiver
で送信された順に受け取ります。
ここではプレイリストIDのリストのサイズだけチャンネルを作成してデータのやり取りをします。
並行処理するタスクをバッファリングして完了した順に結果を返す方法
futures::stream::BufferUnordered
を利用
こっちのほうがスッキリしているがパフォーマンスの面などではどうなのかわからない。
正直これは非同期I/Oと言っていいのかわからない。単純に並行処理なので非同期I/Oでリクエストを行っているわけではない。
use crate::object::Playlist;
use crate::CountryCode;
use futures::{stream, Future, Stream};
use tokio;
pub fn get_playlists(
playlist_ids: Vec<String>,
access_token: String,
) -> Option<Vec<Playlist>> {
let size = playlist_ids.len();
let client = reqwest::r#async::Client::new();
let mut rt = tokio::runtime::Runtime::new().unwrap();
let playlists = stream::iter_ok(playlist_ids)
.map(move |id| {
let url = format!("https://api.spotify.com/v1/playlists/{}", id);
client
.get(&url)
.bearer_auth(&access_token)
.send()
.and_then(move |mut res| res.json::<Playlist>())
.then(move |playlist| Ok::<_, std::io::Error>(playlist.unwrap()))
})
.buffer_unordered(size);
rt.block_on(playlists.collect()).ok()
}
課題
ドキュメントを読んでもいろんな関数があり正直どれを採用したら良いのかよくわからなかった。
今回は自分がフォローしているプレイリスト88個を非同期でリクエストした結果、通常のリクエストのときで40秒以上かかっていたものが9秒に短縮できた。正直もっと早くできると思っていたのでもう少しいろいろやって見たいと思う。