環境
Rails 5.1.4
PostgreSQL 10.5
TL;DR
DBに問い合わせる時はConnectionPool
のwith_connection
を(忘れずに)きちんと使おう。
PG::UnableToSend: no connection to the server
以下はこのエラーを追っていった歴史です。クエリの実行の仕組みを概観しており、誰かの役に立てば嬉しいですね。結論から言うと、with_connection
を使えばなおりました。
ある日、DBに対して、クエリを投げ続けて動くデーモンプロセスでPG::UnableToSend: no connection to the server
というエラーが発生しました。このデーモンプロセスはメインスレッドではDBから仕事を取ってきて、RedisのQueueに詰めて、そのQueueから仕事を引っこ抜き、ワーカースレッドがタスクを処理する、いわゆるConsumer-Producerモデルで実装されています。そして、メインスレッドで上記のエラーが出ました。ただこのエラーはデーモンプロセスが起動して、初めの方は出ません。正常にDBに繋げます。しかし、時間が経過すると(どの程度経過すればいいかはまちまち)、出てくることがある、という非常に困るタイプのバグです。
コネクションプールの情報を見てみる
コネクションプールがまずどういう状況になっているのかを見てみました。ActiveRecordのConnectionPool
のstat
メソッドでコネクションプールの情報を出力してみました。このstat
メソッドは以下のような定義です。
def stat
synchronize do
{
size: size,
connections: @connections.size,
busy: @connections.count { |c| c.in_use? && c.owner.alive? },
dead: @connections.count { |c| c.in_use? && !c.owner.alive? },
idle: @connections.count { |c| !c.in_use? },
waiting: num_waiting_in_queue,
checkout_timeout: checkout_timeout
}
end
end
上記の実装を見れば分かるように、コネクションプールがどういう状態にあるのかがだいたい分かります。これを出力するようにして、上記のエラーが出るのを待ちました。上記のエラーが出て、stat
の結果を見ると以下のようになりました。
{:size=>16, :connections=>1, :busy=>1, :dead=>0, :idle=>0, :waiting=>0, :checkout_timeout=>5}
コネクションプール的にはコネクションはある扱いですね。つまり、in_use?
とowner.alive?
は共にtrueです。実はこの二つの情報は直接的にコネクションが生きているかどうかを判定できていません。つまり、in_use?
の定義は以下のようになっています。
alias :in_use? :owner
ただのエイリアスです。実際にはowner
が存在するかどうかしか見ていません。そして、c.owner.alive?
もowner
が生きているかどうかしか判定できません。そのコネクション自体の情報は反映されていないのです。そこで、stat
を以下のように改造してみました。
def stat
synchronize do
{
size: size,
connections: @connections.size,
busy: @connections.count { |c| c.in_use? && c.owner.alive? },
dead: @connections.count { |c| c.in_use? && !c.owner.alive? },
first_connection_is_active?: @connections.first.blank? ? 'no connection' : @connections.first.send(:active?),
idle: @connections.count { |c| !c.in_use? },
waiting: num_waiting_in_queue,
checkout_timeout: checkout_timeout
}
end
end
コネクション自体が使えるのかどうかの情報も合わせて出力するようにしました。active?
の定義は以下の通りです。
# Is this connection alive and ready for queries?
def active?
@lock.synchronize do
@connection.query "SELECT 1"
end
true
rescue PG::Error
false
end
実際にDBにSELECT 1
というクエリを実行しています。コネクション自体の情報も分かります。この状態でエラーが出るのを待って出力を待ちました。出力は以下のようになりました。
{:size=>16, :connections=>1, :busy=>1, :dead=>0, :first_connection_is_active?=>false, :idle=>0, :waiting=>0, :checkout_timeout=>5}
コネクションが生きていないことが分かりました。なるほど、そうだったのか!
クエリの実行の実装を追う
実はこのデーモンプロセスのメインスレッドが実行するクエリは非常に重いものになっていました。なので、タイムアウトでもしたんじゃね?と思いながら、実装を追っていきました。ActiveRecordはpg
というgemを使って、DBに問い合わせています。adapterが持っている@connection
はPG::Connection
のインスタンスです。つまり、adapterはPG::Connection
のラッパーです。ActiveRecordで出てくるactive?
の中では@connection
に対して、query
というメソッドを実行しています。この実装を少し見てみます。結論から言うと、このpg
と言うgemはメソッドや関数をC言語のマクロやrubyのalias_method()
を使って、動的に生成している部分が多く、追っていくのにとても苦労しました。簡潔に流れを書きます。
PG::Connection
はrubyのクラスで以下のような定数が宣言されています(以下ではquery
のみを見ていきます。本当は他のメソッドもあります)。
REDIRECT_METHODS = {
:query => [:async_exec, :sync_exec],
}
そして、動的にメソッドを定義する以下のメソッドが定義されています。
def self.async_api=(enable)
REDIRECT_METHODS.each do |ali, (async, sync)|
remove_method(ali) if method_defined?(ali)
alias_method( ali, enable ? async : sync )
end
end
これをself.async_api = true
で呼び出して、メソッドを動的に生成しています。手元のirbで実験してみると、クラスが定義された時点で各メソッドは定義されているようです。つまり、query
メソッドを呼び出すと、async_exec
メソッドが呼ばれるようです。async_exec
メソッドはpg_connection.c
で以下のように定義されます。
rb_define_method(rb_cPGconn, "async_exec", pgconn_async_exec, -1);
rb_define_method
はrubyの組み込みメソッドで、これによりasync_exec
の実装はpgconn_async_exec
となります。pgconn_async_exec
の定義は以下の通りです。
static VALUE
pgconn_async_exec(int argc, VALUE *argv, VALUE self)
{
VALUE rb_pgresult = Qnil;
pgconn_discard_results( self );
/* クエリの実行 */
pgconn_send_query( argc, argv, self );
/* 結果のチェック */
pgconn_block( 0, NULL, self ); /* wait for input (without blocking) before reading the last result */
rb_pgresult = pgconn_get_last_result( self );
if ( rb_block_given_p() ) {
return rb_ensure( rb_yield, rb_pgresult, pg_result_clear, rb_pgresult );
}
return rb_pgresult;
}
###クエリの非同期な実行
pgconn_send_query()
でクエリを実行し、pgconn_block()
でビジーループで結果が返却されるのを待ちます。pgconn_send_query()
の実装は以下。
static VALUE
pgconn_send_query(int argc, VALUE *argv, VALUE self)
{
PGconn *conn = pg_get_pgconn(self);
VALUE error;
/* If called with no or nil parameters, use PQexec for compatibility */
/* rb_define_method()の第四引数がargcのはずなので、このif文の中は通らないはず */
if ( argc == 1 || (argc >= 2 && argc <= 4 && NIL_P(argv[1]) )) {
if(gvl_PQsendQuery(conn, pg_cstr_enc(argv[0], ENCODING_GET(self))) == 0) {
error = rb_exc_new2(rb_eUnableToSend, PQerrorMessage(conn));
rb_iv_set(error, "@connection", self);
rb_exc_raise(error);
}
return Qnil;
}
pg_deprecated(2, ("forwarding async_exec to async_exec_params and send_query to send_query_params is deprecated"));
/* If called with parameters, and optionally result_format,
* use PQsendQueryParams
*/
/* 実質pgconn_send_query()はこのメソッドのラッパー関数 */
return pgconn_send_query_params( argc, argv, self);
}
pgconn_send_query_params()
の実装は以下。
static VALUE
pgconn_send_query_params(int argc, VALUE *argv, VALUE self)
{
(略)
result = gvl_PQsendQueryParams(conn, pg_cstr_enc(command, paramsData.enc_idx), nParams, paramsData.types,
(const char * const *)paramsData.values, paramsData.lengths, paramsData.formats, resultFormat);
free_query_params( ¶msData );
if(result == 0) {
error = rb_exc_new2(rb_eUnableToSend, PQerrorMessage(conn));
rb_iv_set(error, "@connection", self);
rb_exc_raise(error);
}
return Qnil;
}
gvl_PQsendQueryParams()
はpg
のgvl_wrappers.hとgvl_wrappers.cを読むと、どのような実装になっているか分かります。マクロを使って、凄まじく動的に関数を生成しているため、grepができず、実装は非常に追いにくいです。結論から言うと、gvl_PQsendQueryParams()
はPQsendQueryParams()
のラッパーです。PQsendQueryParams()
は非同期コマンドの処理を見れば、載っていますが、非同期にクエリを実行できます。そして、結果はPQgetResult()
を呼んで取得します。gvl_PQsendQueryParams()
はだいたい以下のようなコードを実行することになります。
rb_thread_call_without_gvl(gvl_PQsendQueryParams_skeleton, ¶ms, RUBY_UBF_IO, 0);
で、gvl_PQsendQueryParams_skeleton
はPQsendQueryParams()
を呼びます。この記事によると、gvlを取得せずに、gvl_PQsendQueryParams_skeleton
を実行することになりそうです。クエリの実行はだいたいこんな感じです。
###クエリの結果の取得
続いて、クエリの結果の取得を見ていきます。pgconn_block()
です。
static VALUE
pgconn_block( int argc, VALUE *argv, VALUE self ) {
PGconn *conn = pg_get_pgconn( self );
struct timeval timeout;
struct timeval *ptimeout = NULL;
VALUE timeout_in;
double timeout_sec;
void *ret;
if ( rb_scan_args(argc, argv, "01", &timeout_in) == 1 ) {
timeout_sec = NUM2DBL( timeout_in );
timeout.tv_sec = (time_t)timeout_sec;
timeout.tv_usec = (suseconds_t)((timeout_sec - (long)timeout_sec) * 1e6);
ptimeout = &timeout;
}
ret = wait_socket_readable( conn, ptimeout, get_result_readable);
if( !ret )
return Qfalse;
return Qtrue;
}
wait_socket_readable()
という超怪しいやつがいますね。関数が長いので、重要な部分だけ抜粋します。
while ( !(retval=is_readable(conn)) ) {
if ( WSAEventSelect(sd, hEvent, FD_READ|FD_CLOSE) == SOCKET_ERROR ) {
WSACloseEvent( hEvent );
rb_raise( rb_eConnectionBad, "WSAEventSelect socket error: %d", WSAGetLastError() );
}
if ( ptimeout ) {
gettimeofday(&currtime, NULL);
timersub(&aborttime, &currtime, &waittime);
timeout_milisec = (DWORD)( waittime.tv_sec * 1e3 + waittime.tv_usec / 1e3 );
}
/* Is the given timeout valid? */
if( !ptimeout || (waittime.tv_sec >= 0 && waittime.tv_usec >= 0) ){
/* Wait for the socket to become readable before checking again */
wait_ret = rb_w32_wait_events( &hEvent, 1, timeout_milisec );
} else {
wait_ret = WAIT_TIMEOUT;
}
if ( wait_ret == WAIT_TIMEOUT ) {
WSACloseEvent( hEvent );
return NULL;
} else if ( wait_ret == WAIT_OBJECT_0 ) {
/* The event we were waiting for. */
} else if ( wait_ret == WAIT_OBJECT_0 + 1) {
/* This indicates interruption from timer thread, GC, exception
* from other threads etc... */
rb_thread_check_ints();
} else if ( wait_ret == WAIT_FAILED ) {
WSACloseEvent( hEvent );
rb_raise( rb_eConnectionBad, "Wait on socket error (WaitForMultipleObjects): %lu", GetLastError() );
} else {
WSACloseEvent( hEvent );
rb_raise( rb_eConnectionBad, "Wait on socket abandoned (WaitForMultipleObjects)" );
}
/* Check for connection errors (PQisBusy is true on connection errors) */
if ( PQconsumeInput(conn) == 0 ) {
WSACloseEvent( hEvent );
rb_raise( rb_eConnectionBad, "PQconsumeInput() %s", PQerrorMessage(conn) );
}
}
まあ、ビジーループですね。最初はタイムアウトが怪しいと思っていましたが、上記のコードを見ればしっかりハンドリングされています。予想は外れたましたが、実はUnableToSend
っぽいやつが既に出てきています。pgconn_send_query_params()
の実装でエラーを出す時があります。
if(result == 0) {
error = rb_exc_new2(rb_eUnableToSend, PQerrorMessage(conn));
rb_iv_set(error, "@connection", self);
rb_exc_raise(error);
}
PQsendQueryParams()
はコマンドの登録に成功した場合1が、失敗した場合0が返されます。サーバへのコマンドの登録を失敗した可能性があります。と、デバッグは続いていくわけですが、実は当該のデーモンプロセスのメインスレッドでSQLを実行するわけですが、コネクションプールにコネクションをcheckinできてない部分を発見してしまいました。その部分をActiveRecordのwith_connection
メソッド使うように変更したところ、UnableToSend
が出なくなりました。コストが異常に高いSQLを実行していたので、それでPQsendQueryParams()
が失敗したのかな。でも、ちょっと考えにくいですよね。時間があったら、さらにデバッグして見たいと思います。
まとめ
きちんと、with_connection
とかでコネクションを適切に取り扱うことの重要性を認識しました。