5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

PG::UnableToSend: no connection to the server

Posted at

環境

Rails 5.1.4
PostgreSQL 10.5

TL;DR

DBに問い合わせる時はConnectionPoolwith_connectionを(忘れずに)きちんと使おう。

PG::UnableToSend: no connection to the server

以下はこのエラーを追っていった歴史です。クエリの実行の仕組みを概観しており、誰かの役に立てば嬉しいですね。結論から言うと、with_connectionを使えばなおりました。

ある日、DBに対して、クエリを投げ続けて動くデーモンプロセスでPG::UnableToSend: no connection to the serverというエラーが発生しました。このデーモンプロセスはメインスレッドではDBから仕事を取ってきて、RedisのQueueに詰めて、そのQueueから仕事を引っこ抜き、ワーカースレッドがタスクを処理する、いわゆるConsumer-Producerモデルで実装されています。そして、メインスレッドで上記のエラーが出ました。ただこのエラーはデーモンプロセスが起動して、初めの方は出ません。正常にDBに繋げます。しかし、時間が経過すると(どの程度経過すればいいかはまちまち)、出てくることがある、という非常に困るタイプのバグです。

コネクションプールの情報を見てみる

コネクションプールがまずどういう状況になっているのかを見てみました。ActiveRecordのConnectionPoolstatメソッドでコネクションプールの情報を出力してみました。このstatメソッドは以下のような定義です。

def stat
  synchronize do
    {
      size: size,
      connections: @connections.size,
      busy: @connections.count { |c| c.in_use? && c.owner.alive? },
      dead: @connections.count { |c| c.in_use? && !c.owner.alive? },
      idle: @connections.count { |c| !c.in_use? },
      waiting: num_waiting_in_queue,
      checkout_timeout: checkout_timeout
    }
  end
end

上記の実装を見れば分かるように、コネクションプールがどういう状態にあるのかがだいたい分かります。これを出力するようにして、上記のエラーが出るのを待ちました。上記のエラーが出て、statの結果を見ると以下のようになりました。

{:size=>16, :connections=>1, :busy=>1, :dead=>0, :idle=>0, :waiting=>0, :checkout_timeout=>5}

コネクションプール的にはコネクションはある扱いですね。つまり、in_use?owner.alive?は共にtrueです。実はこの二つの情報は直接的にコネクションが生きているかどうかを判定できていません。つまり、in_use?の定義は以下のようになっています。

alias :in_use? :owner

ただのエイリアスです。実際にはownerが存在するかどうかしか見ていません。そして、c.owner.alive?ownerが生きているかどうかしか判定できません。そのコネクション自体の情報は反映されていないのです。そこで、statを以下のように改造してみました。

def stat
  synchronize do
    {
        size: size,
        connections: @connections.size,
        busy: @connections.count { |c| c.in_use? && c.owner.alive? },
        dead: @connections.count { |c| c.in_use? && !c.owner.alive? },
        first_connection_is_active?: @connections.first.blank? ? 'no connection' : @connections.first.send(:active?),
        idle: @connections.count { |c| !c.in_use? },
        waiting: num_waiting_in_queue,
        checkout_timeout: checkout_timeout
    }
  end
end

コネクション自体が使えるのかどうかの情報も合わせて出力するようにしました。active?の定義は以下の通りです。

# Is this connection alive and ready for queries?
def active?
  @lock.synchronize do
    @connection.query "SELECT 1"
  end
  true
rescue PG::Error
  false
end

実際にDBにSELECT 1というクエリを実行しています。コネクション自体の情報も分かります。この状態でエラーが出るのを待って出力を待ちました。出力は以下のようになりました。

{:size=>16, :connections=>1, :busy=>1, :dead=>0, :first_connection_is_active?=>false, :idle=>0, :waiting=>0, :checkout_timeout=>5}

コネクションが生きていないことが分かりました。なるほど、そうだったのか!

クエリの実行の実装を追う

実はこのデーモンプロセスのメインスレッドが実行するクエリは非常に重いものになっていました。なので、タイムアウトでもしたんじゃね?と思いながら、実装を追っていきました。ActiveRecordはpgというgemを使って、DBに問い合わせています。adapterが持っている@connectionPG::Connectionのインスタンスです。つまり、adapterはPG::Connectionのラッパーです。ActiveRecordで出てくるactive?の中では@connectionに対して、queryというメソッドを実行しています。この実装を少し見てみます。結論から言うと、このpgと言うgemはメソッドや関数をC言語のマクロやrubyのalias_method()を使って、動的に生成している部分が多く、追っていくのにとても苦労しました。簡潔に流れを書きます。

PG::Connectionはrubyのクラスで以下のような定数が宣言されています(以下ではqueryのみを見ていきます。本当は他のメソッドもあります)。

REDIRECT_METHODS = {
    :query => [:async_exec, :sync_exec],
  }

そして、動的にメソッドを定義する以下のメソッドが定義されています。

def self.async_api=(enable)
  REDIRECT_METHODS.each do |ali, (async, sync)|
    remove_method(ali) if method_defined?(ali)
    alias_method( ali, enable ? async : sync )
  end
end

これをself.async_api = trueで呼び出して、メソッドを動的に生成しています。手元のirbで実験してみると、クラスが定義された時点で各メソッドは定義されているようです。つまり、queryメソッドを呼び出すと、async_execメソッドが呼ばれるようです。async_execメソッドはpg_connection.cで以下のように定義されます。

rb_define_method(rb_cPGconn, "async_exec", pgconn_async_exec, -1);

rb_define_methodはrubyの組み込みメソッドで、これによりasync_exec の実装はpgconn_async_execとなります。pgconn_async_execの定義は以下の通りです。

static VALUE
pgconn_async_exec(int argc, VALUE *argv, VALUE self)
{
  VALUE rb_pgresult = Qnil;

  pgconn_discard_results( self );
  /* クエリの実行 */
  pgconn_send_query( argc, argv, self );
  /* 結果のチェック */
  pgconn_block( 0, NULL, self ); /* wait for input (without blocking) before reading the last result */
  rb_pgresult = pgconn_get_last_result( self );

  if ( rb_block_given_p() ) {
    return rb_ensure( rb_yield, rb_pgresult, pg_result_clear, rb_pgresult );
  }
  return rb_pgresult;
}

###クエリの非同期な実行

pgconn_send_query()でクエリを実行し、pgconn_block()でビジーループで結果が返却されるのを待ちます。pgconn_send_query()の実装は以下。

static VALUE
pgconn_send_query(int argc, VALUE *argv, VALUE self)
{
  PGconn *conn = pg_get_pgconn(self);
  VALUE error;

  /* If called with no or nil parameters, use PQexec for compatibility */
  /* rb_define_method()の第四引数がargcのはずなので、このif文の中は通らないはず */
  if ( argc == 1 || (argc >= 2 && argc <= 4 && NIL_P(argv[1]) )) {
    if(gvl_PQsendQuery(conn, pg_cstr_enc(argv[0], ENCODING_GET(self))) == 0) {
      error = rb_exc_new2(rb_eUnableToSend, PQerrorMessage(conn));
      rb_iv_set(error, "@connection", self);
      rb_exc_raise(error);
    }
    return Qnil;
  }

  pg_deprecated(2, ("forwarding async_exec to async_exec_params and send_query to send_query_params is deprecated"));

  /* If called with parameters, and optionally result_format,
   * use PQsendQueryParams
   */
  /* 実質pgconn_send_query()はこのメソッドのラッパー関数 */
  return pgconn_send_query_params( argc, argv, self);
}

pgconn_send_query_params()の実装は以下。

static VALUE
pgconn_send_query_params(int argc, VALUE *argv, VALUE self)
{
  ()

  result = gvl_PQsendQueryParams(conn, pg_cstr_enc(command, paramsData.enc_idx), nParams, paramsData.types,
    (const char * const *)paramsData.values, paramsData.lengths, paramsData.formats, resultFormat);

  free_query_params( &paramsData );

  if(result == 0) {
    error = rb_exc_new2(rb_eUnableToSend, PQerrorMessage(conn));
    rb_iv_set(error, "@connection", self);
    rb_exc_raise(error);
  }
  return Qnil;
}

gvl_PQsendQueryParams()pgのgvl_wrappers.hとgvl_wrappers.cを読むと、どのような実装になっているか分かります。マクロを使って、凄まじく動的に関数を生成しているため、grepができず、実装は非常に追いにくいです。結論から言うと、gvl_PQsendQueryParams()PQsendQueryParams()のラッパーです。PQsendQueryParams()非同期コマンドの処理を見れば、載っていますが、非同期にクエリを実行できます。そして、結果はPQgetResult()を呼んで取得します。gvl_PQsendQueryParams()はだいたい以下のようなコードを実行することになります。

rb_thread_call_without_gvl(gvl_PQsendQueryParams_skeleton, &params, RUBY_UBF_IO, 0);

