20
9

More than 1 year has passed since last update.

sqlxでPoolとTransactionのどちらも受け取れるようにする

Last updated at Posted at 2021-12-08

この記事はRust Advent Calendar 2021 カレンダー2の9日目の記事です。

追記(2022/05/01)

0.5.11以降では当記事で紹介した方法が公式ドキュメントに例が記載されていますのでご参考ください。

はじめに

近頃はsqlxを使用して開発しています。

sqlxは生クエリを書いてDBに問い合わせるため、クエリビルダーを使用するよりも簡単に扱うことができます。
query!などのマクロを使用することで文字列リテラルで書かれたSQL文を静的解析するので型セーフにクエリを書くこともできます。
しかしsqlxではクエリの再利用が難しいです。

例えばユーザーを挿入するクエリを単体で使用したい場合とトランザクション内で実行したい場合があります。
以下のように書いてしまうと単体では使用できますがトランザクション内では使用することができません。

async fn insert_user(name: &str, pool: &sqlx::MySqlPool) -> anyhow::Result<()> {
    sqlx::query!("INSERT INTO users (name) VALUES (?)", name).execute(pool).await?;
    Ok(())
}
// これはOK
let pool = sqlx::MySqlPool::connect("uri").await.unwrap();
insert_user("Alice", &pool).await.unwrap();
// これはNG
let mut tx = pool.begin().await.unwrap();
insert_user("Bob", &mut tx).await.unwrap();
tx.commit().await.unwrap();

これを実現したいと思います。

sqlxは0.5系を使用します。
DBはMySQLを使用しますがPostgresでも動作するはずです。

Cargo.toml
sqlx = { version = "0.5", features = [ "mysql", "all-types", "runtime-actix-native-tls" ] }

実装

ExecutorトレイトはPoolTransactionのどちらも継承しているのでこちらを使用します。

pub trait Executor<'c>: Send + Debug + Sized {
    type Database: Database;
    ...
}

async fn insert_user(name: &str, executor: impl sqlx::MySqlExecutor<'_>) -> anyhow::Result<()> {
    sqlx::query!("INSERT INTO users (name) VALUES (?)", name).execute(executor).await?;
    Ok(())
}

問題なさそうです。

クエリを複数回発行したい場合はどうでしょう。

async fn insert_user(name: &str, executor: impl sqlx::MySqlExecutor<'_>) -> anyhow::Result<()> {
    sqlx::query!("INSERT INTO users (name) VALUES (?)", name).execute(executor).await?;
    sqlx::query!("INSERT INTO users (name) VALUES (?)", name).execute(executor).await?;
    Ok(())
}
128 | async fn insert_user(name: &str, executor: impl MySqlExecutor<'_>) -> anyhow::Result<()> {
    |                                  -------- move occurs because `executor` has type `impl MySqlExecutor<'_>`, which does not implement the `Copy` trait
129 |     sqlx::query!("INSERT INTO users (name) VALUES (?)", name).execute(executor).await?;
    |                                                                -------- value moved here
130 |     sqlx::query!("INSERT INTO users (name) VALUES (?)", name).execute(executor).await?;
    |                                                                ^^^^^^^^ value used here after move

怒られました。
ExecutorSend + Debug + Sizedしかトレイト境界が指定されていないのでこれだけでは難しそうです。
関数にジェネリクスとトレイト境界を指定していくのも記述量が増えて避けたいところです。

調べてみるとこんなやり方があったので実装してみます

use sqlx::{Acquire, MySql};

async fn insert_user(name: &str, executor: impl Acquire<'_, Database = MySql>) -> anyhow::Result<()> {
    let mut conn = executor.acquire().await?;
    sqlx::query!("INSERT INTO users (name) VALUES (?)", name).execute(&mut *conn).await?;
    Ok(())
}

insert_user("Alice", &pool).await.unwrap();
insert_user("Bob", &mut tx).await.unwrap();

意外とあっさりできてしまいました。

抽象化してみる

目的は達成しましたがもう少し実践的に書いてみます。

use async_trait::async_trait;
use sqlx::{Acquire, MySql};

/// `Acquire<'_, Database = MySql>`のエイリアス
pub trait MySqlAcquire<'c>: Acquire<'c, Database = MySql> + Send {}
impl<'c, T> MySqlAcquire<'c> for T
where
    T: Acquire<'c, Database = MySql> + Send,
{}

#[derive(Debug)]
struct User {
    id: i32,
    name: String,
}

#[async_trait]
trait UserRepo {
    async fn insert_user(&self, name: &str, executor: impl MySqlAcquire<'_> + 'async_trait) -> anyhow::Result<()>;
    async fn select_user(&self, name: &str, executor: impl MySqlAcquire<'_> + 'async_trait) -> anyhow::Result<Option<User>>;
}

struct UserRepoImpl;
#[async_trait]
impl UserRepo for UserRepoImpl {
    async fn insert_user(&self, name: &str, executor: impl MySqlAcquire<'_> + 'async_trait) -> anyhow::Result<()> {
        let mut conn = executor.acquire().await?;
        sqlx::query!("INSERT INTO users (name) VALUES (?)", name).execute(&mut *conn).await?;
        Ok(())
    }

    async fn select_user(&self, name: &str, executor: impl MySqlAcquire<'_> + 'async_trait) -> anyhow::Result<Option<User>> {
        let mut conn = executor.acquire().await?;
        let user = sqlx::query_as!(User, "SELECT id, name FROM users WHERE name = ?", name).fetch_optional(&mut *conn).await?;
        Ok(user)
    }
}

#[async_trait]
trait UserService {
    async fn save_user(&self, name: &str) -> anyhow::Result<()>;
    async fn find_user(&self, name: &str) -> anyhow::Result<User>;
}

#[async_trait]
impl UserService for MySqlPool {
    async fn save_user(&self, name: &str) -> anyhow::Result<()> {
        let repo = UserRepoImpl;
        let mut tx = self.begin().await?;
        // トランザクションでUserRepoの関数を実行できる
        let user = repo.select_user(name, &mut tx).await?;
        if user.is_some() {
            tx.rollback().await?;
            anyhow::bail!("既に使用されている名前です。")
        }
        repo.insert_user(name, &mut tx).await?;
        tx.commit().await?;
        Ok(())
    }

    async fn find_user(&self, name: &str) -> anyhow::Result<User> {
        let repo = UserRepoImpl;
        // トランザクションを開始せずに実行できる
        let user = repo.select_user(name, self).await?;
        user.ok_or_else(|| anyhow::anyhow!("ユーザ「{}」は存在しません。", name))
    }
}

fn handler(service: impl UserService) {
    assert!(service.find_user("Alice").await.is_err());
    assert!(service.save_user("Alice").await.is_ok());
    assert_eq!(service.find_user("Alice").await.unwrap().name, "Alice");
    assert!(service.save_user("Alice").await.is_err());
}

handler(MockMySqlPool);
handler(MySqlPool);

SQLの再利用性が高まってちょっといい感じです。

おわりに

時期メジャーバージョンの0.6系では当初の書き方ができるみたいです。

async fn some_function(mut executor: impl PgExecutor) {
  // [...]
     .fetch_one(&mut executor)
}

またquery_file!というマクロを使用してSQL文は別ファイルに切り出すというのもひとつの手段です。

sqlxのユーザ増えるといいな。

参考

20
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
20
9