12
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Spring Batch + MyBatisでテーブルパーティション毎に並列処理を行う方法

Last updated at Posted at 2017-12-16

はじめに

ラクス Advent Calendar 2017の16日目です。

本当は実際のソースコードも交えながら「バッチ開発を通して学ぶオブジェクト指向プログラミング」というテーマでオブジェクト指向プログラミング初心者向けに、オブジェクト指向で実装すると何が嬉しいのか? インターフェースや抽象クラスを使うとどんな風に楽できるの? 的なことを書きたかったのですがソースコードの準備も記事の執筆も間に合わなそうだったのでかなり規模を縮小します。

現在私の現場ではSpring Batchを使ってバッチ開発を行っているのでそれ関連で役に立ちそうな記事を書こうかと。
プラスSpring Batch歴がまだ2ヶ月程度なので私自身の知識の整理も兼ねます。
なので、読者としてはSpring Batchで開発を行う予定or行っている方が対象となります。
Spring Batch何ぞや? という点は他にわかりやすいサイトが多数あるのでそちらに任せます。こちらとかこっちとか。

この記事でのテーマは__「パーティショニングされたテーブルのパーティション毎に並列処理を行うバッチを作る」__です。
複数ファイルを読みこんでDBに取り込むようなバッチを作る場合、「ファイルごとにスレッドを作って複数ファイルを同時にDBに書き込む」というような処理はググるとMultiResourcePartitionerクラスを使った例がたくさん出てくるのですが、「テーブルのパーティションごとにスレッドを作って複数パーティションを同時に処理する」みたいな例は探しても見つからなかったので今後の自分用メモも兼ねて整理しておこうかなと思いこのテーマを選びました。

使うIDEやフレームワーク類

今回の例では極力手抜きをしたいので以下のIDE・フレームワーク類を使います。
記事の最後にGitHubの私のレポジトリのリンク張るのでpom.xmlやlaunch-context.xml等の設定ファイルの詳細はその中を見てください。

1. Eclipse

 言わずと知れたJavaIDEとして超有名なツール。今回はSTSプラグインを入れてSpringでの開発を容易にして使います。

2. MySQL

 現場がMySQLなので特に深い理由もなく採用。

3. MyBatis

 MySQLを使うならセットで使うことも多いと思われるORマッパー。
 EclipseにMyBatis Generatorプラグインも入れて徹底的に手抜きします。

4. Maven

 依存性解決とビルドに。Gradleは使ったことないのでとりあえずパス。

5. Spring Batch

 バッチ開発なのでコレが無いと始まらない。今回作成するプロジェクトは「New -> Project -> Spring Legacy Project -> Simple Spring Batch Project」から作ることが出来るスケルトンを元にして作って行きます。

6. MyBatis-Spring

 SpringでMyBatisを使うならついでにこれも使いましょう。今回の処理で「パーティション毎の読み込み」はこのライブラリの機能を使うので。

※注意※
今回の記事ではMavenビルドして作ったjarをJavaコマンドで実行するところは範囲外とします。
本来はpom.xmlにMavenビルドの設定に関する記載が必要です。必要な場合は各々ググってください。

実際に作っていく

1. パーティション名のリストを設定ファイルに作る

Javaに読み込むためのパーティション名のリストを予め設定ファイルに作っておきます。
__の箇所には元々書かれていた設定を書いておいてください。
インフォメーションスキーマからパーティション名を取得した方が汎用性のある実装になると思うのですがやり方が分からなかったので設定ファイルにパーティション名ベタ書きしちゃいます。

launch-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:javaee="http://xmlns.jcp.org/xml/ns/javaee" xmlns:jee="http://www.springframework.org/schema/jee"
	xmlns:mybatis="http://mybatis.org/schema/mybatis-spring"
	xmlns:mybatis-spring="http://mybatis.org/schema/mybatis-spring"
	xmlns:util="http://www.springframework.org/schema/util" xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
		http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.3.xsd
		http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.3.xsd
		http://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring-1.2.xsd
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd
		http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.3.xsd
		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.3.xsd
		http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.3.xsd
		http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd">

