データベースを扱う上では、どうしてもデータの整合性を保つためにトランザクションを意識しなくてはならない。JanusGraphでもトランザクションをサポートしているので、どのように扱うか確認しておく。
トランザクションサポートの有無を確認
TinkerPopの仕様により、グラフがサポートしている機能の一覧はGraph.features()から取得することができる。Gremlinコンソールから打ち込んで試してみる。
gremlin> graph = JanusGraphFactory.open("conf/janusgraph-berkeleyje.properties")
==>standardjanusgraph[berkeleyje:C:\******\janusgraph-0.5.1\conf\../db/berkeley]
==>FEATURES
> GraphFeatures
>-- IoRead: true
>-- Computer: true
>-- IoWrite: true
>-- Transactions: true  // ココ!
>-- Persistence: true
>-- ConcurrentAccess: true
>-- ThreadedTransactions: true  // あとココ!
(略)
Transactionsの項目がtrueであれば、トランザクションが使用できる。例えばTinkerGraph.open()で生成したグラフはトランザクションをサポートしない。
gremlin> graph = TinkerGraph.open()
==>tinkergraph[vertices:0 edges:0]
gremlin> graph.features()
==>FEATURES
> GraphFeatures
>-- IoRead: true
>-- Computer: true
>-- IoWrite: true
>-- Transactions: false  // ダメ!
>-- Persistence: true
>-- ConcurrentAccess: false
>-- ThreadedTransactions: false  // ダメ!
ThreadedTransactionsがtrueの場合は複数のスレッド間で1つのトランザクションを共有する機能が使用できる(何に使うのか具体的に思いつかないが、重い処理を分散させるときに使うことがあるかもしれない)。
features()だとログが流れて見づらいと思う場合は以下のコードで個別に確認できる。
gremlin> graph.features().graph().supportsTransactions()
==>true
gremlin> graph.features().graph().supportsThreadedTransactions()
==>true
サーバーにアクセスする場合のトランザクション制限
以前の記事「JanusGraphの2つの使い方」でも記載したが、
グラフDBサーバーにアクセスしてクエリを行う場合(つまりRemoteの場合)はトランザクションの使用が制限される。
送ったクエリが自動的に1つのトランザクションでラッピングされて実行される。なので、2つのクエリを1つのトランザクションにまとめるような処理は出来ない。しかしGremlinでは複数の要素(頂点/辺/プロパティ)を1度に取得・追加・編集・削除する複雑なクエリが書けるようになっているので、そこまで不都合はないはずだ。上の図の2つのクエリはg.addV("person").V().count()のようにまとめられる。
どうしても完全にトランザクションを制御したい場合は上記の記事中のClusterとClientを使った接続方法を試して欲しい。あまりおススメはしないが。
TinkerPopトランザクションの基礎
ここから以下ではEmbeddedなDBに接続している場合(Remoteではない場合)という前提で記述していく。
トランザクションのためのインターフェース
トランザクションを扱う場合はgraph.tx()あるいはg.tx()を使用する。g.tx()はgraph.tx()のプロキシだと明示されているので、この2つは全く同じ処理をする。好きな方を使ってよい。結果としてTransactionを返してくるので、目的に応じて操作する。
Transactionのメソッド一覧(一部)
| メソッド | 説明 | 
|---|---|
| open | 明示的にトランザクションを開始する | 
| isOpen | トランザクションが開始されているかどうかを取得する | 
| close | 明示的にトランザクションを閉じる。commitされるかrollbackされるか、あるいはエラーになるかはonCloseの設定による | 
| rollback | トランザクションの内容をロールバックする | 
| commit | トランザクションの内容をコミットする | 
| onReadWrite | グラフのデータに対して読み書きが行われた場合、自動的にトランザクションを開始するかどうか設定する | 
| onClose | closeが呼ばれた際の挙動を設定する | 
| createThreadedTx | マルチスレッドに対応したトランザクションを開始し、専用のGraphインターフェースを返す | 
トランザクションの開始
デフォルトではトランザクションは自動的に開始される。そのタイミングは最初にグラフに対する操作(クエリ)を行った瞬間となる。例えばg.V()やg.E()やgraph.addVertex(),graph.addEdge(),graph.vertices(),graph.edges()などの関数を実行した時点である。
トランザクションが開始されているかどうかはgraph.tx().isOpenで確認できるので、試してみる。
gremlin> graph = JanusGraphFactory.open("conf/janusgraph-berkeleyje.properties")
==>standardjanusgraph[berkeleyje:C:\******\janusgraph-0.5.1\conf\../db/berkeley]
gremlin> graph.tx().isOpen()
==>false  // まだ開始されていない
gremlin> graph.vertices()  // ここで開始される
gremlin> graph.tx().isOpen()
==>true
確かに開始されている。
コミットとロールバック
読み取りのみのクエリ発行であれば特に気にする必要もないが、グラフに対する変更(書き込み)を行った場合、その変更はまだ確定されてはいない。確定するにはコミットを行う必要がある。あるいはトランザクションの途中でエラー等の不都合が発生した場合、行った全ての変更を取り消すためにロールバックを行う必要がある。
ロールバックはgraph.tx().rollback()により行う。
gremlin> g.V().count()
==>0
gremlin> g.addV("person").iterate()  // 頂点を追加した
gremlin> g.V().count()
==>1  // 現時点では反映されている(が、確定ではない)
gremlin> g.tx().rollback()  // 取り消す
==>null
gremlin> g.V().count()
==>0  // ロールバックされた
コミットはgraph.tx().commit()により行う。
gremlin> g.V().count()
==>0
gremlin> g.addV("person").iterate()  // 頂点を追加した
gremlin> g.V().count()
==>1 // 現時点では反映されている(が、確定ではない)
gremlin> g.tx().commit()  // 確定させる
==>null
gremlin> g.V().count()
==>1  // コミットされた
実際にプログラムを書く際は、エラーが発生した場合は最後にrollbackを、しない場合は最後にcommitを行うようにすればよい。
コミットまたはロールバックを行うとトランザクションは閉じられる。
トランザクションの開始を明示的に行う
トランザクションが自動的に開始されるのはなんか気持ち悪いと思う人もいるかもしれない。そのような場合はonReadWriteを使うことで、明示的に開始を行うように強制させることができる。
gremlin> graph.tx().onReadWrite(Transaction.READ_WRITE_BEHAVIOR.MANUAL)
==>org.janusgraph.graphdb.tinkerpop.JanusGraphBlueprintsGraph$GraphTransaction@e886caf
gremlin> g.addV("person").iterate()
Open a transaction before attempting to read/write the transaction
Type ':help' or ':h' for help.
Display stack trace? [yN]
gremlin> graph.tx().open()
==>null
gremlin> g.addV("person").iterate()
onReadWriteにMANUALを指定することで自動的に開始しないようにする。デフォルトではAUTOが指定されている。
マルチスレッドでのトランザクション
複数のスレッドで1つのトランザクションを共有する
createThreadedTx()を使う。メインスレッドでトランザクションを生成し、各スレッドにその参照を渡す。全てのスレッドの処理が終わったらcommitあるいはrollbackを実行する。
Javaのプログラムで例を示す。
class AddVertexThread extends Thread
{
    private Graph gx;
    private String name;
    
