Edited at

統合フレームワーク「Apache Camel」のトランザクション管理


トランザクション

本エントリーではApache Camelフレームワークでのトランザクション管理について説明していきます。

そもそもトランザクションとは何かということについて少しだけ説明すると、(説明するまでもないと思いますが)複数の処理を1つにまとめたものと言えます。この複数の処理は分離できず、処理されたか処理されないかのいずれかの結果になります。これが有名なトランザクションのACID特性のAtomicity(原子性)になります。


  • Atomicity(原子性):全ての処理が完全に実行されるか一つも実行されないこと。

  • Consistency(一貫性):トランザクションの終了状態にかかわらず、データに矛盾が生じないこと(整合性がとれていること)。

  • Isolation(隔離性):複数のトランザクションが実行されていた場合、他のトランザクションに干渉しないこと。

  • Durability(耐久性):正常に完了したトランザクション処理の結果は失われないこと。

複数の処理を実行している中でエラーがあればトランザクション実行前の状態に戻します。これをロールバックといいます。複数の処理が全てエラーなく成功すればコミットすることで処理結果が反映されます。


Apache Camelでのトランザクション

Apache Camelで扱うトランザクションはJDBCかJMSなどですが、ここではJDBCを対象に説明していきます。CamelはSpringのトランザクション管理をベースとしていますので、Springでトランザクションを扱ったことがある方は、Camelのトランザクションも容易に理解できるかと思います。

SpringのトランザクションはTransaction Manager(トランザクションマネージャ)と呼ばれるもので管理されます。

このトランザクションマネージャはいくつか用意されており、JDBCの場合は「org.springframework.jdbc.datasource.DataSourceTransactionManager」を用います。

他にもJTA、JMSやHibername用のトランザクションマネージャがあります(使ったことないけど)

Camelでも同様にSpringのトランザクションマネージャを用いてトランザクションが管理されます。

それではJDBCを対象にCamelのトランザクション管理を説明していこうと思います。


Camelでのトランザクションのサンプル

Camelでトランザクションを扱ったシンプルなサンプルプログラムを用いて説明します。

サンプルプログラムではDBにアクセスするために、CamelのSQLコンポーネントを使ってみます。

まずはトランザクションマネージャで管理するデータソースを定義します。

対象のDBはPostgreSQLで、コネクションプーリングにHikariCPを使用したデータソースになっています。

    <bean id="hikariConfig" class="com.zaxxer.hikari.HikariConfig">

<property name="jdbcUrl" value="jdbc:postgresql://192.168.20.71:5432/testdb" />
<property name="driverClassName" value="org.postgresql.Driver" />
<property name="username" value="postgres" />
<property name="password" value="postgres" />
<property name="autoCommit" value="false" />
</bean>

<bean id="dataSource" class="com.zaxxer.hikari.HikariDataSource"
destroy-method="close">
<constructor-arg ref="hikariConfig" />
</bean>

定義したデータソースを管理するトランザクションマネージャを定義します。

(1)で先ほど作成したデータソースをプロパティで指定しています。

    <bean id="transactionManager"

class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" /><!-- (1) -->
</bean>


テスト用のテーブルを作成

まず、テーブルがないと試すこともできないので、シンプルなテーブル(sample_test)を一つ作成します。

create table sample_test (

id varchar(10) not null primary key,
name varchar(255) not null,
count integer not null,
insert_date timestamp not null);


SQLコンポーネントの定義

データベースへのSQLの実行にはSQLコンポーネントを使用します。

以下(1)のようにSQLコンポーネントを使用するデータソースを指定して定義します。

    <bean id="sqlComponent"

class="org.apache.camel.component.sql.SqlComponent">
<property name="dataSource" ref="dataSource" /><!-- (1) -->
</bean>

また、使用するデータソースはエンドポイントのURIに指定することもできますが、このようにあらかじめ指定しておいたほうがURIがシンプルになります。

SQLの実行時はエンドポイントのURIに先ほど指定したSQLコンポーネントのIDと実行するDMLを指定します。

            <to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />

SQL中の:#id, :#name, :#countはパラメータで、メッセージのヘッダに同じ名前のプロパティを設定しておくと自動でバインドされます。

なお、SQLはエンドポイントに直接書いていますが、プロパティファイルから読み込むようにしたほうが良いです。


簡単なサンプルプログラム

SQLコンポーネントの準備も整ったので、簡単なサンプルプログラムを作成します。

サンプルプログラムでは、sample_testテーブルに対して1レコード挿入し、そのレコードを更新しています。

トランザクションは「<transacted />」(1)と書くことでルートがトランザクションマネージャの管理下に入ります。ルートが開始によりトランザクションが開始され、ルートがエラーなく完了すれば、トランザクションマネージャにより自動的にコミットされます。

