Help us understand the problem. What is going on with this article?

Apache IgniteをSlickから使ってみる

この記事はただの集団 Advent Calendar 2019の19日目の記事です。

はじめに

大規模データを高速に分散処理するにあたって、Apache IgniteをSlickから利用する検証してみました。

Apache Ignite とは

ペタバイト規模のメモリ内速度を実現する、トランザクション、分析、およびストリーミングのワークロード向けのメモリ内コンピューティングプラットフォームです。
参照: IGNITE FACTS

分散データベースとして利用できる

Apache Igniteは、SQL、キー値、計算、機械学習、その他のデータ処理APIをサポートするオールインワン分散データベースとして使用できます。

SQL が利用できる

SQL ANSI-99標準

Igniteは、SELECT、UPDATE、INSERT、MERGE、DELETEステートメントおよび分散結合を含む、すべてのSQLおよびDMLコマンドをサポートするSQL ANSI-99標準に準拠しています。また、分散SQLデータベースに関連するDDLコマンドのサブセットのサポートも提供します。

KVS としても利用できる

JCache (JSR 107)準拠

  • インメモリキーバリューストア
  • 基本的なキャッシュ操作
  • ConcurrentMap API
  • Collocated Processing(EntryProcessor)
  • Events and Metrics
  • プラグ可能な永続性

Cassandraとの比較記事

Apache Cassandra vs. Apache Ignite: Affinity Collocation and Distributed SQL
Apache® Ignite™ and Apache® Cassandra™ Benchmarks: The Power of In-Memory Computing

その他の利用例などの詳細は下記を参照してください。
Apache Ignite の利用例

Slickから利用してみる

実行環境

igniteの設定ファイル

サーバ側の設定ファイルを用意します。

ignite.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">

        <!-- Configure internal thread pool. -->
        <property name="publicThreadPoolSize" value="64"/>

        <!-- Configure system thread pool. -->
        <property name="systemThreadPoolSize" value="32"/>

        <property name="binaryConfiguration">
            <bean class="org.apache.ignite.configuration.BinaryConfiguration">
                <property name="compactFooter" value="false"/>
            </bean>
        </property>
        <property name="cacheConfiguration">
            <list>
                <bean class="org.apache.ignite.configuration.CacheConfiguration">
                    <property name="name" value="default"/>
                    <property name="atomicityMode" value="ATOMIC"/>
                    <property name="backups" value="1"/>
                    <property name="cacheMode" value="PARTITIONED"/>
                    <property name="statisticsEnabled" value="true"/>
                    <property name="nearConfiguration">
                        <bean class="org.apache.ignite.configuration.NearCacheConfiguration">
                            <property name="nearEvictionPolicy">
                                <bean class="org.apache.ignite.cache.eviction.lru.LruEvictionPolicy">
                                    <property name="maxSize" value="10000"/>
                                </bean>
                            </property>
                        </bean>
                    </property>
                </bean>
            </list>
        </property>
        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                        <property name="addresses">
                            <list>
                                <value>127.0.0.1:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

docker-compose

先ほどつくった設定ファイルをvolume mountで読み込ませます。

docker-compose.yml
version: "3"

services:

  ignite:
    image: apacheignite/ignite:2.7.6
    environment:
      - CONFIG_URI=/opt/ignite/apache-ignite/config/ignite.xml
    volumes:
      - ./ignite.xml:/opt/ignite/apache-ignite/config/ignite.xml
    ports:
      - "47100:47100"
      - "47500-47509:47500-47509"
      - "10800:10800"

起動

docker-compose upで起動します。

