LoginSignup
7
6

More than 5 years have passed since last update.

[GAE/Java] 非同期APIでQueueにAddしたTaskが消える問題(未解決)

Last updated at Posted at 2015-09-22

Google App Engine(GAE)/Javaで、TaskQueueの非同期APIをトランザクション指定して呼び出したのにTaskのAddが消失する(Addされない)、という現象に遭遇しました。

GAEではDatastoreの弱いトランザクション機能をカバーする為に無限リトライするTaskQueueを併用して結果整合性を担保する、みたいなことをよくやるのですが、これでは結果整合性を保つ事ができません( ・᷄ὢ・᷅ )

仕様なのか、それともコードに問題があるのかもまだ分かっていません。識者が目をとめてくれることを期待して記事にしておきます。

コード

そのままデプロイできるプロジェクトをgithubに置いてます。
https://github.com/knightso/asynctqtest

TestServlet.java
AsyncDatastoreService datastore = DatastoreServiceFactory
        .getAsyncDatastoreService();

List<Future<?>> futures = new ArrayList<Future<?>>();
for (int i = 0; i < 10; i++) {
    Transaction tx = null;
    try {
        tx = datastore.beginTransaction().get();

        String keyName = UUID.randomUUID().toString();
        Entity entity = new Entity("Task", keyName);
        entity.setProperty("status", "pending");
        entity.setProperty("createdAt", new Date());
        datastore.put(tx, entity);

        Queue queue = QueueFactory.getDefaultQueue();
        queue.addAsync(
                tx,
                TaskOptions.Builder.withUrl("/task")
                        .param("key", keyName).method(Method.GET));

        futures.add(tx.commitAsync());
    } catch (Throwable t) {
        logger.severe("error: " + t.getMessage());
        t.printStackTrace();
        if (tx != null && tx.isActive()) {
            tx.rollback();
        }
        if (t instanceof RuntimeException) {
            throw (RuntimeException) t;
        }
        throw new IllegalStateException(t);
    }
}

for (Future<?> f : futures) {
    try {
        f.get();
    } catch (InterruptedException | ExecutionException e) {
        throw new IllegalStateException(e);
    }
}

問題の箇所↓

TestServlet.java
        Queue queue = QueueFactory.getDefaultQueue();
        queue.addAsync(
                tx,
                TaskOptions.Builder.withUrl("/task")
                        .param("key", keyName).method(Method.GET));

ループ内でそれぞれDatastoreに"Task"というKindのエンティティを一件保存、さらにTaskQueueにTaskを一件保存しています。

Datastore、TaskQueueどちらのAPIも非同期版を使用していますが、Futureを処理しておらず、トランザクションのcommitAsyncの結果Futureをあとでまとめて同期とってます。

期待する挙動

各ループ内でトランザクション指定してDatastoreへのput、TaskQueueへのAddをしているので、どちらかが失敗した場合はロールバックかかることを期待しています。

実際

Datastoreのエンティティだけputされ、TaskがAddされないケースが何度か(10回ループ中3〜5回)発生します。
エラーもおきません。

asynctq_problem_tqlist.png

Run in Last Minuteが10にならないといけないのに5になっています。ログを見てもTQ経由で呼び出されるはずのServletは5回しか呼び出されていません。

疑問

TaskQueueの非同期APIはFuture#getして同期を取って上げないと結果が保証されないのでしょうか。

ドキュメント
https://cloud.google.com/appengine/docs/java/datastore/async#Working_with_Async_Transactions
を見ると、"When you are using a transaction, calling Transaction.commit() blocks on the result of all async calls made since the transaction started before committing it" と書かれていて、何度かテストした限りでは確かにDatastoreの非同期APIは同期を取らなくてもちゃんと保存されていました。この記述はTaskQueue APIには当てはまらないのでしょうか。

(とりあえずの)対策

TaskQueue APIだけ同期版に変更したところ、無事に全てのタスクがAddされるようになりました。
でもその部分で平行処理が失われてしまうのでとても残念( ・᷄ὢ・᷅ )

ちなみに蛇足ですが・・

GAE/Goではgoroutine+channelを使ってトランザクションごとまるっと並行処理できるので、上記の問題はおきません(`・ω・´)ドヤ

あ、GAE/JavaでもThread使えるか。。でもThread自体が重すぎてあまり魅力を感じない。。

7
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
6