<!-- 中略 -->

	<util:list id="partitionNameList" value-type="java.lang.String">
		<value>p0</value>
		<value>p1</value>
		<value>p2</value>
		<value>p3</value>
		<value>p4</value>
	</util:list>
</beans>

2. パーティション名を受け取ってReaderに渡すためのPartitionerクラスを作る

Spring Batchで並列処理を実装する際、予め用意されているPartitionerとしてorg.springframework.batch.core.partition.support.MultiResourcePartitionerクラスがありますが、このクラスは複数ファイルで並列処理を行うこと前提に作られたクラスのためこのままではDB読み込み用のPartitionerにはできません。
なので、今回はこのクラスを参考にして自作のPartitionerクラスを作成します。
出来たのが↓のクラスです。

MultiTablePartitionPartitioner.java
package jp.co.example.Batch.TableToOtherTable;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;

public class MultiTablePartitionPartitioner implements Partitioner {

	private static final String DEFAULT_KEY_NAME = "partitionName";

	private static final String PARTITION_KEY = "partition";

	private List<String> partitions;

	private String keyName = DEFAULT_KEY_NAME;

	public void setPartitions(List<String> resources) {
		this.partitions = resources;
	}

	public void setKeyName(String keyName) {
		this.keyName = keyName;
	}

	/**
	 * Assign the filename of each of the injected resources to an
	 * {@link ExecutionContext}.
	 *
	 * @see Partitioner#partition(int)
	 */
	@Override
	public Map<String, ExecutionContext> partition(int gridSize) {
		Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>(gridSize);
		int i = 0;
		for (String partition : partitions) {
			ExecutionContext context = new ExecutionContext();
			context.putString(keyName, partition);
			map.put(PARTITION_KEY + i, context);
			i++;
		}
		return map;
	}
}

別に何ということはありません。
org.springframework.batch.core.partition.support.MultiResourcePartitionerクラスのResourceを受け取っていた箇所をStringに書き換えただけです。
このクラスが設定ファイルからパーティション名を受け取り、パーティション毎にDBのReaderクラスを生成します。

3. 並列処理の1スレッドにあたるReaderを作る

これはMyBatis-Springのクラスに必要なプロパティを渡してやることで実現させます。
MyBatis-Springのクラスはorg.mybatis.spring.batch.MyBatisCursorItemReaderです。
このクラスのparameterValuesというプロパティに値を渡すことで、queryIdで指定されたSQLのプレースホルダ部分に値をはめ込んでSQLを実行してくれます。
プロパティへの値受け渡しはlaunch-context.xmlで行います。
手順1の続きに書いてください。

launch-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:javaee="http://xmlns.jcp.org/xml/ns/javaee" xmlns:jee="http://www.springframework.org/schema/jee"
	xmlns:mybatis="http://mybatis.org/schema/mybatis-spring"
	xmlns:mybatis-spring="http://mybatis.org/schema/mybatis-spring"
	xmlns:util="http://www.springframework.org/schema/util" xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
		http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.3.xsd
		http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.3.xsd
		http://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring-1.2.xsd
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd
		http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.3.xsd
		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.3.xsd
		http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.3.xsd
		http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd">

<!-- 中略 -->

	<util:list id="partitionNameList" value-type="java.lang.String">
		<value>p0</value>
		<value>p1</value>
		<value>p2</value>
		<value>p3</value>
		<value>p4</value>
	</util:list>

	<!-- テーブルをパーティション毎に読み込むためのReader -->
	<bean id="tableToOtherTableReader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
		scope="step">
		<property name="sqlSessionFactory" ref="sqlSessionFactory" />
		<property name="queryId"
			value="jp.co.example.DbMapper.PurchaseDetailMapper.selectByPartition" />
		<property name="parameterValues" ref="partitionNameMap" />
	</bean>

	<!-- 手順2で作ったMultiTablePartitionPartitionerにパーティション名リストを渡してやる -->
	<bean id="multiTablePartitionPartitioner"
		class="jp.co.example.Batch.TableToOtherTable.MultiTablePartitionPartitioner">
		<property name="partitions" ref="partitionNameList" />
	</bean>

	<!-- org.mybatis.spring.batch.MyBatisCursorItemReaderに渡すパラメータ。
	MultiTablePartitionPartitionerクラスの中にstepExecutionContextという領域にパーティション名を格納する処理がある -->
	<util:map id="partitionNameMap" scope="step">
		<entry key="partitionName" value="#{stepExecutionContext['partitionName']}" />
	</util:map>

	<!-- タスクExecutor -->
	<bean id="taskExecutor" class="org.springframework.core.task.SyncTaskExecutor" />