docker-compose up
Starting ignite-slick_ignite_1 ... done
Attaching to ignite-slick_ignite_1
ignite_1  | [02:39:33]    __________  ________________ 
ignite_1  | [02:39:33]   /  _/ ___/ |/ /  _/_  __/ __/ 
ignite_1  | [02:39:33]  _/ // (7 7    // /  / / / _/   
ignite_1  | [02:39:33] /___/\___/_/|_/___/ /_/ /___/  
ignite_1  | [02:39:33] 
ignite_1  | [02:39:33] ver. 2.7.6#20190911-sha1:21f7ca41
ignite_1  | [02:39:33] 2019 Copyright(C) Apache Software Foundation
ignite_1  | [02:39:33] 
ignite_1  | [02:39:33] Ignite documentation: http://ignite.apache.org
ignite_1  | [02:39:33] 
ignite_1  | [02:39:33] Quiet mode.
ignite_1  | [02:39:33]   ^-- Logging to file '/opt/ignite/apache-ignite/work/log/ignite-72f112a2.0.log'
ignite_1  | [02:39:33]   ^-- Logging by 'JavaLogger [quiet=true, config=null]'
ignite_1  | [02:39:33]   ^-- To see **FULL** console log here add -DIGNITE_QUIET=false or "-v" to ignite.{sh|bat}
ignite_1  | [02:39:33] 
ignite_1  | [02:39:33] OS: Linux 4.9.184-linuxkit amd64
ignite_1  | [02:39:33] VM information: OpenJDK Runtime Environment 1.8.0_212-b04 IcedTea OpenJDK 64-Bit Server VM 25.212-b04
ignite_1  | [02:39:33] Please set system property '-Djava.net.preferIPv4Stack=true' to avoid possible problems in mixed environments.
ignite_1  | [02:39:33] Initial heap size is 32MB (should be no less than 512MB, use -Xms512m -Xmx512m).
ignite_1  | [02:39:33] Configured plugins:
ignite_1  | [02:39:33]   ^-- None
ignite_1  | [02:39:33] 
ignite_1  | [02:39:33] Configured failure handler: [hnd=StopNodeOrHaltFailureHandler [tryStop=false, timeout=0, super=AbstractFailureHandler [ignoredFailureTypes=[SYSTEM_WORKER_BLOCKED, SYSTEM_CRITICAL_OPERATION_TIMEOUT]]]]
ignite_1  | [02:39:33] Message queue limit is set to 0 which may lead to potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes due to message queues growth on sender and receiver sides.
ignite_1  | [02:39:34] Security status [authentication=off, tls/ssl=off]
ignite_1  | [02:39:36] Nodes started on local machine require more than 80% of physical RAM what can lead to significant slowdown due to swapping (please decrease JVM heap size, data region size or checkpoint buffer size) [required=944MB, available=1998MB]
ignite_1  | [02:39:36] Performance suggestions for grid  (fix if possible)
ignite_1  | [02:39:36] To disable, set -DIGNITE_PERFORMANCE_SUGGESTIONS_DISABLED=true
ignite_1  | [02:39:36]   ^-- Enable G1 Garbage Collector (add '-XX:+UseG1GC' to JVM options)
ignite_1  | [02:39:36]   ^-- Specify JVM heap max size (add '-Xmx<size>[g|G|m|M|k|K]' to JVM options)
ignite_1  | [02:39:36]   ^-- Set max direct memory size if getting 'OOME: Direct buffer memory' (add '-XX:MaxDirectMemorySize=<size>[g|G|m|M|k|K]' to JVM options)
ignite_1  | [02:39:36]   ^-- Disable processing of calls to System.gc() (add '-XX:+DisableExplicitGC' to JVM options)
ignite_1  | [02:39:36]   ^-- Disable near cache (set 'nearConfiguration' to null)
ignite_1  | [02:39:36]   ^-- Decrease number of backups (set 'backups' to 0)
ignite_1  | [02:39:36] Refer to this page for more performance suggestions: https://apacheignite.readme.io/docs/jvm-and-system-tuning
ignite_1  | [02:39:36] 
ignite_1  | [02:39:36] To start Console Management & Monitoring run ignitevisorcmd.{sh|bat}
ignite_1  | [02:39:36] Data Regions Configured:
ignite_1  | [02:39:36]   ^-- default [initSize=256.0 MiB, maxSize=399.7 MiB, persistence=false]
ignite_1  | [02:39:36] 
ignite_1  | [02:39:36] Ignite node started OK (id=72f112a2)
ignite_1  | [02:39:36] Topology snapshot [ver=1, locNode=72f112a2, servers=1, clients=0, state=ACTIVE, CPUs=8, offheap=0.39GB, heap=0.43GB]

こんな感じになればOK

サンプルコードの用意

build.sbtの用意

下記のように記載します。

build.sbt
name := "ignite-slick"

version := "0.1"

scalaVersion := "2.13.1"

val igniteVersion = "2.7.6"
val slickVersion  = "3.3.2"
libraryDependencies ++= Seq(
  "org.apache.ignite"  % "ignite-core"     % igniteVersion,
  "org.apache.ignite"  % "ignite-clients"  % igniteVersion,
  "com.typesafe.slick" %% "slick"          % slickVersion,
  "com.typesafe.slick" %% "slick-hikaricp" % slickVersion,
  "org.scalatest"      %% "scalatest"      % "3.2.0-M1" % Test
)

モデルの用意

今回は人のデータを入れることにします。

Person.scala
package model

import java.time.LocalDate

import model.`enum`.Sex

final case class Person(
                         id: Long,
                         name: String,
                         sex: Option[Sex],
                         birthday: LocalDate,
                         father: Option[Long],
                         mother: Option[Long]
                       )

Slick Tablesの用意

Tables.scala
package model

import java.time.LocalDate

import model.`enum`.Sex

// AUTO-GENERATED Slick data model
/** Stand-alone Slick data model for immediate use */
object Tables extends {
  val profile = slick.jdbc.PostgresProfile
} with Tables

/** Slick data model trait for extension, choice of backend or usage in the cake pattern. (Make sure to initialize this late.) */
trait Tables {
  val profile: slick.jdbc.JdbcProfile

  import profile.api._

  /** DDL for all tables. Call .create to execute. */
  lazy val schema: profile.SchemaDescription =
    Array(Persons.schema).reduceLeft(_ ++ _)
  @deprecated("Use .schema instead of .ddl", "3.0")
  def ddl = schema

  final class PersonRow(tag: Tag) extends Table[Person](tag, "Campaigns") {

    def id = column[Long]("id", O.PrimaryKey)

    def name = column[String]("name")

    def sex = column[Option[String]]("sex")