で、gvl_PQsendQueryParams_skeletonPQsendQueryParams()を呼びます。この記事によると、gvlを取得せずに、gvl_PQsendQueryParams_skeletonを実行することになりそうです。クエリの実行はだいたいこんな感じです。

###クエリの結果の取得
続いて、クエリの結果の取得を見ていきます。pgconn_block()です。

static VALUE
pgconn_block( int argc, VALUE *argv, VALUE self ) {
  PGconn *conn = pg_get_pgconn( self );

  struct timeval timeout;
  struct timeval *ptimeout = NULL;
  VALUE timeout_in;
  double timeout_sec;
  void *ret;

  if ( rb_scan_args(argc, argv, "01", &timeout_in) == 1 ) {
    timeout_sec = NUM2DBL( timeout_in );
    timeout.tv_sec = (time_t)timeout_sec;
    timeout.tv_usec = (suseconds_t)((timeout_sec - (long)timeout_sec) * 1e6);
    ptimeout = &timeout;
  }

  ret = wait_socket_readable( conn, ptimeout, get_result_readable);

  if( !ret )
    return Qfalse;

  return Qtrue;
}

wait_socket_readable()という超怪しいやつがいますね。関数が長いので、重要な部分だけ抜粋します。

while ( !(retval=is_readable(conn)) ) {
  if ( WSAEventSelect(sd, hEvent, FD_READ|FD_CLOSE) == SOCKET_ERROR ) {
    WSACloseEvent( hEvent );
    rb_raise( rb_eConnectionBad, "WSAEventSelect socket error: %d", WSAGetLastError() );
  }

  if ( ptimeout ) {
    gettimeofday(&currtime, NULL);
    timersub(&aborttime, &currtime, &waittime);
    timeout_milisec = (DWORD)( waittime.tv_sec * 1e3 + waittime.tv_usec / 1e3 );
  }

  /* Is the given timeout valid? */
  if( !ptimeout || (waittime.tv_sec >= 0 && waittime.tv_usec >= 0) ){
    /* Wait for the socket to become readable before checking again */
    wait_ret = rb_w32_wait_events( &hEvent, 1, timeout_milisec );
  } else {
    wait_ret = WAIT_TIMEOUT;
  }

  if ( wait_ret == WAIT_TIMEOUT ) {
    WSACloseEvent( hEvent );
    return NULL;
  } else if ( wait_ret == WAIT_OBJECT_0 ) {
    /* The event we were waiting for. */
  } else if ( wait_ret == WAIT_OBJECT_0 + 1) {
    /* This indicates interruption from timer thread, GC, exception
     * from other threads etc... */
    rb_thread_check_ints();
  } else if ( wait_ret == WAIT_FAILED ) {
    WSACloseEvent( hEvent );
    rb_raise( rb_eConnectionBad, "Wait on socket error (WaitForMultipleObjects): %lu", GetLastError() );
  } else {
    WSACloseEvent( hEvent );
    rb_raise( rb_eConnectionBad, "Wait on socket abandoned (WaitForMultipleObjects)" );
  }

  /* Check for connection errors (PQisBusy is true on connection errors) */
  if ( PQconsumeInput(conn) == 0 ) {
    WSACloseEvent( hEvent );
    rb_raise( rb_eConnectionBad, "PQconsumeInput() %s", PQerrorMessage(conn) );
  }
}

まあ、ビジーループですね。最初はタイムアウトが怪しいと思っていましたが、上記のコードを見ればしっかりハンドリングされています。予想は外れたましたが、実はUnableToSendっぽいやつが既に出てきています。pgconn_send_query_params()の実装でエラーを出す時があります。

if(result == 0) {
  error = rb_exc_new2(rb_eUnableToSend, PQerrorMessage(conn));
  rb_iv_set(error, "@connection", self);
  rb_exc_raise(error);
}

PQsendQueryParams()はコマンドの登録に成功した場合1が、失敗した場合0が返されます。サーバへのコマンドの登録を失敗した可能性があります。と、デバッグは続いていくわけですが、実は当該のデーモンプロセスのメインスレッドでSQLを実行するわけですが、コネクションプールにコネクションをcheckinできてない部分を発見してしまいました。その部分をActiveRecordのwith_connectionメソッド使うように変更したところ、UnableToSendが出なくなりました。コストが異常に高いSQLを実行していたので、それでPQsendQueryParams()が失敗したのかな。でも、ちょっと考えにくいですよね。時間があったら、さらにデバッグして見たいと思います。

まとめ

きちんと、with_connectionとかでコネクションを適切に取り扱うことの重要性を認識しました。

5
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?