この記事ではRustでDB接続テストを書くときにハマった話について書きます。
Running Example
例として以下のような関数のテストを行うことを考えましょう。
use sqlx::{Poxtgres, Row, Executor};
use chrono::{DateTime, Utc};
pub async fn select_now<'c, E>(executor: E) -> DateTime<Utc>
where
E: Executor<'c, Database = Postgres>,
{
let row = sqlx::query("SELECT NOW()")
.fetch_one(executor)
.await
.unwrap();
let now: DateTime<Utc> = row.try_get("now").unwrap();
now
}
Naive Test
まずは一番素朴なテストを書いてみましょう。
use super::*;
use std::ops::{Add, Sub};
use sqlx::{Connection, PgConnection}
use chrono::Duration;
#[tokio::test]
async fn test_naive() {
let db_url = std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified");
let mut conn = PgConnection::connect(&db_url).await.unwrap();
let now = select_now(&mut conn).await;
let expected = Utc::now();
let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
assert!(range.contains(&now));
}
テストが成功することを確認します。
$ cargo test --lib --tests tests::test_naive
running 1 test
test tests::test_naive ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 4 filtered out; finished in 0.12s
コネクションプールでコネクションを再利用する
しかし、このテストではテストのたびに新しいコネクションを貼り直してしまいます。コネクションプールを共有してみましょう。
テスト間でデータを共有するためにasync_once_cellクレートを利用してコネクションプールを初期化します。
use super::*;
use std::ops::{Add, Sub};
use async_once_cell::OnceCell;
use sqlx::{Connection, PgConnection, PgPool}
use chrono::Duration;
async fn get_pool() -> &'static PgPool {
static POOL: OnceCell<PgPool> = OnceCell::new();
POOL.get_or_init(async {
let db_url = std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified");
PgPool::connect(&db_url).await.unwrap()
})
.await
}
#[tokio::test]
async fn test_pool() {
let pool = get_pool().await;
let now = select_now(pool).await;
let expected = Utc::now();
let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
assert!(range.contains(&now));
}
テストを実行して成功することを確認しましょう。
$ cargo test --lib --tests tests::test_pool
running 1 test
test tests::test_pool ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 5 filtered out; finished in 0.13s
これで問題ないように見えますが、実はこの書き方でテストを増やしていくとIO driver has terminated
という謎のエラーで確率的にテストが失敗するようになってしまいます。
#[tokio::test]
async fn test_pool2() {
let pool = get_pool().await;
let now = select_now(pool).await;
for _ in 1..10 { // 複雑なSQLのシミュレーションとしてfor文で何回も実行する。
select_now(pool).await;
}
let expected = Utc::now();
let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
assert!(range.contains(&now));
}
#[tokio::test]
async fn test_pool3() {
let pool = get_pool().await;
let now = select_now(pool).await;
for _ in 1..10 { // 複雑なSQLのシミュレーションとしてfor文で何回も実行する。
select_now(pool).await;
}
let expected = Utc::now();
let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
assert!(range.contains(&now));
}
$ cargo test --lib --tests tests::test_pool
running 3 tests
test tests::test_pool ... ok
test tests::test_pool3 ... ok
test tests::test_pool2 ... FAILED
failures:
---- tests::test_pool2 stdout ----
thread 'tests::test_pool2' panicked at 'called `Result::unwrap()` on an `Err` value: Io(Custom { kind: Other, error: "IO driver has terminated" })', examples/async-share/src/lib.rs:12:10
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
failures:
tests::test_pool2
test result: FAILED. 2 passed; 1 failed; 0 ignored; 0 measured; 5 filtered out; finished in 0.37s
エラーの原因調査
さて、この謎のエラーのデバッグをしましょう。まずはログを出力してみます。
+use env_logger;
async fn get_pool() -> &'static PgPool {
static POOL: OnceCell<PgPool> = OnceCell::new();
POOL.get_or_init(async {
+ std::env::set_var("RUST_LOG", "info");
+ env_logger::init();
let db_url = std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified");
PgPool::connect(&db_url).await.unwrap()
})
.await
}
実行してみるとテストが成功している場合でも怪しげなログが出ていることに気づきました。
[2022-09-27T05:37:56Z WARN sqlx_core::pool::connection] error occurred while testing the connection on-release: error communicating with database: IO driver has terminated
[2022-09-27T05:37:56Z INFO sqlx::query] SELECT NOW(); rows affected: 0, rows returned: 1, elapsed: 428.833µs
[2022-09-27T05:37:56Z INFO sqlx::query] SELECT NOW(); rows affected: 0, rows returned: 1, elapsed: 139.333µs
test tests::test_pool3 ... ok
[2022-09-27T05:37:56Z INFO sqlx_core::pool::inner] ping on idle connection returned error: error communicating with database: IO driver has terminated
どうやらプールからコネクションを再取得する際にコネクションが壊れてしまっているようです。
if options.test_before_acquire {
// Check that the connection is still live
if let Err(e) = conn.ping().await {
// an error here means the other end has hung up or we lost connectivity
// either way we're fine to just discard the connection
// the error itself here isn't necessarily unexpected so WARN is too strong
log::info!("ping on idle connection returned error: {}", e);
// connection is broken so don't try to close nicely
return Err(conn.close_hard().await);
}
}
コネクションが壊れたということはプロトコル違反で通信を切断されたという可能性が考えられます。
ただ、パケットキャプチャをwiresharkで分析したりもしましたが、不正な通信で途中で切断されている訳ではなさそうでした。
やはり問題はRust側にありそうです。
デバッガを使ってより深く調査したところ、エラーを発生させている箇所を見つけることができました。
以下のコードのself.handle.inner.is_shutdown()
がtrue
を返すことが問題なようです。
fn poll_ready(
&self,
cx: &mut Context<'_>,
direction: Direction,
) -> Poll<io::Result<ReadyEvent>> {
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let ev = ready!(self.shared.poll_readiness(cx, direction));
if self.handle.inner.is_shutdown() {
return Poll::Ready(Err(gone()));
}
coop.made_progress();
Poll::Ready(Ok(ev))
}
is_shutdown
はなぜtrue
になってしまったのでしょうか。このフィールドをtrue
にしている箇所にブレイクポイントを張ってみると、asyncのruntimeが破棄されるときにis_shutdown
がtrue
になっていることが確認できました。
なぜコネクションを共有できないのか
ここまでくれば、なぜエラーが発生したのか説明できます。鍵はテスト実行時のasync runtimeのライフサイクルにあります。
async
関数のテストは#[tokio::test]
アトリビュートをつけますが、これはテストコードを以下のように変換します。
// 変換前
#[tokio::test]
async fn test_pool() {
...
}
// 変換後
#[test]
fn test_pool() {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
runtime.block_on(async {
...
});
}
つまり、以下のような流れで問題が顕在化していました。
- 一つ目のテストケースの開始時に
Runtime
が生成される。 - テスト実行中にコネクションプールがコネクションを新たに張る際には、その
Runtime
のIO Driverにリソースを紐付ける。 - テスト終了時に
Runtime
が破棄され、IO Driverのis_shutdown
がtrue
にセットされる - 二つ目のテストケースの開始時に新たな
Runtime
が生成される - コネクションプールからコネクションを再利用するが、破棄済みのIO Driverが紐づいているため、コネクションが壊れていると判断する。
コネクションプールは取得時にコネクションの接続チェックを行い、それに失敗したらコネクションを張り直すため、コネクションプールを使ってもテストケースごとにコネクションを張り直してしまう、ということになります。
これではコネクションプールを利用する意味が(ほとんど)ありません。
回避策
この問題の本質は、テストケースごとにasync runtimeを生成・破棄していることにあります。したがってruntimeを共有することで問題を回避できます。
fn runtime() -> &'static Runtime {
static RUNTIME: once_cell::sync::OnceCell<Runtime> = once_cell::sync::OnceCell::new();
RUNTIME.get_or_init(|| tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap())
}
#[test]
fn test_pool_shared_runtime() {
runtime().block_on(async {
let pool = get_pool().await;
let now = select_now(pool).await;
let expected = Utc::now();
let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
assert!(range.contains(&now));
});
}
まとめ
- 非同期のIOリソースはasync runtimeが管理している。
-
#[tokio::test]
を用いたテストケースではテストケースごとにasync runtimeが生成・破棄される - コネクションプールを共有したい場合は、async runtimeも共有する必要がある。