0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

CAP JavaでTransactional Outboxを使ってみる

Last updated at Posted at 2025-06-19

Cloud Foundryでサービスが実行中に落ちたらどうなるのか?

この記事を書くきっかけになったのが上記の疑問です。

Cloud Foundryにデプロイされたアプリケーションは、インフラのメンテナンスなどでVMの再起動が必要になると、Evacuationと呼ばれるプロセスが実行されます。Evacuationでは、対象VM上のアプリケーションインスタンスを別のVMに再配置し、新しいインスタンスが正常に起動するのを待ってから元のインスタンスを終了させます。Evacuation開始から古いインスタンスがシャットダウンされるまでには数十秒の間隔があるため、オンライン処理ではほとんど問題になりません。一方、長時間実行されるバッチ処理では処理実行中にEavcuationが発生すると処理が中断されてしまいます。

Evacuationについて

途中で落ちても大丈夫なケース

CAPでデータベースを更新するとき、リクエストの開始から終了までは一つのトランザクションにラップされます。この中でエラーが発生したり処理が中断した場合、更新は行われません。たとえば複数レコードを1行ずつインサートするような処理を書いていた場合、「全件書き込まれるか」「一件も書き込まれないか」どちらかの結果となります。
つまり「CAPが管理するDBへの書き込み」のみの場合、途中で落ちても同じ処理を再実行すればよいということになります。

途中で落ちると困るケース

処理の中でリモートサービスを呼んでデータを更新するケースがあります。途中で処理が落ちるとリモートは更新されているがDBは更新されていない状態になる可能性があります。

CAPのTransactional Outbox

CAP JavaにはTransactional Outboxという機能があります。サービスへのリクエストをいったん"Outbox"に格納しておいて、処理が正常に完了してからリクエストを実行するという機能です。
※Transactional OutboxはCAP Javaのみでサポートされていますが、Node.jsでは似たようなコンセプトのTask Queueがあります。

Transactional Outboxの種類

Transactional Outboxには2種類あります。

  • In-Memory Outbox: デフォルトのOutboxです。メッセージはインメモリのOutboxに格納され、現在のトランザクションが正常終了したあとでリモートに送られます
  • Persistent Outbox: メッセージはDB上のOutboxに格納されます。リモート呼び出しが成功した場合はメッセージがOutboxから削除され、失敗した場合は指定された回数(デフォルトは10回)だけリトライされます

Persistent Outboxを使用した場合の処理の流れ

Transactional Outbox.drawio.png

①リモート呼び出しを実行
②メッセージはTransactional Outboxに格納される
③処理が正常に終了する
④Transactional Outboxからリモートサービスにメッセージが送られる

Outboxへの書き込みは現在のトランザクションの中で行われるので、処理が中断した場合はOutboxは更新されず、リモート呼び出しも行われません(トランザクション整合性が担保される)。

Transactional Outboxを使ってみる

シナリオ

指定された件数だけSales Orderを作成するアクションを作ります。DBにSales Orderデータを格納してからリモートサービスを呼び出し、発注を登録します。1件ごとに10秒のwaitを入れ、処理の途中でサービスを再起動した場合に何が起こるかを観察します。

実装

ソースコードは以下のリポジトリにあります。

1. スキーマ定義

SalesOrdersのエンティティを定義します。

db/schema.cds
namespace outbox;
using { managed } from '@sap/cds/common'; 

entity SalesOrders: managed {
    key ID: UUID;
    customerId: Integer;
    orderDate: Date;
    amount: Integer;
}

2. サービス定義

複数のSalesOrderを作成するアクション、createBulkOrdersを定義します。

srv/sales-service.cds
using { outbox as db } from '../db/schema';
using { PurchaseService } from './external/PurchaseService';
using from '@sap/cds/srv/outbox';

service SalesService {
    entity SalesOrders as projection on db.SalesOrders;
    action createBulkOrders(count: Integer) returns String;
}

service OutboxService {
    @readonly
    entity OutboxMessages as projection on cds.outbox.Messages;
}
解説

./external/PurchaseServiceは呼び出すリモートサービスの定義です。PurchaseService自体は公開していませんが、ここに指定しないとPurchaseServiceを利用するためのインターフェースが生成されないので追加しています。

using { PurchaseService } from './external/PurchaseService';

以下でOutboxの中身を確認するためにエンティティをサービスに公開しています。

