LoginSignup
17
7

More than 5 years have passed since last update.

Logstashを使ったElasticsearchの無停止インデックス更新の運用を考える

Last updated at Posted at 2018-12-09

はじめまして!

リクルートライフスタイルで新規事業の開発を担当している@taikitです。
今回のアドベントカレンダーを機に初めて記事を書いてみます。

はじめに

現在、Elasticsearchを用いた検索システムの構築を行っています。Indexの設定を更新する場合、更新以前のデータには適用されません。更新以前のデータにも適用させるためには、インデックスを作り直す必要があります。様々な運用方法が考えられますが、今回はなるべく楽に運用できる方法を考えてみました。

考えるケース

前提.png
検索対象のデータはMySQLに溜められたのちに、LogstashでElasticsearchへ同期します。ユーザーの検索リクエストをRailsで受け取、Elasticsearchで検索を行います。Railsの場合、公式のGemでElasticsearchへデータを保存することもできますが、Railsを介さずMySQLへ直接データが保存されることも想定してLogstashを使っています。なお、ElasticsearchはマネージドサービスであるElastic Cloudを利用しています。

無停止インデックス更新の主な方法

調べた範囲では主に大きく分けて3つの方法がありました。

Index Aliasesを利用する方法

公式ブログで紹介されている方法です。アプリケーション側で直接Indexを参照するのではなく、Aliasを参照するように設定しておきます。これにより新しいIndexを参照させたいときは、Aliasの更新だけすれば良いのでアプリケーションのコードを更新する必要はありません。

クラスタごと切り替える方法

リクルートの他部署でも行われている方法1です。運用中のクラスタとは別に新しいインデックスが適用されたクラスタを作り、Blue/Greenデプロイ等で新旧のクラスタを入れ替えます。比較的大掛かりになりますが、Elasticsearchのバージョンアップ等の運用面も考慮すると一度仕組みを構築してしまえば運用は楽だそうです。

アプリケーションレイヤーで切り替える方法

こちらの記事で紹介されている方法です。アプリケーション側で参照するインデックスを切り替えます。アプリケーション側の検索クエリの変更を伴う場合やIndexのFieldを大きく変更する場合は、アプリケーションのコードで切り替える必要があるケースもあるかもしれません。

適用した方法

今回はIndex Aliasesを利用する方法を採用しました。クラスタごと切り替える方法に関しては、今回はマネージドサービスを利用しておりElasticsearch自体の更新は自動的に無停止で行われるため、バージョンアップ時の心配はありません。また検索性能を向上させるためにAnalyzerを頻繁に更新することが予想されるため、毎回アプリケーションを更新するのでは煩雑になってしまいます。

処理の概要としては、Logstashでタイムスタンプが含まれたIndexを新規作成し、定期実行されるスクリプトでIndex名のタイムスタンプを読み取り、新しいIndexがあればAliasの切り替えを行います。

__ (2).png

Logstash の設定

Logstashをデプロイするだけで新たな設定のIndexがElasticsearchに追加されデータが同期されるような設定を行います。

input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-8.0.11/mysql-connector-java-8.0.11.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://${MYSQL_HOST}:3306/${DB_NAME}"
    jdbc_user => "${MYSQL_USER}"
    jdbc_password => "${MYSQL_PASSWORD}"
    schedule => "* * * * *"
    statement_filepath => "/usr/share/logstash/sql/items.sql"
    use_column_value => true
    tracking_column => "updated_at"
    tracking_column_type => "timestamp"
  }
}

output {
  elasticsearch {
    hosts => ["${ELASTICSEARCH_HOST}"]
    user => "${ELASTICSEARCH_USERNAME}"
    password => "${ELASTICSEARCH_PASSWORD}"
    manage_template => true
    template => "/usr/share/logstash/template/items.json"
    template_name => "items"
    template_overwrite => true
    index => "items-${INDEX_TIMESTAMP}"
    document_type => "_doc"
    document_id => "%{id}"
  }
}