また、SQL中の:#id, :#name, :#countといったパラメータの値はその前のsetHeader(2)で値を指定していることが分かるかと思います。

        <route id="main_route">

<from uri="timer:trigger?repeatCount=1" />
<transacted /><!-- (1) -->
<setHeader headerName="id"><constant>id001</constant></setHeader><!-- (2) -->
<setHeader headerName="name"><constant>"testuser"</constant></setHeader><!-- (2) -->
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader><!-- (2) -->
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
<setHeader headerName="count"><simple resultType="java.lang.Integer">1</simple></setHeader>
<to uri="sqlComponent:update sample_test set count = :#count where id = :#id" />
<to uri="log:insertLog?showHeaders=true" />
</route>

さきほどの例ではトランザクションマネージャがあってもなくても結果が同じですので、1レコード挿入後に例外を発生させ、ロールバックされる例を試してみます。

        <route id="main_route">

<from uri="timer:trigger?repeatCount=1" />
<transacted />
<setHeader headerName="id"><constant>id001</constant></setHeader>
<setHeader headerName="name"><constant>"testuser"</constant></setHeader>
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
<throwException exceptionType="java.lang.Exception" message="throw new Exception" /><!-- (1) -->
<setHeader headerName="count"><simple resultType="java.lang.Integer">1</simple></setHeader>
<to uri="sqlComponent:update sample_test set count = :#count where id = :#id" />
<to uri="log:insertLog?showHeaders=true" />
</route>

throwException(1)で、強制的に例外を発生させています。これによりその前で実行されたINSERT文がロールバックされます。また、例外が発生しているので以降のUPDATE文は実行されずにルートが終わります。

実行時のログは以下のように出力されます。

[2019-03-08 08:38:20.599], [ERROR], o.a.c.p.DefaultErrorHandler, Camel (camel-1) thread #1 - timer://trigger, org.apache.camel.processor.DefaultErrorHandler, Failed delivery for (MessageId: ID-mky-PC-1552001898669-0-2 on ExchangeId: ID-mky-PC-1552001898669-0-1). Exhausted after delivery attempt: 1 caught: java.lang.Exception: throw new Exception

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[main_route ] [main_route ] [timer://trigger?repeatCount=1 ] [ 80]
[main_route ] [transacted1 ] [transacted ] [ 0]
[main_route ] [setHeader1 ] [setHeader[id] ] [ 0]
[main_route ] [setHeader2 ] [setHeader[name] ] [ 0]
[main_route ] [setHeader3 ] [setHeader[count] ] [ 1]
[main_route ] [to1 ] [sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#i] [ 34]
[main_route ] [to2 ] [log:insertLog?showHeaders=true ] [ 6]
[main_route ] [throwException1 ] [throwException[ref:null] ] [ 0]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.Exception: throw new Exception
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_172]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_172]
~省略~
at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_172]
[2019-03-08 08:38:20.612], [WARN ], o.a.c.s.s.TransactionErrorHandler, Camel (camel-1) thread #1 - timer://trigger, org.apache.camel.spring.spi.TransactionErrorHandler, Transaction rollback (0xb112b13) redelivered(false) for (MessageId: ID-mky-PC-1552001898669-0-2 on ExchangeId: ID-mky-PC-1552001898669-0-1) caught: java.lang.Exception: throw new Exception

最後の行で、「TransactionErrorHandler」のスレッドが「Transaction rollback」というログを出力していて、ロールバックされたことが分かります。実際のテーブル上のデータもロールバックによりレコードは挿入されていません。


トランザクションの伝播(PROPAGATION)

先ほどの例では1ルートに複数の処理(DML)を定義し、ルートが開始によりトランザクションが開始し、ルートの終了によりトランザクションも終了していました。

ルートが複数になった場合は、ルート間で同一のトランザクションを使用する、別のトランザクションを作成するなどの扱いを指定することができます。これはトランザクションの伝播(PROPAGATION)のオプションで設定することができます。

トランザクションの伝播(PROPAGATION)はApache Camel独自の仕様ではなく、Springトランザクションを利用しています。

トランザクションの伝播(PROPAGATION)のオプションでは以下の設定値をとり、トランザクションがある場合とない場合の動作が異なります。

トランザクション伝播属性
トランザクションがある場合
トランザクションがない場合

PROPAGATION_REQUIRED
既存のトランザクション内で実行する。
新規にトランザクションを開始する。

PROPAGATION_REQUIRES_NEW
既存のトランザクションとは別に、新規のトランザクションを開始する。
新規にトランザクションを開始する。

PROPAGATION_MANDATORY
既存のトランザクション内で実行する。
例外をスローする。