    public AddVertexThread(Graph gx, String name)
    {
        this.gx = gx;
        this.name = name;
    }
    
    @Override
    public void run()
    {
        GraphTraversalSource g = this.gx.traversal();
        g.addV("person").property("name", this.name).iterate();        
    }
}
public class JanusExample {
    public static void main(String args[])
    {
        Graph graph = JanusGraphFactory.open("conf/embedded.properties");
        GraphTraversalSource g = graph.traversal();
        // グラフを空にする
        g.V().drop().iterate();
        // コミット
        g.tx().commit();
        
        // 名前1つに対し、それぞれ別のスレッドで頂点を登録する
        String names[] = {"bob", "alice", "ellie"};
        List<Thread> threads = new ArrayList<Thread>();
        Graph gx = graph.tx().createThreadedTx();  // 共有されるトランザクション
        for(String name : names){
            Thread t = new AddVertexThread(gx, name);
            threads.add(t);
        }
        for(Thread t : threads) t.start();  // 全スレッド開始
        for(Thread t : threads) t.join();  // 全スレッド完了待ち
        gx.tx().commit();  // まとめてコミット
        
        // 結果を確認
        List<Object> result_names = g.V().values("name").toList();
        for(Object obj_name : result_names){
            String name = (String)obj_name;
            System.out.println(name);
        }
       
        graph.close();
    }
}
結果は
bob
alice
ellie
となる(登録の順番は保証されない)。
複数のスレッドで複数のトランザクションを扱う
そのような方法は調べた限り無かった。実際試してみても
Could not commit transaction due to exception during persistence
というエラーが出るのでうまく行かない。ConcurrentLinkedQueueみたいなキューを用いて1つのスレッドで順番にトランザクションを処理するようにするのが無難と思われる。
うまくやる方法があれば教えてください。
読み取りオンリーなトランザクション
うっかりデータを上書きしたり破壊したりしないように、制限したい場合もある。そのような場合にはJanusGraph.buildTransaction()を用いる。
実際にはJanusGraphを直接インスタンス化することは出来ないので、JanusGraphFactory.open()を実行した際に得られるグラフ(StandardJanusGraph)を経由して使う。StandardJanusGraphはJanusGraphのサブクラスなのでbuildTransactionを使える。Javaの場合はopenの戻り値をダウンキャストして使用すること。
gremlin> graph = JanusGraphFactory.open("conf/janusgraph-berkeleyje.properties")
==>standardjanusgraph[berkeleyje:C:\******\janusgraph-0.5.1\conf\../db/berkeley]
gremlin> readonly_graph = graph.buildTransaction().readOnly().start()
==>standardjanusgraphtx[0x1e12a5a6]  // 読み取り専用グラフ
gremlin> readonly_graph.addVertex("person")
Cannot create new entities in read-only transaction
Type ':help' or ':h' for help.
Display stack trace? [yN]
gremlin> ro_g = ro_tx.traversal()  // 読み取り専用トラバーサル
==>graphtraversalsource[standardjanusgraphtx[0x1e12a5a6], standard]
gremlin> ro_g.addV("person")
Cannot create new entities in read-only transaction
Type ':help' or ':h' for help.
Display stack trace? [yN]
現在開いているトランザクションの一覧
getOpenTransactions()を用いる。何かの原因でトランザクションが開いたままになってしまった場合は、closeTransaction()を用いて強制的に閉じることができる。
gremlin> tx = graph.tx().createThreadedTx()
==>standardjanusgraphtx[0x13213f26]
gremlin> graph.getOpenTransactions()
==>standardjanusgraphtx[0x13213f26]
// 1つだけ閉じたい場合は
gremlin> graph.closeTransaction(graph.getOpenTransactions().get(0))  // 0番目を閉じる
// 全部閉じたい場合は
gremlin> graph.getOpenTransactions().forEach { tx -> graph.closeTransaction(tx) }
ACIDについて
トランザクションがACIDにどの程度対応しているかについては、公式のドキュメントによると、
JanusGraphのトランザクションは必ずしもACIDではありません。BerkeleyDBではそう設計されていますが、下層のストレージシステムが直列化可能分離性や複数行のアトミックな書き込みを提供しないCassandraやHBaseにおいてはACIDではありません。
とあり、BerkeleyDBは対応してそう?まあ、実際どうかは分からないので、心配なら各自検証した方がよさそう。