デプロイ時に環境変数のINDEX_TIMESTAMPに20181209045112のように現在時刻がセットされるようにしてください。これによりIndex名の末尾にタイムスタンプが入るようにしています。Indexの設定はなるべくLogstashに寄せるためにIndex Templatesを使っています。Templateのindex_patternsのタイムスタンプの部分にはワイルドカードを指定しておきます(上記の例の場合"items-*")。

tracking_columnを設定し、前回同期時点以降に更新されたレコードのみを同期するようにしています。Logstashをデプロイする際に、この最終同期日時をリセットすることで新しいIndexにすべてのレコードが同期されます。Logstashをコンテナで運用する場合などは最終同期日時を記録しているファイルが引き継がれないため、このリセットに関して操作は必要ありません。

Aliasを更新するスクリプト

Index名のタイムスタンプを読み取り最新のIndexにAliasを入れ替え、古くなったIndexを削除します。このスクリプトはクックパッドさんのこちらの記事を大変参考にさせていただきました。

main.rb
require 'elasticsearch'
require 'uri'
require_relative '../index_manager'

ES_URL = ENV.fetch('ES_URL')
ES_USER = ENV.fetch('ES_USER')
ES_PASSWORD = ENV.fetch('ES_PASSWORD')
TABLE_NAME = ENV.fetch('TABLE_NAME')

uri = URI.parse(ES_URL)
uri.user = ES_USER
uri.password = ES_PASSWORD

client = Elasticsearch::Client.new(url: uri.to_s)

index_manager = IndexManager.new(TABLE_NAME, client)
index_manager.switch_alias_to_latest
index_manager.delete_old_indexes
index_manager.rb
class IndexManager
  def initialize(name, client)
    @name = name
    @client = client
  end

  def switch_alias_to_latest
    latest_index_cache = latest_index
    return if indexes_in_alias == [latest_index_cache]

    switch_alias(latest_index_cache)
  end

  def delete_old_indexes
    old_indexes.map do |index|
      @client.indices.delete(index: index)
    end
  end

  private

  def alias_name
    "#{@name}-latest"
  end

  def indexes
    @client.indices.get(index: "#{@name}-*").keys
  end

  def indexes_in_alias
    @client.indices.get_alias(index: alias_name).keys
  rescue Elasticsearch::Transport::Transport::Errors::NotFound
    []
  end

  def latest_index
    latest_date = indexes.map { |index| index_timestamp(index) }.max
    "#{@name}-#{latest_date}"
  end

  def old_indexes
    latest_timestamp = index_timestamp(latest_index)
    indexes.select { |index| index_timestamp(index) < latest_timestamp }
  end

  def switch_alias(new_index)
    actions = []
    indexes_in_alias.each do |old_index|
      actions << { remove: { index: old_index, aliases: alias_name } }
    end
    actions << { add: { index: new_index, aliases: alias_name } }
    @client.indices.update_aliases(body: { actions: actions })
  end

  def index_timestamp(index)
    index.match(/#{@name}-(\d{14})/)[1]
  end
end

このスクリプトをCron等で定期実行させることで、新しいIndexが作成された際に自動的にAlias先のIndexが切り替わり、古いIndexは削除されます。万が一に備えて、切り戻しを想定する場合はIndexの削除は行わない方が良いかもしれません。

同期が完了していない場合もAliasを更新してしまう

上記のスクリプトを定期実行する場合、対象のデータが多いとLogstashの同期が完了する前にAliasを切り替えてしまいタイミングによっては検索結果が不完全になる可能性があります。スクリプト内で新旧のドキュメント数を比較したり、旧インデックスの最後のIDが新インデックスに含まていることを確認したりする処理が必要になります。(できれば追記したいと思います)

Railsの設定

gemのelasticsearch-modelを利用している場合の設定になります。index_nameにスクリプトで設定したAlias名を設定します。

class Item < ApplicationRecord
    include Elasticsearch::Model
    index_name 'items-latest'
    document_type '_doc'
end

終わりに

Logstashと組み合わせて無停止インデックス更新の方法を考えてみました。まだ一部、試行錯誤の途中ではありますが記事にしてみました。こんな運用の仕方もあるよ等ありましたらコメントで教えていただけるとうれしいです。

17
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
7