24
31

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

徹底解説! SpringBatchでバッチ処理を行う

Last updated at Posted at 2020-01-14

はじめに

本記事はゼロからSpring Batchを説明し、活用して環境構築を行ってバッチ処理を実装してみます。
処理流れの全体としては、バッチプロセス
が動いたら、あるjSONファイルからデータを読み込んでMySQLのデータベースに書き出す。

SpringBatchとは

Spring BatchはSpringの子どものプロジェクトである。Spring BatchはSpringを基に、バッチアプリケーションフレームワークの一つである。

SpringBatchの構成

Spring Batchの主な構成要素と処理全体の流れについて説明をする。 また、ジョブの実行状況などのメタデータがどのように管理されているかについても説明する。
Spring Batchの主な構成要素と処理全体の流れ(チャンクモデル)を下図に示す。

springbatch.png

構成要素 役割
Job Spring Batchにおけるバッチアプリケーションの一連の処理をまとめた1実行単位。
Step Jobを構成する処理の単位。1つのJobに1~N個のStepをもたせることが可能。
JobLauncher Jobを起動するためのインターフェース。
ItemReader チャンクモデルを実装する際に、データの入力/加工/出力の3つに分割するためのインターフェース。
ItemProcessor チャンクモデルを実装する際に、データの入力/加工/出力の3つに分割するためのインターフェース。
ItemWriter チャンクモデルを実装する際に、データの入力/加工/出力の3つに分割するためのインターフェース。
JobRepository JobやStepの状況を管理する機構。これらの管理情報は、Spring Batchが規定するテーブルスキーマを元にデータベース上に永続化される。

環境構築

  • SpringBatch
  • SpringBoot
  • Java
  • Grade

本記事では、Spring Initializrを活用してプロジェクトを作りました。以下の通りにやりましょう。
grade.png

ソース実装

こらから、プロジェクトを作って説明します。

ライブラリ導入

このファイルがライブラリ依頼となります。

Build.gralde
buildscript {
   ext {
      springBootVersion = '2.0.4.RELEASE'
   }
   repositories {
      mavenCentral()
   }
   dependencies {
      classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
   }
}

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'com.demo'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
   mavenCentral()
}

dependencies {
   compile('org.springframework.boot:spring-boot-starter-batch')
   compile('org.springframework.boot:spring-boot-starter-jdbc')
   compile("org.springframework.boot:spring-boot-starter-data-jpa")
   compile group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-joda', version: '2.9.4'
   compile group: 'org.jadira.usertype', name: 'usertype.core', version: '6.0.1.GA'
   compile group: 'mysql', name: 'mysql-connector-java', version: '6.0.6',
   testCompile('org.springframework.boot:spring-boot-starter-test')
   testCompile('org.springframework.batch:spring-batch-test')
}

SpringBatchの配置

まずは、グローバル設定を行います。後はJOBの配置をします。

SpringBatchConfiguration.java
@Configuration
@EnableAutoConfiguration
@EnableBatchProcessing(modular = true)
public class SpringBatchConfiguration {
    @Bean
    public ApplicationContextFactory firstJobContext() {
        return new GenericApplicationContextFactory(FirstJobConfiguration.class);
    }
    
    @Bean
    public ApplicationContextFactory secondJobContext() {
        return new GenericApplicationContextFactory(SecondJobConfiguration.class);
    }

}

エンティティクラスを作成

Json Dataのフォーマットの通りです。

Message.java
@Entity
@Table(name = "message")
public class Message {
    @Id
    @Column(name = "object_id", nullable = false)
    private String objectId;

    @Column(name = "content")
    private String content;

    @Column(name = "last_modified_time")
    private LocalDateTime lastModifiedTime;

    @Column(name = "created_time")
    private LocalDateTime createdTime;
}

Jobを作成

MessageMigrationJobConfiguration.java
public class MessageMigrationJobConfiguration {
  @Autowired
  private JobBuilderFactory jobBuilderFactory;

  @Autowired
  private StepBuilderFactory stepBuilderFactory

  @Autowired
  private EntityManagerFactory entityManager;

  // Jobを作成
  @Bean
  public Job messageMigrationJob(@Qualifier("messageMigrationStep") Step    messageMigrationStep) {
    return jobBuilderFactory.get("messageMigrationJob")
            .start(messageMigrationStep)
            .build();
  }

