AkkaStreamを使ってElasticsearchにストリームを流そうと思った時にフラットなJsonフィールド23個以上がエラーになったので、その時の対処方法を記録しています。
kebs
このライブラリを使うと簡単にできました。
[kebs]
https://github.com/theiterators/kebs
READMEを見てみると、play-jsonにも使えるみたいです。
kebsを使ってみる。
build.sbt
lazy val akkaVersion = "2.5.18"
libraryDependencies ++= Seq(
// https://github.com/theiterators/kebs
"pl.iterators" %% "kebs-spray-json" % "1.6.2",
// ここら辺はAkka Streamの設定
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion,
"com.typesafe.akka" %% "akka-http" % "10.1.5",
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.lightbend.akka" %% "akka-stream-alpakka-slick" % "1.0-M2",
"com.lightbend.akka" %% "akka-stream-alpakka-csv" % "0.8",
"com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % "1.0-M1",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
"com.typesafe" % "config" % "1.3.1",
"ch.qos.logback" % "logback-classic" % "1.1.3",
"mysql" % "mysql-connector-java" % "5.1.44"
)
使い方は簡単でした。
こんな感じのobject
を定義して
object KebsProtocol extends DefaultJsonProtocol with KebsSpray
こんな感じでJsonFormat
を定義できます。
val jsonFormat = implicitly[JsonFormat[Over22FieldsRecord]]
全体のコードはこんな感じになっています。
Akkaのコードです。
package com.example
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchFlow
import akka.stream.alpakka.elasticsearch.{
ElasticsearchWriteSettings,
RetryNever,
WriteMessage
}
import akka.stream.alpakka.slick.scaladsl.{Slick, SlickSession}
import akka.stream.scaladsl.{Flow, Sink}
import com.example.models.Over22FieldsRecord
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
import org.apache.http.HttpHost
import org.elasticsearch.client.RestClient
import pl.iterators.kebs.json.KebsSpray
import slick.jdbc.GetResult
import spray.json.DefaultJsonProtocol
import scala.util.{Failure, Success}
// タプル22個の制限を超えるために利用する
// https://github.com/theiterators/kebs
object KebsProtocol extends DefaultJsonProtocol with KebsSpray
/**
* RDBからElasticsearchにストリーミング
*/
object RdbToElasticStream extends App with LazyLogging {
implicit val system = ActorSystem("RdbToElasticStreamSystem")
implicit val dispatcher = system.dispatcher
implicit val mat = ActorMaterializer()
val config = ConfigFactory.load()
lazy val esHost = config.getString("elasticsearch.host")
lazy val esPort = config.getInt("elasticsearch.port")
// Database Session
implicit val session = SlickSession.forConfig("slick-mysql")
system.registerOnTermination(session.close())
// Elasticsearch Client
implicit val esClient =
RestClient
.builder(new HttpHost(esHost, esPort))
.build()
// define Graph
val dbToEsGraph = dbSource via elasticFlow runWith Sink.seq
dbToEsGraph.onComplete {
case Success(_) =>
logger.info("Write to Elasticsearch completed.")
esClient.close()
sys.exit(0)
case Failure(e) =>
logger.error("Write to Elasticsearch failed.", e)
esClient.close()
sys.exit(1)
}
private def dbSource() = {
import session.profile.api._
implicit val getOver22FieldsRecord = GetResult { r =>
Over22FieldsRecord(
r.nextString, // 1
r.nextString, // 2
r.nextString, // 3
r.nextString, // 4
r.nextString, // 5
r.nextString, // 6
r.nextString, // 7
r.nextString, // 8
r.nextString, // 9
r.nextString, // 10
r.nextString, // 11
r.nextString, // 12
r.nextString, // 13
r.nextString, // 14
r.nextString, // 15
r.nextString, // 16
r.nextString, // 17
r.nextString, // 18
r.nextString, // 19
r.nextString, // 20
r.nextString, // 21
r.nextString, // 22
r.nextString // 23
)
}
Slick.source(sql"SELECT * FROM dev_db.over22fields".as[Over22FieldsRecord])
}
private def elasticFlow() = {
import spray.json._
import KebsProtocol._
val jsonFormat = implicitly[JsonFormat[Over22FieldsRecord]]
val sinkSettings =
ElasticsearchWriteSettings()
.withBufferSize(1000)
.withRetryLogic(RetryNever)
Flow[Over22FieldsRecord].map { r =>
WriteMessage.createUpsertMessage(r.id.toString, r)
}.via(
ElasticsearchFlow.create[Over22FieldsRecord](
"over22",
"_doc",
sinkSettings
)
)
}
}
case class Over22FieldsRecord(
id: String,
c1: String,
c2: String,
c3: String,
c4: String,
c5: String,
c6: String,
c7: String,
c8: String,
c9: String,
c10: String,
c11: String,
c12: String,
c13: String,
c14: String,
c15: String,
c16: String,
c17: String,
c18: String,
c19: String,
c20: String,
c21: String,
c22: String
)
DBのテーブル定義はこんな感じで作ってました。
create table dev_db.over22fields (
id varchar(6),
c1 varchar(3),
c2 varchar(3),
c3 varchar(3),
c4 varchar(3),
c5 varchar(3),
c6 varchar(3),
c7 varchar(3),
c8 varchar(3),
c9 varchar(3),
c10 varchar(3),
c11 varchar(3),
c12 varchar(3),
c13 varchar(3),
c14 varchar(3),
c15 varchar(3),
c16 varchar(3),
c17 varchar(3),
c18 varchar(3),
c19 varchar(3),
c20 varchar(3),
c21 varchar(3),
c22 varchar(3)
);
PlayFrameworkのplay-jsonといい、spray.jsonといい、タプル22個の制約に引っ掛かることが多いような。
22個を超えるフラットなJSONは定義するなということなのかな。