Google App Engine(GAE)/Javaで、TaskQueueの非同期APIをトランザクション指定して呼び出したのにTaskのAddが消失する(Addされない)、という現象に遭遇しました。
GAEではDatastoreの弱いトランザクション機能をカバーする為に無限リトライするTaskQueueを併用して結果整合性を担保する、みたいなことをよくやるのですが、これでは結果整合性を保つ事ができません( ・᷄ὢ・᷅ )
仕様なのか、それともコードに問題があるのかもまだ分かっていません。識者が目をとめてくれることを期待して記事にしておきます。
コード
そのままデプロイできるプロジェクトをgithubに置いてます。
https://github.com/knightso/asynctqtest
AsyncDatastoreService datastore = DatastoreServiceFactory
.getAsyncDatastoreService();
List<Future<?>> futures = new ArrayList<Future<?>>();
for (int i = 0; i < 10; i++) {
Transaction tx = null;
try {
tx = datastore.beginTransaction().get();
String keyName = UUID.randomUUID().toString();
Entity entity = new Entity("Task", keyName);
entity.setProperty("status", "pending");
entity.setProperty("createdAt", new Date());
datastore.put(tx, entity);
Queue queue = QueueFactory.getDefaultQueue();
queue.addAsync(
tx,
TaskOptions.Builder.withUrl("/task")
.param("key", keyName).method(Method.GET));
futures.add(tx.commitAsync());
} catch (Throwable t) {
logger.severe("error: " + t.getMessage());
t.printStackTrace();
if (tx != null && tx.isActive()) {
tx.rollback();
}
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
throw new IllegalStateException(t);
}
}
for (Future<?> f : futures) {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
問題の箇所↓
Queue queue = QueueFactory.getDefaultQueue();
queue.addAsync(
tx,
TaskOptions.Builder.withUrl("/task")
.param("key", keyName).method(Method.GET));
ループ内でそれぞれDatastoreに"Task"というKindのエンティティを一件保存、さらにTaskQueueにTaskを一件保存しています。
Datastore、TaskQueueどちらのAPIも非同期版を使用していますが、Futureを処理しておらず、トランザクションのcommitAsyncの結果Futureをあとでまとめて同期とってます。
期待する挙動
各ループ内でトランザクション指定してDatastoreへのput、TaskQueueへのAddをしているので、どちらかが失敗した場合はロールバックかかることを期待しています。
実際
Datastoreのエンティティだけputされ、TaskがAddされないケースが何度か(10回ループ中3〜5回)発生します。
エラーもおきません。
Run in Last Minuteが10にならないといけないのに5になっています。ログを見てもTQ経由で呼び出されるはずのServletは5回しか呼び出されていません。
疑問
TaskQueueの非同期APIはFuture#getして同期を取って上げないと結果が保証されないのでしょうか。
ドキュメント
https://cloud.google.com/appengine/docs/java/datastore/async#Working_with_Async_Transactions
を見ると、"When you are using a transaction, calling Transaction.commit() blocks on the result of all async calls made since the transaction started before committing it" と書かれていて、何度かテストした限りでは確かにDatastoreの非同期APIは同期を取らなくてもちゃんと保存されていました。この記述はTaskQueue APIには当てはまらないのでしょうか。
(とりあえずの)対策
TaskQueue APIだけ同期版に変更したところ、無事に全てのタスクがAddされるようになりました。
でもその部分で平行処理が失われてしまうのでとても残念( ・᷄ὢ・᷅ )
ちなみに蛇足ですが・・
GAE/Goではgoroutine+channelを使ってトランザクションごとまるっと並行処理できるので、上記の問題はおきません(`・ω・´)ドヤ
あ、GAE/JavaでもThread使えるか。。でもThread自体が重すぎてあまり魅力を感じない。。