はじめに
この記事は CrowdWorks Advent Calendar 2016 12日目の記事です。
昨日のエントリーは @nasum さんによる「nasneの容量をシェルスクリプトでSlackに通知する」でした。
先日起きたことを、ありのままに1話します。
MySQL にあるデータを Elasticsearch (以下 ES と略) にインデックスしようとしていたと思ったら、いつの間にか Ruby の C 拡張をデバッグしていた。な、何を言ってるのか (ry
おまえは何を言っているんだ
三行でまとめると、
- Ruby を使って ES にインデックスを作ろうとしたら予期しない現象に遭遇して、
- 原因を探ろうとしたら C 拡張で実装されている箇所だったので、
- しょうがないからデバッグする方法を調べた
という話です。
発端
事の起こりは、自社サービスのデータベースに溜まっているデータをより効率良く検索できるように、データを ES にインデックスしようと試みたところに始まります。
自社サービスはデータベースは MySQL を利用、Rails で web アプリケーションを構成しています。ES にインデックスしようとしたのは、MySQL に入っている百万レコード程度のボリュームのデータでした。
ひとまず ES にインデックスする方針は決まったものの、ES の知識はお世辞にも充分とは言えない程度だったこともあり、まずは検証環境を作って試行錯誤しようということにしました。
ちなみに検証環境には AWS の EC2 でインスタンスを立てることにしました。処理を行うプロセスはそこで動かします。ES サーバーは同じインスタンスのローカルに立てる形に。MySQL サーバーは RDS を利用。
ということで、後半の方でネイティブコードの話が出てきますが、環境は Linux です。EC2 方面に行けばどこにでも転がっている普通の Amazon Linux 上で検証しています。
ruby スクリプトで片付ける作戦
どのツールを使うか少し検討して、慣れた Ruby 使うことに。MySQL から ES へデータを投げ込むだけの簡単なお仕事です2。幸いにして MySQL, ES 共にスタンダードになっているクライアントの gem があるので、そんなに難しいことにはならないんじゃないかと予想。
まずはこんな感じの Gemfile を作って準備を整えます。
gem "elasticsearch"
gem "mysql2"
ナイーブに実装
まずは愚直に、いちばん簡単な方法で処理してみます。
require 'elasticsearch'
require 'mysql2'
# mysql_config と es_config は別のところで定義されているハッシュ
mysql = Mysql2::Client.new(mysql_config)
es = Elasticsearch::Client.new(es_config)
# query は別のところで定義されている SQL 文字列
results = mysql.query(query)
results.each do |record|
es.index(index: "foo", type: "bar", body: build_index_hash(record))
end
この方法の問題は、レコードが百万件ほど返ってくるとメモリを食い潰してしまうことです。実際に実行してみたところ、メモリ使用量が急上昇し、ついには実行中のターミナルで Ctrl-C を連打しても無反応という事態に陥りました。
メモリが枯渇するわけと回避方法
原因は Mysql2::Result
がデフォルトの挙動だと、フェッチした行を全てメモリにキャッシュしているためです。回避するには #query
メソッドのオプションに stream
を指定してやれば良さそうだということが分かりました3。試してみます。
results = mysql.query(query, stream: true) # ←ここを変えた
results.each do |record|
es.index(index: "foo", type: "bar", body: build_index_hash(record))
end
今度は上手く行きました。mysql2 の方の取扱いはこの方向性で良さそうです。
ES のインデックス生成を効率化
これでできたということにしても良いのですが、実は ES へのデータの渡し方に問題があって、このままでは効率が悪いです。ES に用意されている Bulk API を使えば、1 リクエストで複数のドキュメントをまとめてインデックスすることができます。これを利用して、例えば 1,000 件ずつ小分けにして API を呼び出すようにすれば、もっと早く処理を片付けられるようになりそうです。
というわけで、結果セット全体を決まった長さのサブセットに分割する方法がないかと考えてみます。
Mysql2::Result
は Enumerable
であるので、#each_slice
を持っているはずです。これが使えないでしょうか。
ちなみに Enumerable#each_slice
とは、こういう動作をするものです。
# 1〜10 の数列を 3 つずつに分けた配列にする
(1..10).each_slice(3).map(&:itself)
# => [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10]]
Enumerable#each_slice を使った解決策
先程のコードを少し改変して、こんな感じにします。
results = mysql.query(query, stream: true)
results.each_slice(1000) do |records|
hashes = build_bulk_insert_hashes(records)
es.bulk(index: "foo", type: "bar", body: hashes)
end
・・・・・・。
ターミナルから反応がなくなりました (。◉ᆺ◉)
またもやメモリを食い潰してしまったようです。
#each
を使っていたときは問題なかったので、この挙動は不思議です。気になるので、どうなってるのか調べることにしました。
#each_slice で期待通りにならない原因を探る
最小限の再現コード
いろいろ試行錯誤してみた結果、問題を再現できる最小限のコードはこんな感じに。ES への操作は関係ないのでバッサリ削除して mysql2 を取り扱うコードだけにします。
mysql = Mysql2::Client.new(mysql_config)
results = mysql.query(query, stream: true)
results.each_slice(1234).first
#each_slice
は Enumerator インスタンスを返すだけの挙動で、この段階ではまだ問題は起きていません。最後の #first
が呼ばれたタイミングで問題が発動します。
発動するタイミングは #first
ではあるものの、これは #each_slice
が返している enumerator から値を取得しようとして処理が走ったことによるものではないかと推測、調べるターゲットを #each_slice
に決めます。
pry を使って先程のコードを手動で順に実行していきます。
[1] pry(main)> mysql = Mysql2::Client.new(mysql_config)
[2] pry(main)> results = mysql.query(query, stream: true)
調べたいのは results
が持っている #each_slice
なので、
[3] pry(main)> show-source results.each_slice
From: enum.c (C Method):
Owner: Enumerable
Visibility: public
Number of lines: 20
static VALUE
enum_each_slice(VALUE obj, VALUE n)
{
long size = NUM2LONG(n);
VALUE ary;
struct MEMO *memo;
int arity;
if (size <= 0) rb_raise(rb_eArgError, "invalid slice size");
RETURN_SIZED_ENUMERATOR(obj, 1, &n, enum_each_slice_size);
size = limit_by_enum_size(obj, size);
ary = rb_ary_new2(size);
arity = rb_block_arity();
memo = MEMO_NEW(ary, dont_recycle_block_arg(arity), size);
rb_block_call(obj, id_each, 0, 0, each_slice_i, (VALUE)memo);
ary = memo->v1;
if (RARRAY_LEN(ary) > 0) rb_yield(ary);
return Qnil;
}
実装箇所が特定できました。C で書かれているようです。
手元に ruby のソース一式を展開し、enum_each_slice
を起点にしてコードを追ってみますが・・・
・・・なるほど、わからん。
問題を起こしている時の変数の実際の値などが分からないと、なかなか追いづらいものがあります。
よろしい、ならば gdb だ
というわけで、先ほどの再現コードを実行しつつ、デバッガで追跡する作戦にします。モノは Ruby ではなくて C で書かれたコードなので gdb を使う方針で。
・・・ところで、いまインストールして使っている ruby 処理系って gdb でデバッグできたっけ?
デバッグ用の ruby を作る
よく分からなかったのでググる。見つかったのがこちら。
gdbを使ったrubyのデバッグ
要約すると、
-
-O0
で最適化しないようにして、 -
-g3
でデバッグシンボルが含まれるようにする
というというオプションを渡してコンパイルするところがポイント。
これらを参考に、検証環境の rbenv にデバッグ用の ruby をビルド&インストールします。
$ mkdir ~/src && cd ~/src
$ wget https://cache.ruby-lang.org/pub/ruby/2.3/ruby-2.3.3.tar.gz
$ tar xvf ruby-2.3.3.tar.gz
# いま見返したら、ここで autoreconf しないといけないのを忘れてた :p
# 2.3.3-debug という名前で rbenv 配下にインストールされるように
$ ./configure optflags="-O0" debugflags="-g3" --prefix="${HOME}/.rbenv/versions/2.3.3-debug"
$ make
$ make install
特に問題なく完了。
検証をやっているディレクトリ (= Gemfile を置いている場所) へ戻って、
$ rbenv local 2.3.3-debug
これで、この場所で ruby を使うと 2.3.3-debug が使われるようになりました。
gdb で bundle exec する
検証用のスクリプトの実行は、
$ bundle exec ruby test.rb
みたいな感じでやっていたので、これを gdb 経由で動かすようにするには・・・(しばし試行錯誤)
$ ~/.rbenv/versions/2.3.3-debug/bin/bundle exec gdb \
--args ~/.rbenv/versions/2.3.3-debug/bin/ruby test.rb
こんな感じで行けた。たぶんもっとスマートなやり方がありそうな気もしますが、ここはとりあえず動かせれば良しとします。
each_slice の実装に潜る
まずは、先ほど show-source
で見つけた enum_each_slice
関数にブレークポイントを置いて実行すると該当箇所で停まります。
(gdb) break enum_each_slice
Breakpoint 1 at 0x1cc629: file enum.c, line 2227.
(gdb) run
【中略】
Breakpoint 1, enum_each_slice (obj=93825010224720, n=5) at enum.c:2227
2227 long size = NUM2LONG(n);
ここまで辿り着いた経路を調べるには rb_backtrace
という関数が提供されているので、これを利用すると
(gdb) call rb_backtrace()
from test.rb:7:in `<main>'
from ~/.rbenv/versions/2.3.3-debug/lib/ruby/2.3.0/psych.rb:474:in `load_file'
from ~/.rbenv/versions/2.3.3-debug/lib/ruby/2.3.0/psych.rb:474:in `open'
from ~/.rbenv/versions/2.3.3-debug/lib/ruby/2.3.0/psych.rb:475:in `block in load_file'
【以下略】
Ruby のバックトレースを表示できます。
上の例だと、ファイル名に psych.rb とあるあたりから推測するに、どうやら探しているものはこれではないようです。どこか別の場所で #each_slice
が使われたのでしょう。
一度 continue
してみると、今度は
(gdb) continue
Continuing.
Breakpoint 1, enum_each_slice (obj=93825010217640, n=2469) at enum.c:2227
2227 long size = NUM2LONG(n);
(gdb) call rb_backtrace()
from test.rb:12:in `<main>'
from test.rb:12:in `each_slice'
test.rb の each_slice
から始まるバックトレースが出ました。ビンゴのようです。
ちなみに、このタイミングで enum_each_slice
の第一引数に渡される obj
の正体は rb_p
関数で調べることができて、
(gdb) call rb_p(obj)
#<Mysql2::Result:0x0055555667b398 【詳細略】>
Mysql2::Result
であることが分かります。
狙ったところを捕捉できたと見て間違いなさそうです。
処理の流れ
ソースコードを再掲します。
C で書かれた enum_each_slice
のコードが
static VALUE
enum_each_slice(VALUE obj, VALUE n)
{
long size = NUM2LONG(n);
VALUE ary;
struct MEMO *memo;
int arity;
if (size <= 0) rb_raise(rb_eArgError, "invalid slice size");
RETURN_SIZED_ENUMERATOR(obj, 1, &n, enum_each_slice_size);
size = limit_by_enum_size(obj, size);
ary = rb_ary_new2(size);
arity = rb_block_arity();
memo = MEMO_NEW(ary, dont_recycle_block_arg(arity), size);
rb_block_call(obj, id_each, 0, 0, each_slice_i, (VALUE)memo);
ary = memo->v1;
if (RARRAY_LEN(ary) > 0) rb_yield(ary);
return Qnil;
}
デバッグ用の ruby のスクリプトが
results.each_slice(1234).first
と each_slice
にブロックを渡していない関係で、最初に enum_each_slice
が呼ばれた時には RETURN_SIZED_ENUMERATOR
のところで enumerator インスタンスを作ってそのまま return します4。
その後、今度はブロック付きの形でもう一度この関数が呼ばれるという流れになっています。二度めに呼ばれたときは次の行へ続きます。
/* さっきはこの行で return した */
RETURN_SIZED_ENUMERATOR(obj, 1, &n, enum_each_slice_size);
/* 今度はブロック付きなので return せず、↓ が実行される */
size = limit_by_enum_size(obj, size);
size がゼロ?
引続き、出てくる変数の値を確認しながら進めてみます。
2233 RETURN_SIZED_ENUMERATOR(obj, 1, &n, enum_each_slice_size);
(gdb) n
2234 size = limit_by_enum_size(obj, size);
(gdb) p size
$1 = 1234
limit_by_enum_size
を呼出す前は size
の値は 1234 でした。
呼び出した後の値を見てみると、
(gdb) n
2235 ary = rb_ary_new2(size);
(gdb) p size
$2 = 0
・・・ゼロ?
なんだか妖しい香りがしてきました。
この続きのコードの流れは
ary = rb_ary_new2(size);
arity = rb_block_arity();
memo = MEMO_NEW(ary, dont_recycle_block_arg(arity), size);
rb_block_call(obj, id_each, 0, 0, each_slice_i, (VALUE)memo);
- サイズ 0 の配列を作り
- その配列を memo オブジェクトに格納しつつ、
-
rb_block_call
を呼出す
さらに、rb_block_call
の先から each_slice_i
という関数が呼び出されることになります。
コードはこんな感じ。
static VALUE
each_slice_i(RB_BLOCK_CALL_FUNC_ARGLIST(i, m))
{
struct MEMO *memo = MEMO_CAST(m);
VALUE ary = memo->v1;
VALUE v = Qnil;
long size = memo->u3.cnt;
ENUM_WANT_SVALUE();
rb_ary_push(ary, i);
if (RARRAY_LEN(ary) == size) {
v = rb_yield(ary);
if (memo->v2) {
MEMO_V1_SET(memo, rb_ary_new2(size));
}
else {
rb_ary_clear(ary);
}
}
return v;
}
変数 memo
と ary
はそれぞれ先ほど enum_each_slice
で作った MEMO
オブジェクトと配列、size
はやはりゼロです。
(gdb) break each_slice_i
Breakpoint 2 at 0x555555720475: file enum.c, line 2172.
(gdb) continue
Continuing.
test.rb:12: warning: :cache_rows is ignored if :stream is true
Breakpoint 2, each_slice_i (i=93825010216640, m=93825010217280, argc=1, argv=0x7fffffffb768, blockarg=8) at enum.c:2172
2172 struct MEMO *memo = MEMO_CAST(m);
(gdb) n
2173 VALUE ary = memo->v1;
(gdb) n
2174 VALUE v = Qnil;
(gdb) n
2175 long size = memo->u3.cnt;
(gdb) n
2176 ENUM_WANT_SVALUE();
(gdb) p size
$3 = 0
あと、関数の引数に渡されている i
というものがあって、 call rb_p(i)
すると MySQL から取ってきたレコードの最初の行のハッシュが入っているのが分かります。
つまり、この each_slice_i
は ruby っぽい疑似コードで表すと
results.each do |hash|
call :each_slice_i, hash, ...
end
のような感じで1レコードごとに呼び出されていると捉えられます。処理の中身は
rb_ary_push(ary, i);
if (RARRAY_LEN(ary) == size) {
v = rb_yield(ary);
if (memo->v2) {
MEMO_V1_SET(memo, rb_ary_new2(size));
}
else {
rb_ary_clear(ary);
}
}
return v;
- 配列に要素を push し、
- その結果として配列のサイズが size とイコールになったらブロックを
yield
する - そうでなければ、それ以上何もせずに return
という処理を繰り返す感じになります。
さて、最初にここに辿り着いたときは
- 配列は空 (= 配列のサイズは 0)
- 最初の要素を配列に入れるので、配列の長さは +1 されて 1
- 変数
size
の値は 0 - 配列の長さと
size
は異なるので、何もしない
2つめの要素で呼び出されたときは
- 配列のサイズは 1
- 2つめの要素を配列に入れるので、配列の長さは +1 されて 2
- 変数
size
の値は 0 - 配列の長さと
size
は異なるので、何もしない
というわけで、配列のサイズは size
から離れる一方で一致することはなく、結果として全要素をこの配列に詰め込もうとする動作になることが分かりました。メモリを食い潰すのは、ここが原因のようです。
size = 0 を返すのは誰?
そもそも、size
がゼロになっているのがおかしい気がします。ゼロを返したのは誰かというと enum_each_slice
で呼び出されていた limit_by_enum_size
という関数でした。実装はこんな感じ。
static long
limit_by_enum_size(VALUE obj, long n)
{
unsigned long limit;
VALUE size = rb_check_funcall(obj, id_size, 0, 0);
if (!FIXNUM_P(size)) return n;
limit = FIX2ULONG(size);
return ((unsigned long)n > limit) ? (long)limit : n;
}
要約すると、
-
obj
のsize
というメソッドを呼び出して、 -
size
メソッドが返した値を元に計算を行い、 - その値を返す
という動作です。つまり ruby のコードだと
results.size
の返す値が鍵になりそうです。
というわけで、いったん gdb を離れて Ruby のコードで実験してみます。
results = mysql.query(query, stream: true)
results.size # => 0
ゼロを返していることが確認できました。
stream
が false
だと
# query は末尾に "limit 123" をつけて大量に返ってこないように細工した
results = mysql.query(query, stream: false)
results.size # => 123
今度は正しい値が返ってきました。
stream: true
を指定すると結果の件数が 0 になるようです。
どうも ruby 側 Enumerable#each_slice
の期待と mysql2 側 Mysql2::Result#size
の挙動との間でミスマッチがあるようですね。
件数 = 0 が返るのはバグ?
mysql2 が件数 = 0 を返すのはバグでしょうか?
あるいは意図した通りの挙動?
さらに調べる必要がありそうです。
Mysql2::Result#size の実装
pry で #size
の実装箇所を求めて show-source
すると、
pry(main)> show-source results.size
From: ..../vendor/bundle/ruby/2.3.0/gems/mysql2-0.4.5/ext/mysql2/result.c (C Method):
Owner: Mysql2::Result
Visibility: public
Number of lines: 20
static VALUE rb_mysql_result_count(VALUE self) {
GET_RESULT(self);
if (wrapper->is_streaming) {
/* This is an unsigned long per result.h */
return ULONG2NUM(wrapper->numberOfRows);
}
if (wrapper->resultFreed) {
/* Ruby arrays have platform signed long length */
return LONG2NUM(RARRAY_LEN(wrapper->rows));
} else {
/* MySQL returns an unsigned 64-bit long here */
if (wrapper->stmt_wrapper) {
return ULL2NUM(mysql_stmt_num_rows(wrapper->stmt_wrapper->stmt));
} else {
return ULL2NUM(mysql_num_rows(wrapper->result));
}
}
}
こちらも C で書かれたコードでした。
ざっと眺めた感じ、関数冒頭の if 文に出てくる wrapper->is_streaming
の値は、ruby 側で stream: true
を指定している場合は真になりそうに思えます。そうなると wrapper->numberOfRows
がどうなっているかがポイントになります。
あと、ストリーミングかどうかで件数を求めるロジックは違っているようだということも伺えます。
これも実行時の変数の値を覗いてみたいので rb_mysql_result_count
にブレークポイントを置いて続行・・・と考えたところで疑問が。
mysql2 の C 拡張は果たしてデバッグ用にコンパイルされてるのだろうか?
gem 内の C 拡張もデバッグ用にするには
ちゃんと追い切れていないのですが、デバッグ用にビルドした ruby で gem をインストールすれば大丈夫そうです。
通常ビルドの ruby とデバッグ用に -O0 -g3
などを指定してビルドした ruby でそれぞれ bundle install
して比較してみたところ、デバッグ用 ruby でインストールされた mysql2 の Makefile には該当オプションが現れていました。
Makefile から該当箇所を抜粋したものがこんな感じです。
optflags = -O0 -fno-fast-math
debugflags = -g3
デバッグ用ではない通常ビルドの ruby の Makefile だと、
optflags = -O3 -fno-fast-math
debugflags = -ggdb3
通常ビルドでも -ggdb3
が指定されているので、デバッグシンボルについては特に問題にならなそうですが、最適化オプションが -O3
になっているので、場合によっては gdb の動作に影響がありそうに思えます。
というわけで、gem の C 拡張を gdb で追いたい場合は、デバッグ用にビルドした ruby でインストールし直すのが良さそうです。
Mysql2::Result#size に潜る
gem の C 拡張をデバッグ用にする方法が分かったところで rb_mysql_result_count
関数の追跡に戻ります。
ブレークポイントを設定して実行を進めたところ、
Breakpoint 2, rb_mysql_result_count (self=93825010212000) at result.c:951
951 GET_RESULT(self);
(gdb) n
953 if (wrapper->is_streaming) {
(gdb) p wrapper->is_streaming
$1 = 1 '\001'
(gdb) n
955 return ULONG2NUM(wrapper->numberOfRows);
(gdb) p wrapper->numberOfRows
$2 = 0
wrapper->is_streaming
の値が 1 で真になるのは予想通り、問題の numberOfRows
は、やはり 0 でした。
そうなると、この numberOfRows
に値を入れたのは誰なのかを追跡すれば良さそうです。
というわけで、手元の mysql2 レポジトリを検索。
$ cd ~/git/github.com/brianmario/mysql2/ext/mysql2
$ ag numberOfRows
result.h
14: my_ulonglong numberOfRows;
result.c
792: wrapper->numberOfRows++;
812: if (args->cacheRows && wrapper->lastRowProcessed == wrapper->numberOfRows) {
815: for (i = 0; i < wrapper->numberOfRows; i++) {
823: for (i = 0; i < wrapper->numberOfRows; i++) {
847: if (wrapper->lastRowProcessed == wrapper->numberOfRows && args->cacheRows) {
920: wrapper->numberOfRows = wrapper->stmt_wrapper ? mysql_stmt_num_rows(wrapper->stmt_wrapper->stmt) : mysql_num_rows(wrapper->result);
921: wrapper->rows = rb_ary_new2(wrapper->numberOfRows);
928: wrapper->rows = rb_ary_new2(wrapper->numberOfRows);
955: return ULONG2NUM(wrapper->numberOfRows);
978: wrapper->numberOfRows = 0;
numberOfRows
に値を入れていると思われるのは result.c の
792: wrapper->numberOfRows++;
920: wrapper->numberOfRows = wrapper->stmt_wrapper ? mysql_stmt_num_rows(wrapper->stmt_wrapper->stmt) : mysql_num_rows(wrapper->result);
978: wrapper->numberOfRows = 0;
この3つと見て良いでしょう。
978 行目の wrapper->numberOfRows = 0
は構造体の初期化でした。これ以外のフィールドも同様に 0 がセットされています。探しているものではなさそうなので除外。
920 行目の方は rb_mysql_result_each
関数の中にありました。ruby の世界では #each
の実装です。よく見ると該当箇所のコードは
if (wrapper->rows == Qnil && !wrapper->is_streaming) {
wrapper->numberOfRows = 【以下略】
「is_streaming
でなければ」という条件がついているので、ここは通らないはず。これも除外。
残るは 792 行目のみになりました。この箇所は rb_mysql_result_each_
関数にあります。先ほどの、末尾に _
が付いていない rb_mysql_result_each
関数から呼び出されます。
この関数の実装は is_streaming
かどうかで挙動が変わり、792 行目は is_streaming
である場合の処理です。コードを追ってみると「結果を1行取ってきて numberOfRows++
する」ようです。
いったん ruby の世界に戻ってきて確認してみます。
results = mysql.query(query, stream: true)
results.size # => 0
results.first # => 1件目のハッシュ
results.size # => 1
results.first # => 2件目のハッシュ
results.size # => 2
確かに、1件拾うたびに size
がインクリメントされています。
この挙動と、ここまでに見てきたコードの記述から推測するに、ストリーミングの場合にサイズが 0 になっているのはバグではなく、意図してこのように実装されているらしいということが見えてきました。
おそらくは MySQL 自体が、ストリーミング的な使い方をされた時には件数を返さないような挙動になっている事情があって、mysql2 はその挙動に合うように「最初は 0 件にしておいて、レコードを読むのに合わせて自前でカウントしていく」ようになっているのではないかなーと推測します。
MySQL の挙動は?
その答えは MySQL の公式ドキュメントにありました。
mysql_use_result() は結果セットの取得を開始しますが、mysql_store_result() のように、実際に結果セットをクライアントに読み込みません。代わりに、mysql_fetch_row() への呼び出しを行うことによって、各行を個別に取得する必要があります。これは、クエリーの結果を一時テーブルやローカルバッファーに保存することなく、サーバーから直接読み取ります。これは、mysql_store_result() よりいくぶん高速で、使用するメモリーがはるかに少なくなります。
【中略】
mysql_data_seek()、mysql_row_seek()、mysql_row_tell()、mysql_num_rows()、または mysql_affected_rows() を mysql_use_result() から返される結果と一緒に使用できません。また、mysql_use_result() が終了するまで、ほかのクエリーを発行することもできません。(ただし、すべての行をフェッチしたあとに、mysql_num_rows() はフェッチした行数を正確に返します。)
つまり
- 結果セットを取るには
mysql_use_result
かmysql_store_result
どちらかを使うが、 -
mysql_use_result
を使った場合はmysql_num_rows
を呼び出せず、正確な行数を取得することはできない
ということでした。
先ほどの推測と一致しますが、「is_streaming
であったときには mysql_use_result
の方が使われている」のかどうかについて、まだ裏が取れていません。
mysql2 の方に戻って、mysql_use_result
を使っている箇所を探してみます。
client.c の nogvl_do_result
関数にこういう箇所があり、
if (use_result) {
result = mysql_use_result(wrapper->client);
} else {
result = mysql_store_result(wrapper->client);
}
use_result
はどこから来たのかを呼び出し元に遡って探してみると、同じく client.c の rb_mysql_client_async_result
にこういう箇所があり、
if (is_streaming == Qtrue) {
result = (MYSQL_RES *)rb_thread_call_without_gvl(nogvl_use_result, wrapper, RUBY_UBF_IO, 0);
} else {
result = (MYSQL_RES *)rb_thread_call_without_gvl(nogvl_store_result, wrapper, RUBY_UBF_IO, 0);
}
ここがスタート地点でした。
-
is_streaming
であればmysql_use_result
が使われ、 - そうでなければ
mysql_store_result
が使われる
という実装になっています。
stream: true
を指定すると、MySQL の応答からは正しい件数が取れないという推測の裏付けが取れました。
結論
- mysql2 で
stream: true
を指定すると、返ってくる件数を予め知ることはできない。これは MySQL の挙動による。 - mysql2 はこの挙動のため、「すでに読み込んだ件数」を返すようにしている。全件を読み終わったら、結果として正しい件数と一致するようになる。
- この mysql2 の挙動は
Enumerable#each_slice
と相性が悪い。#each_slice
は#size
が常に正しい件数を返すことを前提にしているからである。
というわけで、mysql2, Enumerable#each_slice
共にバグがあるとは言えず、根元にある MySQL の挙動から生じる制限によって正しく協調することができない、相性の問題かな、という印象です。
もしかすると、Mysql2::Result
に独自の #each_slice
を実装すれば上手く動かせるようになるかもしれませんが、今回はひとまず調査までということで、ここで一旦終わりにします。
まとめ
Ruby には pry などの便利ツールが揃っていてデバッグする敷居がだいぶ低くなっている一方で、今回の例のように C で書かれた箇所で問題が起きる場合もあります。そういう場合にも gdb などのツールが使えると自分で実装まで潜れるので助かります。
もし興味を持たれたら、お手元の環境で gdb などを試してみるのも一興かと思います。いつか何かの役に立つかもしれません。
あと、末尾に参考文献を挙げておきますので、良ければ覗いてみてください。
オチ
さて、さんざん遠回りしておいて顛末をまとめたりとかしてきたわけですが、ここで落ち着いてもう一度試してみたところ、
results = mysql.query(query, cache_rows: false) # ← ここを変えた
results.each_slice(1000) do |records|
hashes = build_bulk_insert_hashes(records)
es.bulk(index: "foo", type: "bar", body: hashes)
end
実はこうすれば何の問題もなかった5 (。◉ᆺ◉)
お後がよろしいようで。
明日は @koichiro さんが何か書く予定です。ご期待ください。
参考文献
-
gdbを使ったrubyのデバッグ - クックパッド開発者ブログ
Cookpad の k0kubun さんのエントリー。ruby を gdb でデバッグする方法について参考になりました。 -
GDBでデバッグするなら-g3オプション - ククログ
クリアコードの、たぶん須藤さんでしょうか? 今回使ったコンパイルオプションの意味を確認しようとして辿り着きました。 -
Debug Hacks - デバッグを極めるテクニック&ツール
少し前の本ですが、gdb の基本的な使い方などがまとまっていて参考になりました。
補足
-
「ありのまま」はちょっと嘘で、実際にはこのエントリーに書いたよりもっとカオスでした。 ↩
-
そういうのは embulk 使えば良くね? というツッコミはあろうかと思います。いちおう検討したのですが、embulk-output-elasticsearchをざっと眺めた感じ、内側にネストした項目があるドキュメントを扱う方法が分からず、今回は見送った次第です。 ↩
-
マクロが使われていて分かりづらいですが、条件次第ではこの場所で return するようになっています。 ↩
-
いちおう言い訳をしておくとですねー、途中でよく分からなくなって、いろいろ試行錯誤したわけですよ。
cache_rows
とstream
オプションをそれぞれオン・オフして4パターンで動かしてみたり、Enumerable::Lazy
を持ち出してみたり、#each_slice
でブロックが呼ばれる度にGC.start
してみたり。いろいろやり過ぎて、ちゃんと動いていたのを見落としてたとかかなーと思いますが、今となってはもうよく分かりません。まぁ、おかげでアドベントカレンダーのネタができたので良しとするということで、ひとつ。 ↩