この記事はただの集団 Advent Calendar 2019の19日目の記事です。
はじめに
大規模データを高速に分散処理するにあたって、Apache IgniteをSlickから利用する検証してみました。
Apache Ignite とは
ペタバイト規模のメモリ内速度を実現する、トランザクション、分析、およびストリーミングのワークロード向けのメモリ内コンピューティングプラットフォームです。
参照: IGNITE FACTS
分散データベースとして利用できる
Apache Igniteは、SQL、キー値、計算、機械学習、その他のデータ処理APIをサポートするオールインワン分散データベースとして使用できます。
SQL が利用できる
SQL ANSI-99標準
Igniteは、SELECT、UPDATE、INSERT、MERGE、DELETEステートメントおよび分散結合を含む、すべてのSQLおよびDMLコマンドをサポートするSQL ANSI-99標準に準拠しています。また、分散SQLデータベースに関連するDDLコマンドのサブセットのサポートも提供します。
KVS としても利用できる
JCache (JSR 107)準拠
- インメモリキーバリューストア
- 基本的なキャッシュ操作
- ConcurrentMap API
- Collocated Processing(EntryProcessor)
- Events and Metrics
- プラグ可能な永続性
Cassandraとの比較記事
Apache Cassandra vs. Apache Ignite: Affinity Collocation and Distributed SQL
Apache® Ignite™ and Apache® Cassandra™ Benchmarks: The Power of In-Memory Computing
その他の利用例などの詳細は下記を参照してください。
Apache Ignite の利用例
Slickから利用してみる
実行環境
igniteの設定ファイル
サーバ側の設定ファイルを用意します。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Configure internal thread pool. -->
<property name="publicThreadPoolSize" value="64"/>
<!-- Configure system thread pool. -->
<property name="systemThreadPoolSize" value="32"/>
<property name="binaryConfiguration">
<bean class="org.apache.ignite.configuration.BinaryConfiguration">
<property name="compactFooter" value="false"/>
</bean>
</property>
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="default"/>
<property name="atomicityMode" value="ATOMIC"/>
<property name="backups" value="1"/>
<property name="cacheMode" value="PARTITIONED"/>
<property name="statisticsEnabled" value="true"/>
<property name="nearConfiguration">
<bean class="org.apache.ignite.configuration.NearCacheConfiguration">
<property name="nearEvictionPolicy">
<bean class="org.apache.ignite.cache.eviction.lru.LruEvictionPolicy">
<property name="maxSize" value="10000"/>
</bean>
</property>
</bean>
</property>
</bean>
</list>
</property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<value>127.0.0.1:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
docker-compose
先ほどつくった設定ファイルをvolume mountで読み込ませます。
version: "3"
services:
ignite:
image: apacheignite/ignite:2.7.6
environment:
- CONFIG_URI=/opt/ignite/apache-ignite/config/ignite.xml
volumes:
- ./ignite.xml:/opt/ignite/apache-ignite/config/ignite.xml
ports:
- "47100:47100"
- "47500-47509:47500-47509"
- "10800:10800"
起動
docker-compose up
で起動します。
docker-compose up
Starting ignite-slick_ignite_1 ... done
Attaching to ignite-slick_ignite_1
ignite_1 | [02:39:33] __________ ________________
ignite_1 | [02:39:33] / _/ ___/ |/ / _/_ __/ __/
ignite_1 | [02:39:33] _/ // (7 7 // / / / / _/
ignite_1 | [02:39:33] /___/\___/_/|_/___/ /_/ /___/
ignite_1 | [02:39:33]
ignite_1 | [02:39:33] ver. 2.7.6#20190911-sha1:21f7ca41
ignite_1 | [02:39:33] 2019 Copyright(C) Apache Software Foundation
ignite_1 | [02:39:33]
ignite_1 | [02:39:33] Ignite documentation: http://ignite.apache.org
ignite_1 | [02:39:33]
ignite_1 | [02:39:33] Quiet mode.
ignite_1 | [02:39:33] ^-- Logging to file '/opt/ignite/apache-ignite/work/log/ignite-72f112a2.0.log'
ignite_1 | [02:39:33] ^-- Logging by 'JavaLogger [quiet=true, config=null]'
ignite_1 | [02:39:33] ^-- To see **FULL** console log here add -DIGNITE_QUIET=false or "-v" to ignite.{sh|bat}
ignite_1 | [02:39:33]
ignite_1 | [02:39:33] OS: Linux 4.9.184-linuxkit amd64
ignite_1 | [02:39:33] VM information: OpenJDK Runtime Environment 1.8.0_212-b04 IcedTea OpenJDK 64-Bit Server VM 25.212-b04
ignite_1 | [02:39:33] Please set system property '-Djava.net.preferIPv4Stack=true' to avoid possible problems in mixed environments.
ignite_1 | [02:39:33] Initial heap size is 32MB (should be no less than 512MB, use -Xms512m -Xmx512m).
ignite_1 | [02:39:33] Configured plugins:
ignite_1 | [02:39:33] ^-- None
ignite_1 | [02:39:33]
ignite_1 | [02:39:33] Configured failure handler: [hnd=StopNodeOrHaltFailureHandler [tryStop=false, timeout=0, super=AbstractFailureHandler [ignoredFailureTypes=[SYSTEM_WORKER_BLOCKED, SYSTEM_CRITICAL_OPERATION_TIMEOUT]]]]
ignite_1 | [02:39:33] Message queue limit is set to 0 which may lead to potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes due to message queues growth on sender and receiver sides.
ignite_1 | [02:39:34] Security status [authentication=off, tls/ssl=off]
ignite_1 | [02:39:36] Nodes started on local machine require more than 80% of physical RAM what can lead to significant slowdown due to swapping (please decrease JVM heap size, data region size or checkpoint buffer size) [required=944MB, available=1998MB]
ignite_1 | [02:39:36] Performance suggestions for grid (fix if possible)
ignite_1 | [02:39:36] To disable, set -DIGNITE_PERFORMANCE_SUGGESTIONS_DISABLED=true
ignite_1 | [02:39:36] ^-- Enable G1 Garbage Collector (add '-XX:+UseG1GC' to JVM options)
ignite_1 | [02:39:36] ^-- Specify JVM heap max size (add '-Xmx<size>[g|G|m|M|k|K]' to JVM options)
ignite_1 | [02:39:36] ^-- Set max direct memory size if getting 'OOME: Direct buffer memory' (add '-XX:MaxDirectMemorySize=<size>[g|G|m|M|k|K]' to JVM options)
ignite_1 | [02:39:36] ^-- Disable processing of calls to System.gc() (add '-XX:+DisableExplicitGC' to JVM options)
ignite_1 | [02:39:36] ^-- Disable near cache (set 'nearConfiguration' to null)
ignite_1 | [02:39:36] ^-- Decrease number of backups (set 'backups' to 0)
ignite_1 | [02:39:36] Refer to this page for more performance suggestions: https://apacheignite.readme.io/docs/jvm-and-system-tuning
ignite_1 | [02:39:36]
ignite_1 | [02:39:36] To start Console Management & Monitoring run ignitevisorcmd.{sh|bat}
ignite_1 | [02:39:36] Data Regions Configured:
ignite_1 | [02:39:36] ^-- default [initSize=256.0 MiB, maxSize=399.7 MiB, persistence=false]
ignite_1 | [02:39:36]
ignite_1 | [02:39:36] Ignite node started OK (id=72f112a2)
ignite_1 | [02:39:36] Topology snapshot [ver=1, locNode=72f112a2, servers=1, clients=0, state=ACTIVE, CPUs=8, offheap=0.39GB, heap=0.43GB]
こんな感じになればOK
サンプルコードの用意
build.sbt
の用意
下記のように記載します。
name := "ignite-slick"
version := "0.1"
scalaVersion := "2.13.1"
val igniteVersion = "2.7.6"
val slickVersion = "3.3.2"
libraryDependencies ++= Seq(
"org.apache.ignite" % "ignite-core" % igniteVersion,
"org.apache.ignite" % "ignite-clients" % igniteVersion,
"com.typesafe.slick" %% "slick" % slickVersion,
"com.typesafe.slick" %% "slick-hikaricp" % slickVersion,
"org.scalatest" %% "scalatest" % "3.2.0-M1" % Test
)
モデルの用意
今回は人のデータを入れることにします。
package model
import java.time.LocalDate
import model.`enum`.Sex
final case class Person(
id: Long,
name: String,
sex: Option[Sex],
birthday: LocalDate,
father: Option[Long],
mother: Option[Long]
)
Slick Tablesの用意
package model
import java.time.LocalDate
import model.`enum`.Sex
// AUTO-GENERATED Slick data model
/** Stand-alone Slick data model for immediate use */
object Tables extends {
val profile = slick.jdbc.PostgresProfile
} with Tables
/** Slick data model trait for extension, choice of backend or usage in the cake pattern. (Make sure to initialize this late.) */
trait Tables {
val profile: slick.jdbc.JdbcProfile
import profile.api._
/** DDL for all tables. Call .create to execute. */
lazy val schema: profile.SchemaDescription =
Array(Persons.schema).reduceLeft(_ ++ _)
@deprecated("Use .schema instead of .ddl", "3.0")
def ddl = schema
final class PersonRow(tag: Tag) extends Table[Person](tag, "Campaigns") {
def id = column[Long]("id", O.PrimaryKey)
def name = column[String]("name")
def sex = column[Option[String]]("sex")
def birthday = column[LocalDate]("birthday")
def father = column[Option[Long]]("father")
def mother = column[Option[Long]]("mother")
def * =
(
id,
name,
sex,
birthday,
father,
mother
).shaped <> ({
case (id, name, sex, birthday, father, mother) =>
Person(
id, name, sex.flatMap(Sex.of), birthday, father, mother
)
}, { person: Person =>
Some(
person.id, person.name, person.sex.map(_.code), person.birthday, person.father, person.mother
)
})
}
lazy val Persons = new TableQuery(tag => new PersonRow(tag))
}
テストコードの用意
import java.time.LocalDate
import model.Tables.profile.api._
import model.`enum`.Sex
import model.{Person, Tables}
import org.scalatest._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import slick.jdbc.PostgresProfile.backend.Database
import scala.concurrent.Await
import scala.concurrent.duration.Duration
class IgniteSlickSpec extends AsyncWordSpec with BeforeAndAfterAll with Matchers {
lazy val db = Database.forConfig("ignite.db")
val namihei = Person(id = 1L, name = "磯野波平", sex = Some(Sex.Male), father = None, mother = None, birthday = LocalDate.of(1895, 9, 14))
val fune = Person(id = 2L, name = "磯野フネ", sex = Some(Sex.Female), father = None, mother = None, birthday = LocalDate.of(1901, 1, 11))
val sazae = Person(id = 3L, name = "フグ田サザエ", sex = Some(Sex.Female), father = Some(1L), mother = Some(2L), birthday = LocalDate.of(1922, 11, 22))
val katsuo = Person(id = 4L, name = "磯野カツオ", sex = Some(Sex.Male), father = Some(1L), mother = Some(2L), birthday = LocalDate.of(1938, 3, 11))
val wakame = Person(id = 5L, name = "磯野ワカメ", sex = Some(Sex.Female), father = Some(1L), mother = Some(2L), birthday = LocalDate.of(1942, 6, 15))
val masuo = Person(id = 6L, name = "フグ田マスオ", sex = Some(Sex.Male), father = None, mother = None, birthday = LocalDate.of(1917, 4, 3))
val tara = Person(id = 7L, name = "フグ田タラオ", sex = Some(Sex.Male), father = Some(6L), mother = Some(3L), birthday = LocalDate.of(1948, 3, 18))
val isonoFamily = Seq(namihei, fune, sazae, katsuo, wakame, masuo, tara)
override def beforeAll(): Unit = {
// init ignite
Await.result(db.run(Tables.schema.createIfNotExists), Duration.Inf)
Await.result(db.run(DBIO.sequence(isonoFamily.map(Tables.Persons.insertOrUpdate))), Duration.Inf)
}
"磯野家" should {
"サザエさんの兄弟はカツオとワカメ" in {
val query = Tables.Persons.filter(x => x.mother === sazae.mother || x.father === sazae.father)
.filterNot(_.id === sazae.id)
.sortBy(_.id)
.map(x => (x.id, x.name)).result
db.run(query).map(x => assert(x.toSeq === Seq(katsuo, wakame).map(x => (x.id, x.name))))
}
"サザエさんの夫はマスオさん" in {
val action = for {
children <- Tables.Persons.filter(_.mother === sazae.id)
husband <- Tables.Persons.filter(_.id === children.father)
} yield (husband.id, husband.name)
val query = action.result
db.run(query).map(x => assert(x.toSeq === Seq(masuo).map(x => (x.id, x.name))))
}
}
}
テスト実行!
sbt test
[info] Loading project definition from /Users/kanako.ohashi/IdeaProjects/ignite-slick/project
[info] Loading settings for project ignite-slick from build.sbt ...
[info] Set current project to ignite-slick (in build file:/Users/kanako.ohashi/IdeaProjects/ignite-slick/)
[info] Compiling 2 Scala sources to /Users/kanako.ohashi/IdeaProjects/ignite-slick/target/scala-2.13/classes ...
[warn] there was one deprecation warning (since 2.13.0); re-run with -deprecation for details
[warn] one warning found
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.ignite.internal.util.GridUnsafe$2 (file:/Users/kanako.ohashi/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/org/apache/ignite/ignite-core/2.7.6/ignite-core-2.7.6.jar) to field java.nio.Buffer.address
WARNING: Please consider reporting this to the maintainers of org.apache.ignite.internal.util.GridUnsafe$2
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[info] IgniteSlickSpec:
[info] 磯野家
[info] - should サザエさんの兄弟はカツオとワカメ
[info] - should サザエさんの夫はマスオさん
[info] Run completed in 1 second, 636 milliseconds.
[info] Total number of tests run: 2
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 6 s, completed 2019/12/19 11:52:43
うまくいきました。
SQLでデータを検索できています。
まとめ
今回、ApacheIgniteをSlickから分散データベースとして利用することができました。
Slickを利用経験者は、学習コストを抑えられて良いのではないでしょうか。
※今回の検証コードはココにあげました。