経緯
Spring-Bootで作られたWEBアプリケーションにおいて、
用意されたAPI実行ログファイルの内容を集計して、その結果をDBに格納する、そんなバッチ処理の実装を行いたい。
ただその要件の実装の参考になりそうなサイトがなく、とんでもなく詰まったので備忘録として残しておく。(これがベストかは不明)
※当方投稿時点でバッチ歴7日くらい
主要件
- データ中の特定の項目毎に集計を行いたい
- 集計はJava側で行う(SQLでは行わない)
- データの一部をDB格納前にマッピングした文字列に変換する
前提データ(例)
Before:集計対象のログ形式
アクセス時のURL/HTTPメソッド/HTTPステータスコード/実行時間/実行日時
がTSV形式で羅列されている
例:
/api/・・・ GET 200 48 2020/08/14 11:05:42 701 /api/・・・ GET 200 27 2020/08/14 11:05:43 352 /api/・・・・/41 DELETE 401 10 2020/08/14 11:05:46 780 /api/・・・/42 PUT 200 108 2020/08/14 11:06:16 824 /api/・・・ POST 500 806 2020/08/14 11:06:30 252 ・・・
After:DB格納時の形式
-
URL
を論理API名に加工 -
URL(API名)/HTTPメソッド/HTTPステータスコード
をキーにアクセス回数
と実行時間の平均
を集計 - レコード形式:
API名/HTTPメソッド/HTTPステータスコード/アクセス回数/実行時間平均/集計日時
- 例:
○○API GET 200 10 240
実装
方針
-
チャンクモデル
で実装- Reader:1行ずつ読み込む(1行のデータ=1Item)
- Processor:Readerから送られてきたItemの加工
- Writer:Processorから送られてきた加工済みItemのListを集計→別のWriterインスタンスを用意しそれに集計結果を渡す
ポイント
- 集計処理を
Stream API
で行う - ProcessorからItem郡を受け取るWriterと、実際に書き込み処理を行うWriterの2つを用意する
コード
※バッチの設定周りの処理やEntityクラスの内容は割愛
Dtoクラス
@Data
public class LogCollectedDto {
// API名
private String apiName;
// HTTPメソッド
private String httpMethod;
// HTTPステータスコード
private String httpCode;
// 実行時間(ms)
private String executionTime;
// 集計日時
private String collectedDate;
}
Reader
Beanで定義
@Bean
public FlatFileItemReader<LogCollectedDto> reader() {
final String READ_FILE_PATH = <読み込むログファイル名>;
FlatFileItemReader<LogCollectedDto> reader = new FlatFileItemReader<>();
reader.setResource(new FileSystemResource(READ_FILE_PATH));
reader.setEncoding(StandardCharsets.UTF_8.name());
reader.setLinesToSkip(0);
reader.setLineMapper(
new DefaultLineMapper() {
{
setLineTokenizer(
new DelimitedLineTokenizer(DelimitedLineTokenizer.DELIMITER_TAB) {
{
setNames(
new String[] {
"apiUrl", "httpMethod", "httpCode", "executionTime", "collectedDate"
});
}
});
setFieldSetMapper(
new BeanWrapperFieldSetMapper<LogCollectedDto>() {
{
setTargetType(LogCollectedDto.class);
}
});
}
});
return reader;
}
Processor
別クラスに切り出し
public class CustomItemProcessor implements ItemProcessor<LogCollectedDto, LogCollectedDto> {
@Override
public LogCollectedDto process(LogCollectedDto item) throws Exception {
// 取得したItemのバリデーション、falseならその行はスキップ
if (!validateItem(item)) {
return null;
}
// 後に加工するため取得したItemを一度別変数に保持
// (引数に直接手を加えると中断して再開した時に取得したItemが加工後のデータになる可能性がある)
LogCollectedDto afterItem = item;
// データ内容加工(別メソッド割愛)
afterItem.setApiName(getApiName(・・・));
return afterItem;
}
//(略)
}
Writer
別クラスに切り出し
@RequiredArgsConstructor
public class CustomItemWriter extends JpaItemWriter<LogCollectedDto> {
private final JpaItemWriter<Log> jpaItemWriter;
@Override
public void write(List<? extends LogCollectedDto> items) {
// Processorから受け取ったItemを集計して、別口のWriterインスタンスに渡す
Map<String, List<LogCollectedDto>> groupingMap = groupingItems(items);
jpaItemWriter.write(collectItems(groupingMap));
}
/**
* Processorから受け取ったItemをグルーピング
* API名、HTTPステータスを複合キーとする
*
* @param list
* @return
*/
private Map<String, List<LogCollectedDto>> groupingItems(List<? extends LogCollectedDto> list) {
// 複合キーを作成
Function<LogCollectedDto, String> compositeKey =
logDto -> {
StringBuffer sb = new StringBuffer();
sb.append(logDto.getApiName()).append("-").append(logDto.getHttpMethod());
return sb.toString();
};
Map<String, List<LogCollectedDto>> grpByComplexKeys =
list.stream().collect(Collectors.groupingBy(compositeKey));
return grpByComplexKeys;
}
/**
* グルーピングしたItemを集計してEntityのリストを生成
*
* @param groupingMap
* @return
*/
private List<Log> collectItems(Map<String, List<LogCollectedDto>> groupingMap) {
List<Log> recordList = new ArrayList<>();
for (List<LogCollectedDto> dtoList : groupingMap.values()) {
// Entityクラスのインスタンス生成
Log record = new Log();
// 集計処理
record.setApiName(dtoList.stream().findFirst().get().getApiName());
record.setHttpCode(dtoList.stream().findFirst().get().getHttpCode());
record.setHttpMethod(dtoList.stream().findFirst().get().getHttpMethod());
record.setAccesses(dtoList.size());
record.setAverageTime(
dtoList.stream()
.collect(
Collectors.averagingDouble(dto -> Double.parseDouble(dto.getExecutionTime()))));
record.setCollectedTime(LocalDateTime.now());
recordList.add(record);
}
return recordList;
}
}
懸念点
- 上記の実装のみだと、一度に集計できる行数は設定したチャンクサイズまでとなってしまう
- →上記を1つのステップとし、次のステップでDBに格納した内容に同一キーが存在すれば再集計する実装にすればいけそう(DB負荷高そうだけど・・・)
- Writerの責務が散らかっている感が否めない
まとめ
懸念はありますが、個人的には一番スッキリした書き方だと思っています。
ご意見随時募集しています。
結論:Stream API最強!!!