3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Apache Camel Spring Boot starters でDB上のデータをバッチ処理するサンプル

Posted at

Apache Camel Spring Boot starters を使った単純なcamelアプリの作成手順」の続きで、MySQLにためたタスクを処理するバッチを実装する。

本当は複数マシンで並行処理させる際の排他制御を実験するという目的があるのだが、SQLもcamelもJavaも中途半端な理解なのでまだそこまでできない。今回のアプリはそこへ繋がりそうなサンプルという位置付け。


前回の分は説明しないが、必要なファイルは全てこちらの記事に書くので見返さなくても動かせる。

ファイル一覧
path/to/app/
  ├── src/main/
  │     ├── java/org/example/mycamelapp/
  │     │     ├── db/
  │     │     │     ├── TaskSql.java
  │     │     │     └── TaskTable.java
  │     │     ├── processors/
  │     │     │     └── PseudoWorkProcessor.java
  │     │     ├── routes/
  │     │     │     └── TaskExecutionRoute.java
  │     │     └── MyCamelApplication.java
  │     └── resources/
  │           └── application.yml
  └── pom.xml

環境

DBを追加した以外は前回と同じ。

  • Java: 1.8.0_191
  • Maven: 3.6.3
  • Camel: 3.2.0
    • Spring: 5.2.5.RELEASE
    • Spring Boot: 2.2.6.RELEASE
  • MySQL: 5.7.30

DBテーブル定義

task テーブルを作る。「タスク処理」をイメージしたカラムは以下の2つ。

  • status : タスクの処理段階を表す。各バッチ(今回は1つだけ)は自分の担当する段階のタスクを抜き出して、完了したらステータスを進める。
  • executor : タスクを処理する/したホストを記録する。アプリを複数のマシンで並列実行する際に排他制御に使うかもしれない。

いずれ実験したい排他制御の想定は、異なるタスクは順序を気にせず並行して処理してもいいが、同じタスクを二重に実行してはいけないというもの。

CREATE DATABASE IF NOT EXISTS my_camel_app;

CREATE TABLE IF NOT EXISTS my_camel_app.task (
	task_id    int(11)      NOT NULL AUTO_INCREMENT,
	status     int(11)      NOT NULL DEFAULT 0,
	executor   varchar(255)          DEFAULT NULL,
	created_at datetime     NOT NULL DEFAULT CURRENT_TIMESTAMP,
	updated_at datetime     NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
	PRIMARY KEY (task_id)
);

レコードの追加は INSERT INTO my_camel_app.task () VALUES (); という感じで行える。前回のtimerを使って自動で増やしてもいいかもしれない。

pom.xml

全文(クリックして展開)
pom.xml (mycamelapp)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>org.example</groupId>
	<artifactId>mycamelapp</artifactId>
	<version>1.0-SNAPSHOT</version>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.6.RELEASE</version>
	</parent>

	<!-- https://camel.apache.org/camel-spring-boot/latest/index.html -->
	<dependencyManagement>
		<dependencies>
			<!-- Camel BOM -->
			<dependency>
				<groupId>org.apache.camel.springboot</groupId>
				<artifactId>camel-spring-boot-dependencies</artifactId>
				<version>3.2.0</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
			<!-- ... other BOMs or dependencies ... -->
		</dependencies>
	</dependencyManagement>

	<dependencies>
		<!-- Camel Starter -->
		<dependency>
			<groupId>org.apache.camel.springboot</groupId>
			<artifactId>camel-spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.camel.springboot</groupId>
			<artifactId>camel-sql-starter</artifactId>
		</dependency>
		<!-- ... other dependencies ... -->

		<!-- JDBC -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-jdbc</artifactId>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
		</dependency>

		<!-- Utils -->
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<scope>provided</scope>
		</dependency>
	</dependencies>
</project>

変更点:

  • 親プロジェクトに spring-boot-starter-parent を指定
    • dependencyManagementに spring-boot-dependencies を追加した場合は、pluginsは継承されず明示しなければならないため
    • 2.2.6.RELEASE はcamelバージョンに合わせたもの
  • dependenciesへの追加
    • camel-sql-starter
    • spring-boot-starter-jdbc
    • mysql-connector-java
    • lombok (getterなどの自動生成用、コンパイル時に利用)
  • その他
    • buildは丸ごと削除(pluginsは spring-boot-dependencies の設定を利用するため)

dependencyManagement(親プロジェクトを含む)の中で書かれているものは改めてバージョンを書く必要が無いのが便利。 mysql-connector-java や lombok もあった。

タスク処理の実装

「DBからタスクを一度に複数取得し、並列で処理してDBを更新する」というrouteを作ってみる。

camel-sample-route.png

処理するタスクのステータスは UNEXECUTEDSUCCEEDED 。(各ステータスには後で適当に数字を割り当てる)

Route

細かい設定や処理は別で定義することにして、まずは絵の通りに構造だけ作成する。(コードをコピペする場合は、他のクラスをimportする必要があるので最後がいい)

src/main/java/org/example/mycamelapp/routes/TaskExecutionRoute.java
@Component
public class TaskExecutionRoute extends RouteBuilder {

	// dummy hostname
	static public final String HOSTNAME = "host-" + Integer.toHexString(new SecureRandom().nextInt());

