39
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

クラウドワークスAdvent Calendar 2016

Day 12

Elasticsearch にインデックスを生成しようとしたら Ruby の C 拡張ライブラリをデバッグしていた話

Last updated at Posted at 2016-12-12

はじめに

この記事は CrowdWorks Advent Calendar 2016 12日目の記事です。
昨日のエントリーは @nasum さんによる「nasneの容量をシェルスクリプトでSlackに通知する」でした。

先日起きたことを、ありのままに1話します。

MySQL にあるデータを Elasticsearch (以下 ES と略) にインデックスしようとしていたと思ったら、いつの間にか Ruby の C 拡張をデバッグしていた。な、何を言ってるのか (ry

おまえは何を言っているんだ

三行でまとめると、

  • Ruby を使って ES にインデックスを作ろうとしたら予期しない現象に遭遇して、
  • 原因を探ろうとしたら C 拡張で実装されている箇所だったので、
  • しょうがないからデバッグする方法を調べた

という話です。

発端

事の起こりは、自社サービスのデータベースに溜まっているデータをより効率良く検索できるように、データを ES にインデックスしようと試みたところに始まります。

自社サービスはデータベースは MySQL を利用、Rails で web アプリケーションを構成しています。ES にインデックスしようとしたのは、MySQL に入っている百万レコード程度のボリュームのデータでした。

ひとまず ES にインデックスする方針は決まったものの、ES の知識はお世辞にも充分とは言えない程度だったこともあり、まずは検証環境を作って試行錯誤しようということにしました。

ちなみに検証環境には AWS の EC2 でインスタンスを立てることにしました。処理を行うプロセスはそこで動かします。ES サーバーは同じインスタンスのローカルに立てる形に。MySQL サーバーは RDS を利用。

ということで、後半の方でネイティブコードの話が出てきますが、環境は Linux です。EC2 方面に行けばどこにでも転がっている普通の Amazon Linux 上で検証しています。

ruby スクリプトで片付ける作戦

どのツールを使うか少し検討して、慣れた Ruby 使うことに。MySQL から ES へデータを投げ込むだけの簡単なお仕事です2。幸いにして MySQL, ES 共にスタンダードになっているクライアントの gem があるので、そんなに難しいことにはならないんじゃないかと予想。

まずはこんな感じの Gemfile を作って準備を整えます。

gem "elasticsearch"
gem "mysql2"

ナイーブに実装

まずは愚直に、いちばん簡単な方法で処理してみます。

require 'elasticsearch'
require 'mysql2'

# mysql_config と es_config は別のところで定義されているハッシュ
mysql = Mysql2::Client.new(mysql_config)
es = Elasticsearch::Client.new(es_config)

# query は別のところで定義されている SQL 文字列
results = mysql.query(query)
results.each do |record|
  es.index(index: "foo", type: "bar", body: build_index_hash(record))
end

この方法の問題は、レコードが百万件ほど返ってくるとメモリを食い潰してしまうことです。実際に実行してみたところ、メモリ使用量が急上昇し、ついには実行中のターミナルで Ctrl-C を連打しても無反応という事態に陥りました。

メモリが枯渇するわけと回避方法

原因は Mysql2::Result がデフォルトの挙動だと、フェッチした行を全てメモリにキャッシュしているためです。回避するには #query メソッドのオプションに stream を指定してやれば良さそうだということが分かりました3。試してみます。

results = mysql.query(query, stream: true) # ←ここを変えた
results.each do |record|
  es.index(index: "foo", type: "bar", body: build_index_hash(record))
end

今度は上手く行きました。mysql2 の方の取扱いはこの方向性で良さそうです。

ES のインデックス生成を効率化

これでできたということにしても良いのですが、実は ES へのデータの渡し方に問題があって、このままでは効率が悪いです。ES に用意されている Bulk API を使えば、1 リクエストで複数のドキュメントをまとめてインデックスすることができます。これを利用して、例えば 1,000 件ずつ小分けにして API を呼び出すようにすれば、もっと早く処理を片付けられるようになりそうです。

というわけで、結果セット全体を決まった長さのサブセットに分割する方法がないかと考えてみます。

Mysql2::ResultEnumerable であるので、#each_slice を持っているはずです。これが使えないでしょうか。

ちなみに Enumerable#each_slice とは、こういう動作をするものです。

# 1〜10 の数列を 3 つずつに分けた配列にする
(1..10).each_slice(3).map(&:itself)
# => [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10]]

Enumerable#each_slice を使った解決策

先程のコードを少し改変して、こんな感じにします。

results = mysql.query(query, stream: true)
results.each_slice(1000) do |records|
  hashes = build_bulk_insert_hashes(records)
  es.bulk(index: "foo", type: "bar", body: hashes)
end

・・・・・・。

ターミナルから反応がなくなりました (。◉ᆺ◉)
またもやメモリを食い潰してしまったようです。

#each を使っていたときは問題なかったので、この挙動は不思議です。気になるので、どうなってるのか調べることにしました。

#each_slice で期待通りにならない原因を探る

最小限の再現コード

いろいろ試行錯誤してみた結果、問題を再現できる最小限のコードはこんな感じに。ES への操作は関係ないのでバッサリ削除して mysql2 を取り扱うコードだけにします。

mysql = Mysql2::Client.new(mysql_config)
results = mysql.query(query, stream: true)
results.each_slice(1234).first

#each_slice は Enumerator インスタンスを返すだけの挙動で、この段階ではまだ問題は起きていません。最後の #first が呼ばれたタイミングで問題が発動します。

発動するタイミングは #first ではあるものの、これは #each_slice が返している enumerator から値を取得しようとして処理が走ったことによるものではないかと推測、調べるターゲットを #each_slice に決めます。

pry を使って先程のコードを手動で順に実行していきます。

[1] pry(main)> mysql = Mysql2::Client.new(mysql_config)
[2] pry(main)> results = mysql.query(query, stream: true)

調べたいのは results が持っている #each_slice なので、

[3] pry(main)> show-source results.each_slice

From: enum.c (C Method):
Owner: Enumerable
Visibility: public
Number of lines: 20

static VALUE
enum_each_slice(VALUE obj, VALUE n)
{
    long size = NUM2LONG(n);
    VALUE ary;
    struct MEMO *memo;
    int arity;

    if (size <= 0) rb_raise(rb_eArgError, "invalid slice size");
    RETURN_SIZED_ENUMERATOR(obj, 1, &n, enum_each_slice_size);
    size = limit_by_enum_size(obj, size);
    ary = rb_ary_new2(size);
    arity = rb_block_arity();
    memo = MEMO_NEW(ary, dont_recycle_block_arg(arity), size);
    rb_block_call(obj, id_each, 0, 0, each_slice_i, (VALUE)memo);
    ary = memo->v1;
    if (RARRAY_LEN(ary) > 0) rb_yield(ary);

    return Qnil;
}

実装箇所が特定できました。C で書かれているようです。
手元に ruby のソース一式を展開し、enum_each_slice を起点にしてコードを追ってみますが・・・

・・・なるほど、わからん。

問題を起こしている時の変数の実際の値などが分からないと、なかなか追いづらいものがあります。

よろしい、ならば gdb だ

というわけで、先ほどの再現コードを実行しつつ、デバッガで追跡する作戦にします。モノは Ruby ではなくて C で書かれたコードなので gdb を使う方針で。

・・・ところで、いまインストールして使っている ruby 処理系って gdb でデバッグできたっけ?

デバッグ用の ruby を作る

よく分からなかったのでググる。見つかったのがこちら。
gdbを使ったrubyのデバッグ

要約すると、

  • -O0 で最適化しないようにして、
  • -g3 でデバッグシンボルが含まれるようにする

というというオプションを渡してコンパイルするところがポイント。

これらを参考に、検証環境の rbenv にデバッグ用の ruby をビルド&インストールします。

$ mkdir ~/src && cd ~/src
$ wget https://cache.ruby-lang.org/pub/ruby/2.3/ruby-2.3.3.tar.gz
$ tar xvf ruby-2.3.3.tar.gz

# いま見返したら、ここで autoreconf しないといけないのを忘れてた :p

# 2.3.3-debug という名前で rbenv 配下にインストールされるように
$ ./configure optflags="-O0" debugflags="-g3" --prefix="${HOME}/.rbenv/versions/2.3.3-debug"
$ make
$ make install

特に問題なく完了。
検証をやっているディレクトリ (= Gemfile を置いている場所) へ戻って、

$ rbenv local 2.3.3-debug

これで、この場所で ruby を使うと 2.3.3-debug が使われるようになりました。

gdb で bundle exec する

検証用のスクリプトの実行は、

$ bundle exec ruby test.rb

みたいな感じでやっていたので、これを gdb 経由で動かすようにするには・・・(しばし試行錯誤)

$ ~/.rbenv/versions/2.3.3-debug/bin/bundle exec gdb \
  --args ~/.rbenv/versions/2.3.3-debug/bin/ruby test.rb

こんな感じで行けた。たぶんもっとスマートなやり方がありそうな気もしますが、ここはとりあえず動かせれば良しとします。

each_slice の実装に潜る

まずは、先ほど show-source で見つけた enum_each_slice 関数にブレークポイントを置いて実行すると該当箇所で停まります。

(gdb) break enum_each_slice
Breakpoint 1 at 0x1cc629: file enum.c, line 2227.
(gdb) run
  【中略】
Breakpoint 1, enum_each_slice (obj=93825010224720, n=5) at enum.c:2227
2227	    long size = NUM2LONG(n);

ここまで辿り着いた経路を調べるには rb_backtrace という関数が提供されているので、これを利用すると

(gdb) call rb_backtrace()
	from test.rb:7:in `<main>'
	from ~/.rbenv/versions/2.3.3-debug/lib/ruby/2.3.0/psych.rb:474:in `load_file'
	from ~/.rbenv/versions/2.3.3-debug/lib/ruby/2.3.0/psych.rb:474:in `open'
	from ~/.rbenv/versions/2.3.3-debug/lib/ruby/2.3.0/psych.rb:475:in `block in load_file'
	  【以下略】

Ruby のバックトレースを表示できます。
上の例だと、ファイル名に psych.rb とあるあたりから推測するに、どうやら探しているものはこれではないようです。どこか別の場所で #each_slice が使われたのでしょう。

一度 continue してみると、今度は

(gdb) continue
Continuing.

Breakpoint 1, enum_each_slice (obj=93825010217640, n=2469) at enum.c:2227
2227	    long size = NUM2LONG(n);
(gdb) call rb_backtrace()
	from test.rb:12:in `<main>'
	from test.rb:12:in `each_slice'

test.rb の each_slice から始まるバックトレースが出ました。ビンゴのようです。

ちなみに、このタイミングで enum_each_slice の第一引数に渡される obj の正体は rb_p 関数で調べることができて、

(gdb) call rb_p(obj)
#<Mysql2::Result:0x0055555667b398 【詳細略】>

Mysql2::Result であることが分かります。
狙ったところを捕捉できたと見て間違いなさそうです。

処理の流れ

ソースコードを再掲します。

C で書かれた enum_each_slice のコードが

static VALUE
enum_each_slice(VALUE obj, VALUE n)
{
    long size = NUM2LONG(n);
    VALUE ary;
    struct MEMO *memo;
    int arity;

    if (size <= 0) rb_raise(rb_eArgError, "invalid slice size");
    RETURN_SIZED_ENUMERATOR(obj, 1, &n, enum_each_slice_size);
    size = limit_by_enum_size(obj, size);
    ary = rb_ary_new2(size);
    arity = rb_block_arity();
    memo = MEMO_NEW(ary, dont_recycle_block_arg(arity), size);
    rb_block_call(obj, id_each, 0, 0, each_slice_i, (VALUE)memo);
    ary = memo->v1;
    if (RARRAY_LEN(ary) > 0) rb_yield(ary);

    return Qnil;
}

デバッグ用の ruby のスクリプトが

results.each_slice(1234).first

each_slice にブロックを渡していない関係で、最初に enum_each_slice が呼ばれた時には RETURN_SIZED_ENUMERATOR のところで enumerator インスタンスを作ってそのまま return します4

その後、今度はブロック付きの形でもう一度この関数が呼ばれるという流れになっています。二度めに呼ばれたときは次の行へ続きます。

    /* さっきはこの行で return した */
    RETURN_SIZED_ENUMERATOR(obj, 1, &n, enum_each_slice_size);
    
    /* 今度はブロック付きなので return せず、↓ が実行される */
    size = limit_by_enum_size(obj, size);

size がゼロ?

引続き、出てくる変数の値を確認しながら進めてみます。

2233	    RETURN_SIZED_ENUMERATOR(obj, 1, &n, enum_each_slice_size);
(gdb) n
2234	    size = limit_by_enum_size(obj, size);
(gdb) p size
$1 = 1234

limit_by_enum_size を呼出す前は size の値は 1234 でした。
呼び出した後の値を見てみると、

(gdb) n
2235	    ary = rb_ary_new2(size);
(gdb) p size
$2 = 0

・・・ゼロ?

なんだか妖しい香りがしてきました。

この続きのコードの流れは

    ary = rb_ary_new2(size);
    arity = rb_block_arity();
    memo = MEMO_NEW(ary, dont_recycle_block_arg(arity), size);
    rb_block_call(obj, id_each, 0, 0, each_slice_i, (VALUE)memo);
  • サイズ 0 の配列を作り
  • その配列を memo オブジェクトに格納しつつ、
  • rb_block_call を呼出す

さらに、rb_block_call の先から each_slice_i という関数が呼び出されることになります。
コードはこんな感じ。

static VALUE
each_slice_i(RB_BLOCK_CALL_FUNC_ARGLIST(i, m))
{
    struct MEMO *memo = MEMO_CAST(m);
    VALUE ary = memo->v1;
    VALUE v = Qnil;
    long size = memo->u3.cnt;
    ENUM_WANT_SVALUE();

    rb_ary_push(ary, i);

    if (RARRAY_LEN(ary) == size) {
        v = rb_yield(ary);

        if (memo->v2) {
            MEMO_V1_SET(memo, rb_ary_new2(size));
        }
        else {
            rb_ary_clear(ary);
        }
    }

    return v;
}

変数 memoary はそれぞれ先ほど enum_each_slice で作った MEMO オブジェクトと配列、size はやはりゼロです。

(gdb) break each_slice_i
Breakpoint 2 at 0x555555720475: file enum.c, line 2172.
(gdb) continue
Continuing.
test.rb:12: warning: :cache_rows is ignored if :stream is true

Breakpoint 2, each_slice_i (i=93825010216640, m=93825010217280, argc=1, argv=0x7fffffffb768, blockarg=8) at enum.c:2172
2172	    struct MEMO *memo = MEMO_CAST(m);
(gdb) n
2173	    VALUE ary = memo->v1;
(gdb) n
2174	    VALUE v = Qnil;
(gdb) n
2175	    long size = memo->u3.cnt;
(gdb) n
2176	    ENUM_WANT_SVALUE();
(gdb) p size
$3 = 0

あと、関数の引数に渡されている i というものがあって、 call rb_p(i) すると MySQL から取ってきたレコードの最初の行のハッシュが入っているのが分かります。

つまり、この each_slice_i は ruby っぽい疑似コードで表すと

results.each do |hash|
  call :each_slice_i, hash, ...
end

のような感じで1レコードごとに呼び出されていると捉えられます。処理の中身は

    rb_ary_push(ary, i);

    if (RARRAY_LEN(ary) == size) {
        v = rb_yield(ary);

        if (memo->v2) {
            MEMO_V1_SET(memo, rb_ary_new2(size));
        }
        else {
            rb_ary_clear(ary);
        }
    }

    return v;
  • 配列に要素を push し、
  • その結果として配列のサイズが size とイコールになったらブロックを yield する
  • そうでなければ、それ以上何もせずに return

という処理を繰り返す感じになります。

さて、最初にここに辿り着いたときは

  • 配列は空 (= 配列のサイズは 0)
  • 最初の要素を配列に入れるので、配列の長さは +1 されて 1
  • 変数 size の値は 0
  • 配列の長さと size は異なるので、何もしない

2つめの要素で呼び出されたときは

  • 配列のサイズは 1
  • 2つめの要素を配列に入れるので、配列の長さは +1 されて 2
  • 変数 size の値は 0
  • 配列の長さと size は異なるので、何もしない

というわけで、配列のサイズは size から離れる一方で一致することはなく、結果として全要素をこの配列に詰め込もうとする動作になることが分かりました。メモリを食い潰すのは、ここが原因のようです。

size = 0 を返すのは誰?

そもそも、size がゼロになっているのがおかしい気がします。ゼロを返したのは誰かというと enum_each_slice で呼び出されていた limit_by_enum_size という関数でした。実装はこんな感じ。

static long
limit_by_enum_size(VALUE obj, long n)
{
    unsigned long limit;
    VALUE size = rb_check_funcall(obj, id_size, 0, 0);
    if (!FIXNUM_P(size)) return n;
    limit = FIX2ULONG(size);
    return ((unsigned long)n > limit) ? (long)limit : n;
}

要約すると、

  • objsize というメソッドを呼び出して、
  • size メソッドが返した値を元に計算を行い、
  • その値を返す

という動作です。つまり ruby のコードだと

results.size

の返す値が鍵になりそうです。
というわけで、いったん gdb を離れて Ruby のコードで実験してみます。

results = mysql.query(query, stream: true)
results.size  # => 0

ゼロを返していることが確認できました。
streamfalse だと

# query は末尾に "limit 123" をつけて大量に返ってこないように細工した
results = mysql.query(query, stream: false)
results.size  # => 123

今度は正しい値が返ってきました。

stream: true を指定すると結果の件数が 0 になるようです。

どうも ruby 側 Enumerable#each_slice の期待と mysql2 側 Mysql2::Result#size の挙動との間でミスマッチがあるようですね。

件数 = 0 が返るのはバグ?

mysql2 が件数 = 0 を返すのはバグでしょうか?
あるいは意図した通りの挙動?

さらに調べる必要がありそうです。

Mysql2::Result#size の実装

pry で #size の実装箇所を求めて show-source すると、

pry(main)> show-source results.size

From: ..../vendor/bundle/ruby/2.3.0/gems/mysql2-0.4.5/ext/mysql2/result.c (C Method):
Owner: Mysql2::Result
Visibility: public
Number of lines: 20

static VALUE rb_mysql_result_count(VALUE self) {
  GET_RESULT(self);

  if (wrapper->is_streaming) {
    /* This is an unsigned long per result.h */
    return ULONG2NUM(wrapper->numberOfRows);
  }

  if (wrapper->resultFreed) {
    /* Ruby arrays have platform signed long length */
    return LONG2NUM(RARRAY_LEN(wrapper->rows));
  } else {
    /* MySQL returns an unsigned 64-bit long here */
    if (wrapper->stmt_wrapper) {
      return ULL2NUM(mysql_stmt_num_rows(wrapper->stmt_wrapper->stmt));
    } else {
      return ULL2NUM(mysql_num_rows(wrapper->result));
    }
  }
}

こちらも C で書かれたコードでした。

ざっと眺めた感じ、関数冒頭の if 文に出てくる wrapper->is_streaming の値は、ruby 側で stream: true を指定している場合は真になりそうに思えます。そうなると wrapper->numberOfRows がどうなっているかがポイントになります。

あと、ストリーミングかどうかで件数を求めるロジックは違っているようだということも伺えます。

これも実行時の変数の値を覗いてみたいので rb_mysql_result_count にブレークポイントを置いて続行・・・と考えたところで疑問が。

mysql2 の C 拡張は果たしてデバッグ用にコンパイルされてるのだろうか?

gem 内の C 拡張もデバッグ用にするには

ちゃんと追い切れていないのですが、デバッグ用にビルドした ruby で gem をインストールすれば大丈夫そうです。

通常ビルドの ruby とデバッグ用に -O0 -g3 などを指定してビルドした ruby でそれぞれ bundle install して比較してみたところ、デバッグ用 ruby でインストールされた mysql2 の Makefile には該当オプションが現れていました。

Makefile から該当箇所を抜粋したものがこんな感じです。

optflags = -O0 -fno-fast-math
debugflags = -g3

デバッグ用ではない通常ビルドの ruby の Makefile だと、

optflags = -O3 -fno-fast-math
debugflags = -ggdb3

通常ビルドでも -ggdb3 が指定されているので、デバッグシンボルについては特に問題にならなそうですが、最適化オプションが -O3 になっているので、場合によっては gdb の動作に影響がありそうに思えます。

というわけで、gem の C 拡張を gdb で追いたい場合は、デバッグ用にビルドした ruby でインストールし直すのが良さそうです。

Mysql2::Result#size に潜る

gem の C 拡張をデバッグ用にする方法が分かったところで rb_mysql_result_count 関数の追跡に戻ります。

ブレークポイントを設定して実行を進めたところ、

Breakpoint 2, rb_mysql_result_count (self=93825010212000) at result.c:951
951       GET_RESULT(self);
(gdb) n
953       if (wrapper->is_streaming) {
(gdb) p wrapper->is_streaming
$1 = 1 '\001'
(gdb) n
955         return ULONG2NUM(wrapper->numberOfRows);
(gdb) p wrapper->numberOfRows
$2 = 0

wrapper->is_streaming の値が 1 で真になるのは予想通り、問題の numberOfRows は、やはり 0 でした。

そうなると、この numberOfRows に値を入れたのは誰なのかを追跡すれば良さそうです。

というわけで、手元の mysql2 レポジトリを検索。

$ cd ~/git/github.com/brianmario/mysql2/ext/mysql2
$ ag numberOfRows
result.h
14:  my_ulonglong numberOfRows;

result.c
792:          wrapper->numberOfRows++;
812:    if (args->cacheRows && wrapper->lastRowProcessed == wrapper->numberOfRows) {
815:      for (i = 0; i < wrapper->numberOfRows; i++) {
823:      for (i = 0; i < wrapper->numberOfRows; i++) {
847:      if (wrapper->lastRowProcessed == wrapper->numberOfRows && args->cacheRows) {
920:    wrapper->numberOfRows = wrapper->stmt_wrapper ? mysql_stmt_num_rows(wrapper->stmt_wrapper->stmt) : mysql_num_rows(wrapper->result);
921:    wrapper->rows = rb_ary_new2(wrapper->numberOfRows);
928:    wrapper->rows = rb_ary_new2(wrapper->numberOfRows);
955:    return ULONG2NUM(wrapper->numberOfRows);
978:  wrapper->numberOfRows = 0;

numberOfRows に値を入れていると思われるのは result.c の

792:          wrapper->numberOfRows++;
920:    wrapper->numberOfRows = wrapper->stmt_wrapper ? mysql_stmt_num_rows(wrapper->stmt_wrapper->stmt) : mysql_num_rows(wrapper->result);
978:  wrapper->numberOfRows = 0;

この3つと見て良いでしょう。

978 行目の wrapper->numberOfRows = 0 は構造体の初期化でした。これ以外のフィールドも同様に 0 がセットされています。探しているものではなさそうなので除外。

920 行目の方は rb_mysql_result_each 関数の中にありました。ruby の世界では #each の実装です。よく見ると該当箇所のコードは

  if (wrapper->rows == Qnil && !wrapper->is_streaming) {
    wrapper->numberOfRows = 【以下略】

is_streaming でなければ」という条件がついているので、ここは通らないはず。これも除外。

残るは 792 行目のみになりました。この箇所は rb_mysql_result_each_ 関数にあります。先ほどの、末尾に _ が付いていない rb_mysql_result_each 関数から呼び出されます。

この関数の実装は is_streaming かどうかで挙動が変わり、792 行目は is_streaming である場合の処理です。コードを追ってみると「結果を1行取ってきて numberOfRows++ する」ようです。

いったん ruby の世界に戻ってきて確認してみます。

results = mysql.query(query, stream: true)
results.size # => 0

results.first # => 1件目のハッシュ
results.size # => 1

results.first # => 2件目のハッシュ
results.size # => 2

確かに、1件拾うたびに size がインクリメントされています。

この挙動と、ここまでに見てきたコードの記述から推測するに、ストリーミングの場合にサイズが 0 になっているのはバグではなく、意図してこのように実装されているらしいということが見えてきました。

おそらくは MySQL 自体が、ストリーミング的な使い方をされた時には件数を返さないような挙動になっている事情があって、mysql2 はその挙動に合うように「最初は 0 件にしておいて、レコードを読むのに合わせて自前でカウントしていく」ようになっているのではないかなーと推測します。

MySQL の挙動は?

その答えは MySQL の公式ドキュメントにありました。

23.8.7.72 mysql_use_result()

mysql_use_result() は結果セットの取得を開始しますが、mysql_store_result() のように、実際に結果セットをクライアントに読み込みません。代わりに、mysql_fetch_row() への呼び出しを行うことによって、各行を個別に取得する必要があります。これは、クエリーの結果を一時テーブルやローカルバッファーに保存することなく、サーバーから直接読み取ります。これは、mysql_store_result() よりいくぶん高速で、使用するメモリーがはるかに少なくなります。

【中略】

mysql_data_seek()、mysql_row_seek()、mysql_row_tell()、mysql_num_rows()、または mysql_affected_rows() を mysql_use_result() から返される結果と一緒に使用できません。また、mysql_use_result() が終了するまで、ほかのクエリーを発行することもできません。(ただし、すべての行をフェッチしたあとに、mysql_num_rows() はフェッチした行数を正確に返します。)

つまり

  • 結果セットを取るには mysql_use_resultmysql_store_result どちらかを使うが、
  • mysql_use_result を使った場合は mysql_num_rows を呼び出せず、正確な行数を取得することはできない

ということでした。
先ほどの推測と一致しますが、「is_streaming であったときには mysql_use_result の方が使われている」のかどうかについて、まだ裏が取れていません。

mysql2 の方に戻って、mysql_use_result を使っている箇所を探してみます。

client.c の nogvl_do_result 関数にこういう箇所があり、

  if (use_result) {
    result = mysql_use_result(wrapper->client);
  } else {
    result = mysql_store_result(wrapper->client);
  }

use_result はどこから来たのかを呼び出し元に遡って探してみると、同じく client.c の rb_mysql_client_async_result にこういう箇所があり、

  if (is_streaming == Qtrue) {
    result = (MYSQL_RES *)rb_thread_call_without_gvl(nogvl_use_result, wrapper, RUBY_UBF_IO, 0);
  } else {
    result = (MYSQL_RES *)rb_thread_call_without_gvl(nogvl_store_result, wrapper, RUBY_UBF_IO, 0);
  }

ここがスタート地点でした。

  • is_streaming であれば mysql_use_result が使われ、
  • そうでなければ mysql_store_result が使われる

という実装になっています。

stream: true を指定すると、MySQL の応答からは正しい件数が取れないという推測の裏付けが取れました。

結論

  • mysql2 で stream: true を指定すると、返ってくる件数を予め知ることはできない。これは MySQL の挙動による。
  • mysql2 はこの挙動のため、「すでに読み込んだ件数」を返すようにしている。全件を読み終わったら、結果として正しい件数と一致するようになる。
  • この mysql2 の挙動は Enumerable#each_slice と相性が悪い。#each_slice#size が常に正しい件数を返すことを前提にしているからである。

というわけで、mysql2, Enumerable#each_slice 共にバグがあるとは言えず、根元にある MySQL の挙動から生じる制限によって正しく協調することができない、相性の問題かな、という印象です。

もしかすると、Mysql2::Result に独自の #each_slice を実装すれば上手く動かせるようになるかもしれませんが、今回はひとまず調査までということで、ここで一旦終わりにします。

まとめ

Ruby には pry などの便利ツールが揃っていてデバッグする敷居がだいぶ低くなっている一方で、今回の例のように C で書かれた箇所で問題が起きる場合もあります。そういう場合にも gdb などのツールが使えると自分で実装まで潜れるので助かります。

もし興味を持たれたら、お手元の環境で gdb などを試してみるのも一興かと思います。いつか何かの役に立つかもしれません。

あと、末尾に参考文献を挙げておきますので、良ければ覗いてみてください。

オチ

さて、さんざん遠回りしておいて顛末をまとめたりとかしてきたわけですが、ここで落ち着いてもう一度試してみたところ、

results = mysql.query(query, cache_rows: false) # ← ここを変えた

results.each_slice(1000) do |records|
  hashes = build_bulk_insert_hashes(records)
  es.bulk(index: "foo", type: "bar", body: hashes)
end

実はこうすれば何の問題もなかった5 (。◉ᆺ◉)

お後がよろしいようで。

明日は @koichiro さんが何か書く予定です。ご期待ください。

参考文献

補足

  1. 「ありのまま」はちょっと嘘で、実際にはこのエントリーに書いたよりもっとカオスでした。

  2. そういうのは embulk 使えば良くね? というツッコミはあろうかと思います。いちおう検討したのですが、embulk-output-elasticsearchをざっと眺めた感じ、内側にネストした項目があるドキュメントを扱う方法が分からず、今回は見送った次第です。

  3. mysql2 の README に Streaming の項目で説明されています。

  4. マクロが使われていて分かりづらいですが、条件次第ではこの場所で return するようになっています。

  5. いちおう言い訳をしておくとですねー、途中でよく分からなくなって、いろいろ試行錯誤したわけですよ。cache_rowsstream オプションをそれぞれオン・オフして4パターンで動かしてみたり、Enumerable::Lazy を持ち出してみたり、#each_slice でブロックが呼ばれる度に GC.start してみたり。いろいろやり過ぎて、ちゃんと動いていたのを見落としてたとかかなーと思いますが、今となってはもうよく分かりません。まぁ、おかげでアドベントカレンダーのネタができたので良しとするということで、ひとつ。

39
9
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
39
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?