3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Cats Effect & circe でちょっとモダンな Elasticsearch プログラミング

Last updated at Posted at 2019-05-10

elastic4s と Cats Effect & circe と併用して、Scala の Elasticsearch コードを少しだけモダンな関数型風味にしてみる記事。

はじめに

elastic4s という、わりと歴史のある Elasticsearch のための Scala クライアントがあるが、core のモジュールだけだと、実行の effectFuture だったり、検索結果が Map ベースの汎用的データ構造だったりする。

幸い付属モジュールで Cats EffectMonix、また circe などとの連携がサポートされているので、この記事ではそれらを使って、「記述と実行の分離」と「を活かしたドキュメント表現」を意識した、モダンな関数型 Scala スタイルでの Elasticsearch プログラミングを考えてみる。

お題と趣向

elastic4s の README.md に簡単なサンプルアプリが載っていて、以下のように Elasticsearch を操作をしている。

  1. Elasticsearch クライアントを生成。
  2. インデクス artists を作成。artists にはマッピング modern が、modern にはフィールド name が含まれる。
  3. artists/modern に、ドキュメント {"name": "L.S. Lowry"} を追加。
  4. artists 内を "lowry" で検索。
  5. 検索結果を表示。
  6. Elasticsearch クライアントをクローズ。

これをベースに、以下のような趣向で少し改善してみる。

  • Elasticsearch クライアントの生成・クローズとその使用は、Cats Effect の Resource として管理。
  • Future に替えて型パラメータ Sync[F] でプログラムを「記述」。「実行」するところで初めて F[_]IO を指定。
  • ドキュメントとケースクラスを circe の自動エンコーダ/デコーダで関連づけ。
    • ドキュメント追加時には、ケースクラスのインスタンスを elastic4s の DSL 構文に与える。
    • 検索結果表示時には、得られたドキュメントをケースクラスに変換してから表示する。

実装

elastic4s のバージョンは 6.5.1 を使った。その他のライブラリを含む依存関係は以下。

libraryDependencies ++= Seq(
  "core", "http", "circe", "cats-effect",
).map(s => "com.sksamuel.elastic4s" %% s"elastic4s-$s" % "6.5.1") ++ Seq(
  "org.typelevel" %% "cats-core" % "1.6.0",
  "org.typelevel" %% "cats-effect" % "1.3.0",
  "ch.qos.logback" % "logback-classic" % "1.2.3"
)

ソースはここにおいた。

Resource としての ElasticClient

elastic4s の Elasticsearch クライアント ElasticClient は、URI を指定した ElasticProperties を与えることで得られるが1、この記事ではクライアント使用後のクローズもまとめて Cats Effect の Resource として扱う。以下のように書いてみた。

def client: Resource[F, ElasticClient] = Resource.make {
  F.delay { ElasticClient(ElasticProperties("http://127.0.0.1:9200")) }
} (c => F.delay { c.close() })

FSync のインスタンスを持つ必要があるが、プログラムの他の部分でも必要なので、トレイト Program で以下のように指定している。

trait Program[F[_]] {
  implicit val F: Sync[F]
  def client: Resource[F, ElasticClient] = ???
  ...

以降、この ProgramSync[F] に依存する他のコードも書いていく。

Elasticsearch リクエストの定義

artists / modern でインデクスされるドキュメントは、以下のようなケースクラスで表すことにする。

case class ModernArtist(name: String)

インデクス生成、ドキュメントの追加/検索リクエストは以下のような DSL で書ける。

val createArtists: CreateIndexRequest = createIndex("artists") mappings {
  mapping("modern") fields textField("name")
}
val indexModern: IndexRequest =
  indexInto("artists" / "modern") source ModernArtist("L.S. Lowry") refreshImmediately

val searchModern: SearchRequest = search("artists") query "lowry"

indexModernsource ModernArtist("L.S. Lowry") の部分で、elastic4s-circe で提供される Indexable が使われるが、その際 circe の Encoder が使われる。

プログラムの記述

以上の ElasticClient リソースと Elasticsearch リクエストは、次のように合成できる。

def program(implicit U: Functor[F], E: Executor[F]): F[Unit] = client use { c =>
  for {
    _ <- c.execute(createArtists)            // インデクス作成
    _ <- c.execute(indexModern)              // ドキュメント追加
    _ <- c.execute(searchModern) >>= report  // ドキュメント検索とレポート
  } yield ()
}

ここで F は、上述の Sync だけではなく Functor2Executor のインスタンスも必要になってくるが、IO についてこれを提供するのが elastic4s-cats-effect モジュールということになる。

検索結果を表示する report は以下のように定義した。

private def puts(s: Any): F[Unit] = F.delay { println(s) }

def report(res: Response[SearchResponse]): F[Unit] = res match {
  case RequestFailure(_, _, _, e) => puts(s"We failed: $e")
  case RequestSuccess(_, _, _, r) => for {
    _ <- r.to[ModernArtist].toList.map(puts(_)).sequence // Cats の traverse 構文
    _ <- puts(s"There were ${r.totalHits} total hits")
  } yield ()
}

r.to[ModernArtist] のところで、elastic4s-circe で提供される HitReader[ModernArtist] が暗黙に参照されるが、HitReader 生成の際に、circe で自動的に導出される Decoder[ModernArtist] が使われる。

プログラムの実行

プログラムを「記述」したここまでのコードでは、あえて具体的な effect を明示せず、必要最小限の制約付きの型パラメータとして抽象化しているが、実行部分では具体的に決める必要がある。ここでは、Cats Effect から IO を採用してみた3

object ArtistIndex extends IOApp with Program[IO] {
  import com.sksamuel.elastic4s.cats.effect.instances._
  val F: Sync[IO] = Sync[IO]

  def run(args: List[String]): IO[ExitCode] = program as ExitCode.Success
}

おわりに

  • elastic4s-cats-effect を使うと、最近の関数型な Scala コードぽく Elasticsearch コードが書ける。
  • elastic4s-circe を使うと、ES ドキュメントとケースクラスの自動的なエンコード/デコードができる。
  • ただし、公式 Github のサンプルが陳腐化してたり、サンプルコードやテストコードが少なかったりして、最新情報を集めるのが意外と難しい。
  1. 本家のサンプルアプリでは LocalNode を使っているが、deprecated になっている。

  2. Cats の Functor ではなく elastic4s 自前のそれ。

  3. elastic4s-monix で使っている Monix は ver2.3.3 で、Cats Effect の型クラスのインスタンスが提供されていないらしく、同じことを Task でやろうとしたら、Sync[Task] を自前で書かなければならないっぽい。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?