はじめまして!
リクルートライフスタイルで新規事業の開発を担当している@taikitです。
今回のアドベントカレンダーを機に初めて記事を書いてみます。
はじめに
現在、Elasticsearchを用いた検索システムの構築を行っています。Indexの設定を更新する場合、更新以前のデータには適用されません。更新以前のデータにも適用させるためには、インデックスを作り直す必要があります。様々な運用方法が考えられますが、今回はなるべく楽に運用できる方法を考えてみました。
考えるケース
検索対象のデータはMySQLに溜められたのちに、LogstashでElasticsearchへ同期します。ユーザーの検索リクエストをRailsで受け取、Elasticsearchで検索を行います。Railsの場合、公式のGemでElasticsearchへデータを保存することもできますが、Railsを介さずMySQLへ直接データが保存されることも想定してLogstashを使っています。なお、ElasticsearchはマネージドサービスであるElastic Cloudを利用しています。
無停止インデックス更新の主な方法
調べた範囲では主に大きく分けて3つの方法がありました。
Index Aliasesを利用する方法
公式ブログで紹介されている方法です。アプリケーション側で直接Indexを参照するのではなく、Aliasを参照するように設定しておきます。これにより新しいIndexを参照させたいときは、Aliasの更新だけすれば良いのでアプリケーションのコードを更新する必要はありません。
クラスタごと切り替える方法
リクルートの他部署でも行われている方法1です。運用中のクラスタとは別に新しいインデックスが適用されたクラスタを作り、Blue/Greenデプロイ等で新旧のクラスタを入れ替えます。比較的大掛かりになりますが、Elasticsearchのバージョンアップ等の運用面も考慮すると一度仕組みを構築してしまえば運用は楽だそうです。
アプリケーションレイヤーで切り替える方法
こちらの記事で紹介されている方法です。アプリケーション側で参照するインデックスを切り替えます。アプリケーション側の検索クエリの変更を伴う場合やIndexのFieldを大きく変更する場合は、アプリケーションのコードで切り替える必要があるケースもあるかもしれません。
適用した方法
今回はIndex Aliasesを利用する方法を採用しました。クラスタごと切り替える方法に関しては、今回はマネージドサービスを利用しておりElasticsearch自体の更新は自動的に無停止で行われるため、バージョンアップ時の心配はありません。また検索性能を向上させるためにAnalyzerを頻繁に更新することが予想されるため、毎回アプリケーションを更新するのでは煩雑になってしまいます。
処理の概要としては、Logstashでタイムスタンプが含まれたIndexを新規作成し、定期実行されるスクリプトでIndex名のタイムスタンプを読み取り、新しいIndexがあればAliasの切り替えを行います。
Logstash の設定
Logstashをデプロイするだけで新たな設定のIndexがElasticsearchに追加されデータが同期されるような設定を行います。
input {
jdbc {
jdbc_driver_library => "mysql-connector-java-8.0.11/mysql-connector-java-8.0.11.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://${MYSQL_HOST}:3306/${DB_NAME}"
jdbc_user => "${MYSQL_USER}"
jdbc_password => "${MYSQL_PASSWORD}"
schedule => "* * * * *"
statement_filepath => "/usr/share/logstash/sql/items.sql"
use_column_value => true
tracking_column => "updated_at"
tracking_column_type => "timestamp"
}
}
output {
elasticsearch {
hosts => ["${ELASTICSEARCH_HOST}"]
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
manage_template => true
template => "/usr/share/logstash/template/items.json"
template_name => "items"
template_overwrite => true
index => "items-${INDEX_TIMESTAMP}"
document_type => "_doc"
document_id => "%{id}"
}
}
デプロイ時に環境変数のINDEX_TIMESTAMPに20181209045112のように現在時刻がセットされるようにしてください。これによりIndex名の末尾にタイムスタンプが入るようにしています。Indexの設定はなるべくLogstashに寄せるためにIndex Templatesを使っています。Templateのindex_patternsのタイムスタンプの部分にはワイルドカードを指定しておきます(上記の例の場合"items-*")。
tracking_columnを設定し、前回同期時点以降に更新されたレコードのみを同期するようにしています。Logstashをデプロイする際に、この最終同期日時をリセットすることで新しいIndexにすべてのレコードが同期されます。Logstashをコンテナで運用する場合などは最終同期日時を記録しているファイルが引き継がれないため、このリセットに関して操作は必要ありません。
Aliasを更新するスクリプト
Index名のタイムスタンプを読み取り最新のIndexにAliasを入れ替え、古くなったIndexを削除します。このスクリプトはクックパッドさんのこちらの記事を大変参考にさせていただきました。
require 'elasticsearch'
require 'uri'
require_relative '../index_manager'
ES_URL = ENV.fetch('ES_URL')
ES_USER = ENV.fetch('ES_USER')
ES_PASSWORD = ENV.fetch('ES_PASSWORD')
TABLE_NAME = ENV.fetch('TABLE_NAME')
uri = URI.parse(ES_URL)
uri.user = ES_USER
uri.password = ES_PASSWORD
client = Elasticsearch::Client.new(url: uri.to_s)
index_manager = IndexManager.new(TABLE_NAME, client)
index_manager.switch_alias_to_latest
index_manager.delete_old_indexes
class IndexManager
def initialize(name, client)
@name = name
@client = client
end
def switch_alias_to_latest
latest_index_cache = latest_index
return if indexes_in_alias == [latest_index_cache]
switch_alias(latest_index_cache)
end
def delete_old_indexes
old_indexes.map do |index|
@client.indices.delete(index: index)
end
end
private
def alias_name
"#{@name}-latest"
end
def indexes
@client.indices.get(index: "#{@name}-*").keys
end
def indexes_in_alias
@client.indices.get_alias(index: alias_name).keys
rescue Elasticsearch::Transport::Transport::Errors::NotFound
[]
end
def latest_index
latest_date = indexes.map { |index| index_timestamp(index) }.max
"#{@name}-#{latest_date}"
end
def old_indexes
latest_timestamp = index_timestamp(latest_index)
indexes.select { |index| index_timestamp(index) < latest_timestamp }
end
def switch_alias(new_index)
actions = []
indexes_in_alias.each do |old_index|
actions << { remove: { index: old_index, aliases: alias_name } }
end
actions << { add: { index: new_index, aliases: alias_name } }
@client.indices.update_aliases(body: { actions: actions })
end
def index_timestamp(index)
index.match(/#{@name}-(\d{14})/)[1]
end
end
このスクリプトをCron等で定期実行させることで、新しいIndexが作成された際に自動的にAlias先のIndexが切り替わり、古いIndexは削除されます。万が一に備えて、切り戻しを想定する場合はIndexの削除は行わない方が良いかもしれません。
同期が完了していない場合もAliasを更新してしまう
上記のスクリプトを定期実行する場合、対象のデータが多いとLogstashの同期が完了する前にAliasを切り替えてしまいタイミングによっては検索結果が不完全になる可能性があります。スクリプト内で新旧のドキュメント数を比較したり、旧インデックスの最後のIDが新インデックスに含まていることを確認したりする処理が必要になります。(できれば追記したいと思います)
Railsの設定
gemのelasticsearch-modelを利用している場合の設定になります。index_nameにスクリプトで設定したAlias名を設定します。
class Item < ApplicationRecord
include Elasticsearch::Model
index_name 'items-latest'
document_type '_doc'
end
終わりに
Logstashと組み合わせて無停止インデックス更新の方法を考えてみました。まだ一部、試行錯誤の途中ではありますが記事にしてみました。こんな運用の仕方もあるよ等ありましたらコメントで教えていただけるとうれしいです。