</beans>

4. 実際にDB読み込みを行う部分を作る

まずMyBatisGeneratorプラグインでMyBatisを使うためのクラスや設定ファイルを自動生成します。
MyBatisGeneratorプラグインの設定ファイルは↓の通り。詳しい使い方はググってください。

generatorConfig.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE generatorConfiguration PUBLIC "-//mybatis.org//DTD MyBatis Generator Configuration 1.0//EN" "http://mybatis.org/dtd/mybatis-generator-config_1_0.dtd" >
<generatorConfiguration >
  <classPathEntry location="C:\Program Files\MySQL\mysql-connector-java-5.1.44\mysql-connector-java-5.1.44-bin.jar"/>
<!-- ↑ MySQLに接続するConnectorJのjarファイルを指定します  -->
  <context id="context1" >
    <jdbcConnection driverClass="com.mysql.jdbc.Driver" connectionURL="jdbc:mysql://localhost:3306/batch_sample" userId="root" password="root" />
    <!-- ↑ ローカルのMySQLに接続するパスとDB名、それからMySQLのユーザーIDとパスを指定します -->
    <javaModelGenerator targetPackage="jp.co.example.DbMapper" targetProject="spring-batch-simple/src/main/java/" />
    <sqlMapGenerator targetPackage="jp.co.example.DbMapper" targetProject="spring-batch-simple/src/main/java/" />
    <javaClientGenerator targetPackage="jp.co.example.DbMapper" targetProject="spring-batch-simple/src/main/java/" type="XMLMAPPER" />
    <!-- ↑  3行すべてに、MyBatisのクラスを入れたいパッケージ名と、プロジェクトのファイルパスを入れてください -->
<!--     <table schema="batch_sample" tableName="employee" > -->
    <table schema="batch_sample" tableName="purchase_detail" >
    <table schema="batch_sample" tableName="purchase_summary" >
    <!-- ↑ DB名とテーブル名を入れてください -->
    </table>
  </context>
</generatorConfiguration>

次にデータの受け取りと加工を行うクラスを作ります。
バッチのProcessorやWriterクラスにデータ加工や書き込みの処理を書いてしまうとProcessorやWriterの汎用性が著しく下がるためそれらの処理はデータクラスとまとめます。
MyBatisGeneratorで自動生成したデータクラスを継承して作ります。

InputData.java
package jp.co.example.Entity;

public interface InputData {
	public OutputToDB convertToDB();

}
PurchaseDetailEntity.java
package jp.co.example.Entity.Impl;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import jp.co.example.DbMapper.PurchaseDetail;
import jp.co.example.Entity.InputData;
import jp.co.example.Entity.OutputToDB;

@Repository("purchaseDetailEntity")
@Configurable
public class PurchaseDetailEntity extends PurchaseDetail implements InputData {

	@Autowired
	@Qualifier("purchaseSummaryEntity")
	private PurchaseSummaryEntity purchaseSummaryEntity;

	@Override
	public OutputToDB convertToDB() {
		purchaseSummaryEntity.setCustomerId(getCustomerId());
		purchaseSummaryEntity.setPurchaseNumber(getPurchaseNumber());
		purchaseSummaryEntity.setTotalPrice(getItemPrice());
		purchaseSummaryEntity.setPurchaseAt(getPurchaseAt());
		return purchaseSummaryEntity;
	}

}

次、DB書き込みを行うクラス

OutputToDB.java
package jp.co.example.Entity;

public interface OutputToDB {
	public void insertThisRecord();

}
PurchaseSummaryEntity.java
package jp.co.example.Entity.Impl;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.stereotype.Repository;

