2
1

More than 3 years have passed since last update.

JanusGraphのトランザクション

Posted at

データベースを扱う上では、どうしてもデータの整合性を保つためにトランザクションを意識しなくてはならない。JanusGraphでもトランザクションをサポートしているので、どのように扱うか確認しておく。

トランザクションサポートの有無を確認

TinkerPopの仕様により、グラフがサポートしている機能の一覧はGraph.features()から取得することができる。Gremlinコンソールから打ち込んで試してみる。

JanusGraphFactoryを使う場合
gremlin> graph = JanusGraphFactory.open("conf/janusgraph-berkeleyje.properties")
==>standardjanusgraph[berkeleyje:C:\******\janusgraph-0.5.1\conf\../db/berkeley]
==>FEATURES
> GraphFeatures
>-- IoRead: true
>-- Computer: true
>-- IoWrite: true
>-- Transactions: true  // ココ!
>-- Persistence: true
>-- ConcurrentAccess: true
>-- ThreadedTransactions: true  // あとココ!
()

Transactionsの項目がtrueであれば、トランザクションが使用できる。例えばTinkerGraph.open()で生成したグラフはトランザクションをサポートしない。

TinkerGraphを使う場合
gremlin> graph = TinkerGraph.open()
==>tinkergraph[vertices:0 edges:0]
gremlin> graph.features()
==>FEATURES
> GraphFeatures
>-- IoRead: true
>-- Computer: true
>-- IoWrite: true
>-- Transactions: false  // ダメ!
>-- Persistence: true
>-- ConcurrentAccess: false
>-- ThreadedTransactions: false  // ダメ!

ThreadedTransactionstrueの場合は複数のスレッド間で1つのトランザクションを共有する機能が使用できる(何に使うのか具体的に思いつかないが、重い処理を分散させるときに使うことがあるかもしれない)。

features()だとログが流れて見づらいと思う場合は以下のコードで個別に確認できる。

JanusGraphFactoryを使う場合
gremlin> graph.features().graph().supportsTransactions()
==>true
gremlin> graph.features().graph().supportsThreadedTransactions()
==>true

サーバーにアクセスする場合のトランザクション制限

以前の記事「JanusGraphの2つの使い方」でも記載したが、
グラフDBサーバーにアクセスしてクエリを行う場合(つまりRemoteの場合)はトランザクションの使用が制限される。

JanusGraph-article01(1).png

送ったクエリが自動的に1つのトランザクションでラッピングされて実行される。なので、2つのクエリを1つのトランザクションにまとめるような処理は出来ない。しかしGremlinでは複数の要素(頂点/辺/プロパティ)を1度に取得・追加・編集・削除する複雑なクエリが書けるようになっているので、そこまで不都合はないはずだ。上の図の2つのクエリはg.addV("person").V().count()のようにまとめられる。

どうしても完全にトランザクションを制御したい場合は上記の記事中のClusterClientを使った接続方法を試して欲しい。あまりおススメはしないが。

TinkerPopトランザクションの基礎

ここから以下ではEmbeddedなDBに接続している場合(Remoteではない場合)という前提で記述していく。

トランザクションのためのインターフェース

トランザクションを扱う場合はgraph.tx()あるいはg.tx()を使用する。g.tx()graph.tx()のプロキシだと明示されているので、この2つは全く同じ処理をする。好きな方を使ってよい。結果としてTransactionを返してくるので、目的に応じて操作する。

Transactionのメソッド一覧(一部)

メソッド 説明
open 明示的にトランザクションを開始する
isOpen トランザクションが開始されているかどうかを取得する
close 明示的にトランザクションを閉じる。commitされるかrollbackされるか、あるいはエラーになるかはonCloseの設定による
rollback トランザクションの内容をロールバックする
commit トランザクションの内容をコミットする
onReadWrite グラフのデータに対して読み書きが行われた場合、自動的にトランザクションを開始するかどうか設定する
onClose closeが呼ばれた際の挙動を設定する
createThreadedTx マルチスレッドに対応したトランザクションを開始し、専用のGraphインターフェースを返す

