9
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

【Rails】 Elasticsearch に bulk update させよう

Last updated at Posted at 2020-04-28

2020/04/29 公開

やること (やりたいこと)

  • rails, elasticsearch-ruby関連の gem を使用
  • XxxxSearchable module 周りをできるだけキレイに作る
  • Elasticsearch にインデックスを作成する
  • 複数ドキュメントの更新を bulk update で高速に処理させる

やらないこと ↓

  • Elasticsearch での検索に関して(今回は全く触れていません)

環境

  • ruby 2.7.0
  • rails 6.0.2
  • elasticsearch-model 7.1.0
  • elasticsaerch-rails 7.1.0

Rails アプリケーション、データベース、Elasticsearch が動作する環境がローカル環境に既に構築されているものとします。

HogeSearchableモジュール周りをできるだけキレイに作る

扱うモデルの関係図

(ER図間違ってたらごめんなさい)

er_test.png

Memberモデル

Memberモデルにelasticsearch-modelの機能を使うための設定を追加します。

app/models/member.rb
class Member < ActiveRecord::Base
  include Es::MemberSearchable
end

MemberSearchableモジュール

MemberSearchable module を作成します。

以下が個人的なポイント。

  • app/models/concerns/以下に有象無象にモデルが溜まるのがあまり好きではないので、XxxxSearchable用にapp/models/es/というディレクトリを新たに作成している
  • XxxxSearchableで使い回せるようなクラスメソッドを定義するための Es::SearchableBase module を include する(後で作る)
  • as_indexed_json メソッドの中身を定義するための Es::MemberFormer module を include する(後で作る)
app/models/es/member_searchable.rb
module Es::MemberSearchable
  extend ActiveSupport::Concern

  included do
    include Elasticsearch::Model
    include Es::SearchableBase
    include Es::Formers::MemberFormer

    # index名
    index_name 'members'

    settings do
      # フィールドの型を静的に定義する
      mappings dynamic: 'false' do
        indexes :name, analyzer: 'kuromoji', type: 'text' # 氏名
        indexes :watched_movie_ids, type: 'integer' # 見たことがある映画のID
        indexes :watched_movie_genre_ids, type: 'integer' # 見たことがある映画のジャンルのID
      end
    end

    # DBからElasticsearchへのデータインポート時に渡す値の設定
    def as_indexed_json(_option = {})
      create_data_hash.as_json
    end
  end
end

Es::SearchableBaseモジュール

使い回しができそうなメソッドを SearchableBase として切り分けておきます。
使い方はクラスメソッドなので Member.create_es_index! とか Member.get_es_mapping のような感じ。

app/models/es/searchable_base.rb
module Es::SearchableBase
  extend ActiveSupport::Concern

  class_methods do

    # index作成メソッド
    def create_es_index
      __elasticsearch__.client.indices.create(
        index: self.index_name,
        body: {
          settings: self.settings.to_hash,
          mappings: self.mappings.to_hash,
        }
      )
    end

    # indexを削除し、作成し直すメソッド(ドキュメント消えるので注意)
    def create_es_index!
      begin
        __elasticsearch__.client.indices.delete(index: self.index_name)
      rescue StandardError
        nil
      end

      self.create_es_index
    end

    # mappingを確認するメソッド
    def get_es_mapping
      __elasticsearch__.client.indices.get_mapping(index: index_name)
    end

    # mappingの再定義をするメソッド(新たなmappingを追加するときのみ使う)
    def put_es_mapping
      __elasticsearch__.client.indices.put_mapping(
        index: index_name,
        body:  self.mappings.to_hash,
      )
    end

    # documentの更新をするメソッド
    def update_es_documents
      transform = lambda do |target|
        { update: { _id: target.id, data: { doc: target.__elasticsearch__.as_indexed_json } } }
      end

      __elasticsearch__.import(transform: transform)
    end
  end
end

Es::MemberFormerモジュール

よく見かける XxxxSearchabel の設計では as_indexed_json メソッドの中にベタッとハッシュを書いていたりします。
しかし、index するフィールドの個数が20も30も、となってくると大変です。(今回は3件だけですが)
データ取得のための private メソッドも増えるはずなので行数が膨らみ、コードを追うのがつらくなります。

そうならないために分けます。
役割はできるだけ分け、ひとつひとつをできるだけシンプルに保ちたい。

app/models/es/formers/member_former.rb
module Es::Formers::MemberFormer
  extend ActiveSupport::Concern

  def create_data_hash
    {
      name: self.name,
      watched_movie_ids: self.member_watched_movies.ids,
      watched_movie_genre_ids: watched_movie_genre_ids_array,
    }
  end

  private

  def watched_movie_genre_ids_array
    self.member_watched_movies.map { |movie| movie.genre.id }.uniq.sort
  end
end

ここまでのディレクトリ構造

app
├ controllers
├ ...
└ models
  ├ concerns
  ├ es
  │ ├ formers
  │ │ └ member_former.rb
  │ ├ member_searchable.rb
  │ └ searchable_base.rb
  ├ member.rb
  ├ movie.rb
  └ ...

Elasticsearch にインデックスを作成する

インデックス作成用とデータインポート用の rake タスクを作ります。

lib/tasks/es.rake
namespace :es do
  namespace :members do
    # インデックス作成用
    task create_index: :environment do
      Member.create_es_index!
    end

    # データインポート用
    # いきなり全件いれたくなければ、allじゃなくてwhereで絞ってください
    task import: :environment do
      Member.all.__elasticsearch__.import
    end
  end
end

インデックス作成 rake タスクを実行します。↓

$ bundle exec rake es:members:create_index

インデックスが作成されたことを確認してみます。↓