using from '@sap/cds/srv/outbox';

service OutboxService {
    @readonly
    entity OutboxMessages as projection on cds.outbox.Messages;
}

3. Outboxの設定

Persistent Outboxを使用するため、package.jsonに以下の設定を追加します。

{
  // ...
  "cds": {
    "requires": {
      "outbox": {
        "kind": "persistent-outbox"
      }
    }
  }
}

4. イベントハンドラの定義

アクションのイベントハンドラを以下のように定義します。

@Component
@ServiceName(SalesService_.CDS_NAME)
public class SalesServiceHandler implements EventHandler{
    private PersistenceService db;
    private OutboxService outboxService;

    @Autowired
    @Qualifier(PurchaseService_.CDS_NAME)
    CqnService purchaseService;

    private static final Logger logger = LoggerFactory.getLogger(SalesServiceHandler.class);

    public SalesServiceHandler(
        PersistenceService db, 
        @Qualifier(OutboxService.PERSISTENT_ORDERED_NAME) OutboxService outboxService) {
        this.db = db;
        this.outboxService = outboxService;
    }

    @On(event = CreateBulkOrdersContext.CDS_NAME)
    public void createBulkOrders(CreateBulkOrdersContext context) {
        //1. 既存レコードを削除
        CqnDelete delete = Delete.from(cds.gen.outbox.SalesOrders_.class);
        db.run(delete);

        // Outboxサービスの作成
        AsyncCqnService outboxedPurchaseService = outboxService.outboxed(purchaseService, AsyncCqnService.class);        

        //2. レコード登録
        Random random = new Random();
        Integer max = context.getCount();
        for (int i = 0; i < max; i++) {
            cds.gen.outbox.SalesOrders order = cds.gen.outbox.SalesOrders.create();
            order.setId(UUID.randomUUID().toString());
            order.setCustomerId(random.nextInt(1000));
            order.setOrderDate(LocalDate.now());
            order.setAmount(100 + random.nextInt(900));

            CqnInsert insert = Insert.into(cds.gen.outbox.SalesOrders_.class).entry(order);
            db.run(insert);            
            logger.info("{} 件目の受注を登録しました", i + 1);

            PurchaseOrders purchaseorder = PurchaseOrders.create();
            // purchaseorder.setId(UUID.randomUUID().toString());
            purchaseorder.setSalesOrderId(order.getId());
            purchaseorder.setCustomerId(order.getCustomerId());
            purchaseorder.setAmount(order.getAmount());
            CqnInsert insertPo = Insert.into(PurchaseOrders_.class).entry(purchaseorder);
            outboxedPurchaseService.run(insertPo);
            logger.info("{} 件目の発注をアウトボックスに登録しました", i + 1);

            // 10秒待機
            try {
                Thread.sleep(10 * 1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted while waiting", e);
            }
        }

        String message = String.format("%d orders created!", max);
        context.setResult(message);

    }
}
解説

以下でOutboxサービスのインスタンスを DI(依存性注入)により取得し、クラス変数に格納しています。OutboxにはOrderedとUnordered の2種類があり、Unordered は主に AuditLog サービスで利用されます。ここでは、イベントの順序を保証するOrderedなOutboxを使用するため、@Qualifier(OutboxService.PERSISTENT_ORDERED_NAME)を指定しています。

public class SalesServiceHandler implements EventHandler{
    private PersistenceService db;
    private OutboxService outboxService;

    ...

    public SalesServiceHandler(
        PersistenceService db, 
        @Qualifier(OutboxService.PERSISTENT_ORDERED_NAME) OutboxService outboxService) {
        this.db = db;
        this.outboxService = outboxService;
    }

createBulkOrdersメソッドの中ではリモートサービス (purchaseService)への操作をOutbox経由で行うために、outboxService.outboxed(...)を使ってラッパーを作成しています。Insertクエリをこのラッパーに対して実行することで、クエリはOutboxテーブルに格納され、後で非同期に配信されます。

