0
5

More than 3 years have passed since last update.

spring-batchで集計処理をしたい

Last updated at Posted at 2020-09-02

経緯

Spring-Bootで作られたWEBアプリケーションにおいて、
用意されたAPI実行ログファイルの内容を集計して、その結果をDBに格納する、そんなバッチ処理の実装を行いたい。
ただその要件の実装の参考になりそうなサイトがなく、とんでもなく詰まったので備忘録として残しておく。(これがベストかは不明)
※当方投稿時点でバッチ歴7日くらい

主要件

  • データ中の特定の項目毎に集計を行いたい
  • 集計はJava側で行う(SQLでは行わない)
  • データの一部をDB格納前にマッピングした文字列に変換する

前提データ(例)

Before:集計対象のログ形式

アクセス時のURL/HTTPメソッド/HTTPステータスコード/実行時間/実行日時がTSV形式で羅列されている
例:
/api/・・・ GET 200 48 2020/08/14 11:05:42 701
/api/・・・ GET 200 27 2020/08/14 11:05:43 352
/api/・・・・/41 DELETE 401 10 2020/08/14 11:05:46 780
/api/・・・/42 PUT 200 108 2020/08/14 11:06:16 824
/api/・・・ POST 500 806 2020/08/14 11:06:30 252
・・・

After:DB格納時の形式

  • URLを論理API名に加工
  • URL(API名)/HTTPメソッド/HTTPステータスコードをキーにアクセス回数実行時間の平均を集計
  • レコード形式:API名/HTTPメソッド/HTTPステータスコード/アクセス回数/実行時間平均/集計日時
  • 例:○○API GET 200 10 240

実装

方針

  • チャンクモデルで実装
    • Reader:1行ずつ読み込む(1行のデータ=1Item)
    • Processor:Readerから送られてきたItemの加工
    • Writer:Processorから送られてきた加工済みItemのListを集計→別のWriterインスタンスを用意しそれに集計結果を渡す

ポイント

  • 集計処理をStream APIで行う
  • ProcessorからItem郡を受け取るWriterと、実際に書き込み処理を行うWriterの2つを用意する

コード

※バッチの設定周りの処理やEntityクラスの内容は割愛

Dtoクラス

@Data
public class LogCollectedDto {
  // API名
  private String apiName;
  // HTTPメソッド
  private String httpMethod;
  // HTTPステータスコード
  private String httpCode;
  // 実行時間(ms)
  private String executionTime;
  // 集計日時
  private String collectedDate;
}

Reader

Beanで定義

  @Bean
  public FlatFileItemReader<LogCollectedDto> reader() {

    final String READ_FILE_PATH = <読み込むログファイル名>;

    FlatFileItemReader<LogCollectedDto> reader = new FlatFileItemReader<>();
    reader.setResource(new FileSystemResource(READ_FILE_PATH));
    reader.setEncoding(StandardCharsets.UTF_8.name());
    reader.setLinesToSkip(0);
    reader.setLineMapper(
        new DefaultLineMapper() {
          {
            setLineTokenizer(
                new DelimitedLineTokenizer(DelimitedLineTokenizer.DELIMITER_TAB) {
                  {
                    setNames(
                        new String[] {
                          "apiUrl", "httpMethod", "httpCode", "executionTime", "collectedDate"
                        });
                  }
                });
            setFieldSetMapper(
                new BeanWrapperFieldSetMapper<LogCollectedDto>() {
                  {
                    setTargetType(LogCollectedDto.class);
                  }
                });
          }
        });
    return reader;
  }

Processor

別クラスに切り出し

public class CustomItemProcessor implements ItemProcessor<LogCollectedDto, LogCollectedDto> {

  @Override
  public LogCollectedDto process(LogCollectedDto item) throws Exception {

    // 取得したItemのバリデーション、falseならその行はスキップ
    if (!validateItem(item)) {
      return null;
    }

    // 後に加工するため取得したItemを一度別変数に保持
    // (引数に直接手を加えると中断して再開した時に取得したItemが加工後のデータになる可能性がある)
    LogCollectedDto afterItem = item;
    // データ内容加工(別メソッド割愛)
    afterItem.setApiName(getApiName(・・・));

    return afterItem;
  }

//(略)

}

Writer

別クラスに切り出し

@RequiredArgsConstructor
public class CustomItemWriter extends JpaItemWriter<LogCollectedDto> {

  private final JpaItemWriter<Log> jpaItemWriter;

  @Override
  public void write(List<? extends LogCollectedDto> items) {

   // Processorから受け取ったItemを集計して、別口のWriterインスタンスに渡す
    Map<String, List<LogCollectedDto>> groupingMap = groupingItems(items);
    jpaItemWriter.write(collectItems(groupingMap));
  }

  /**
   * Processorから受け取ったItemをグルーピング 
   * API名、HTTPステータスを複合キーとする
   *
   * @param list
   * @return
   */
  private Map<String, List<LogCollectedDto>> groupingItems(List<? extends LogCollectedDto> list) {
    // 複合キーを作成
    Function<LogCollectedDto, String> compositeKey =
        logDto -> {
          StringBuffer sb = new StringBuffer();
          sb.append(logDto.getApiName()).append("-").append(logDto.getHttpMethod());
          return sb.toString();
        };

    Map<String, List<LogCollectedDto>> grpByComplexKeys =
        list.stream().collect(Collectors.groupingBy(compositeKey));

    return grpByComplexKeys;
  }

  /**
   * グルーピングしたItemを集計してEntityのリストを生成
   *
   * @param groupingMap
   * @return
   */
  private List<Log> collectItems(Map<String, List<LogCollectedDto>> groupingMap) {

    List<Log> recordList = new ArrayList<>();

    for (List<LogCollectedDto> dtoList : groupingMap.values()) {
      // Entityクラスのインスタンス生成
      Log record = new Log();
      // 集計処理
      record.setApiName(dtoList.stream().findFirst().get().getApiName());
      record.setHttpCode(dtoList.stream().findFirst().get().getHttpCode());
      record.setHttpMethod(dtoList.stream().findFirst().get().getHttpMethod());
      record.setAccesses(dtoList.size());
      record.setAverageTime(
          dtoList.stream()
              .collect(
                  Collectors.averagingDouble(dto -> Double.parseDouble(dto.getExecutionTime()))));
      record.setCollectedTime(LocalDateTime.now());
      recordList.add(record);
    }

    return recordList;
  }
}

懸念点

  • 上記の実装のみだと、一度に集計できる行数は設定したチャンクサイズまでとなってしまう
    • →上記を1つのステップとし、次のステップでDBに格納した内容に同一キーが存在すれば再集計する実装にすればいけそう(DB負荷高そうだけど・・・)
  • Writerの責務が散らかっている感が否めない

まとめ

懸念はありますが、個人的には一番スッキリした書き方だと思っています。
ご意見随時募集しています。
結論:Stream API最強!!!

0
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
5