背景:
データストアを使うのにJavaのクライアントライブラリ(Google Cloud Client Library for Java)を使っている。マルチスレッド処理をする上で、このトランザクションがどのように動くかを調べた。公式ドキュメントに以下のような仕様説明があったので、コードを動かしながら確かめた。
1つまたは複数の共通エンティティ グループに対し、複数のトランザクションがエンティティを同時に変更しようとした場合は、変更を commit した最初のトランザクションだけが成功し、他のすべてのトランザクションは commit に失敗することになります。
また、Transactionオブジェクト経由の処理と、Datastoreオブジェクト経由の処理が混ざった場合に場合にどうなるかも確認した。
やったこと
あるエンティティを複数スレッドから更新しようとした場合にどうなるか以下の3つのケースで確認
シナリオ1
Thread1からTransaction経由でentityを取得しsleep
-> Thread2からTransaction経由でentityを取得 & 更新
-> Thread1でTransaction経由で更新
シナリオ2
Thread1からTransaction経由でentityを取得しsleep
-> Thread2からDatastore経由でentityを取得 & 更新
-> Thread1でTransaction経由で更新
シナリオ3
Thread1からDatastore経由でentityを取得しsleep
-> Thread2からTransaction経由でentityを取得 & 更新
-> Thread1でDatastore経由で更新
結論
transaction.updateの場合は、取得時からエンティティが変わっているかを確認してからupdateする。entityの状態が変わっている場合は例外となる。updateの処理が、transactionオブジェクト経由か、datastoreオブジェクト経由かどうかは関係ない。
確認コード
シナリオ1
コード
package jp.ne.opt.spinapp.runner;
import com.google.cloud.datastore.*;
public final class DatastoreTest {
final static String NAME_SPACE = "dataflow";
final static String LOCK_KIND = "lock";
final static String LOCK_PROP_VALUE = "value";
final static Datastore DATASTORE = DatastoreOptions.getDefaultInstance().getService();
final static KeyFactory KEY_FACTORY = DATASTORE.newKeyFactory().setNamespace(NAME_SPACE).setKind(LOCK_KIND);
final static Transaction TRANSACTION = DATASTORE.newTransaction();
public static void main(final String[] args) throws InterruptedException {
MultiThread1 mt1 = new MultiThread1();
MultiThread2 mt2 = new MultiThread2();
mt1.start();
Thread.sleep(1000);
mt2.start();
}
}
class MultiThread1 extends Thread {
public void run() {
Entity lock = DatastoreTest.TRANSACTION.get(DatastoreTest.KEY_FACTORY.newKey("test"));
System.out.println("got lock from 1: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
try {
Thread.sleep(3000);
System.out.println("sleep 1 ended");
Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 1).build();
DatastoreTest.TRANSACTION.update(entity);
DatastoreTest.TRANSACTION.commit();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (DatastoreTest.TRANSACTION.isActive()) {
DatastoreTest.TRANSACTION.rollback();
}
}
System.out.println("thread1 done.");
}
}
class MultiThread2 extends Thread {
public void run() {
Entity lock = DatastoreTest.TRANSACTION.get(DatastoreTest.KEY_FACTORY.newKey("test"));
System.out.println("got lock from 2: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
try {
Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 2).build();
DatastoreTest.TRANSACTION.update(entity);
DatastoreTest.TRANSACTION.commit();
} finally {
if (DatastoreTest.TRANSACTION.isActive()) {
DatastoreTest.TRANSACTION.rollback();
}
}
System.out.println("thread2 done.");
}
}
got lock from 2: 0
got lock from 1: 0
thread2 done.
sleep 1 ended
[WARNING]
com.google.cloud.datastore.DatastoreException: transaction is no longer active
thread2がcommitした後に、thread1でupdateしようとすると transaction is no longer active エラーが出る。
DatastoreはThread2で更新した状態。
シナリオ2
class MultiThread1 extends Thread {
public void run() {
Entity lock = DatastoreTest.TRANSACTION.get(DatastoreTest.KEY_FACTORY.newKey("test"));
System.out.println("got lock from 1: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
try {
Thread.sleep(10000);
System.out.println("sleep 1 ended");
Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 1).build();
DatastoreTest.TRANSACTION.update(entity);
DatastoreTest.TRANSACTION.commit();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (DatastoreTest.TRANSACTION.isActive()) {
DatastoreTest.TRANSACTION.rollback();
}
}
System.out.println("thread1 done.");
}
}
class MultiThread2 extends Thread {
public void run() {
Entity lock = DatastoreTest.DATASTORE.get(DatastoreTest.KEY_FACTORY.newKey("test"));
System.out.println("got lock from 2: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 2).build();
DatastoreTest.DATASTORE.update(entity);
System.out.println("thread2 done.");
}
}
got lock from 1: 0
got lock from 2: 0
thread2 done.
sleep 1 ended
[WARNING]
com.google.cloud.datastore.DatastoreException: too much contention on these datastore entities. please try again. entity groups: [(app=b~spinapptest-151310!dataflow, lock, "test")]
thread2で更新した後、thread1で更新しようとしてエラー。
DatastoreはThread2で更新した状態。
シナリオ3
class MultiThread1 extends Thread {
public void run() {
try {
Entity lock = DatastoreTest.DATASTORE.get(DatastoreTest.KEY_FACTORY.newKey("test"));
Thread.sleep(10000);
System.out.println("sleep 1 ended");
System.out.println("got lock from 1: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 1).build();
DatastoreTest.DATASTORE.update(entity);
System.out.println("thread1 done.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class MultiThread2 extends Thread {
public void run() {
Entity lock = DatastoreTest.TRANSACTION.get(DatastoreTest.KEY_FACTORY.newKey("test"));
System.out.println("got lock from 2: " + lock.getLong((DatastoreTest.LOCK_PROP_VALUE)));
try {
Entity entity = Entity.newBuilder(lock).set(DatastoreTest.LOCK_PROP_VALUE, 2).build();
DatastoreTest.TRANSACTION.update(entity);
DatastoreTest.TRANSACTION.commit();
} finally {
if (DatastoreTest.TRANSACTION.isActive()) {
DatastoreTest.TRANSACTION.rollback();
}
}
System.out.println("thread2 done.");
}
}
got lock from 2: 0
thread2 done.
sleep 1 ended
got lock from 1: 0
thread1 done.
両Thread処理は成功する。