$ curl -X GET 'localhost:9200/_cat/indices?v&pretty'

members index が表示されれば成功です。docs.count は0のはずです。

データインポート rake タスクを実行します。↓

$ budle exec rake es:members:import

データがインポートされたことを確認してみます。↓

$ curl -X GET 'localhost:9200/_cat/indices?v&pretty'

docs.count が 0 ではなくなっていればOKです。

ちなみにインデックス消したければ、以下を叩いてください。↓

$ curl -X DELETE 'localhost:9200/members'

複数ドキュメントの更新を bulk update で高速に処理させる

bulk updateの実装

bulk update 用の rake タスクを作成します。

lib/tasks/es.rake
namespace :es do
  namespace :members do

    # 省略

    # ドキュメントのアップデート用
    # whereの範囲は適当
    task update: :environment do
      Member.where(id: 1..10).update_es_documents
    end
  end
end

この rake タスクを実行すれば、bulk処理なアップデートができます。
変更差分のないドキュメントが更新の対象になっていれば、そのドキュメントの version は上がりありません。
update API ではなく index API が実行される場合は、変更差分のないドキュメントの version は上がってしまいます)

ここで使用している update_es_documents メソッドですが、実は先ほど Es::SearchableBase module にて既に定義したものです。

どのような処理をしているか見ていきます。

__elasticsearch__.import メソッドのオプション指定

(上で記述した Es::SearchableBase module を部分的に再掲 ↓)

app/models/es/searchable_base.rb
module Es::SearchableBase
  extend ActiveSupport::Concern

  class_methods do

    # 省略

    # documentの更新をするメソッド
    def update_es_documents
      transform = lambda do |target|
        { update: { _id: target.id, data: { doc: target.__elasticsearch__.as_indexed_json } } }
      end

      __elasticsearch__.import(transform: transform)
    end
  end
end

呼び出している処理は elasticsearch-model gem の import メソッドです。
(先ほど create したインデックスにデータをインポートする rake タスクでも使いました)

今回はこの import メソッドでは transform というオプションの引き数に proc を渡しています。
(fyi: https://github.com/elastic/elasticsearch-rails/blob/master/elasticsearch-model/lib/elasticsearch/model/importing.rb#L114-L120)

ハッシュの先頭の update というキーの指定が重要で、これが Elasitcsearch の update API を呼び出しているいることになります。

import メソッドは元々 bulk API を呼ぶ仕様になっています。
(fyi: https://github.com/elastic/elasticsearch-rails/blob/master/elasticsearch-model/lib/elasticsearch/model/importing.rb#L170)

したがって、この方法で bulk update を実装できたというわけです。

ちなみに transform をオプションとして特に指定しなければ、デフォルトとして index アクションが指定されているようです。
(fyi: https://github.com/elastic/elasticsearch-rails/blob/master/elasticsearch-model/lib/elasticsearch/model/adapters/active_record.rb#L111-L113)

1件ずつ vs 一括

bulk updateが実装できたものの結局速度はどうなんだ、というのがやはり気になるところですね。
かなり簡易的ではありますが、比較をしてみました。

条件
  • 1000件ドキュメントがインデックスされている
  • testという名前のtext型のフィールドがある
  • 1000件ともtestを同じ内容に変更する
  • 計測には ruby の benchmark を使用
1件ずつの場合

ここで使う update_document メソッドについて少しだけ説明をします。

モデルに Elasticsearch::Model::Callbacks を include すると、モデルの after_createafter_delete のタイミングで elastcsearch-model が用意してくれているメソッドが自動で呼ばれ、データベースと Elasticsearch の同期が行われます。
update_document メソッドは after_update 時に呼ばれます。

実は今回の↓のコードような update_document メソッドの使い方では、内部ロジック的に update API ではなく index API が実行されてしまいます…。
(fyi: https://github.com/elastic/elasticsearch-rails/blob/master/elasticsearch-model/lib/elasticsearch/model/indexing.rb#L425-L444)

比較対象としてどうなんだという点は大いにありますが、多めに見て下さい。
(いい方法あれば教えていただけると助かります)

コード

lib/tasks/es.rake
namespace :es do
  namespace :members do
    task test1: :environment do
      target_ids = 1..1000

      Benchmark.bm 10 do |r|
        r.report 'test' do
          Member.where(id: target_ids).find_each do |member|
            member.__elasticsearch__.update_document
          end
        end
      end
    end
  end
end

実行時間

$ bundle exec rake es:members:test1
                 user     system      total        real
test         4.976838   2.255107   7.231945 ( 63.745111)

一括の場合

コード

lib/tasks/es.rake
namespace :es do
  namespace :members do
    task test2: :environment do
      target_ids = 1..1000

      Benchmark.bm 10 do |r|
        r.report 'test' do
          Member.where(id: target_ids).update_es_documents
        end
      end
    end
  end
end

実行時間

$ bundle exec rake es:members:test2
                 user     system      total        real
test         0.251815   0.063858   0.315673 (  1.568112)
結果

bulk処理が圧倒的に速いですね。

最後に

Elasticsearch 周りのクラス設計をできるだけキレイに行い、インデックス内のデータをバルクアップデートするところまでをまとめてみました。

ここまでやってきたものの、これが正解とは思っていません。
モジュール名やメソッド名なども適当なものが多いですし、ディレクトリ構造ももっと改善の余地はあるかと思います。
もっと良い手法があれば教えていただければ大変嬉しいです。

RuboCop を入れていると、怒られる箇所が多々あると思うので、そのあたりはよしなにお願いします。(上限増やすとか、Excludeするとか)

最後まで見ていただき、ありがとうございました。

9
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?