Vert.x と RxJava を使うときの JDBC 接続メモ

  • 4
    Like
  • 0
    Comment
More than 1 year has passed since last update.

バックエンドのマイクロサービスを叩きまくるような API を作る場合、サーブレットベースのサーバーよりも Vert.x のようなミドルウェアを使ったほうが気楽に作れて良いですよね。勤務先でもちょこちょこ Vert.x が活躍しています。

Vert.x は非同期前提なので、使うときはだいたい RxJava を使うことになると思うのですが、うっかりコネクションがリークしていたことがあったので、最近の JDBC 接続設定をメモしておきます。Observable.using が肝です。

public interface JDBCMixin {

    @Nonnull
    default Observable<List<JsonObject>> query(@Nonnull String sql) {
        return query(sql, null);
    }

    @Nonnull
    default Observable<List<JsonObject>> query(@Nonnull String sql, @Nullable Object... args) {
        return query(sql, identity(), args);
    }

    @Nonnull
    default Observable<Optional<JsonObject>> queryOne(@Nonnull String sql, @Nonnull Object... args) {
        return query(sql, rows -> rows.isEmpty() ? Optional.empty() : Optional.of(rows.iterator().next()), args);
    }

    @Nonnull
    default <T> Observable<T> query(@Nonnull String sql, @Nonnull Func1<List<JsonObject>, T> mapper, @Nullable Object... args) {
        return connect(
            conn -> Optional
                .ofNullable(args)
                .map(params -> conn.queryWithParamsObservable(sql, new JsonArray(Arrays.asList(params))))
                .orElse(conn.queryObservable(sql))
                .map(ResultSet::getRows)
                .map(mapper));
    }

    @Nonnull
    default <T> Observable<T> connect(Func1<SQLConnection, Observable<T>> fn) {
        return client().getConnectionObservable().flatMap(connection -> Observable.using(() -> connection, fn, SQLConnection::close));
    }

    @Nonnull
    default <T> Func1<T, T> identity() {
        return v -> v;
    }

    @Nonnull
    JDBCClient client();

}

こんな感じで使います。

public class CountryRepositoryImpl implements CountryRepository, JDBCMixin {

    @Override
    @Nonnull
    public Observable<Optional<Country>> get(@Nonnull CountryId countryId) {
        return queryOne("select * from countries where id = ?", countryId.getValue()).map(row -> row.map(this::toCountry));
    }

}

ちなみにコネクションリークの検知には Hikari CP の leakDetectionThreshold が便利でした(Vert.x でも io.vertx.ext.jdbc.spi.impl.HikariCPDataSourceProvider を使えば Hikari CP を使うことが出来ます)。