        // Outboxサービスの作成
        AsyncCqnService outboxedPurchaseService = outboxService.outboxed(purchaseService, AsyncCqnService.class);   
        ...
        for (int i = 0; i < max; i++) {
        ...
            CqnInsert insertPo = Insert.into(PurchaseOrders_.class).entry(purchaseorder);
            outboxedPurchaseService.run(insertPo);
        ...
        }

動作確認

作成したアプリケーションをCloud Foundryにデプロイし、動作確認してみます。以下の形式でアクションを実行します。

### Bulk Action
POST {{server}}/odata/v4/SalesService/createBulkOrders
Content-Type: application/json
Authorization: Bearer {{Token.response.body.access_token}}

{ "count": 1 }

処理実行中にサービスを再起動した場合

アクションの実行直後にサービスをcf restartで再起動し、DBにデータが登録されるか、リモートサービスにデータが登録されるかを確認します。

①3件で実行した場合(実行時間:約30秒)
レスポンスが正常に返ってきました。

{
  "@context": "$metadata#Edm.String",
  "@metadataEtag": "W/\"07a3fdbf40ac314c67cdb849122f722a9bc5e5ff9b766013e84dd50f329e0e6e\"",
  "value": "3 orders created!"
}

リモートサービスにも発注が登録されました(画面は省略)。

ログを見ていると、インスタンスの停止(Graceful shutdown)と新しいインスタンスの起動が平行して行われているようでした。旧インスタンスがリクエストを処理し終わったあとに停止したため、レスポンスを返すことができたようです。

2025-06-18T21:11:14.36+0000 [APP/PROC/WEB/0] OUT 2025-06-18T21:11:14.367Z  INFO 7 --- [nio-8080-exec-5] c.c.handlers.SalesServiceHandler         : 1 件目の受注を登録しました
2025-06-18T21:11:14.41+0000 [APP/PROC/WEB/0] OUT 2025-06-18T21:11:14.419Z  INFO 7 --- [nio-8080-exec-5] c.c.handlers.SalesServiceHandler         : 1 件目の発注をアウトボックスに登録しました

<Graceful shutdown が開始>
2025-06-18T21:11:22.19+0000 [APP/PROC/WEB/0] OUT 2025-06-18T21:11:22.199Z  INFO 7 --- [ionShutdownHook] o.s.b.w.e.tomcat.GracefulShutdown        : Commencing graceful shutdown. Waiting for active requests to complete

<アプリケーションが再起動>
2025-06-18T21:11:22.32+0000 [CELL/0] OUT Downloaded droplet (154.4M)
2025-06-18T21:11:24.33+0000 [APP/PROC/WEB/0] OUT 2025-06-18T21:11:24.332Z  INFO 7 --- [           main] c.cap_transactional_outbox.Application   : Starting Application using Java 21.0.7 with PID 7 (/home/vcap/app/BOOT-INF/classes started by vcap in /home/vcap/app)

<残りの処理を実施>
2025-06-18T21:11:24.48+0000 [APP/PROC/WEB/0] OUT 2025-06-18T21:11:24.470Z  INFO 7 --- [nio-8080-exec-5] c.c.handlers.SalesServiceHandler         : 2 件目の発注をアウトボックスに登録しました
2025-06-18T21:11:28.48+0000 [APP/PROC/WEB/0] OUT 2025-06-18T21:11:28.485Z  INFO 7 --- [           main] c.cap_transactional_outbox.Application   : Started Application in 4.895 seconds (process running for 5.217)
2025-06-18T21:11:30.48+0000 [CELL/0] OUT Container became healthy

2025-06-18T21:11:34.49+0000 [APP/PROC/WEB/0] OUT 2025-06-18T21:11:34.497Z  INFO 7 --- [nio-8080-exec-5] c.c.handlers.SalesServiceHandler         : 3 件目の受注を登録しました
2025-06-18T21:11:34.51+0000 [APP/PROC/WEB/0] OUT 2025-06-18T21:11:34.518Z  INFO 7 --- [nio-8080-exec-5] c.c.handlers.SalesServiceHandler         : 3 件目の発注をアウトボックスに登録しました

<旧インスタンスの完全終了>
2025-06-18T21:11:44.55+0000 [APP/PROC/WEB/0] OUT 2025-06-18T21:11:44.557Z  INFO 7 --- [tomcat-shutdown] o.s.b.w.e.tomcat.GracefulShutdown        : Graceful shutdown complete
2025-06-18T21:11:45.11+0000 [APP/PROC/WEB/0] OUT Exit status 143

②4件で実行した場合(実行時間:約40秒)
502 (Bad Gateway)が返ってきました。
image.png

Sales Orderは登録されておらず、Outboxも空でした。もちろんリモートの発注も登録されていません。Outboxを使うことで、リモートへのメッセージの送信が処理終了まで待機されるため、途中で落ちた場合にはメッセージが送信されないのです。

ログを見ると、「4 件目の受注を登録しました」というメッセージは出ているものの、そのあとで例外が発生しています。Shutdown phase 2147482623 ends with 1 bean still running after timeout of 30000msというメッセージから、Spring Bootのgraceful shutdownが30秒経っても完了しなかったために、強制終了されたことがわかります。

2025-06-19T20:40:05.777+0000 [APP/PROC/WEB/0] STDOUT 2025-06-19T20:40:05.777Z  INFO 6 --- [nio-8080-exec-5] c.c.handlers.SalesServiceHandler         : 4 件目の受注を登録しました
2025-06-19T20:40:05.797+0000 [APP/PROC/WEB/0] STDOUT 2025-06-19T20:40:05.797Z  INFO 6 --- [nio-8080-exec-5] c.c.handlers.SalesServiceHandler         : 4 件目の発注をアウトボックスに登録しました
2025-06-19T20:40:13.329+0000 [APP/PROC/WEB/0] STDOUT 2025-06-19T20:40:13.329Z  INFO 6 --- [ionShutdownHook] o.s.c.support.DefaultLifecycleProcessor  : Shutdown phase 2147482623 ends with 1 bean still running after timeout of 30000ms: [webServerGracefulShutdown]
2025-06-19T20:40:13.334+0000 [RTR/4] STDOUT f14949edtrial-dev-cap-transactional-outbox-srv.cfapps.us10-001.hana.ondemand.com - [2025-06-19T20:39:35.193861150Z] "POST /odata/v4/SalesService/createBulkOrders HTTP/1.1" 502 14 67 "-" "vscode-restclient" "10.0.200.1:25906" "10.32.2.6:61069" x_forwarded_for:"34.198.58.151, 10.0.200.1" x_forwarded_proto:"https" vcap_request_id:"efba2f2f-9001-47b2-4588-92a5e74e79c5" response_time:38.140227 gorouter_time:0.000033 app_id:"a3387380-11c5-4ca7-a33c-ab0d120c20a2" app_index:"0" instance_id:"56926b52-8c43-4e10-62f0-69f0" failed_attempts:1 failed_attempts_time:38.140033 dns_time:"-" dial_time:"-" tls_time:"-" backend_time:"-" local_address:"-" x_cf_routererror:"endpoint_failure (EOF)" x_correlationid:"-" tenantid:"-" sap_passport:"-" x_scp_request_id:"a8971e2f-7f13-4514-86e9-92d39180ebb1-68547587-7F5118B" x_cf_app_instance:"-" x_forwarded_host:"-" x_custom_host:"-" x_ssl_client:"-" x_ssl_client_session_id:"-" x_ssl_client_verify:"-" x_ssl_client_subject_dn:"-" x_ssl_client_subject_cn:"-" x_ssl_client_issuer_dn:"-" x_ssl_client_notbefore:"-" x_ssl_client_notafter:"-" x_cf_forwarded_url:"-" traceparent:"-" true_client_ip:"-" x_cf_true_client_ip:"34.198.58.151" x_request_id:"-" x_b3_traceid:"efba2f2f900147b2458892a5e74e79c5" x_b3_spanid:"458892a5e74e79c5" x_b3_parentspanid:"-" b3:"efba2f2f900147b2458892a5e74e79c5-458892a5e74e79c5"
2025-06-19T20:40:13.343+0000 [APP/PROC/WEB/0] STDOUT 2025-06-19T20:40:13.343Z  INFO 6 --- [nio-8080-exec-5] c.s.c.a.o.v4.processors.CdsProcessor     : Exception marked the ChangeSet 9 as cancelled: Interrupted while waiting (service 'SalesService', event 'createBulkOrders', entity '<no entity>')
2025-06-19T20:40:13.369+0000 [APP/PROC/WEB/0] STDOUT 2025-06-19T20:40:13.344Z ERROR 6 --- [nio-8080-exec-5] c.s.c.a.o.v4.processors.CdsProcessor     : Interrupted while waiting (service 'SalesService', event 'createBulkOrders', entity '<no entity>')
2025-06-19T20:40:13.369+0000 [APP/PROC/WEB/0] STDOUT com.sap.cds.services.impl.ContextualizedServiceException: Interrupted while waiting (service 'SalesService', event 'createBulkOrders', entity '<no entity>')

リモートサービスが停止している場合

リモートサービスを停止させた状態でアプリケーションを実行し、結果を確認します。

実行前、Outboxは空です。

{
  "@context": "$metadata#OutboxMessages",
  "@metadataEtag": "W/\"2f1e5e178c7da148ddcdf8f2debe2e351a67f16c0a9b1fc9bdd400b570a8d9c3\"",
  "value": []
}

リモートに登録済みの発注件数は21件でした。

"@count": 21,

1件でリクエストを送信すると、正常にレスポンスが返ってきます。

{
  "@context": "$metadata#Edm.String",
  "@metadataEtag": "W/\"07a3fdbf40ac314c67cdb849122f722a9bc5e5ff9b766013e84dd50f329e0e6e\"",
  "value": "1 orders created!"
}

Outboxを確認すると、未送信のメッセージが格納されていました。attemptsが4になっているので、すでに4回送信を試みたようです。

{
  "@context": "$metadata#OutboxMessages",
  "@metadataEtag": "W/\"2f1e5e178c7da148ddcdf8f2debe2e351a67f16c0a9b1fc9bdd400b570a8d9c3\"",
  "value": [
    {
      "ID": "098b6f40-5908-47c7-9942-1c338edcdb6c",
      "timestamp": "2025-06-19T20:55:35.914780600Z",
      "target": "DefaultOutboxOrdered",
      "msg": "{\"message\":{\"storedRequestContext\":{\"isPrivileged\":false,\"correlationId\":\"3b158f4a-8429-4769-a2ce-4e6dbeb4d804\",\"validFrom\":\"2025-06-19T20:55:35.738162Z\",\"locale\":null,\"tenant\":null,\"validTo\":\"2025-06-19T20:55:35.739162Z\"},\"event\":\"CREATE\",\"params\":{\"cqn\":\"{\\\"INSERT\\\":{\\\"into\\\":{\\\"ref\\\":[\\\"PurchaseService.PurchaseOrders\\\"]},\\\"entries\\\":[{\\\"salesOrderId\\\":\\\"ce5ea3a0-3069-4417-a54c-733998281ef6\\\",\\\"amount\\\":860,\\\"customerId\\\":558}]}}\"},\"entity\":\"PurchaseService.PurchaseOrders\"},\"event\":\"PurchaseService\"}",
      "attempts": 4,
      "partition": 0,
      "lastError": "com.sap.cds.services.impl.ContextualizedServiceException: The remote OData service responded with status code '404'\...",
      "lastAttemptTimestamp": "2025-06-19T20:56:16.112016100Z"
    }
  ]
}

リモートサービスを再起動します。Outboxからの再送は最初は短い間隔でおこなわれますが、徐々に間隔が伸びていきます。5回目以降は結構待たないと動きません。
しばらくするとOutboxが空になりました。

{
  "@context": "$metadata#OutboxMessages",
  "@metadataEtag": "W/\"2f1e5e178c7da148ddcdf8f2debe2e351a67f16c0a9b1fc9bdd400b570a8d9c3\"",
  "value": []
}

リモートの発注件数も増えています。