	@Override
	public void configure() throws Exception {
		from(TaskSql.selectTasks(TaskTable.Status.UNEXECUTED, 5))
				// exchange.getIn().getBody() is List<Map<String, Object>> of records
				.log("${body.size} rows selected.")
				.split(body()).parallelProcessing()
					// exchange.getIn().getBody() is Map<String, Object> of a record
					.process(new PseudoWorkProcessor())
					.to(TaskSql.updateTask(TaskTable.Status.SUCCEEDED, HOSTNAME))
				.end()
				.log("all tasks finished.")
		;
	}
}
  • from() にデータを受け取る設定をURI文字列で書く。
  • camelでは文字列の中に ${expr} という形で式展開を書けることが多い。
  • split() でデータを分割してそれぞれ処理する。
    • 分割されたデータは新しいexchangeに詰められる。
    • 今回は順序を気にしないので parallelProcessing() を指定して並行処理させる。
    • メソッドチェーンでもexchangeを色々操作できるが、今回は process() で自作クラスによってexchangeを編集する。
    • to() にデータを渡す設定をURI文字列で書く。
  • end() で入れ子構造の終わりを示す。

JavaのDSLだと end() を入れ忘れても滅多に構文エラーにならず、想像と異なるroute構造になってしまう危険性がある。

なお、 RouteBuilderconfigure() という名前が示す通り、このメソッドはデータが来る度に呼ばれるのではなくアプリ起動時に一度だけ呼ばれる。そのためブレークポイントを置いてもデータ処理のデバッグはできない。

テーブルとクエリ

src/main/java/org/example/mycamelapp/db/TaskTable.java
public interface TaskTable {

	String TABLE_NAME = "task";

	String TASK_ID = "task_id";
	String STATUS = "status";
	String EXECUTOR = "executor";
	String CREATED_AT = "created_at";
	String UPDATED_AT = "updated_at";

	@AllArgsConstructor
	@Getter
	enum Status {

		UNEXECUTED(0),
		SUCCEEDED(10),
		FAILED(-1),
		;

		private final int code;
	}
}
src/main/java/org/example/mycamelapp/db/TaskSql.java
public class TaskSql implements TaskTable {

	public static String insertTask() {
		return "sql:INSERT INTO " + TABLE_NAME + " () VALUES ()";
	}

	public static String selectTasks(Status status, int limit) {
		return "sql:SELECT * FROM " + TABLE_NAME
				+ " WHERE " + STATUS + " = " + status.getCode()
				+ " LIMIT " + limit
				+ "?useIterator=false" // List<Map<String, Object>>
				;
	}

	public static String updateTask(Status nextStatus, String hostname) {
		return "sql:UPDATE " + TABLE_NAME
				+ " SET " + STATUS + " = " + nextStatus.getCode()
				+ ", " + EXECUTOR + " = " + quote(hostname)
				+ " WHERE " + TASK_ID + " = " + ref(TASK_ID)
				;
	}

	private static String quote(String value) {
		if (value == null) return "NULL";
		return "'" + value + "'";
	}

	private static String ref(String key) {
		return ":#" + key;
	}
}

from()to() に指定する文字列はURIであり、 ? 以降にオプションを指定できる。今回使っているのは useIterator=false で、DBから取得したデータをrouteへ1行ずつ Map<> で流すのではなく List<Map<>> で1個に纏めて流す。

クエリの中に :#task_id (← ref(TASK_ID) )という文字列が登場する。これはcamelがexchange内のbodyまたはheaderからキーに対応する値を取り出して埋め込む。

Process

「時間のかかる処理をしている」という想定で、ランダムに1〜3秒のスリープを入れる。

src/main/java/org/example/mycamelapp/processors/PseudoWorkProcessor.java
@Slf4j
public class PseudoWorkProcessor implements Processor {

	@Override
	public void process(Exchange exchange) throws Exception {
		Map<String, Object> task = exchange.getIn().getBody(Map.class);
		int processingTime = ThreadLocalRandom.current().nextInt(1000, 3000);
		String infoMsg = "task_id = " + task.get(TaskTable.TASK_ID) + ", time = " + processingTime + "[ms]";

		log.info("start  working :: " + infoMsg);
		Thread.sleep(processingTime);
		log.info("finish working :: " + infoMsg);
	}
}

こちらはデータが来るたびに呼ばれるので、ブレークポイントを置いてデバッグできる。

起動

設定

MySQLへの接続設定を追加する。ここではyamlに書いているが、環境変数や -D オプションで与えてもいい(SpringBootで優先順位が決まっている)。長く書くのが面倒だったので、URLにユーザー名やパスワードも入れた。

src/main/resources/application.yml
# to keep the JVM running
camel:
  springboot:
    main-run-controller: true

spring:
  data-source:
    url: jdbc:mysql://user:password@localhost:3306/my_camel_app

アプリの開始地点は前回と同じ。

src/main/java/org/example/mycamelapp/MyCamelApplication.java
@SpringBootApplication
public class MyCamelApplication {

	public static void main(String[] args) {
		SpringApplication.run(MyCamelApplication.class, args);
	}
}

実行

terminal
cd path/to/app

mvn spring-boot:run

参考

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?