この記事はWanoグループ Advent Calendar 2017の21日目の記事です。
今回はshio-rsとmysql_asyncを使ってAPIサーバーを構築してみたいと思います。
shioとは
tokioという非同期IOによるネットワーク通信処理を扱う為のライブラリをベースにしたマイクロフレームワークです。
mysql_asyncとは
shioと同様にtokioをベースにしたMySQLクライアントライブラリです。
個人的に期待しているライブラリなのですが、ググってもあまり話が出てきません。
雛形のHello Worldを作成
まずはDBへの通信関係の処理は置いといてHello Worldを作ってみましょう
まずはcargo new hello_shio --bin等として空のcrateを作成します。
それから依存crate(=パッケージ)をCargo.tomlに次の様に追記します。
[dependenceis]
shio = "*"
main.rsは次の様になります。
extern crate shio;
use shio::prelude::*;
fn hello_world_handler(ctx: Context) -> Response {
Response::with("Hello World")
}
fn main() {
shio::default()
.route((Method::Get, "/", hello_world_handler))
.run("0.0.0.0:8080")
.unwrap();
}
コードも短くて簡単ですね
JSONの返し方を確認
次はJSONの返し方を確認しましょう。
まずはJSONを扱えるようにする為にCargo.tomlに次の依存crateを追加します。
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
error-chain = "*"
serdeはシリアライズ・デシリアライズ処理の抽象化を行うライブラリ、
serde_deriveはボイラープレートを削減する為のマクロを提供するライブラリ、
serde_jsonはその上に構築されたJSONを扱うライブラリになります。
error-chainは各種ライブラリの返すエラー型を束ねて一つのエラー型にまとめるのに使用します。
main.rsは次の様になります。
extern crate shio;
extern crate serde;
# [macro_use]
extern crate serde_derive;
extern crate serde_json;
# [macro_use]
extern crate error_chain;
mod errors {
error_chain! {
foreign_links {
Json(::serde_json::Error);
}
}
}
use shio::prelude::*;
use shio::header::ContentType;
# [derive(Debug, Serialize, Deserialize)]
struct HelloWorld {
message: String,
}
fn hello_world_handler(ctx: Context) -> Result<Response, errors::Error> {
let hw = HelloWorld {
message: "Hello World!".to_string(),
};
Ok(Response::build().header(ContentType::json()).body(serde_json::to_string(&hw)?))
}
fn main() {
Shio::default()
.route((Method::Get, "/", hello_world_handler))
.run("0.0.0.0:8080")
.unwrap();
}
変更した個所を順に見ていきます。
extern crate serde;
# [macro_use]
extern crate serde_derive;
extern crate serde_json;
# [macro_use]
extern crate error_chain;
まずCargo.tomlに追記したcrateをextern crateで読み込みます。
#[macro_use]というのはcrate内で定義されているマクロを読み込む為の呪文です。
mod errors {
error_chain! {
foreign_links {
Json(::serde_json::Error);
}
}
}
serde_jsonの関数が返すエラー型をそのまま扱うのではなく、このプログラム用のエラー型(errors::Error型)を定義する為にまたもや呪文を書きます。
この呪文によってserde_jsonが返すエラー型をこのプログラム用のエラー型に変換する事が出来るようになります(From traitの自動実装が行われています)。
ここではerror-chainを使っていますが、quick-errorを使ったり、面倒くさいですが手で直接定義してもいいです。
use shio::prelude::*;
use shio::header::ContentType;
後の行でContentType型を使用するのでここでuseして名前空間にインポートしています。
JSONのレスポンスを生成する部分を関数にして、その中でだけuseしてもいいかもしれません。
# [derive(Debug, Serialize, Deserialize)]
struct HelloWorld {
message: String,
}
レスポンスとして返すJSON用の型を定義しています。
#[derive(Debug, Serialize, Deserialize)]はマクロで、ここではDebug, Serialize, Deserialize traitの自動実装を行っています。
また、ここでは話を簡単にする為にmessage: Stringという様に&'a strではなくString型を用いています。
fn hello_world_handler(ctx: Context) -> Result<Response, errors::Error> {
let hw = HelloWorld {
message: "Hello World!".to_string(),
};
Ok(Response::build().header(ContentType::json()).body(serde_json::to_string(&hw)?))
}
レスポンスとしてJSONを返す為にhello_world_handlerの中がガラッと変わりました。
JSONを返す為、Response::build()でBuilderを取得してheader()でContent-Typeをセットして、最後にbody()でJSON文字列をセットしてResponseを作ります。
serde_json::to_string()がResultを返しますが、ここではErrであれば即returnするだけなので?オペレータを使ってmatch等をするコードを省略します。この時に上の方でerror_chain!を使って定義したエラー型に自動で変換してくれています。
これで後から拡張出来そうな感じにレスポンスをJSONで返すことが出来るようになりました。
POSTでJSONやGETクエリを受け取る
POSTされたJSONやGETクエリを受け取れるようにしてみましょう
まずは全体像はこちらです。
Cargo.toml
[package]
name = "my-api-server"
version = "0.1.0"
authors = ["you <your@mailaddress>"]
[dependencies]
shio = "*"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
error-chain = "*"
queryst = "1.0"
futures = "*"
hyper = "0.11"
main.rs
extern crate shio;
extern crate serde;
# [macro_use]
extern crate serde_derive;
extern crate serde_json;
# [macro_use]
extern crate error_chain;
extern crate queryst;
extern crate futures;
extern crate hyper;
pub mod errors {
error_chain! {
foreign_links {
Json(::serde_json::Error);
Hyper(::hyper::Error);
}
errors {
Queryst(e: ::queryst::ParseError) {
description("queryst parse error")
display("queryst parse error: {:?}", e)
}
}
}
}
use futures::prelude::*;
use shio::prelude::*;
use shio::header::ContentType;
# [derive(Debug, Serialize, Deserialize)]
struct HelloWorld {
message: String,
}
fn hello_world_handler(ctx: Context) -> Result<Response, errors::Error> {
let hw = HelloWorld {
message: "Hello World!".to_string(),
};
Ok(Response::build().header(ContentType::json()).body(serde_json::to_string(&hw)?))
}
fn query_string_handler(ctx: Context) -> Result<Response, errors::Error> {
if let Some(query_string) = ctx.uri().query() {
let value = queryst::parse(query_string).map_err(|e| errors::ErrorKind::Queryst(e))?;
let msg = value.find("message").and_then(|v| v.as_str()).unwrap_or("No Message");
return Ok(Response::with(msg));
}
Ok(Response::with("No Query"))
}
fn post_json_handler(ctx: Context) -> Box<Future<Item = Response, Error = errors::Error>> {
let future = ctx.body()
.from_err::<errors::Error>()
.concat2()
.and_then(|body| {
let v: serde_json::Value = serde_json::from_slice(&body)?;
let op_first_name = v.pointer("/profile/first_name").and_then(|v| v.as_str());
let op_last_name = v.pointer("/profile/last_name").and_then(|v| v.as_str());
if let (Some(first_name), Some(last_name)) = (op_first_name, op_last_name) {
Ok(Response::with(format!("Hello {} {}", first_name, last_name)))
} else {
Ok(Response::with("Please include 'profile.first_name' and 'profile.last_name' key in json"))
}
});
Box::new(future)
}
fn main() {
Shio::default()
.route((Method::Get, "/", hello_world_handler))
.route((Method::Get, "/query_string", query_string_handler))
.route((Method::Post, "/json", post_json_handler))
.run("0.0.0.0:8080")
.unwrap();
}
JSONを返すだけのプログラムから変わった個所を見ていきます。
queryst = "1.0"
futures = "*"
hyper = "0.11"
まずはCargo.tomlですが、3つ依存crateが増えています。
querystはGETクエリをパースするのに使用します。
futuresは非同期処理の為のTraitを提供するパッケージです。
hyperはshioのベースになっているライブラリで、今回使用するAPIの一部がhyperで定義されているエラー型を返すので使用します。
pub mod errors {
error_chain! {
foreign_links {
Json(::serde_json::Error);
Hyper(::hyper::Error);
}
errors {
Queryst(e: ::queryst::ParseError) {
description("queryst parse error")
display("queryst parse error: {:?}", e)
}
}
}
}
次にmain.rsですが、まずerrorsの中が変わっています。
foreign_linksの中にHyper(::hyper::Error);が追加されています。
これにより::hyper::Error型のエラーをerrors::Errorへ変換する事が出来るようになります。
さらにerrorsという項目も増えています。
error_chain!マクロはErrorKindというenumを定義するのですが、ここに記述した物はそのErrorKindの値になります。
ErrorKindは?オペレータやFromやerrors::Error::from_kindでerrors::Error型への変換が出来ます。
querystのエラー型であるParseErrorがstd::error::Errorを実装しておらずforeign_linksに書くことが出来ない為、こちらで定義して必要に応じて手動で変換していきます。
use futures::prelude::*;
use shio::prelude::*;
use shio::header::ContentType;
use futures::prelude::*;が増えました。
これによりfuturesで定義されている様々なtraitを簡単に利用できるようになります。
fn query_string_handler(ctx: Context) -> Result<Response, errors::Error> {
if let Some(query_string) = ctx.uri().query() {
let value = queryst::parse(query_string).map_err(|e| errors::ErrorKind::Queryst(e))?;
let msg = value.find("message").and_then(|v| v.as_str()).unwrap_or("No Message");
return Ok(Response::with(msg));
}
Ok(Response::with("No Query"))
}
GETクエリのパースのサンプル用にquery_string_handlerが追加されました。
ここではctx.uri().query()でクエリストリングを取得し、queryst::parse(query_string)でパースします。
queryst::parse()はResult<Value, ParseError>を返すので、map_errでParseErrorをErrorKindに変換します。
Valueはserde_jsonで提供されている型です。
ここではfind()を使って適当なキーの値を取得して、それをそのままレスポンスとして返しています。
fn post_json_handler(ctx: Context) -> Box<Future<Item = Response, Error = errors::Error>> {
let future = ctx.body()
.from_err::<errors::Error>()
.concat2()
.and_then(|body| {
let v: serde_json::Value = serde_json::from_slice(&body)?;
let op_first_name = v.pointer("/profile/first_name").and_then(|v| v.as_str());
let op_last_name = v.pointer("/profile/last_name").and_then(|v| v.as_str());
if let (Some(first_name), Some(last_name)) = (op_first_name, op_last_name) {
Ok(Response::with(format!("Hello {} {}", first_name, last_name)))
} else {
Ok(Response::with("Please include 'profile.first_name' and 'profile.last_name' key in json"))
}
});
Box::new(future)
}
POSTでJSONを受け取るサンプルとしてpost_json_handler()が追加されました。
リクエストボディはctx.body()で取得する事が出来ます。
そしてその直後にfrom_err::<errors::Error>()とする事でエラーの型をerrors::Errorへと変更しています。
ctx.body()はBody型を返しますが、このBody型にはリクエストボディを簡単に取れるようなメソッドがありません。
しかし代わりにfuturesで定義されているStream traitを実装していますので、こちらのメソッドを使ってリクエストボディを取得します。
JSONは文字列全体を取得しないとパースが出来ないので、全体がまとまった状態で取得できるconcat2()メソッドでリクエストボディ全体を取得します。
次にconcat2()によって取得できたリクエストボディをserde_json::from_slice()によってパースします。
ここまで出来たら後はquery_string_handler()と同様にserde_jsonから提供されている関数をやりたい処理に応じて使用していき、最後に型が合うようにResponseを返せばOKです。
こうやって構築した大きなFutureは型が複雑になってしまっています。
これを簡単に取り扱えるようにする為に最後にBox::new(future)でボックス化してTraitオブジェクトにしています。
fn main() {
Shio::default()
.route((Method::Get, "/", hello_world_handler))
.route((Method::Get, "/query_string", query_string_handler))
.route((Method::Post, "/json", post_json_handler))
.run("0.0.0.0:8080")
.unwrap();
}
query_string_handler()とpost_json_handler()に対応するルーティングの設定が追加されました。
DBからデータを取得
最後にDBとデータのやり取りをする部分を追加してそれっぽい感じにします。
全体は次の様になります。
Cargo.toml:
[package]
name = "my-api-server"
version = "0.1.0"
authors = ["you <your@mailaddress>"]
[dependencies]
shio = "*"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
error-chain = "*"
queryst = "1.0"
futures = "*"
hyper = "0.11"
chrono = { version = "0.4", features = ["serde"] }
mysql_async = "*"
tokio-core = "*"
main.rs:
extern crate shio;
extern crate serde;
# [macro_use]
extern crate serde_derive;
extern crate serde_json;
# [macro_use]
extern crate error_chain;
extern crate queryst;
extern crate futures;
extern crate hyper;
extern crate chrono;
# [macro_use]
extern crate mysql_async;
extern crate tokio_core;
pub mod errors {
error_chain! {
links {
MySQL(::mysql_async::errors::Error, ::mysql_async::errors::ErrorKind);
}
foreign_links {
Json(::serde_json::Error);
Hyper(::hyper::Error);
FromRow(::mysql_async::FromRowError);
}
errors {
Queryst(e: ::queryst::ParseError) {
description("queryst parse error")
display("queryst parse error: {:?}", e)
}
}
}
}
use futures::prelude::*;
use shio::prelude::*;
use shio::header::ContentType;
use tokio_core::reactor::Handle;
use chrono::prelude::*;
use mysql_async::prelude::*;
use mysql_async::{self as my, Pool, OptsBuilder, FromRowError};
use std::sync::RwLock;
thread_local!(static MYSQL_CONN_POOL: RwLock<Option<Pool>> = {
RwLock::new(None)
});
fn get_pool(handle: &Handle) -> Pool {
if let Some(pool) = MYSQL_CONN_POOL.with(|lock| lock.read().unwrap().clone()) {
return pool;
}
MYSQL_CONN_POOL.with(|lock| {
let mut pool = lock.write().unwrap();
let mut builder = OptsBuilder::from_opts("mysql://root:piyo@172.17.0.2/employees");
builder.pool_max(Some(20usize));
let new_pool = Pool::new(builder, handle);
*pool = Some(new_pool.clone());
return new_pool;
})
}
# [derive(Debug, Serialize, Deserialize)]
struct HelloWorld {
message: String,
}
fn hello_world_handler(ctx: Context) -> errors::Result<Response> {
let hw = HelloWorld {
message: "Hello World!".to_string(),
};
Ok(Response::build().header(ContentType::json()).body(serde_json::to_string(&hw)?))
}
fn query_string_handler(ctx: Context) -> errors::Result<Response> {
if let Some(query_string) = ctx.uri().query() {
let value = queryst::parse(query_string).map_err(|e| errors::ErrorKind::Queryst(e))?;
let msg = value.find("message").and_then(|v| v.as_str()).unwrap_or("No Message");
return Ok(Response::with(msg));
}
Ok(Response::with("No Query"))
}
fn post_json_handler(ctx: Context) -> Box<Future<Item=Response, Error=errors::Error>> {
let future = ctx.body()
.from_err()
.concat2()
.and_then(|body| {
let v: serde_json::Value = serde_json::from_slice(&body)?;
let op_first_name = v.pointer("/profile/first_name").and_then(|v| v.as_str());
let op_last_name = v.pointer("/profile/last_name").and_then(|v| v.as_str());
if let (Some(first_name), Some(last_name)) = (op_first_name, op_last_name) {
Ok(Response::with(format!("Hello {} {}", first_name, last_name)))
} else {
Ok(Response::with("Please include 'profile.first_name' and 'profile.last_name' key in json"))
}
});
Box::new(future)
}
# [derive(Debug, Serialize, Deserialize)]
struct Employee {
emp_no: i32,
birth_date: NaiveDate,
first_name: String,
last_name: String,
gender: String,
hire_date: NaiveDate,
}
impl FromRow for Employee {
fn from_row(row: my::Row) -> Employee {
match my::from_row_opt(row) {
Ok(v) => v,
Err(err) => panic!("Convert row error: {:?} to the Employee", err),
}
}
fn from_row_opt(row: my::Row) -> Result<Employee, FromRowError> {
my::from_row_opt(row).map(|(emp_no, birth_date, first_name, last_name, gender, hire_date)| {
Employee { emp_no, birth_date, first_name, last_name, gender, hire_date }
})
}
}
fn get_from_and_to_emp_no(req: &Request) -> errors::Result<(i64, i64)> {
let default_from = 10000;
let default_to = 11000;
if let Some(query_string) = req.uri().query() {
let value = queryst::parse(query_string).map_err(|e| errors::ErrorKind::Queryst(e))?;
let from = value.find("from").and_then(|v| v.as_i64()).unwrap_or(default_from);
let to = value.find("to").and_then(|v| v.as_i64()).unwrap_or(default_to);
return Ok((from, to));
} else {
return Ok((default_from, default_to));
}
}
fn employee_list_handler(ctx: Context) -> Box<Future<Item=Response, Error=errors::Error>> {
let future = get_from_and_to_emp_no(&ctx)
.into_future()
.from_err()
.and_then(move |(from_emp_no, to_emp_no)| {
let pool = get_pool(ctx.handle());
pool.get_conn()
.from_err()
.and_then(move |conn| {
conn.prep_exec(
r#"
SELECT
emp_no,
birth_date,
first_name,
last_name,
gender,
hire_date
FROM employees
WHERE emp_no >= :from AND emp_no <= :to;
"#,
params! {
"from" => from_emp_no,
"to" => to_emp_no,
},
)
.and_then(|result| {
result.collect_and_drop()
})
.from_err()
})
.and_then(|(_, employees): (_, Vec<Employee>)| {
let res = Response::build()
.header(ContentType::json())
.body(serde_json::to_string(&employees)?);
Ok(res)
})
});
Box::new(future)
}
fn main() {
Shio::default()
.route((Method::Get, "/", hello_world_handler))
.route((Method::Get, "/query_string", query_string_handler))
.route((Method::Post, "/json", post_json_handler))
.route((Method::Get, "/employees", employee_list_handler))
.run("0.0.0.0:8080")
.unwrap();
}
変わった部分を見ていきます。
chrono = { version = "0.4", features = ["serde"] }
mysql_async = "*"
tokio-core = "*"
chronoは時刻を扱うライブラリです。
featuresに["serde"]を設定してserdeによるシリアライズ・デシリアライズが出来るようにします。
mysql_asyncはMySQLに非同期IOで接続するライブラリです。
tokioをベースに構築されています。
tokio-coreは非同期IO処理を抽象化したライブラリです。
ここではtokio-coreで定義されているHandle型が必要なので依存crateに追加します。
extern crate chrono;
# [macro_use]
extern crate mysql_async;
extern crate tokio_core;
依存crateの追加に合わせてextern crateする行が増えました。
pub mod errors {
error_chain! {
links {
MySQL(::mysql_async::errors::Error, ::mysql_async::errors::ErrorKind);
}
foreign_links {
Json(::serde_json::Error);
Hyper(::hyper::Error);
FromRow(::mysql_async::FromRowError);
}
errors {
Queryst(e: ::queryst::ParseError) {
description("queryst parse error")
display("queryst parse error: {:?}", e)
}
}
}
}
mysql_asyncのエラー型を取り扱う為の記述が増えています。
mysql_async::errors::Errorはerror-chainによって定義されているので、linksという項目に連携の為の設定を記述しています。
linksはerror-chainによって定義された他のError型と連携させたいときに使用します。
use tokio_core::reactor::Handle;
use chrono::prelude::*;
use mysql_async::prelude::*;
use mysql_async::{self as my, Pool, OptsBuilder, FromRowError};
use std::sync::RwLock;
それぞれ後の処理で使用する型を名前空間にインポートしています。
thread_local!(static MYSQL_CONN_POOL: RwLock<Option<Pool>> = {
RwLock::new(None)
});
fn get_pool(handle: &Handle) -> Pool {
if let Some(pool) = MYSQL_CONN_POOL.with(|lock| lock.read().unwrap().clone()) {
return pool;
}
MYSQL_CONN_POOL.with(|lock| {
let mut pool = lock.write().unwrap();
let mut builder = OptsBuilder::from_opts("mysql://root:piyo@172.17.0.2/employees");
builder.pool_max(Some(20usize));
let new_pool = Pool::new(builder, handle);
*pool = Some(new_pool.clone());
return new_pool;
})
}
ここではスレッドローカルな変数としてmysql_asyncのコネクションプールであるPoolを生成しています。
thread_local!を使う事でスレッドローカルな変数を定義できます。
アクセスする時はwithメソッド等を使います。
(詳しくはこちら https://doc.rust-lang.org/std/thread/struct.LocalKey.html )
しかしPool型は初期化に&Handleを要求してくるのですが、shioがスレッド毎に保持しているtokio_core::reactor::CoreのHandleは、Contextを通してしか取得する事が出来ません。
その為、Poolの取得時にまだ初期化されていない場合は初期化するという処理を行う必要があります。
この時データレースを防ぐためにRwLockを用いて初期化処理時にだけ排他ロックを掛けられるようにしています。
Poolは接続先をnewに直接文字列を渡して設定する事もできるのですが、コネクション保持数の上限などをカスタマイズしたい場合はOptsBuilderを使用して設定します。
# [derive(Debug, Serialize, Deserialize)]
struct Employee {
emp_no: i32,
birth_date: NaiveDate,
first_name: String,
last_name: String,
gender: String,
hire_date: NaiveDate,
}
impl FromRow for Employee {
fn from_row(row: my::Row) -> Employee {
match my::from_row_opt(row) {
Ok(v) => v,
Err(err) => panic!("Convert row error: {:?} to the Employee", err),
}
}
fn from_row_opt(row: my::Row) -> Result<Employee, FromRowError> {
my::from_row_opt(row).map(|(emp_no, birth_date, first_name, last_name, gender, hire_date)| {
Employee { emp_no, birth_date, first_name, last_name, gender, hire_date }
})
}
}
今回サンプルデータとしてMySQLのEmployeesデータセットを使用します。
その中のemployeesテーブルの行に対応する構造体としてEmployee型を定義しています。
またクエリの結果からEmployee型への変換方法をFromRow型を実装する事で定義しています。
from_row関数やfrom_row_opt関数を使ってタプルに変換するようにすると簡単に取り扱える為、ここではそのようにしてからEmployeeを生成しています。
Employee型は後でJSONとしてクライアントに返すので、serdeの機能を使ってSerializeとDeserialize traitの自動実装を行っています。
この辺りの実装は面倒なのでもっと簡単な方法は無いのでしょうか?
dieselがtokioに対応するまで待つしかないのかなぁ
fn employee_list_handler(ctx: Context) -> Box<Future<Item=Response, Error=errors::Error>> {
let future = get_from_and_to_emp_no(&ctx)
.into_future()
.from_err()
.and_then(move |(from_emp_no, to_emp_no)| {
let pool = get_pool(ctx.handle());
pool.get_conn()
.from_err()
.and_then(move |conn| {
conn.prep_exec(
r#"
SELECT
emp_no,
birth_date,
first_name,
last_name,
gender,
hire_date
FROM employees
WHERE emp_no >= :from AND emp_no <= :to;
"#,
params! {
"from" => from_emp_no,
"to" => to_emp_no,
},
)
.and_then(|result| {
result.collect_and_drop()
})
.from_err()
})
.and_then(|(_, employees): (_, Vec<Employee>)| {
let res = Response::build()
.header(ContentType::json())
.body(serde_json::to_string(&employees)?);
Ok(res)
})
});
Box::new(future)
}
GETクエリで指定された範囲の従業員番号を持つ人のリストをJSONとして返すハンドラです。
GETクエリの処理はget_from_and_to_emp_no関数で行っています。
この関数の実装は前の方でGETクエリを処理したハンドラの実装とそれほど変わらないので説明は省略します。
get_from_and_to_emp_no関数はResultを返しますが、その後にFutureによる処理を続けたいのでinto_futureメソッドでFutureResultに変換します。
そしてfrom_errでエラー型を変換し、MySQLにクエリを投げる処理へと続きます。
少し前に定義したget_pool関数を使ってコネクションプールをスレッドローカルな変数から取得します。
このアクセスが初回である場合、コネクションプールの初期化がget_pool内で行われます。
後はmysql_asyncのドキュメントにあるコード例ほぼそのままのコードを書いて従業員のリストを取得し、その後それをJSONに変換してレスポンスとして返します。
こうやって構築された巨大なFutureは型が複雑で、そのまま戻り値の型を記述するととても長くなってしまって扱いにくいのでBox化してTraitオブジェクト化する事で扱いやすくします。
終わりに
以上でshioとmysql_asyncを用いた簡易APIサーバーが構築できました。
後はこれに認証処理を追加したり等いろいろ拡張していくとそれっぽい物が出来ていくと思います。
ただ、まだまだFuture周りはインデントが深くなりがちになってしまったり、変数の所有権の移動などでボローチェッカーから怒られやすかったりと、決して書きやすいとは言えない状況だと思います。
この辺りの問題についてはasync/awaitをマクロで実装したライブラリ( https://github.com/alexcrichton/futures-await )を使う事で軽減できるように見えますが、まだ使ったことがないので私からは何とも言えません。
tokioベースのライブラリはまだまだこれから発展していくところですので、今後に期待をして今回の記事を終わりたいと思います。