  // Stepを作成
  @Bean
  public Step messageMigrationStep(@Qualifier("jsonMessageReader") FlatFileItemReader<Message> jsonMessageReader,
                                 @Qualifier("messageItemWriter") JpaItemWriter<Message> messageItemWriter,
                                 @Qualifier("errorWriter") Writer errorWriter) {
    return stepBuilderFactory.get("messageMigrationStep")
            .<Message, Message>chunk(CHUNK_SIZE)
            .reader(jsonMessageReader).faultTolerant().skip(JsonParseException.class).skipLimit(SKIP_LIMIT)
            .listener(new MessageItemReadListener(errorWriter))
            .writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT)
            .listener(new MessageWriteListener())
            .build();
  }

  // Readerを作成
  @Bean
  public FlatFileItemReader<Message> jsonMessageReader() {
    FlatFileItemReader<Message> reader = new FlatFileItemReader<>();
    // 実際のJson Dataファイルパスを入れる
    reader.setResource(new FileSystemResource(new File(MESSAGE_FILE)));
    reader.setLineMapper(new MessageLineMapper());
    return reader;
 }

  // Writerを作成
  @Bean
  public JpaItemWriter<Message> messageItemWriter() {
    JpaItemWriter<Message> writer = new JpaItemWriter<>();
    writer.setEntityManagerFactory(entityManager);
    return writer;
  }
}

データプロセスメソッドを実装

ここでLineMapperを使ってtxtソースファイルから1行ずつデータを読み込んでMessageオブジェクトを転換します。

MessageLineMapper.java
public class MessageLineMapper implements LineMapper<Message> {
    private MappingJsonFactory factory = new MappingJsonFactory();

    @Override
    public Message mapLine(String line, int lineNumber) throws Exception {   
        JsonParser parser = factory.createParser(line);
        Map<String, Object> map = (Map) parser.readValueAs(Map.class);
        Message message = new Message();
        ... // TODO データ転換ロジック
        return message;
    }
}
```

#### propertiesファイルを作成
```java:application.properties
spring.datasource.url=jdbc:mysql://database
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.jpa.database-platform=org.hibernate.dialect.MySQLDialect
spring.jpa.show-sql=true
spring.jpa.properties.jadira.usertype.autoRegisterUserTypes=true
spring.jackson.serialization.write-dates-as-timestamps=false
spring.batch.initialize-schema=ALWAYS
spring.jpa.hibernate.ddl-auto=update
```

#### Listenerを実装
エラーが発生する時にリスナーがエラーメッセージをファイルに出力する

1.ReadListener

```java:MessageItemReadListener.java
public class MessageItemReadListener implements ItemReadListener<Message> {
    private Writer errorWriter;

    public MessageItemReadListener(Writer errorWriter) {
        this.errorWriter = errorWriter;
    }

    @Override
    public void beforeRead() {
    }

    @Override
    public void afterRead(Message item) {
    }

    @Override
    public void onReadError(Exception ex) {
         errorWriter.write(format("%s%n", ex.getMessage()));
    }
}
```

2.WriteListener

```java:MessageWriteListener.java
public class MessageWriteListener implements ItemWriteListener<Message> {

    @Autowired
    private Writer errorWriter;

    @Override
    public void beforeWrite(List<? extends Message> items) {
    }

    @Override
    public void afterWrite(List<? extends Message> items) {
    }

    @Override
    public void onWriteError(Exception exception, List<? extends Message> items) {
        errorWriter.write(format("%s%n", exception.getMessage()));
        for (Message message : items) {
            errorWriter.write(format("Failed writing message id: %s", message.getObjectId()));
        }
    }
}
```

#### Jobを実行
まずはテストメソッドを作ってます

```java:SpringBatchTest.java
public static void main(String[] args) {
    String jobName = args[0];

    try {
        ConfigurableApplicationContext context = SpringApplication.run(ZuociBatchApplication.class, args);
        JobRegistry jobRegistry = context.getBean(JobRegistry.class);
        Job job = jobRegistry.getJob(jobName);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        JobExecution jobExecution = jobLauncher.run(job, createJobParams());
        if (!jobExecution.getExitStatus().equals(ExitStatus.COMPLETED)) {
            throw new RuntimeException(format("%s Job execution failed.", jobName));
        }
    } catch (Exception e) {
        throw new RuntimeException(format("%s Job execution failed.", jobName));
    }
}

private static JobParameters createJobParams() {
    return new JobParametersBuilder().addDate("date", new Date()).toJobParameters();
}
```

プロジェクトをjarファイルにして以下のcommandで実行してみよう
> java -jar YOUR_BATCH_NAME.jar YOUR_JOB_NAME

#### 実行結果
実行してうまくいけば以下のようなテーブルとデータを作成するべきだ
![data.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/225740/66b3b917-ded2-71cf-8e5a-2f2ed7948f43.png)

また詳しい情報job名状態エラーメッセージなどは以下の通りです
![job.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/225740/7c5ee6aa-5f1f-fc07-90e2-249d7f996184.png)
![erro.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/225740/ccb18ee1-0b6e-0196-a071-a64b5f0ecc64.png)

### 最後に
最後まで読んでいただきありがとうございます
おかしいと思う部分は遠慮なくご指摘いただければと思います 
よろしくお願いします

参考資料
1.https://terasoluna-batch.github.io/guideline/5.0.0.RELEASE/ja/Ch02_SpringBatchArchitecture.html

2.https://spring.io/projects/spring-batch

24
31
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
24
31

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?