LoginSignup
21
18

More than 3 years have passed since last update.

[Neo4J] ③パフォーマンス確保に向け、Spark+Neo4J

Last updated at Posted at 2015-11-14

TODO graphqlをフロントエンドにしてneo4jを導入してみる ;-)

1) Graph DB provides index-free adjacency

 "グラフ構造のデータを高速検索するグラフ型データベース「Neo4j」"というキャッチフレーズに惹かれて入門を開始。学ぶうちに、この高速とは、"グラフ構造の局所データを検索"という限定があることがわかり、さらに、遅さの秘密?という話も気になっている。

このあたりは、
“A graph database is any storage system that provides index-free adjacency”
という特性に依存するものらしい。
Neo4Jを使うためにはグラフ理論周りのところは、ある程度押さえておいた方が良さそうだ。
#近くに(ad)という接頭辞を持つ"adjacent≒隣り合った"という単語は、ラテン語のadjacere(近くに横たわっている)が起源。

とりあえず、インデックス抜きのadjacencyを持つストレージシステムが、(純粋な)グラフデータベースと定義されていることは覚えておこう。組込用途のグラフデータベースとしてNeo4Jのバージョン1.0が公開されたころは、この純粋なグラフデータベースであったものと思われる。

参考:
http://www.slideshare.net/doryokujin/graphdbgraphdb

2) Neo4Jは内部でLuceneインデックスを使いまくっている

検索といえばまずは全文検索が思い浮かぶ人も多いだろう。全文検索のキモであるインデックス抜きで"高速検索"ができるものだろうか?

Neo4Jの現在の実装では、ノード名の検索という現実的な用途で、インデックスを使っている。
neo4jとLucene :

neo4jでは、Luceneをインデックスに使ってますが、
何のためにあるんでしょうか?
今は、ノードを作ったり、Traverseしたり、削除したりできているんだけど。。
よくよく考えていると分かりました!
作ったノードを取り出す際に必要なのです。
同じコンソールで作っていると、作ったノードを変数に入れておいたり、
登録したノードIDを用いて探しだすことができますが、
何十何百と登録していくうちに、ノードを探せなくなります。
そんな時にノードを作成した際に、インデックスにノード名等を登録しておくと、
そのノード名を元に、ノードを返してくれるのです。

Neo4J内部でのバッチ処理としてLuceneにデータを投入するLuceneBatchInserterIndexProviderNewImplクラスの
用途は以下のような感じだ。

src/test/scala/org/neo4j/cypher/performance/DataImportTest.scala
package org.neo4j.cypher.performance

import java.io.File
import org.neo4j.cypher.internal.frontend.v3_0.test_helpers.CypherFunSuite
import org.neo4j.graphdb.DynamicRelationshipType
import org.neo4j.index.impl.lucene.LuceneBatchInserterIndexProviderNewImpl
import org.neo4j.unsafe.batchinsert.{BatchInserter, BatchInserterIndex, BatchInserters}

()
def createNodeIdx(indexProvider: LuceneBatchInserterIndexProviderNewImpl, name: String, typ: String, column: String): BatchInserterIndex = {
    val moviesId = indexProvider.nodeIndex(name, Map("type" -> typ).asJava)
    moviesId.setCacheCapacity(column, 10000)
    moviesId
  }

  private def createInserters(targetDir: String) = {
    val inserter = BatchInserters.inserter(targetDir)
    val indexProvider = new LuceneBatchInserterIndexProviderNewImpl(inserter)

    val moviesTitles = createNodeIdx(indexProvider, "movieTitles", "fulltext", "title")
    val moviesId = createNodeIdx(indexProvider, "movieIds", "exact", "id")
    val typeIdx = createNodeIdx(indexProvider, "type", "exact", "type")

    (inserter, moviesId, moviesTitles, indexProvider, typeIdx)
  }

これはただのテストコードだが、inserterのところは、巨大なCSVデータを一気にNeo4J内のLuceneインデックスに記憶させる実装に応用できるだろう。

内部的に、Luceneインデックスを貼るバッチ処理を受け入れるからには、、バッチ処理する何かと相性が良いはず。...となると、scala実装つながりで、sparkなのかも。

2) Sparkとの組み合わせ

