LoginSignup
11
3

More than 1 year has passed since last update.

Rustのコネクションプールはテスト間で共有できないという話

Posted at

この記事ではRustでDB接続テストを書くときにハマった話について書きます。

Running Example

例として以下のような関数のテストを行うことを考えましょう。

use sqlx::{Poxtgres, Row, Executor};
use chrono::{DateTime, Utc};

pub async fn select_now<'c, E>(executor: E) -> DateTime<Utc>
where
    E: Executor<'c, Database = Postgres>,
{
    let row = sqlx::query("SELECT NOW()")
        .fetch_one(executor)
        .await
        .unwrap();
    let now: DateTime<Utc> = row.try_get("now").unwrap();
    now
}

Naive Test

まずは一番素朴なテストを書いてみましょう。

use super::*;

use std::ops::{Add, Sub};
use sqlx::{Connection, PgConnection}
use chrono::Duration;

#[tokio::test]
async fn test_naive() {
    let db_url = std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified");
    let mut conn = PgConnection::connect(&db_url).await.unwrap();
    let now = select_now(&mut conn).await;
    let expected = Utc::now();
    let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
    assert!(range.contains(&now));
}

テストが成功することを確認します。

$ cargo test --lib --tests tests::test_naive

running 1 test
test tests::test_naive ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 4 filtered out; finished in 0.12s

コネクションプールでコネクションを再利用する

しかし、このテストではテストのたびに新しいコネクションを貼り直してしまいます。コネクションプールを共有してみましょう。
テスト間でデータを共有するためにasync_once_cellクレートを利用してコネクションプールを初期化します。

use super::*;

use std::ops::{Add, Sub};
use async_once_cell::OnceCell;
use sqlx::{Connection, PgConnection, PgPool}
use chrono::Duration;

async fn get_pool() -> &'static PgPool {
    static POOL: OnceCell<PgPool> = OnceCell::new();
    POOL.get_or_init(async {
        let db_url = std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified");
        PgPool::connect(&db_url).await.unwrap()
    })
    .await
}

#[tokio::test]
async fn test_pool() {
    let pool = get_pool().await;
    let now = select_now(pool).await;
    let expected = Utc::now();
    let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
    assert!(range.contains(&now));
}

テストを実行して成功することを確認しましょう。

$ cargo test --lib --tests tests::test_pool
running 1 test
test tests::test_pool ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 5 filtered out; finished in 0.13s

これで問題ないように見えますが、実はこの書き方でテストを増やしていくとIO driver has terminatedという謎のエラーで確率的にテストが失敗するようになってしまいます。

#[tokio::test]
async fn test_pool2() {
    let pool = get_pool().await;
    let now = select_now(pool).await;
    for _ in 1..10 { // 複雑なSQLのシミュレーションとしてfor文で何回も実行する。
        select_now(pool).await;
    }
    let expected = Utc::now();
    let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
    assert!(range.contains(&now));
}

#[tokio::test]
async fn test_pool3() {
    let pool = get_pool().await;
    let now = select_now(pool).await;
    for _ in 1..10 { // 複雑なSQLのシミュレーションとしてfor文で何回も実行する。
        select_now(pool).await;
    }
    let expected = Utc::now();
    let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
    assert!(range.contains(&now));
}
$ cargo test --lib --tests tests::test_pool
running 3 tests
test tests::test_pool ... ok
test tests::test_pool3 ... ok
test tests::test_pool2 ... FAILED

failures:

---- tests::test_pool2 stdout ----
thread 'tests::test_pool2' panicked at 'called `Result::unwrap()` on an `Err` value: Io(Custom { kind: Other, error: "IO driver has terminated" })', examples/async-share/src/lib.rs:12:10
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    tests::test_pool2

test result: FAILED. 2 passed; 1 failed; 0 ignored; 0 measured; 5 filtered out; finished in 0.37s

エラーの原因調査

さて、この謎のエラーのデバッグをしましょう。まずはログを出力してみます。

+use env_logger;

 async fn get_pool() -> &'static PgPool {
     static POOL: OnceCell<PgPool> = OnceCell::new();
     POOL.get_or_init(async {
+        std::env::set_var("RUST_LOG", "info");
+        env_logger::init();
         let db_url = std::env::var("DATABASE_URL").expect("No DATABASE_URL is specified");
         PgPool::connect(&db_url).await.unwrap()
     })
     .await
}

実行してみるとテストが成功している場合でも怪しげなログが出ていることに気づきました。

[2022-09-27T05:37:56Z WARN  sqlx_core::pool::connection] error occurred while testing the connection on-release: error communicating with database: IO driver has terminated
[2022-09-27T05:37:56Z INFO  sqlx::query] SELECT NOW(); rows affected: 0, rows returned: 1, elapsed: 428.833µs
[2022-09-27T05:37:56Z INFO  sqlx::query] SELECT NOW(); rows affected: 0, rows returned: 1, elapsed: 139.333µs
test tests::test_pool3 ... ok
[2022-09-27T05:37:56Z INFO  sqlx_core::pool::inner] ping on idle connection returned error: error communicating with database: IO driver has terminated

どうやらプールからコネクションを再取得する際にコネクションが壊れてしまっているようです。

このログの発生元ソース

    if options.test_before_acquire {
        // Check that the connection is still live
        if let Err(e) = conn.ping().await {
            // an error here means the other end has hung up or we lost connectivity
            // either way we're fine to just discard the connection
            // the error itself here isn't necessarily unexpected so WARN is too strong
            log::info!("ping on idle connection returned error: {}", e);
            // connection is broken so don't try to close nicely
            return Err(conn.close_hard().await);
        }
    }

コネクションが壊れたということはプロトコル違反で通信を切断されたという可能性が考えられます。
ただ、パケットキャプチャをwiresharkで分析したりもしましたが、不正な通信で途中で切断されている訳ではなさそうでした。
やはり問題はRust側にありそうです。

デバッガを使ってより深く調査したところ、エラーを発生させている箇所を見つけることができました。
以下のコードのself.handle.inner.is_shutdown()trueを返すことが問題なようです。

    fn poll_ready(
        &self,
        cx: &mut Context<'_>,
        direction: Direction,
    ) -> Poll<io::Result<ReadyEvent>> {
        // Keep track of task budget
        let coop = ready!(crate::coop::poll_proceed(cx));
        let ev = ready!(self.shared.poll_readiness(cx, direction));

        if self.handle.inner.is_shutdown() {
            return Poll::Ready(Err(gone()));
        }

        coop.made_progress();
        Poll::Ready(Ok(ev))
    }

is_shutdownはなぜtrueになってしまったのでしょうか。このフィールドをtrueにしている箇所にブレイクポイントを張ってみると、asyncのruntimeが破棄されるときにis_shutdowntrueになっていることが確認できました。

なぜコネクションを共有できないのか

ここまでくれば、なぜエラーが発生したのか説明できます。鍵はテスト実行時のasync runtimeのライフサイクルにあります。

async関数のテストは#[tokio::test]アトリビュートをつけますが、これはテストコードを以下のように変換します。

// 変換前
#[tokio::test]
async fn test_pool() {
  ...
}

// 変換後
#[test]
fn test_pool() {
  let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
  runtime.block_on(async {
    ...
  });
}

つまり、以下のような流れで問題が顕在化していました。

  1. 一つ目のテストケースの開始時にRuntimeが生成される。
  2. テスト実行中にコネクションプールがコネクションを新たに張る際には、そのRuntimeのIO Driverにリソースを紐付ける。
  3. テスト終了時にRuntimeが破棄され、IO Driverのis_shutdowntrueにセットされる
  4. 二つ目のテストケースの開始時に新たなRuntimeが生成される
  5. コネクションプールからコネクションを再利用するが、破棄済みのIO Driverが紐づいているため、コネクションが壊れていると判断する。

コネクションプールは取得時にコネクションの接続チェックを行い、それに失敗したらコネクションを張り直すため、コネクションプールを使ってもテストケースごとにコネクションを張り直してしまう、ということになります。

これではコネクションプールを利用する意味が(ほとんど)ありません。

回避策

この問題の本質は、テストケースごとにasync runtimeを生成・破棄していることにあります。したがってruntimeを共有することで問題を回避できます。

fn runtime() -> &'static Runtime {
    static RUNTIME: once_cell::sync::OnceCell<Runtime> = once_cell::sync::OnceCell::new();
    RUNTIME.get_or_init(|| tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap())
}

#[test]
fn test_pool_shared_runtime() {
    runtime().block_on(async {
        let pool = get_pool().await;
        let now = select_now(pool).await;
        let expected = Utc::now();
        let range = expected.sub(Duration::seconds(1))..expected.add(Duration::seconds(1));
        assert!(range.contains(&now));
    });
}

まとめ

  • 非同期のIOリソースはasync runtimeが管理している。
  • #[tokio::test]を用いたテストケースではテストケースごとにasync runtimeが生成・破棄される
  • コネクションプールを共有したい場合は、async runtimeも共有する必要がある。
11
3
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
3