「Apache Camel Spring Boot starters を使った単純なcamelアプリの作成手順」の続きで、MySQLにためたタスクを処理するバッチを実装する。
本当は複数マシンで並行処理させる際の排他制御を実験するという目的があるのだが、SQLもcamelもJavaも中途半端な理解なのでまだそこまでできない。今回のアプリはそこへ繋がりそうなサンプルという位置付け。
前回の分は説明しないが、必要なファイルは全てこちらの記事に書くので見返さなくても動かせる。
path/to/app/
├── src/main/
│ ├── java/org/example/mycamelapp/
│ │ ├── db/
│ │ │ ├── TaskSql.java
│ │ │ └── TaskTable.java
│ │ ├── processors/
│ │ │ └── PseudoWorkProcessor.java
│ │ ├── routes/
│ │ │ └── TaskExecutionRoute.java
│ │ └── MyCamelApplication.java
│ └── resources/
│ └── application.yml
└── pom.xml
環境
DBを追加した以外は前回と同じ。
- Java: 1.8.0_191
- Maven: 3.6.3
- Camel: 3.2.0
- Spring: 5.2.5.RELEASE
- Spring Boot: 2.2.6.RELEASE
- MySQL: 5.7.30
DBテーブル定義
task
テーブルを作る。「タスク処理」をイメージしたカラムは以下の2つ。
-
status
: タスクの処理段階を表す。各バッチ(今回は1つだけ)は自分の担当する段階のタスクを抜き出して、完了したらステータスを進める。 -
executor
: タスクを処理する/したホストを記録する。アプリを複数のマシンで並列実行する際に排他制御に使うかもしれない。
いずれ実験したい排他制御の想定は、異なるタスクは順序を気にせず並行して処理してもいいが、同じタスクを二重に実行してはいけないというもの。
CREATE DATABASE IF NOT EXISTS my_camel_app;
CREATE TABLE IF NOT EXISTS my_camel_app.task (
task_id int(11) NOT NULL AUTO_INCREMENT,
status int(11) NOT NULL DEFAULT 0,
executor varchar(255) DEFAULT NULL,
created_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (task_id)
);
レコードの追加は INSERT INTO my_camel_app.task () VALUES ();
という感じで行える。前回のtimerを使って自動で増やしてもいいかもしれない。
pom.xml
全文(クリックして展開)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>mycamelapp</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
</parent>
<!-- https://camel.apache.org/camel-spring-boot/latest/index.html -->
<dependencyManagement>
<dependencies>
<!-- Camel BOM -->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-dependencies</artifactId>
<version>3.2.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- ... other BOMs or dependencies ... -->
</dependencies>
</dependencyManagement>
<dependencies>
<!-- Camel Starter -->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-sql-starter</artifactId>
</dependency>
<!-- ... other dependencies ... -->
<!-- JDBC -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- Utils -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
変更点:
- 親プロジェクトに spring-boot-starter-parent を指定
- dependencyManagementに spring-boot-dependencies を追加した場合は、pluginsは継承されず明示しなければならないため
-
2.2.6.RELEASE
はcamelバージョンに合わせたもの
- dependenciesへの追加
- camel-sql-starter
- spring-boot-starter-jdbc
- mysql-connector-java
- lombok (getterなどの自動生成用、コンパイル時に利用)
- その他
- buildは丸ごと削除(pluginsは spring-boot-dependencies の設定を利用するため)
dependencyManagement(親プロジェクトを含む)の中で書かれているものは改めてバージョンを書く必要が無いのが便利。 mysql-connector-java や lombok もあった。
タスク処理の実装
「DBからタスクを一度に複数取得し、並列で処理してDBを更新する」というrouteを作ってみる。
処理するタスクのステータスは UNEXECUTED
→ SUCCEEDED
。(各ステータスには後で適当に数字を割り当てる)
Route
細かい設定や処理は別で定義することにして、まずは絵の通りに構造だけ作成する。(コードをコピペする場合は、他のクラスをimportする必要があるので最後がいい)
@Component
public class TaskExecutionRoute extends RouteBuilder {
// dummy hostname
static public final String HOSTNAME = "host-" + Integer.toHexString(new SecureRandom().nextInt());
@Override
public void configure() throws Exception {
from(TaskSql.selectTasks(TaskTable.Status.UNEXECUTED, 5))
// exchange.getIn().getBody() is List<Map<String, Object>> of records
.log("${body.size} rows selected.")
.split(body()).parallelProcessing()
// exchange.getIn().getBody() is Map<String, Object> of a record
.process(new PseudoWorkProcessor())
.to(TaskSql.updateTask(TaskTable.Status.SUCCEEDED, HOSTNAME))
.end()
.log("all tasks finished.")
;
}
}
-
from()
にデータを受け取る設定をURI文字列で書く。 - camelでは文字列の中に
${expr}
という形で式展開を書けることが多い。 -
split()
でデータを分割してそれぞれ処理する。- 分割されたデータは新しいexchangeに詰められる。
- 今回は順序を気にしないので
parallelProcessing()
を指定して並行処理させる。 - メソッドチェーンでもexchangeを色々操作できるが、今回は
process()
で自作クラスによってexchangeを編集する。 -
to()
にデータを渡す設定をURI文字列で書く。
-
end()
で入れ子構造の終わりを示す。
JavaのDSLだと end()
を入れ忘れても滅多に構文エラーにならず、想像と異なるroute構造になってしまう危険性がある。
なお、 RouteBuilder
や configure()
という名前が示す通り、このメソッドはデータが来る度に呼ばれるのではなくアプリ起動時に一度だけ呼ばれる。そのためブレークポイントを置いてもデータ処理のデバッグはできない。
テーブルとクエリ
public interface TaskTable {
String TABLE_NAME = "task";
String TASK_ID = "task_id";
String STATUS = "status";
String EXECUTOR = "executor";
String CREATED_AT = "created_at";
String UPDATED_AT = "updated_at";
@AllArgsConstructor
@Getter
enum Status {
UNEXECUTED(0),
SUCCEEDED(10),
FAILED(-1),
;
private final int code;
}
}
public class TaskSql implements TaskTable {
public static String insertTask() {
return "sql:INSERT INTO " + TABLE_NAME + " () VALUES ()";
}
public static String selectTasks(Status status, int limit) {
return "sql:SELECT * FROM " + TABLE_NAME
+ " WHERE " + STATUS + " = " + status.getCode()
+ " LIMIT " + limit
+ "?useIterator=false" // List<Map<String, Object>>
;
}
public static String updateTask(Status nextStatus, String hostname) {
return "sql:UPDATE " + TABLE_NAME
+ " SET " + STATUS + " = " + nextStatus.getCode()
+ ", " + EXECUTOR + " = " + quote(hostname)
+ " WHERE " + TASK_ID + " = " + ref(TASK_ID)
;
}
private static String quote(String value) {
if (value == null) return "NULL";
return "'" + value + "'";
}
private static String ref(String key) {
return ":#" + key;
}
}
from()
や to()
に指定する文字列はURIであり、 ?
以降にオプションを指定できる。今回使っているのは useIterator=false
で、DBから取得したデータをrouteへ1行ずつ Map<>
で流すのではなく List<Map<>>
で1個に纏めて流す。
クエリの中に :#task_id
(← ref(TASK_ID)
)という文字列が登場する。これはcamelがexchange内のbodyまたはheaderからキーに対応する値を取り出して埋め込む。
Process
「時間のかかる処理をしている」という想定で、ランダムに1〜3秒のスリープを入れる。
@Slf4j
public class PseudoWorkProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Map<String, Object> task = exchange.getIn().getBody(Map.class);
int processingTime = ThreadLocalRandom.current().nextInt(1000, 3000);
String infoMsg = "task_id = " + task.get(TaskTable.TASK_ID) + ", time = " + processingTime + "[ms]";
log.info("start working :: " + infoMsg);
Thread.sleep(processingTime);
log.info("finish working :: " + infoMsg);
}
}
こちらはデータが来るたびに呼ばれるので、ブレークポイントを置いてデバッグできる。
起動
設定
MySQLへの接続設定を追加する。ここではyamlに書いているが、環境変数や -D
オプションで与えてもいい(SpringBootで優先順位が決まっている)。長く書くのが面倒だったので、URLにユーザー名やパスワードも入れた。
# to keep the JVM running
camel:
springboot:
main-run-controller: true
spring:
data-source:
url: jdbc:mysql://user:password@localhost:3306/my_camel_app
アプリの開始地点は前回と同じ。
@SpringBootApplication
public class MyCamelApplication {
public static void main(String[] args) {
SpringApplication.run(MyCamelApplication.class, args);
}
}
実行
cd path/to/app
mvn spring-boot:run