  "@count": 22,

Transactional Outboxの使いどころと注意点

今回のケースでは、CAPの処理がリモートサービスのレスポンスに依存していないため、サービス呼び出しを最後に行うことができました。しかし、リモートサービスに登録した結果を処理で使用したい場合は、リモートサービスは処理中に呼び出す必要があるのでTransactional Outboxは使えません。したがってTransactional Outboxを使える条件は、CAPの処理がリモートサービスの呼び出し結果に依存しないこととなります。

注意点はリモートへの送信が非同期になるので、正常に送れたことを担保する仕組みが必要になるということです。リモートサービス呼び出しで400番台のエラーが出る(呼び出し側に原因がある)ケースでは、Outboxを使っているとエラーに気づくのが遅れます。ドキュメントにOutboxの処理にカスタムロジックを追加し、エラーの種類によって必要な対応(例:アラートを上げる)をした後でOutboxから削除する方法が紹介されています。

さらに、リトライ回数の上限を超えたメッセージについても監視する必要があります。こちらにDeadLetterQueueに相当するエンティティを作り、ロジックを入れてリトライ回数の上限を超えたメッセージを抽出する方法が紹介されています。

Transactional Outboxはエラーが起きない状況では便利ですが、エラーハンドリングを考えると結構扱いが難しいかもしれません。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?