Ruby で ReactiveX が使える RxRuby というgemがあるので、それを使って Ruby でも ReactiveX を試してみたいと思います。
目的
仕事での実案件で、MySQLのデータを ElasticSearch にインデックスさせる必要がありました。通常なら ActiveRecord の after_save で searchkick を叩くだけの簡単なお仕事なのですが、以下の理由からもう少し安心できる実装方法を求めていました。
- after_save などの callback は rails console からデータメンテをしたときなど、予期しないタイミングで走ってしまって問題になりやすいので極力使いたくない
- mysql-client などから直接SQLで更新したデータが elasticsearch に反映されない。それを簡単に検知する方法も無い。
- モデルが複数テーブルにまたがるような複雑な構造の場合、小テーブル孫テーブルだけが変更された場合も変更検知しなければならず、ActiveRecord ではキレイに実装しづらい。孫テーブルが更新されたときに親テーブルのタイムスタンプを更新するような実装だと大量にデータを更新するようなバッチ処理に大きな影響が出る。
そこで、MySQL を直接監視することでテーブルの変更イベントのストリームを作り、そのストリームに対して RxRuby を使って非同期に応答させる仕組みを作ることにしました。また、MySQL の変更を監視する場合、短時間の間に大量の更新が発生することもありえます。そのような場合に重たい処理をを繰り返し実行するということが無いように RxRuby の buffer などの機能を使います。
MySQL の変更検知
最初の問題はどうやって MySQL の変更を検知するかですが、以下の2つの方法を思いつきました。
- ibd ファイルを inotify で検知
- mysqlbinlog を使う
1 は MySQL のデータファイルである ibd ファイルを直接 inotify で監視して、監視対象のテーブルが変更されたときに OS から通知を受けるというものです。RDSなどネットワーク越しでは使えないという大きな問題があるものの負荷が最小に抑えられるので試してみましたが上手く行きませんでした。理由は MySQL が ibd の操作に mmap システムコールを使っていて、inotify は mmap をフックしない (write システムコールからはフックされる)ため、検知が全く届かず使えなかったためです。
2 は mysqlbinlog コマンドを使って全ての更新系クエリをストリームで読むというものです。全てのクエリが流れてくるため1に比べるとほんの少しだけ負荷が高くなります。この方法であればリモートからの接続も可能です。
リモートからmysqlbinlogを読むためには以下のようなコマンドを使うことができます。
$ mysqlbinlog --read-from-remote-server --host=localhost --user=user --password=PASSWORD -v --stop-never -j 37103974 mysql-bin.000009
最後の引数は binlog のファイル名で、その前の数字はファイルの中の読み込み開始位置です。いずれも以下のようにして調べることができます。
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000009 | 37103974 | | | |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)
binlog raw モード
最後に、全ての変更をもれなく検知するためには binlog のフォーマットを row モードにする必要があります。これは binlog の肥大化を招くので binlog の有効期限を短めに設定するなどの対策が必要になります。
RxRuby
簡単に RxRuby を説明します。Rxについてはあまりうまく説明できる自信が無いので詳細は公式のサイトやこちらのドキュメントを参照してもらうのが良いと思います。簡単にいうと、イベントストリームに対して間引きや集約、集計などの処理を簡単に書くためのライブラリです。
残念ながらあまり開発は活発では無いようです。半年ほど更新されておらず、RxJS にはあるAPIが RxRuby には実装されていないということがありました。後述のようにトリビアルなバグも放置されています。
まずはサンプルコードを見てください。ストリーム (source
)を作り、そこに0.1秒ごとに値を流し込みます。出口側では値をいきなり出力せずにバッファにためておいて、0.5秒ごとに配列として吐き出しています。
require 'rx'
subject = Rx::BehaviorSubject.new(nil) # nil は初回の値
# 0.5秒ごとに値を配列にためて一度に渡してくれるバッファ
source = subject.as_observable.buffer_with_time(0.5)
subscription = source.subscribe(
lambda {|values|
# 0.5秒ごとにこのブロックが実行される。valuesは直近0.5秒間にストリームから送られてきた値の配列
return if values.empty?
puts 'Next: ' + values.inspect
},
lambda {|err|
puts 'Error: ' + err.to_s
},
lambda {
puts 'Completed'
})
30.times do |n|
sleep 0.1
subject.on_next n # 0.1秒ごとに値をストリームに突っ込む
end
subject.on_next 100
しかしこれをこのまま実行してもエラーがでます(以下参照)
RxRuby 0.0.3 のバグ
残念ながら RxRuby 0.0.3 には小さなバグがあります。check_unsubscribed
というメソッドがprivateとして定義されているのに self.check_unsubscribed
という呼び出し方をされているために例外が発生するというトホホな感じです。以下のパッチで public メソッドにすることでとりあえず今回は回避しました。
require "rx"
class Rx::BehaviorSubject
public :check_unsubscribed
end
サンプルコード実行結果
Next: [nil, 0, 1, 2, 3]
Next: [4, 5, 6, 7, 8]
Next: [9, 10, 11, 12, 13]
Next: [14, 15, 16, 17, 18]
Next: [19, 20, 21, 22, 23]
Next: [24, 25, 26, 27, 28]
Next: [29, 100]
このように、0.5秒ごとにストリームから吐き出された配列に対してブロックが評価されています。このようにしてイベントを一定時間ごとに集約して重複を省くことで更新チェックというやや重ための処理を高頻度で実行することを防ぐことができます。
注意点として、RxRubyの実装は Thread ベースです。subscribe ごとに内部で Thread が作成され、subscribe に渡したブロックはいずれもそのスレッドの中で実行されるようになります。共有変数を変更したりすると予期せぬバグを生む可能性があるので注意しましょう。
MySQL と組み合わせる
まずは binlog を読み込んで、監視対象テーブルに対する更新系のクエリのみをフィルタして吐き出すメソッドを作ります。
def read_binlog
command = %Q(mysqlbinlog --read-from-remote-server --host=localhost --user=xxxx --password=xxxx -v --stop-never -j #{@position.position} #{@position.filename})
IO.popen(command, "r+") do |binlog|
while line = binlog.gets
next unless match = /\#\#\# (DELETE FROM|UPDATE|INSERT INTO) `([^`]+)`.`([^`]+)`/.match(l)
type, database, table = match[1], match[2], match[3]
next unless TARGET_TABLES.include?(table)
yield { type: type, database: database, table: table }
end
end
end
mysqlbinlog
に -v
オプションを付けると SQL も出力してくれますので、それを正規表現でフィルタすることで目的のクエリを抽出しています。
次に、このメソッドの出力をストリームに突っ込んでやりましょう。
subject = Rx::BehaviorSubject.new("")
source = subject.as_observable.buffer_with_time(1)
source.subscribe(
lambda do |tables|
synchronize( tables.uniq )
end,
lambda { |err| puts 'Error: ' + err.to_s },
lambda { puts 'Completed' }
)
read_binlog do |change|
source.on_next change[:table]
end
ここではストリームを作り、read_binlog が検知したテーブルをストリームに突っ込み、1秒ごとにバッファされたものを synchronize
というメソッドに渡しています。
synchronize
残る synchronize
メソッドですが、今回の記事の範疇を越えているので省略させてもらいます。簡単に説明すると、前回の synchronize 実行から後に変更されたモデルの id をどうにかこうにか洗い出して searchkick で連動するということをしています。それを効率的に実行するために弊社謹製の trigger ベースの履歴テーブルを使っていますが、それについても別の機会に紹介できればと思います。
さいごに
1秒ごとに処理を行う、という目的はキレイに達成できたかなと思いますが、最終的に更新対象の id を洗い出す箇所に何度か select を伴うので負荷的にはあまり良くなってないかもしれません。更新がそれほど頻繁で無い場合は随分改善されるはずですが、対象テーブルの更新が頻繁に起きる運用下ではややオーバーヘッドが大きくなりそうです。
初めて Rx を触りましたが、非同期プログラミングはどう動くのか(スレッドかプロセスか)を理解してイメージできるようにしておかないとコーディング時にすこし戸惑ってしまうと感じました。