この記事は さくらインターネット Advent Calendar 2016 16日目の記事です。
対象読者
- Elasticsearchにある程度の理解が有る
- インデックス・シャード
- 再インデックスの必要性
- Builk API, Scroll API
- Scalaに抵抗が無い
- sbtを使用できる
使用技術説明
elastic4s
elastic4sはScalaからElasticsearchを扱うためのライブラリです。DSLを用いて非同期な処理を定義できます。
https://github.com/sksamuel/elastic4s
ReactiveStreams
簡単に言うと非同期処理の標準化を目指すプロトコルです。Javaのreactive-streams/reactive-streams-jvmなどでインタフェースを提供し、様々なライブラリから統一した仕組みで制御できるように目指しています。
http://www.reactive-streams.org/
Akka-Streams
Akkaを使用した非同期のストリーミング処理ライブラリです。ReactiveStreamsへの互換性も有ります。データの入力であるSource、変換を行うFlow、出力のSinkの3要素で処理を組み立てます。Backpressureを行い、Source/Sinkの処理能力に応じて流動を変化する仕組みが有ります。
elastic4s, AkkaStreamで行うReIndex
導入方法は以下のような依存関係を記述するだけです。x.x.xの部分は上記のREADMEに書かれている、ターゲットESのバージョンと合わせてください。
libraryDependencies += "com.sksamuel.elastic4s" %% "elastic4s-streams" % "x.x.x"
通常のリクエスト
クライアントの生成から、インデックスの投入までのサンプル公式サイトから引用・修正しました。
import com.sksamuel.elastic4s.{ElasticClient, ElasticsearchClientUri}
import com.sksamuel.elastic4s.ElasticDsl._
val uri = ElasticsearchClientUri("elasticsearch://hostname:9300")
val client = ElasticClient.transport(uri)
client.execute {
index into "anime" / "artists" fields "name" -> "madomagi"
}.await
client.execute {
indexInto("anime" / "artists") source """{"name": "yuyushiki"}"""
}.await
val resp = client.execute {
search in "anime" / "artists" query "yuyushiki"
}.await
- 記述の通り、ポート番号はHTTPではなく、9300番のTCPを使用する
-
index into
indexInto
は同じで、括弧も場合により省略できる - フィールドを指定する方法は複数ある
-
fields
- タプルを
("foo" -> 1, "bar" -> 2)
のように(String, Any)*
で渡す -
Map("foo" -> 1, "bar" -> 2)
のようにMap[String, Any]
で渡す
- タプルを
-
source
JSON文字列を渡す
-
- JSONはダブルクオーテーションを含む文字のため、Scalaの文字リテラルである
"""strings"""
を使用した
スクロール・バルク
Scroll APIやBulk APIはそれぞれ、検索結果の取得を効率化・投入やアップデートなどを効率化するAPIです。elastic4sでは、ReactiveStreams(注釈有り)に基づいたストリーム処理を行えます。今回はAkkaStreamsと共に使用します。
elsatic4sのストリームと、AkkaStreamsを使用するためには、以下の依存関係を追記する必要があります。Akkaのバージョンは、elastic4s-streamsが使用しているバージョン(記述場所の例)と同じバージョンを指定すると良いかと思います。今回はjsonを簡単に処理するためのjson4sも追加しています。
libraryDependencies ++= Seq(
"com.sksamuel.elastic4s" %% "elastic4s-streams" % "x.x.x",
"com.typesafe.akka" %% "akka-actor" % "x.x.x",
"com.typesafe.akka" %% "akka-stream" % "x.x.x",
"org.json4s" %% "json4s-native" % "x.x.x"
)
実際にScroll APIのSourceを作成してみましょう。
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import scala.concurrent.duration._
implicit val system = ActorSystem()
val scrollPublisher = client.publisher(
search in "anime" / "artists" scroll "1m" size 2000 timeout(120 second)
)
val scrollSource = Source.fromPublisher(scrollPublisher)
-
in
の次は"indexname" / "type"
と"indexname"
の形式で指定でき、後者は全タイプを取得する -
scroll "1m"
はelasticsearchのAPIと同じように、スクロールオブジェクトの有効期限を指定する - sizeは一度に取得する最大件数を指定する
次は、Builk APIのSinkを作成してみましょう。
import akka.stream.scaladsl._
import com.sksamuel.elastic4s.streams.RequestBuilder
import com.sksamuel.elastic4s.BulkCompatibleDefinition
import org.json4s.native.Serialization.write
import org.json4s.DefaultFormats
// インデックスを行うための情報を集める型
case class Data(index: String, indexType: String, indexId: String, fields: Map[String, Any]) {
implicit val defaultFormats = DefaultFormats
def toJson = write(fields)
}
// DataオブジェクトをIndexDefinitionに変換するビルダー
class DataRequestBuilder extends RequestBuilder[Data] {
implicit val defaultFormats = DefaultFormats
override def request(data: Data): BulkCompatibleDefinition = {
// dataを元にIndexName/Type, ID, Sourceを指定したindexIntoを作成
indexInto(data.index / data.indexType) source data.toJson id data.indexId
}
}
implicit val builder = new DataRequestBuilder()
def shutdown(): Unit = {
println("Complite")
system.terminate()
}
val bulkSubscriber = client.subscriber[Data](
batchSize = 1000,
concurrentRequests = 5,
completionFn = () => shutdown(),
errorFn = (t: Throwable) => {
System.err.println("Unhandled exception Elasticsearch Subscriber", t.getMessage)
},
failureWait = 120 seconde,
maxAttempts = 5,
flushAfter = Some(10 seconde)
)
val bulkSink = Sink.fromSubscriber(bulkSubscriber)
- SinkでData型を受け取り、Elasticsearchのクライアントへ渡す処理を定義
- client.subscriber引数
- batchSize: 1回のBulkリクエストの最大項目数
- concurrentRequests: 並列して行う最大リクエスト数
- failureWait: リクエストのタイムアウト
- maxAttempts: リトライ回数
- flushAfter: 最後の書き込みから指定時間経過すれば、batchSizeに届いていなくとも書き込みを開始する
Option[Duration]
型を指定する必要があるので、Some()で囲む
- batchSize, concurrentRequests等はbackpressureを用いて自動制御される
次に、SourceとSinkを繋ぐFlowを作成します。
def migrateFlow = Flow[RichSearchHit].map[Data](transformer)
def transformer(hit: RichSearchHit): Data = {
val newIndexName = "anime-2"
Data(
index = newIndexName,
indexType = hit.getType,
indexId = hit.id,
fields = hit.getSource.toMap
)
}
Sourceから流れてくるRichSearchHit
を、先程定義したData
へと変換を行う処理を記述します。この中でフィールドの内容を書き換える事もできます。
最後にこれまでに定義したSource, Flow, Sinkを繋げて実行しようと思います。
import akka.stream._
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system))
scrollSource
.via(migrateFlow)
.to(bulkSink)
.run()
if (io.StdIn.readLine() != null)
system.terminate()
最後に
ネット上に多く有るElasticsearchの再インデックスを高速に行うコードは、スレッドを定義したり、バッファを用意したり、タイマーでバッファの量を監視したり… といろいろな手続きを記述していると思います。elastic4sとAkkaStreamを使用してそういった面倒な記述なしに動作の宣言を行うだけで、効率よく動いてくれるのは非常に魅力的だと思います。
今回紹介していませんが、AkkaStreamではSourceにthrottleを指定するとメッセージの流量を制限することが簡単にでき、DBに負荷を掛けることが難しい本番環境へも容易に対応できます。
ElasticsearchとScalaは親和性がとても高いので合わせて使ってみてはいかがでしょうか。