IBMが担いだことなどにより今年だいぶバズっている気がするSpark。
Neo4J界隈では、Spark+Neo4Jの組み合わせがすでに話題となっており
(eg.Big Graph Analytics on Neo4j with Apache Spark)、試験的な実装も公開されている。

全体図はこんな感じ:
SP2NeO4J.png

CSVをインポートするバッチ処理であり、右下の緑&青&白のアイコンのところがNeo4J。
上の図の出典である以下に、詳しく解説してくれている。
http://www.markhneedham.com/blog/2015/04/14/spark-generating-csv-files-to-import-into-neo4j/

CSVを生成してSparkに叩きこむやっつけのバッチ処理はこんな感じで始まる。

github.com/mneedham/neo4j-spark-chicago/blob/master/src/main/scala/GenerateCSVFiles.scala
import java.io.File

import au.com.bytecode.opencsv.CSVParser
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.sql.{SQLContext, Row, DataFrame}

import scala.RuntimeException

case class CrimeType(primaryType: String)
case class Beat(id: String, label:String)


object GenerateCSVFiles {

  def merge(srcPath: String, dstPath: String, header: String): Unit =  {
    val hadoopConfig = new Configuration()
    val hdfs = FileSystem.get(hadoopConfig)
    MyFileUtil.copyMergeWithHeader(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, header)
  }

  def main(args: Array[String]) {
    var crimeFile = args(0)
    val conf = new SparkConf().setAppName("Chicago Crime Dataset")
    val sc = new SparkContext(conf)
()

3) Spark GraphX

そうだ、Sparkといえば、GraphXだ。
参考 私が GraphX を推す理由

GraphX とは、Apache Spark のコンポーネントのひとつです。 グラフ構造の大容量データを並列分散環境上で処理するためのフレームワークです。
グラフ構造での分析技術は、R言語の igraph パッケージなどがよく使われますが、大容量のデータを扱う技術はこれまでなかなかありませんでした。グラフ構造データは頂点が辺で接続しているので、分散格納されたグラフ構造データに対する処理を行うと、分散環境間で情報の共有が必要で、並列分散処理自体に適さないためです。これを Spark の技術で解決しているのが GprahX です。

並列分散処理自体に適さないグラフ構造データを対する処理をSparkがどういう技術で解決しているのか、よくわからないが、グラフつながりで、興味深い。

実装例、Spark GraphXのページランク アルゴリズムを使用しラブライブ!などの人物関係を解析してみるにあるように、多量のハイパーリンク文書ページランクが典型的な用途らしい。

4) neo4j-mazerunner = Spark GraphX+Neo4J

ということで、グラフ兄弟のSpark GraphXとNeo4J。既に、両者の組み合わせた取り組みはNeo4Jコミュニティ主導で既にはじまっており、Dockerイメージも提供されている。
https://github.com/neo4j-contrib/neo4j-mazerunner
実装の方はjavaとscalaで行われているが、外からpythonやrubyで叩くことも可能だろう。

5)まとめ:近々、neo4j-mazerunnerと格闘したほうが良さそうだ。

sparkが出できてだいぶ発散したので、Neo4J入門的にまとめておこう。

α Neo4Jはluceneインデックスのことも忘れていない、ハイブリッド型(?)グラフデータベースである。

忘れちゃいそうな人には:

テメェら、ずっと待ってたんだろ?
インデックスの記憶を奪わなくても済む、インデックスの敵に回らなくても済む、
そんな誰もが笑って誰もが望む最っ高に最っ高な幸福な結末(ハッピーエンド)ってヤツを!
(上条当麻の名台詞集)

β 最っ高な幸福とはいえなくても、パフォーマンス面からみてそこそこハッピーな解を作ろうとするならば

"ハイブリッド型グラフデータベースNeo4J+ グラフ構造の処理をうまく取り込んだビックデータ処理基盤Spark"という組み合わせでの実装は検討に値する。

γ 日本でグラフ構造に取り組む人は、"人物相関図"に関心がある

日本人による人物相関図実装例 :
「ラブライブ!」「やはり俺の青春ラブコメはまちがっている。」、プリキュア...

Neo4Jが来る前にsparkが来てしまった日本では、インデックス抜きでグラフデータベースを語れないということで、この界隈を意識しておこう:
http://j.mp/toaruindex

21
18
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
21
18