    def birthday = column[LocalDate]("birthday")

    def father = column[Option[Long]]("father")

    def mother = column[Option[Long]]("mother")

    def * =
      (
        id,
        name,
        sex,
        birthday,
        father,
        mother
      ).shaped <> ({
        case (id, name, sex, birthday, father, mother) =>
          Person(
            id, name, sex.flatMap(Sex.of), birthday, father, mother
          )
      }, { person: Person =>
        Some(
          person.id, person.name, person.sex.map(_.code), person.birthday, person.father, person.mother
        )
      })
  }

  lazy val Persons = new TableQuery(tag => new PersonRow(tag))

}

テストコードの用意

IgniteSlickSpec.scala
import java.time.LocalDate

import model.Tables.profile.api._
import model.`enum`.Sex
import model.{Person, Tables}
import org.scalatest._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import slick.jdbc.PostgresProfile.backend.Database

import scala.concurrent.Await
import scala.concurrent.duration.Duration

class IgniteSlickSpec extends AsyncWordSpec with BeforeAndAfterAll with Matchers {

  lazy val db = Database.forConfig("ignite.db")

  val namihei = Person(id = 1L, name = "磯野波平", sex = Some(Sex.Male), father = None, mother = None, birthday = LocalDate.of(1895, 9, 14))
  val fune = Person(id = 2L, name = "磯野フネ", sex = Some(Sex.Female), father = None, mother = None, birthday = LocalDate.of(1901, 1, 11))
  val sazae = Person(id = 3L, name = "フグ田サザエ", sex = Some(Sex.Female), father = Some(1L), mother = Some(2L), birthday = LocalDate.of(1922, 11, 22))
  val katsuo = Person(id = 4L, name = "磯野カツオ", sex = Some(Sex.Male), father = Some(1L), mother = Some(2L), birthday = LocalDate.of(1938, 3, 11))
  val wakame = Person(id = 5L, name = "磯野ワカメ", sex = Some(Sex.Female), father = Some(1L), mother = Some(2L), birthday = LocalDate.of(1942, 6, 15))
  val masuo = Person(id = 6L, name = "フグ田マスオ", sex = Some(Sex.Male), father = None, mother = None, birthday = LocalDate.of(1917, 4, 3))
  val tara = Person(id = 7L, name = "フグ田タラオ", sex = Some(Sex.Male), father = Some(6L), mother = Some(3L), birthday = LocalDate.of(1948, 3, 18))
  val isonoFamily = Seq(namihei, fune, sazae, katsuo, wakame, masuo, tara)

  override def beforeAll(): Unit = {
    // init ignite
    Await.result(db.run(Tables.schema.createIfNotExists), Duration.Inf)
    Await.result(db.run(DBIO.sequence(isonoFamily.map(Tables.Persons.insertOrUpdate))), Duration.Inf)
  }

  "磯野家" should {

    "サザエさんの兄弟はカツオとワカメ" in {
      val query = Tables.Persons.filter(x => x.mother === sazae.mother || x.father === sazae.father)
        .filterNot(_.id === sazae.id)
        .sortBy(_.id)
        .map(x => (x.id, x.name)).result
      db.run(query).map(x => assert(x.toSeq === Seq(katsuo, wakame).map(x => (x.id, x.name))))
    }

    "サザエさんの夫はマスオさん" in {
      val action = for {
        children <-  Tables.Persons.filter(_.mother === sazae.id)
        husband <- Tables.Persons.filter(_.id === children.father)
      } yield (husband.id, husband.name)
      val query = action.result
      db.run(query).map(x => assert(x.toSeq === Seq(masuo).map(x => (x.id, x.name))))
    }

  }

}

テスト実行!

 sbt test
[info] Loading project definition from /Users/kanako.ohashi/IdeaProjects/ignite-slick/project
[info] Loading settings for project ignite-slick from build.sbt ...
[info] Set current project to ignite-slick (in build file:/Users/kanako.ohashi/IdeaProjects/ignite-slick/)
[info] Compiling 2 Scala sources to /Users/kanako.ohashi/IdeaProjects/ignite-slick/target/scala-2.13/classes ...
[warn] there was one deprecation warning (since 2.13.0); re-run with -deprecation for details
[warn] one warning found
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.ignite.internal.util.GridUnsafe$2 (file:/Users/kanako.ohashi/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/org/apache/ignite/ignite-core/2.7.6/ignite-core-2.7.6.jar) to field java.nio.Buffer.address
WARNING: Please consider reporting this to the maintainers of org.apache.ignite.internal.util.GridUnsafe$2
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[info] IgniteSlickSpec:
[info] 磯野家
[info] - should サザエさんの兄弟はカツオとワカメ
[info] - should サザエさんの夫はマスオさん
[info] Run completed in 1 second, 636 milliseconds.
[info] Total number of tests run: 2
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 6 s, completed 2019/12/19 11:52:43

うまくいきました。
SQLでデータを検索できています。

まとめ

今回、ApacheIgniteをSlickから分散データベースとして利用することができました。
Slickを利用経験者は、学習コストを抑えられて良いのではないでしょうか。

※今回の検証コードはココにあげました。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした