はじめに
本記事はゼロからSpring Batchを説明し、活用して環境構築を行ってバッチ処理を実装してみます。
処理流れの全体としては、バッチプロセス
が動いたら、あるjSONファイルからデータを読み込んでMySQLのデータベースに書き出す。
SpringBatchとは
Spring BatchはSpringの子どものプロジェクトである。Spring BatchはSpringを基に、バッチアプリケーションフレームワークの一つである。
SpringBatchの構成
Spring Batchの主な構成要素と処理全体の流れについて説明をする。 また、ジョブの実行状況などのメタデータがどのように管理されているかについても説明する。
Spring Batchの主な構成要素と処理全体の流れ(チャンクモデル)を下図に示す。
構成要素 | 役割 |
---|---|
Job | Spring Batchにおけるバッチアプリケーションの一連の処理をまとめた1実行単位。 |
Step | Jobを構成する処理の単位。1つのJobに1~N個のStepをもたせることが可能。 |
JobLauncher | Jobを起動するためのインターフェース。 |
ItemReader | チャンクモデルを実装する際に、データの入力/加工/出力の3つに分割するためのインターフェース。 |
ItemProcessor | チャンクモデルを実装する際に、データの入力/加工/出力の3つに分割するためのインターフェース。 |
ItemWriter | チャンクモデルを実装する際に、データの入力/加工/出力の3つに分割するためのインターフェース。 |
JobRepository | JobやStepの状況を管理する機構。これらの管理情報は、Spring Batchが規定するテーブルスキーマを元にデータベース上に永続化される。 |
環境構築
- SpringBatch
- SpringBoot
- Java
- Grade
本記事では、Spring Initializrを活用してプロジェクトを作りました。以下の通りにやりましょう。
ソース実装
こらから、プロジェクトを作って説明します。
ライブラリ導入
このファイルがライブラリ依頼となります。
Build.gralde
buildscript {
ext {
springBootVersion = '2.0.4.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
group = 'com.demo'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-batch')
compile('org.springframework.boot:spring-boot-starter-jdbc')
compile("org.springframework.boot:spring-boot-starter-data-jpa")
compile group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-joda', version: '2.9.4'
compile group: 'org.jadira.usertype', name: 'usertype.core', version: '6.0.1.GA'
compile group: 'mysql', name: 'mysql-connector-java', version: '6.0.6',
testCompile('org.springframework.boot:spring-boot-starter-test')
testCompile('org.springframework.batch:spring-batch-test')
}
SpringBatchの配置
まずは、グローバル設定を行います。後はJOBの配置をします。
SpringBatchConfiguration.java
@Configuration
@EnableAutoConfiguration
@EnableBatchProcessing(modular = true)
public class SpringBatchConfiguration {
@Bean
public ApplicationContextFactory firstJobContext() {
return new GenericApplicationContextFactory(FirstJobConfiguration.class);
}
@Bean
public ApplicationContextFactory secondJobContext() {
return new GenericApplicationContextFactory(SecondJobConfiguration.class);
}
}
エンティティクラスを作成
Json Dataのフォーマットの通りです。
Message.java
@Entity
@Table(name = "message")
public class Message {
@Id
@Column(name = "object_id", nullable = false)
private String objectId;
@Column(name = "content")
private String content;
@Column(name = "last_modified_time")
private LocalDateTime lastModifiedTime;
@Column(name = "created_time")
private LocalDateTime createdTime;
}
Jobを作成
MessageMigrationJobConfiguration.java
public class MessageMigrationJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory
@Autowired
private EntityManagerFactory entityManager;
// Jobを作成
@Bean
public Job messageMigrationJob(@Qualifier("messageMigrationStep") Step messageMigrationStep) {
return jobBuilderFactory.get("messageMigrationJob")
.start(messageMigrationStep)
.build();
}
// Stepを作成
@Bean
public Step messageMigrationStep(@Qualifier("jsonMessageReader") FlatFileItemReader<Message> jsonMessageReader,
@Qualifier("messageItemWriter") JpaItemWriter<Message> messageItemWriter,
@Qualifier("errorWriter") Writer errorWriter) {
return stepBuilderFactory.get("messageMigrationStep")
.<Message, Message>chunk(CHUNK_SIZE)
.reader(jsonMessageReader).faultTolerant().skip(JsonParseException.class).skipLimit(SKIP_LIMIT)
.listener(new MessageItemReadListener(errorWriter))
.writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT)
.listener(new MessageWriteListener())
.build();
}
// Readerを作成
@Bean
public FlatFileItemReader<Message> jsonMessageReader() {
FlatFileItemReader<Message> reader = new FlatFileItemReader<>();
// 実際のJson Dataファイルパスを入れる
reader.setResource(new FileSystemResource(new File(MESSAGE_FILE)));
reader.setLineMapper(new MessageLineMapper());
return reader;
}
// Writerを作成
@Bean
public JpaItemWriter<Message> messageItemWriter() {
JpaItemWriter<Message> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManager);
return writer;
}
}
データプロセスメソッドを実装
ここでLineMapperを使ってtxtソースファイルから1行ずつデータを読み込んでMessageオブジェクトを転換します。
MessageLineMapper.java
public class MessageLineMapper implements LineMapper<Message> {
private MappingJsonFactory factory = new MappingJsonFactory();
@Override
public Message mapLine(String line, int lineNumber) throws Exception {
JsonParser parser = factory.createParser(line);
Map<String, Object> map = (Map) parser.readValueAs(Map.class);
Message message = new Message();
... // TODO データ転換ロジック
return message;
}
}
```
#### propertiesファイルを作成
```java:application.properties
spring.datasource.url=jdbc:mysql://database
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.jpa.database-platform=org.hibernate.dialect.MySQLDialect
spring.jpa.show-sql=true
spring.jpa.properties.jadira.usertype.autoRegisterUserTypes=true
spring.jackson.serialization.write-dates-as-timestamps=false
spring.batch.initialize-schema=ALWAYS
spring.jpa.hibernate.ddl-auto=update
```
#### Listenerを実装
エラーが発生する時に、リスナーがエラーメッセージをファイルに出力する。
1.ReadListener
```java:MessageItemReadListener.java
public class MessageItemReadListener implements ItemReadListener<Message> {
private Writer errorWriter;
public MessageItemReadListener(Writer errorWriter) {
this.errorWriter = errorWriter;
}
@Override
public void beforeRead() {
}
@Override
public void afterRead(Message item) {
}
@Override
public void onReadError(Exception ex) {
errorWriter.write(format("%s%n", ex.getMessage()));
}
}
```
2.WriteListener
```java:MessageWriteListener.java
public class MessageWriteListener implements ItemWriteListener<Message> {
@Autowired
private Writer errorWriter;
@Override
public void beforeWrite(List<? extends Message> items) {
}
@Override
public void afterWrite(List<? extends Message> items) {
}
@Override
public void onWriteError(Exception exception, List<? extends Message> items) {
errorWriter.write(format("%s%n", exception.getMessage()));
for (Message message : items) {
errorWriter.write(format("Failed writing message id: %s", message.getObjectId()));
}
}
}
```
#### Jobを実行
まずはテストメソッドを作ってます。
```java:SpringBatchTest.java
public static void main(String[] args) {
String jobName = args[0];
try {
ConfigurableApplicationContext context = SpringApplication.run(ZuociBatchApplication.class, args);
JobRegistry jobRegistry = context.getBean(JobRegistry.class);
Job job = jobRegistry.getJob(jobName);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
JobExecution jobExecution = jobLauncher.run(job, createJobParams());
if (!jobExecution.getExitStatus().equals(ExitStatus.COMPLETED)) {
throw new RuntimeException(format("%s Job execution failed.", jobName));
}
} catch (Exception e) {
throw new RuntimeException(format("%s Job execution failed.", jobName));
}
}
private static JobParameters createJobParams() {
return new JobParametersBuilder().addDate("date", new Date()).toJobParameters();
}
```
プロジェクトをjarファイルにして以下のcommandで実行してみよう。
> java -jar YOUR_BATCH_NAME.jar YOUR_JOB_NAME
#### 実行結果
実行してうまくいけば、以下のようなテーブルとデータを作成するべきだ。
![data.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/225740/66b3b917-ded2-71cf-8e5a-2f2ed7948f43.png)
また、詳しい情報(job名、状態、エラーメッセージなど)は以下の通りです。
![job.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/225740/7c5ee6aa-5f1f-fc07-90e2-249d7f996184.png)
![erro.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/225740/ccb18ee1-0b6e-0196-a071-a64b5f0ecc64.png)
### 最後に
最後まで読んでいただき、ありがとうございます。
おかしいと思う部分は遠慮なくご指摘いただければと思います。
よろしくお願いします。
参考資料:
1.https://terasoluna-batch.github.io/guideline/5.0.0.RELEASE/ja/Ch02_SpringBatchArchitecture.html
2.https://spring.io/projects/spring-batch