Posted at

Vert.x と RxJava を使うときの JDBC 接続メモ

More than 1 year has passed since last update.

バックエンドのマイクロサービスを叩きまくるような API を作る場合、サーブレットベースのサーバーよりも Vert.x のようなミドルウェアを使ったほうが気楽に作れて良いですよね。勤務先でもちょこちょこ Vert.x が活躍しています。

Vert.x は非同期前提なので、使うときはだいたい RxJava を使うことになると思うのですが、うっかりコネクションがリークしていたことがあったので、最近の JDBC 接続設定をメモしておきます。Observable.using が肝です。

public interface JDBCMixin {

@Nonnull
default Observable<List<JsonObject>> query(@Nonnull String sql) {
return query(sql, null);
}

@Nonnull
default Observable<List<JsonObject>> query(@Nonnull String sql, @Nullable Object... args) {
return query(sql, identity(), args);
}

@Nonnull
default Observable<Optional<JsonObject>> queryOne(@Nonnull String sql, @Nonnull Object... args) {
return query(sql, rows -> rows.isEmpty() ? Optional.empty() : Optional.of(rows.iterator().next()), args);
}

@Nonnull
default <T> Observable<T> query(@Nonnull String sql, @Nonnull Func1<List<JsonObject>, T> mapper, @Nullable Object... args) {
return connect(
conn -> Optional
.ofNullable(args)
.map(params -> conn.queryWithParamsObservable(sql, new JsonArray(Arrays.asList(params))))
.orElse(conn.queryObservable(sql))
.map(ResultSet::getRows)
.map(mapper));
}

@Nonnull
default <T> Observable<T> connect(Func1<SQLConnection, Observable<T>> fn) {
return client().getConnectionObservable().flatMap(connection -> Observable.using(() -> connection, fn, SQLConnection::close));
}

@Nonnull
default <T> Func1<T, T> identity() {
return v -> v;
}

@Nonnull
JDBCClient client();

}

こんな感じで使います。

public class CountryRepositoryImpl implements CountryRepository, JDBCMixin {

@Override
@Nonnull
public Observable<Optional<Country>> get(@Nonnull CountryId countryId) {
return queryOne("select * from countries where id = ?", countryId.getValue()).map(row -> row.map(this::toCountry));
}

}

ちなみにコネクションリークの検知には Hikari CP の leakDetectionThreshold が便利でした(Vert.x でも io.vertx.ext.jdbc.spi.impl.HikariCPDataSourceProvider を使えば Hikari CP を使うことが出来ます)。