import jp.co.example.DbMapper.PurchaseSummary;
import jp.co.example.DbMapper.PurchaseSummaryMapper;
import jp.co.example.Entity.OutputToDB;

@Repository("purchaseSummaryEntity")
@Configurable
public class PurchaseSummaryEntity extends PurchaseSummary implements OutputToDB {

	@Autowired
	private PurchaseSummaryMapper purchaseSummaryMapper;

	@Override
	public void insertThisRecord() {
		purchaseSummaryMapper.insertSelective(this);

	}

}

読み込み処理を行うSQL。
これは自動生成されたxmlに追記します。

PurchaseDetailMapper.xml

<!-- 中略 -->

  <!-- 自作のEntityクラスにデータを渡すための設定 -->
  <resultMap id="BaseResultMapEntity" type="jp.co.example.Entity.Impl.PurchaseDetailEntity">
    <id column="id" jdbcType="INTEGER" property="id" />
    <id column="customer_id" jdbcType="INTEGER" property="customerId" />
    <result column="purchase_number" jdbcType="VARCHAR" property="purchaseNumber" />
    <result column="purchase_number_seq" jdbcType="INTEGER" property="purchaseNumberSeq" />
    <result column="item_name" jdbcType="VARCHAR" property="itemName" />
    <result column="item_price" jdbcType="INTEGER" property="itemPrice" />
    <result column="purchase_at" jdbcType="TIMESTAMP" property="purchaseAt" />
  </resultMap>
  <!-- SQLの実行結果はresultMapで指定したクラスに渡される -->
  <select id="selectByPartition"  resultMap="BaseResultMapEntity">
    select customer_id, purchase_number, SUM(item_price) item_price, purchase_at from purchase_detail
    PARTITION(${partitionName}) group by DATE(purchase_at), purchase_number, customer_id
  </select>

5. 並列実行を定義したJobを作る

module-context.xmlに並列実行ジョブを定義します。

module-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xsi:schemaLocation="
	http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
	http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">

	<batch:job id="tableToOtherTableUsingPartition">
		<batch:step id="tableToOtherTable_step1">
			<batch:partition step="tableToOtherTable_stepImpl" partitioner="multiTablePartitionPartitioner">
			<!-- tableToOtherTable_stepImplがスレッド1つで実際に実行する処理。
multiTablePartitionPartitionerは手順2で作ったクラスで、このクラスがスレッドを作ってくれる -->
				<batch:handler grid-size="5" task-executor="taskExecutor" />
			</batch:partition>
		</batch:step>
	</batch:job>
	<!-- tableToOtherTable_stepImplの中身 -->
	<batch:step id="tableToOtherTable_stepImpl">
		<batch:tasklet transaction-manager="logicTransactionManager"
			start-limit="100">
			<batch:chunk reader="tableToOtherTableReader"
				processor="tableToOtherTableExampleItemProcessor" writer="tableToOtherTableExampleItemWriter"
				commit-interval="1000" />
		</batch:tasklet>
	</batch:step>
</beans>

6. ソースコードはこちら

一応これで完成です。
Run ConfigurationsでMain class『org.springframework.batch.core.launch.support.CommandLineJobRunner』を指定して、引数『classpath:launch-context.xml tableToOtherTableUsingPartition』としてやれば動きます。
ここに書かれていないProcessor、WriterクラスのソースコードやテーブルのDDLは↓のレポジトリに置いてあるので中身を見ながら追って下さい。
https://github.com/Black-Spider007/SpringBatch_sample

本文ソースコード共にあまりにも読みにくいので後から加筆修正不可避

参考

soracane:https://sites.google.com/site/soracane/home/springnitsuite/spring-batch
おもてなしマインド.com:http://www.omotenashi-mind.com/index.php?title=Sping_Batch
mybatis-spring公式:http://www.mybatis.org/spring/ja/index.html
MyBatis Generator の使い方:https://cloudear.jp/blog/?p=708
@Configurableをつけると普通にnewしてるクラスにも依存性を注入できるとな:http://ryu-htn.hatenablog.com/entry/2013/04/25/151240

12
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?