LoginSignup
1
1

More than 5 years have passed since last update.

spray-jsonでフラットなJSONフィールド22個の制約を超える

Last updated at Posted at 2019-02-07

AkkaStreamを使ってElasticsearchにストリームを流そうと思った時にフラットなJsonフィールド23個以上がエラーになったので、その時の対処方法を記録しています。

kebs

このライブラリを使うと簡単にできました。

[kebs]
https://github.com/theiterators/kebs

READMEを見てみると、play-jsonにも使えるみたいです。

kebsを使ってみる。

build.sbt
lazy val akkaVersion = "2.5.18"

libraryDependencies ++= Seq(

  // https://github.com/theiterators/kebs
  "pl.iterators" %% "kebs-spray-json" % "1.6.2",

  // ここら辺はAkka Streamの設定
  "com.typesafe.akka" %% "akka-actor" % akkaVersion,
  "com.typesafe.akka" %% "akka-testkit" % akkaVersion,
  "com.typesafe.akka" %% "akka-http" % "10.1.5",
  "com.typesafe.akka" %% "akka-stream" % akkaVersion,
  "com.lightbend.akka" %% "akka-stream-alpakka-slick" % "1.0-M2",
  "com.lightbend.akka" %% "akka-stream-alpakka-csv" % "0.8",
  "com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % "1.0-M1",
  "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
  "com.typesafe" % "config" % "1.3.1",
  "ch.qos.logback" % "logback-classic" % "1.1.3",

  "mysql" % "mysql-connector-java" % "5.1.44"
)

使い方は簡単でした。

こんな感じのobjectを定義して

object KebsProtocol extends DefaultJsonProtocol with KebsSpray

こんな感じでJsonFormatを定義できます。

val jsonFormat = implicitly[JsonFormat[Over22FieldsRecord]]

全体のコードはこんな感じになっています。
Akkaのコードです。

package com.example

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchFlow
import akka.stream.alpakka.elasticsearch.{
  ElasticsearchWriteSettings,
  RetryNever,
  WriteMessage
}
import akka.stream.alpakka.slick.scaladsl.{Slick, SlickSession}
import akka.stream.scaladsl.{Flow, Sink}
import com.example.models.Over22FieldsRecord
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
import org.apache.http.HttpHost
import org.elasticsearch.client.RestClient
import pl.iterators.kebs.json.KebsSpray
import slick.jdbc.GetResult
import spray.json.DefaultJsonProtocol

import scala.util.{Failure, Success}

// タプル22個の制限を超えるために利用する
// https://github.com/theiterators/kebs
object KebsProtocol extends DefaultJsonProtocol with KebsSpray

/**
  * RDBからElasticsearchにストリーミング
  */
object RdbToElasticStream extends App with LazyLogging {
  implicit val system = ActorSystem("RdbToElasticStreamSystem")
  implicit val dispatcher = system.dispatcher
  implicit val mat = ActorMaterializer()
  val config = ConfigFactory.load()

  lazy val esHost = config.getString("elasticsearch.host")
  lazy val esPort = config.getInt("elasticsearch.port")

  // Database Session
  implicit val session = SlickSession.forConfig("slick-mysql")
  system.registerOnTermination(session.close())

  // Elasticsearch Client
  implicit val esClient =
    RestClient
      .builder(new HttpHost(esHost, esPort))
      .build()

  // define Graph
  val dbToEsGraph = dbSource via elasticFlow runWith Sink.seq

  dbToEsGraph.onComplete {
    case Success(_) =>
      logger.info("Write to Elasticsearch completed.")
      esClient.close()
      sys.exit(0)

    case Failure(e) =>
      logger.error("Write to Elasticsearch failed.", e)
      esClient.close()
      sys.exit(1)
  }

  private def dbSource() = {
    import session.profile.api._

    implicit val getOver22FieldsRecord = GetResult { r =>
      Over22FieldsRecord(
        r.nextString, // 1
        r.nextString, // 2
        r.nextString, // 3
        r.nextString, // 4
        r.nextString, // 5
        r.nextString, // 6
        r.nextString, // 7
        r.nextString, // 8
        r.nextString, // 9
        r.nextString, // 10
        r.nextString, // 11
        r.nextString, // 12
        r.nextString, // 13
        r.nextString, // 14
        r.nextString, // 15
        r.nextString, // 16
        r.nextString, // 17
        r.nextString, // 18
        r.nextString, // 19
        r.nextString, // 20
        r.nextString, // 21
        r.nextString, // 22
        r.nextString // 23
      )
    }
    Slick.source(sql"SELECT * FROM dev_db.over22fields".as[Over22FieldsRecord])
  }

  private def elasticFlow() = {
    import spray.json._
    import KebsProtocol._

    val jsonFormat = implicitly[JsonFormat[Over22FieldsRecord]]

    val sinkSettings =
      ElasticsearchWriteSettings()
        .withBufferSize(1000)
        .withRetryLogic(RetryNever)

    Flow[Over22FieldsRecord].map { r =>
      WriteMessage.createUpsertMessage(r.id.toString, r)
    }.via(
      ElasticsearchFlow.create[Over22FieldsRecord](
        "over22",
        "_doc",
        sinkSettings
      )
    )
  }
}


case class Over22FieldsRecord(
    id: String,
    c1: String,
    c2: String,
    c3: String,
    c4: String,
    c5: String,
    c6: String,
    c7: String,
    c8: String,
    c9: String,
    c10: String,
    c11: String,
    c12: String,
    c13: String,
    c14: String,
    c15: String,
    c16: String,
    c17: String,
    c18: String,
    c19: String,
    c20: String,
    c21: String,
    c22: String
)

DBのテーブル定義はこんな感じで作ってました。

create table dev_db.over22fields (
  id varchar(6),
  c1 varchar(3),
  c2 varchar(3),
  c3 varchar(3),
  c4 varchar(3),
  c5 varchar(3),
  c6 varchar(3),
  c7 varchar(3),
  c8 varchar(3),
  c9 varchar(3),
  c10 varchar(3),
  c11 varchar(3),
  c12 varchar(3),
  c13 varchar(3),
  c14 varchar(3),
  c15 varchar(3),
  c16 varchar(3),
  c17 varchar(3),
  c18 varchar(3),
  c19 varchar(3),
  c20 varchar(3),
  c21 varchar(3),
  c22 varchar(3)
);

PlayFrameworkのplay-jsonといい、spray.jsonといい、タプル22個の制約に引っ掛かることが多いような。
22個を超えるフラットなJSONは定義するなということなのかな。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1