トランザクションの開始

デフォルトではトランザクションは自動的に開始される。そのタイミングは最初にグラフに対する操作(クエリ)を行った瞬間となる。例えばg.V()g.E()graph.addVertex(),graph.addEdge(),graph.vertices(),graph.edges()などの関数を実行した時点である。

トランザクションが開始されているかどうかはgraph.tx().isOpenで確認できるので、試してみる。

gremlin> graph = JanusGraphFactory.open("conf/janusgraph-berkeleyje.properties")
==>standardjanusgraph[berkeleyje:C:\******\janusgraph-0.5.1\conf\../db/berkeley]
gremlin> graph.tx().isOpen()
==>false  // まだ開始されていない
gremlin> graph.vertices()  // ここで開始される
gremlin> graph.tx().isOpen()
==>true

確かに開始されている。

コミットとロールバック

読み取りのみのクエリ発行であれば特に気にする必要もないが、グラフに対する変更(書き込み)を行った場合、その変更はまだ確定されてはいない。確定するにはコミットを行う必要がある。あるいはトランザクションの途中でエラー等の不都合が発生した場合、行った全ての変更を取り消すためにロールバックを行う必要がある。

ロールバックはgraph.tx().rollback()により行う。

gremlin> g.V().count()
==>0
gremlin> g.addV("person").iterate()  // 頂点を追加した
gremlin> g.V().count()
==>1  // 現時点では反映されている(が、確定ではない)
gremlin> g.tx().rollback()  // 取り消す
==>null
gremlin> g.V().count()
==>0  // ロールバックされた

コミットはgraph.tx().commit()により行う。

gremlin> g.V().count()
==>0
gremlin> g.addV("person").iterate()  // 頂点を追加した
gremlin> g.V().count()
==>1 // 現時点では反映されている(が、確定ではない)
gremlin> g.tx().commit()  // 確定させる
==>null
gremlin> g.V().count()
==>1  // コミットされた

実際にプログラムを書く際は、エラーが発生した場合は最後にrollbackを、しない場合は最後にcommitを行うようにすればよい。

コミットまたはロールバックを行うとトランザクションは閉じられる。

トランザクションの開始を明示的に行う

トランザクションが自動的に開始されるのはなんか気持ち悪いと思う人もいるかもしれない。そのような場合はonReadWriteを使うことで、明示的に開始を行うように強制させることができる。

gremlin> graph.tx().onReadWrite(Transaction.READ_WRITE_BEHAVIOR.MANUAL)
==>org.janusgraph.graphdb.tinkerpop.JanusGraphBlueprintsGraph$GraphTransaction@e886caf
gremlin> g.addV("person").iterate()
Open a transaction before attempting to read/write the transaction
Type ':help' or ':h' for help.
Display stack trace? [yN]
gremlin> graph.tx().open()
==>null
gremlin> g.addV("person").iterate()

onReadWriteMANUALを指定することで自動的に開始しないようにする。デフォルトではAUTOが指定されている。

マルチスレッドでのトランザクション

複数のスレッドで1つのトランザクションを共有する

createThreadedTx()を使う。メインスレッドでトランザクションを生成し、各スレッドにその参照を渡す。全てのスレッドの処理が終わったらcommitあるいはrollbackを実行する。

Javaのプログラムで例を示す。

JanusExample.java
class AddVertexThread extends Thread
{
    private Graph gx;
    private String name;

    public AddVertexThread(Graph gx, String name)
    {
        this.gx = gx;
        this.name = name;
    }

    @Override
    public void run()
    {
        GraphTraversalSource g = this.gx.traversal();
        g.addV("person").property("name", this.name).iterate();        
    }
}

