バックエンドのマイクロサービスを叩きまくるような API を作る場合、サーブレットベースのサーバーよりも Vert.x のようなミドルウェアを使ったほうが気楽に作れて良いですよね。勤務先でもちょこちょこ Vert.x が活躍しています。
Vert.x は非同期前提なので、使うときはだいたい RxJava を使うことになると思うのですが、うっかりコネクションがリークしていたことがあったので、最近の JDBC 接続設定をメモしておきます。Observable.using
が肝です。
public interface JDBCMixin {
@Nonnull
default Observable<List<JsonObject>> query(@Nonnull String sql) {
return query(sql, null);
}
@Nonnull
default Observable<List<JsonObject>> query(@Nonnull String sql, @Nullable Object... args) {
return query(sql, identity(), args);
}
@Nonnull
default Observable<Optional<JsonObject>> queryOne(@Nonnull String sql, @Nonnull Object... args) {
return query(sql, rows -> rows.isEmpty() ? Optional.empty() : Optional.of(rows.iterator().next()), args);
}
@Nonnull
default <T> Observable<T> query(@Nonnull String sql, @Nonnull Func1<List<JsonObject>, T> mapper, @Nullable Object... args) {
return connect(
conn -> Optional
.ofNullable(args)
.map(params -> conn.queryWithParamsObservable(sql, new JsonArray(Arrays.asList(params))))
.orElse(conn.queryObservable(sql))
.map(ResultSet::getRows)
.map(mapper));
}
@Nonnull
default <T> Observable<T> connect(Func1<SQLConnection, Observable<T>> fn) {
return client().getConnectionObservable().flatMap(connection -> Observable.using(() -> connection, fn, SQLConnection::close));
}
@Nonnull
default <T> Func1<T, T> identity() {
return v -> v;
}
@Nonnull
JDBCClient client();
}
こんな感じで使います。
public class CountryRepositoryImpl implements CountryRepository, JDBCMixin {
@Override
@Nonnull
public Observable<Optional<Country>> get(@Nonnull CountryId countryId) {
return queryOne("select * from countries where id = ?", countryId.getValue()).map(row -> row.map(this::toCountry));
}
}
ちなみにコネクションリークの検知には Hikari CP の leakDetectionThreshold
が便利でした(Vert.x でも io.vertx.ext.jdbc.spi.impl.HikariCPDataSourceProvider
を使えば Hikari CP を使うことが出来ます)。