概要
Spring Batchを利用して、バッチ処理を記述する。
調べたこと、検証したこと等を備忘録として記録。
環境
開発環境は以下
- Windows10(64bit)
- Java 1.8.0_171
- Spring Boot 2.0.2
- Gradle 4.3
想定する利用方法
今回作成するバッチは、コマンドラインから起動される想定。
利用するモデル
チャンクモデルで実装。
Jobに設定するStepは以下のようになる。
read(ItemReader) → processor(ItemProcessor) → write(ItemWriter)
処理の流れ
- DBから処理データの取得(reade)
- 取得したデータを元に外部サービスの呼び出し(processor)
- 「2」の結果を元に、固定長ファイルを出力(writer)
実装
ItemReaderの作成
JdbcCursorItemReaderBuilderを利用して、DBにアクセスする。
@Configuration
public class Step1 implements ItemReader<ClassificationDomain> {
@Bean
public ItemReader<ClassificationDomain> jdbcReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<PersonDomain>()
.dataSource(dataSource)
.name("jdbc-reader")
.sql("select id,name,age from person")
.rowMapper(new PersonRowmapper())
.build();
}
@Override
public ClassificationDomain read() throws Exception,
UnexpectedInputException, ParseException, NonTransientResourceException {
// TODO Auto-generated method stub
return null;
}
Domain,RowMapperは以下の通り。
(lombok利用)
@Data
public class PersonDomain {
String id;
String name;
String status;
int age;
}
@Component
public class PersonRowmapper implements RowMapper<PersonDomain> {
@Override
public PersonDomain mapRow(ResultSet rs, int rowNum) throws SQLException {
PersonDomain person = new PersonDomain();
person.setId(rs.getString("id"));
person.setName(rs.getString("name"));
person.setAge(rs.getInt("age"));
return person;
}
}
ItemProcessorの作成
ItemProcessorは、以下のようになっています。
(以下の例ではInput、Outputともに同じクラスを利用するので念の為・・・)
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
ProcessorではRESTサービスを呼び出しています。
RESTの呼び出しは、RestTemplateを利用。
public class Step2 implements ItemProcessor<PersonDomain, PersonDomain> {
@Override
public PersonDomain process(PersonDomain item) throws Exception {
PersonStatusEntity entity = new PersonStatusEntity();
entity.setId(item.getCode());
URI uri = UriComponentsBuilder.newInstance().scheme("http").host("localhost").port(8080).pathSegment("api").pathSegment("rest").pathSegment("personstatus").build().toUri();
RequestEntity<PersonStatusEntity> requestEntity = RequestEntity
.post(uri)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.body(entity);
RestOperations restOperations = new RestTemplateBuilder()
.defaultMessageConverters()
.detectRequestFactory(false)
.build();
ResponseEntity<String> responseEntity = restOperations.exchange(requestEntity, String.class);
item.setStatus(responseEntity.getStatusCode())
return item;
}
@Data
public class PersonStatusEntity {
@JsonProperty("id")
private String id = null;
}
ItemWriterの作成
@Configuration
public class Step3 implements ItemWriter<PersonDomain> {
@Override
public void write(List<? extends PersonDomain> items) throws Exception {
// 固定長ファイルを扱うので「FormatterLineAggregator」を利用
FormatterLineAggregator<PersonDomain> aggregator = new FormatterLineAggregator<PersonDomain>();
// フィールドの設定をしたいので、「BeanWrapperFieldExtractor」で実装
BeanWrapperFieldExtractor<PersonDomain> extractor = new BeanWrapperFieldExtractor<PersonDomain>();
extractor.setNames(new String[] { "id", "name","age","status"});
aggregator.setFieldExtractor(extractor);
//String.formatの形式で固定長を取り扱う
aggregator.setFormat("%6s%-10s%05d%5s");
FlatFileItemWriter<PersonDomain> writer = new FlatFileItemWriterBuilder<PersonDomain>()
.name("fixedlengthfile-make")
.resource(new FileSystemResource("createfixedlengthfile"))
.lineAggregator(aggregator)
.build();
//ヘッダーが必要な場合は、CallBackを利用
writer.setHeaderCallback(new HeaderCopyCallback("Header"));
writer.setShouldDeleteIfExists(true);
writer.afterPropertiesSet();
//openが必要か?は調査する必要あり。(ないとInitializeされてないというエラーが発生した。)
writer.open(new ExecutionContext());
writer.write(items);
}
}
HeaderCopyCallbackは以下のようにしています。
public class HeaderCopyCallback implements FlatFileHeaderCallback{
private String header;
public HeaderCopyCallback(String header) {
this.header=header;
}
@Override
public void writeHeader(Writer writer) throws IOException {
writer.write(this.header);
}
}
Jobの定義と実行
Configurationクラスは以下のようにしました。
@Configuration
@EnableBatchProcessing
public class BatchConfigration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public Step newstep(Step1 step1, Step2 step2, Step3 step3) {
return stepBuilderFactory.get("db-read")
.<PersonDomain, PersonDomain>chunk(10)
.reader(step1.jdbcReader(dataSourceConfig.dataSource(datasource //データソースを渡す)))
.processor(step2)
.writer(step3)
.build();
}
@Bean
Job job1(Step step) throws Exception {
//コマンドラインからは、「job1」で実行する.
return jobBuilderFactory.get("job1")
.incrementer(new RunIdIncrementer())
.start(step)
.build();
}
@Bean
public Step1 step1 () {
return new Step1();
}
@Bean
public Step2 step2 () {
return new Step2();
}
@Bean
public Step3 step3 () {
return new Step3();
}
}
mainのクラスは以下のようにしました。
@SpringBootApplication
public class Appliciation {
public static void main(String[] args) throws Exception {
SpringApplication.run(Appliciation.class, args);
}
}
jarを作成して、コマンドラインから実行
java -jar batch.jar -job job1