PROPAGATION_SUPPORTS
既存のトランザクション内で実行する。
トランザクションなしで実行する。

PROPAGATION_NOT_SUPPORTED
既存のトランザクションを停止し、トランザクションなしで実行する。
トランザクションなしで実行する。

PROPAGATION_NEVER
例外をスローする。
トランザクションなしで実行する。

PROPAGATION_NESTED
既存のトランザクションを利用し、その部分だけネストしたトランザクションのように処理されます。
新規にトランザクションを開始する。

Camelでトランザクションの伝播(PROPAGATION)を使用する場合は、以下のように使用するトランザクションの伝播を定義します。

先ほどまで例では使用していませんでしたが、無指定だとデフォルトでPROPAGATION_REQUIREDが指定されたことになります。

以下ではPROPAGATION_REQUIRED(1)、PROPAGATION_REQUIRES_NEW(2)、PROPAGATION_MANDATORY(3)の3つのトランザクションの伝播を定義しています。

    <bean id="PROPAGATION_REQUIRED"

class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="transactionManager" />
<property name="propagationBehaviorName"
value="PROPAGATION_REQUIRED" /><!-- (1) -->
</bean>

<bean id="PROPAGATION_REQUIRES_NEW"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="transactionManager" />
<property name="propagationBehaviorName"
value="PROPAGATION_REQUIRES_NEW" /><!-- (2) -->
</bean>

<bean id="PROPAGATION_MANDATORY"
class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="transactionManager" />
<property name="propagationBehaviorName"
value="PROPAGATION_MANDATORY" /><!-- (3) -->
</bean>

定義したトランザクションの伝播を使用する場合は、transactedに「ref="PROPAGATION_REQUIRED"」(1)のように記述します。

以下の例では、main_route, tran1_route, tran2_routeという3つのルートを定義し、それぞれtransactedに使用するトランザクションの伝播を指定しています。

        <route id="main_route">

<from uri="timer:trigger?repeatCount=1" />
<transacted ref="PROPAGATION_REQUIRED" /><!-- (1) -->
<to uri="direct:tran1" />
<to uri="direct:tran2" />
</route>

<route id="tran1_route">
<from uri="direct:tran1" />
<transacted ref="PROPAGATION_REQUIRES_NEW" /><!-- (1) -->
<setHeader headerName="id"><constant>id001</constant></setHeader>
<setHeader headerName="name"><constant>testuser</constant></setHeader>
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
</route>

<route id="tran2_route">
<from uri="direct:tran2" />
<transacted ref="PROPAGATION_MANDATORY" /><!-- (1) -->
<setHeader headerName="id"><constant>id002</constant></setHeader>
<setHeader headerName="name"><constant>testuser2</constant></setHeader>
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
</route>

また、Springではデフォルトで非検査例外(RuntimeExceptionとそのサブクラス)が発生した場合はロールバックされが、検査例外が発生した場合はロールバックされずにコミットされます。

一方、Camelでも同様に非検査例外(とException)でロールバックされます。


明示的にロールバックを実行する

Camelでは例外(非検査例外)が発生すると自動でロールバックされますが、明示的にロールバックさせることもできます。

明示的にロールバックするためには、「<rollback markRollbackOnlyLast="true" />」と指定します。

それにより、例外をスローすることなく、現在のトランザクションにロールバックのマークを付けます。以降の処理は実行されないため、ロールバック前にログを出力するなど処理が必要な場合は、rollback要素の前に実行します。

        <route id="main_route">

<from uri="timer:trigger?repeatCount=1" />
<transacted />
<setHeader headerName="id"><constant>id001</constant></setHeader>
<setHeader headerName="name"><constant>testuser</constant></setHeader>
<setHeader headerName="count"><simple resultType="java.lang.Integer">0</simple></setHeader>
<to uri="sqlComponent:insert into sample_test(id, name, count, insert_date) values (:#id, :#name, :#count, current_timestamp)" />
<to uri="log:insertLog?showHeaders=true" />
<rollback markRollbackOnlyLast="true" />
</route>


最後に

今回は説明しませんでしたが、複数のJMSやJDBCのような複数のリソースを対象としたトランザクションも扱うことができます。これはグローバルトランザクションと呼ばれます。今回はJDBCのみを扱っており、これはグローバルトランザクションに対してローカルトランザクションと呼ばれるものです。


参考


自分用TODO


  • Java DSLの例も書く。

  • ロールバックの例が単純なので、もう少し複雑で意味のある例に変える。

  • SQLコンポーネントの説明が少ないので説明を補足する。でもこの記事はトランザクションに関してなので、SQLコンポーネントに絞った別記事にするか考える。

  • FROMがJMSでTOがJDBCのトランザクションも説明に入れる。(JDBCに失敗したら、JMSはコミットされず再送される)