はじめに
本稿では、Fluentdから逐次送信されるJSON Lines形式のログをAsakusaで入力して集計するサンプルアプリケーションを説明します。
前回の記事 Asakusa on M3BPとDBを連携するで紹介したAsakusaアプリケーションと同じ仕様ですが、前回はDatabaseから売上明細を連携していたのをFluentdから出力されたログを入力します。データフロー図で表現すると以下のようになります。
動作環境
以下の環境で動作確認をしています。
- MacOS 10.13 (High Sierra)
- Docker Engine Version 18.09.2
ソースコードはGitHubにあります。
Dockerコンテナの設定
docker上にfluentdとm3bpという2つのコンテナを設定します。
Host OS上のディレクトリ./directio
を2つのコンテナにmountしています。
./directio
ディレクトリ以下には入力となるデータとバッチの結果ファイルが配置されます。
./workspace
はAsakusaのソースコードが配置されていて、m3bpコンテナ上でAsakusa on M3BP用にビルドしてデプロイを行います。
version: '3'
services:
fluentd:
build: ./fluentd
ports:
- "24224:24224"
volumes:
- ./fluentd/etc:/fluentd/etc
- ./directio:/directio
environment:
- FLUENTD_CONF=fluent.conf
m3bp:
build: .
volumes:
- ./workspace:/workspace
- ./directio:/directio
m3bpコンテナのDockerfileは次のようになっています。Asakusa on M3BPのビルドに必要なパッケージをインストールしています。
FROM centos:centos7
ENV ASAKUSA_HOME /workspace/asakusa
ENV JAVA_HOME /usr/lib/jvm/java
ENV PATH ${PATH}:${JAVA_HOME}/bin:${ASAKUSA_HOME}/bin
ADD ./workspace /workspace
ADD ./directio /directio
WORKDIR /workspace
RUN yum install -y cmake make gcc-c++ hwloc java-1.8.0-openjdk java-1.8.0-openjdk-devel
CMD ["bash", "/workspace/build.sh"]
次のコマンドでDockerコンテナを開始します。
docker-compose up -d
Fluentdの設定
JSON Lines形式の売上明細ログを入力して30秒毎に./directio/sales
ディレクトリ以下に出力しています。
本サンプルではdummyプラグインで固定の売上明細を生成しています。
<source>
@type dummy
tag sales.log
dummy {"sales_date_time":"2019-04-01 10:30:00","item_code":"4922010001000","amount":2,"unit_selling_price":120,"selling_price":240}
</source>
<match sales.**>
@type file
path /directio/sales/sales
time_slice_format %Y-%m-%d-%H%M
<format>
@type json
</format>
<buffer>
@type file
path /fluentd/buffer/
flush_mode interval
flush_interval 30s
timekey 5m
</buffer>
</match>
fluentdコンテナが開始されてしばらく待つと以下のようなファイル名で30秒毎にログが出力されます。ファイル名の日付と時間は5分毎に変更されます。
ll directio/sales/
total 32
-rw-r--r-- 1 suga staff 4158 4 24 18:36 sales.2019-04-24-1835_0.log
-rw-r--r-- 1 suga staff 4032 4 24 18:37 sales.2019-04-24-1835_1.log
-rw-r--r-- 1 suga staff 3780 4 24 18:37 sales.2019-04-24-1835_2.log
DMDLの設定
Direct I/O JSON ※ の機能を利用してJSON Lines形式の売上明細を定義しています。
(※0.10.3から追加された機能です)
"売上明細"
@directio.json(
format = jsonl,
datetime_format = "yyyy-MM-dd HH:mm:ss"
)
sales_detail = {
sales_date_time : DATETIME;
item_code : TEXT;
amount : INT;
unit_selling_price : INT;
selling_price : INT;
};
ファイル入力定義情報の設定
入力するファイル名のパターンはgetResourcePattaern
に定義します。${date}
はバッチ引数date
の値で置換されます。
package com.example.jobflow;
import com.example.modelgen.dmdl.json.AbstractSalesDetailJsonInputDescription;
/**
* 売上明細をDirect I/Oで入力する。
* 入力ファイルは {@code sales} 上のすべてのファイル。
*/
public class SalesDetailFromJson extends AbstractSalesDetailJsonInputDescription {
@Override
public String getBasePath() {
return "sales";
}
@Override
public String getResourcePattern() {
return "**/sales.${date}*.log";
}
@Override
public DataSize getDataSize() {
return DataSize.LARGE;
}
}
Asakusaバッチアプリケーションのビルドとデプロイ
以下のコマンドでDockerfileで設定した/workspace/build.sh
コマンドが実行されAsakusaバッチアプリケーションがm3bpコンテナ内でビルドされてASAKUSA_HOME
へデプロイされます。
docker-compose run m3bp
# !/bin/bash
PRJ_HOME=/workspace/example-m3bp
cd ${PRJ_HOME}
./gradlew -g /workspace/lib assemble
rm -rf $ASAKUSA_HOME
mkdir $ASAKUSA_HOME
cd $ASAKUSA_HOME
tar xvf ${PRJ_HOME}/build/asakusafw-example-m3bp.tar.gz
java -jar tools/bin/setup.jar
バッチ実行
バッチ引数に集計対象のJSONファイルを指定するため日付と時間を指定しています。
下記の場合、2019-04-24
を含むJSONファイルが入力され、指定した日付のカテゴリ別の売上合計が計算されます。
docker-compose run m3bp asakusa run m3bp.example.summarizeSales -A date=2019-04-24
なお、5分毎の集計を求めたい場合には次のように実行します。
docker-compose run m3bp asakusa run m3bp.example.summarizeSales -A date=2019-04-24-1835
結果ファイルの確認
結果ファイルは、ホストOS上の以下のディレクトリに出力されます。
cat directio/result/category/result.csv
カテゴリコード,販売数量,売上合計
1600,850,102000
最後に
本稿では、JSON Lines形式のファイルを入力する簡単なサンプルをご確認いただきました。
IoTのセンサーデータなどをfluentdなどで収集し、M3BPやHadoop上で加工するといった用途など色々と応用ができるかと思いますので、参考になれば幸いです。