LoginSignup
4
0

More than 3 years have passed since last update.

FluentdのログをAsakusaで集計する

Last updated at Posted at 2019-05-09

はじめに

本稿では、Fluentdから逐次送信されるJSON Lines形式のログをAsakusaで入力して集計するサンプルアプリケーションを説明します。
前回の記事 Asakusa on M3BPとDBを連携するで紹介したAsakusaアプリケーションと同じ仕様ですが、前回はDatabaseから売上明細を連携していたのをFluentdから出力されたログを入力します。データフロー図で表現すると以下のようになります。
fluentd_m3bp_dfd.png

動作環境

以下の環境で動作確認をしています。

  • MacOS 10.13 (High Sierra)
  • Docker Engine Version 18.09.2

ソースコードはGitHubにあります。

Dockerコンテナの設定

docker上にfluentdとm3bpという2つのコンテナを設定します。
Host OS上のディレクトリ./directioを2つのコンテナにmountしています。
./directioディレクトリ以下には入力となるデータとバッチの結果ファイルが配置されます。
./workspaceはAsakusaのソースコードが配置されていて、m3bpコンテナ上でAsakusa on M3BP用にビルドしてデプロイを行います。

docker.png

docker-compose.yml
version: '3'
services:
  fluentd:
    build: ./fluentd
    ports:
      - "24224:24224"
    volumes:
      - ./fluentd/etc:/fluentd/etc
      - ./directio:/directio
    environment:
      - FLUENTD_CONF=fluent.conf
  m3bp:
    build: .
    volumes:
      - ./workspace:/workspace
      - ./directio:/directio

m3bpコンテナのDockerfileは次のようになっています。Asakusa on M3BPのビルドに必要なパッケージをインストールしています。

Dockerfile
FROM centos:centos7

ENV ASAKUSA_HOME /workspace/asakusa
ENV JAVA_HOME /usr/lib/jvm/java
ENV PATH ${PATH}:${JAVA_HOME}/bin:${ASAKUSA_HOME}/bin
ADD ./workspace /workspace
ADD ./directio /directio
WORKDIR /workspace

RUN yum install -y cmake make gcc-c++ hwloc java-1.8.0-openjdk java-1.8.0-openjdk-devel

CMD ["bash", "/workspace/build.sh"]

次のコマンドでDockerコンテナを開始します。

docker-compose up -d

Fluentdの設定

JSON Lines形式の売上明細ログを入力して30秒毎に./directio/salesディレクトリ以下に出力しています。
本サンプルではdummyプラグインで固定の売上明細を生成しています。

fluentd/etc/fluent.conf
<source>
  @type dummy
  tag sales.log
  dummy {"sales_date_time":"2019-04-01 10:30:00","item_code":"4922010001000","amount":2,"unit_selling_price":120,"selling_price":240}
</source>

<match sales.**>
  @type file
  path /directio/sales/sales
  time_slice_format %Y-%m-%d-%H%M
  <format>
    @type json
  </format>
  <buffer>
    @type file
    path /fluentd/buffer/
    flush_mode interval
    flush_interval 30s
    timekey 5m
  </buffer>
</match>

fluentdコンテナが開始されてしばらく待つと以下のようなファイル名で30秒毎にログが出力されます。ファイル名の日付と時間は5分毎に変更されます。

ll directio/sales/
total 32
-rw-r--r--  1 suga  staff  4158  4 24 18:36 sales.2019-04-24-1835_0.log
-rw-r--r--  1 suga  staff  4032  4 24 18:37 sales.2019-04-24-1835_1.log
-rw-r--r--  1 suga  staff  3780  4 24 18:37 sales.2019-04-24-1835_2.log

DMDLの設定

Direct I/O JSON ※ の機能を利用してJSON Lines形式の売上明細を定義しています。
(※0.10.3から追加された機能です)

workspace/example-m3bp/src/main/dmdl/models.dmdl
"売上明細"
@directio.json(
    format = jsonl,
    datetime_format = "yyyy-MM-dd HH:mm:ss"
)
sales_detail = {
    sales_date_time : DATETIME;
    item_code : TEXT;
    amount : INT;
    unit_selling_price : INT;
    selling_price : INT;
};

ファイル入力定義情報の設定

入力するファイル名のパターンはgetResourcePattaernに定義します。${date}はバッチ引数dateの値で置換されます。

workspace/example-m3bp/src/main/java/com/example/jobflow/SalesDetailFromJson.java
package com.example.jobflow;

import com.example.modelgen.dmdl.json.AbstractSalesDetailJsonInputDescription;

/**
 * 売上明細をDirect I/Oで入力する。
 * 入力ファイルは {@code sales} 上のすべてのファイル。
 */
public class SalesDetailFromJson extends AbstractSalesDetailJsonInputDescription {

    @Override
    public String getBasePath() {
        return "sales";
    }

    @Override
    public String getResourcePattern() {
        return "**/sales.${date}*.log";
    }

    @Override
    public DataSize getDataSize() {
        return DataSize.LARGE;
    }
}

Asakusaバッチアプリケーションのビルドとデプロイ

以下のコマンドでDockerfileで設定した/workspace/build.shコマンドが実行されAsakusaバッチアプリケーションがm3bpコンテナ内でビルドされてASAKUSA_HOMEへデプロイされます。

docker-compose run m3bp
workspace/build.sh
#!/bin/bash
PRJ_HOME=/workspace/example-m3bp

cd ${PRJ_HOME}
./gradlew -g /workspace/lib assemble
rm -rf $ASAKUSA_HOME
mkdir $ASAKUSA_HOME
cd $ASAKUSA_HOME
tar xvf ${PRJ_HOME}/build/asakusafw-example-m3bp.tar.gz
java -jar tools/bin/setup.jar

バッチ実行

バッチ引数に集計対象のJSONファイルを指定するため日付と時間を指定しています。
下記の場合、2019-04-24を含むJSONファイルが入力され、指定した日付のカテゴリ別の売上合計が計算されます。

docker-compose run m3bp asakusa run m3bp.example.summarizeSales -A date=2019-04-24

なお、5分毎の集計を求めたい場合には次のように実行します。

docker-compose run m3bp asakusa run m3bp.example.summarizeSales -A date=2019-04-24-1835

結果ファイルの確認

結果ファイルは、ホストOS上の以下のディレクトリに出力されます。

cat directio/result/category/result.csv 
カテゴリコード,販売数量,売上合計
1600,850,102000

最後に

本稿では、JSON Lines形式のファイルを入力する簡単なサンプルをご確認いただきました。
IoTのセンサーデータなどをfluentdなどで収集し、M3BPやHadoop上で加工するといった用途など色々と応用ができるかと思いますので、参考になれば幸いです。

4
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
0