public class JanusExample {
    public static void main(String args[])
    {
        Graph graph = JanusGraphFactory.open("conf/embedded.properties");
        GraphTraversalSource g = graph.traversal();
        // グラフを空にする
        g.V().drop().iterate();
        // コミット
        g.tx().commit();

        // 名前1つに対し、それぞれ別のスレッドで頂点を登録する
        String names[] = {"bob", "alice", "ellie"};
        List<Thread> threads = new ArrayList<Thread>();
        Graph gx = graph.tx().createThreadedTx();  // 共有されるトランザクション
        for(String name : names){
            Thread t = new AddVertexThread(gx, name);
            threads.add(t);
        }
        for(Thread t : threads) t.start();  // 全スレッド開始
        for(Thread t : threads) t.join();  // 全スレッド完了待ち
        gx.tx().commit();  // まとめてコミット

        // 結果を確認
        List<Object> result_names = g.V().values("name").toList();
        for(Object obj_name : result_names){
            String name = (String)obj_name;
            System.out.println(name);
        }

        graph.close();
    }
}

結果は

bob
alice
ellie

となる(登録の順番は保証されない)。

複数のスレッドで複数のトランザクションを扱う

そのような方法は調べた限り無かった。実際試してみても

Could not commit transaction due to exception during persistence

というエラーが出るのでうまく行かない。ConcurrentLinkedQueueみたいなキューを用いて1つのスレッドで順番にトランザクションを処理するようにするのが無難と思われる。

うまくやる方法があれば教えてください。

読み取りオンリーなトランザクション

うっかりデータを上書きしたり破壊したりしないように、制限したい場合もある。そのような場合にはJanusGraph.buildTransaction()を用いる。

実際にはJanusGraphを直接インスタンス化することは出来ないので、JanusGraphFactory.open()を実行した際に得られるグラフ(StandardJanusGraph)を経由して使う。StandardJanusGraphJanusGraphのサブクラスなのでbuildTransactionを使える。Javaの場合はopenの戻り値をダウンキャストして使用すること。

Console
gremlin> graph = JanusGraphFactory.open("conf/janusgraph-berkeleyje.properties")
==>standardjanusgraph[berkeleyje:C:\******\janusgraph-0.5.1\conf\../db/berkeley]
gremlin> readonly_graph = graph.buildTransaction().readOnly().start()
==>standardjanusgraphtx[0x1e12a5a6]  // 読み取り専用グラフ
gremlin> readonly_graph.addVertex("person")
Cannot create new entities in read-only transaction
Type ':help' or ':h' for help.
Display stack trace? [yN]
gremlin> ro_g = ro_tx.traversal()  // 読み取り専用トラバーサル
==>graphtraversalsource[standardjanusgraphtx[0x1e12a5a6], standard]
gremlin> ro_g.addV("person")
Cannot create new entities in read-only transaction
Type ':help' or ':h' for help.
Display stack trace? [yN]

現在開いているトランザクションの一覧

getOpenTransactions()を用いる。何かの原因でトランザクションが開いたままになってしまった場合は、closeTransaction()を用いて強制的に閉じることができる。

Console
gremlin> tx = graph.tx().createThreadedTx()
==>standardjanusgraphtx[0x13213f26]
gremlin> graph.getOpenTransactions()
==>standardjanusgraphtx[0x13213f26]

// 1つだけ閉じたい場合は
gremlin> graph.closeTransaction(graph.getOpenTransactions().get(0))  // 0番目を閉じる
// 全部閉じたい場合は
gremlin> graph.getOpenTransactions().forEach { tx -> graph.closeTransaction(tx) }

ACIDについて

トランザクションがACIDにどの程度対応しているかについては、公式のドキュメントによると、

JanusGraphのトランザクションは必ずしもACIDではありません。BerkeleyDBではそう設計されていますが、下層のストレージシステムが直列化可能分離性や複数行のアトミックな書き込みを提供しないCassandraやHBaseにおいてはACIDではありません。

とあり、BerkeleyDBは対応してそう?まあ、実際どうかは分からないので、心配なら各自検証した方がよさそう。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1