Java
hadoop
batch
Impala
AsakusaFramework

Asakusa FrameworkとImpalaを連携してBIツールで可視化する

More than 1 year has passed since last update.


はじめに

前回の記事 Asakusa Frameworkで Hello, World! ではAsakusa Frameowkの実装方法について説明しました。この記事はより業務シナリオに近い内容について紹介していきたいと思います。

下記の図は、様々なデータをHadoop(HDFS)上に蓄積し、Asakusa on Sparkで加工したデータをBIツール等で可視化するというデータ分析基盤の一例です。データ規模は数十TB〜数PBといった非常に大きなデータを扱うことを想定したものとなっています。

今回の記事ではこれら要素技術のAsakusa on SparkとSQLクエリーエンジンとしてImpalaを例に連携方法を紹介します。

architecture.png


この記事でやりたいこと

入力ファイルはHadoopのHDFS上に配備済みであるものとします。本来は入力ファイルを加工後に出力するのですが、このサンプルシナリオでは入力ファイルをそのままHDFS上にParquet形式に変換して出力するだけです。出力したデータをImpalaの外部表として定義し、BIツールから参照します。

asakusa_to_impala.png

salesデータモデルは次のようになっています。Asakusaのプロパティのデータ形式とImpalaのデータ形式を変換する機能については、DMDL定義で説明します。

データ項目
Asakusaデータ型
Impalaデータ型

売上日付(sales_date)
DATE
TIMESTAMP

商品コード(item_code)
TEXT
STRING

販売数量(amount)
INT
INT

販売金額(selling_price)
INT
INT


動作環境

以下の環境で動作確認をしています。


  • MapR 5.2.1(Spark2.1.0、Impala2.7.0、Hive2.1、Hive Metastore2.1) ※Spark2.1であれば特にMapR以外のHaoopクラスタでも問題ありません。

  • JDK 1.8.0_161

  • Gradle 4.5

なお、Asakusa Framework 0.10のDirect I/O Hiveは、Hive1.2.2を使用しているため、Hiveのバージョン間によるデータの互換性について、全てを動作検証しているわけではございませんのでご了承ください。

以下は事前作業として完了していること。


  • JDK及びGradleのインストール

  • Hadoopクラスタ上にasakusaユーザーを作成

  • HDFS上にユーザーディレクトリの作成


開発環境の準備

まずはじめにプロジェクトフォルダを作成します。このサンプルでは以下のフォルダ上で作業します。

mkdir asakusa-example-parquet

プロジェクトフォルダ上にGradleスクリプトファイルを作成します。今回はSparkを実行エンジンにするため、Asakusa on Spark Pluginを構成しています。

作成したGradleスクリプトの詳細については、Asakusa Gradle Plugin リファレンスを参照してください。


build.gradle

group 'com.example'

buildscript {
repositories {
maven { url 'http://asakusafw.s3.amazonaws.com/maven/releases' }
maven { url 'http://asakusafw.s3.amazonaws.com/maven/snapshots' }
}
dependencies {
classpath group: 'com.asakusafw.gradle', name: 'asakusa-distribution', version: '0.10.0'
}
}

apply plugin: 'asakusafw-sdk'
apply plugin: 'asakusafw-organizer'
apply plugin: 'asakusafw-spark'
apply plugin: 'eclipse'

asakusafw {
sdk.hive true
}

asakusafwOrganizer {
hive.enabled true
profiles.prod {
assembly.into('.') {
put 'src/dist/prod'
}
}
}


デフォルトのAsakusa設定ファイルから、DirectI/O のファイルシステムパス(com.asakusafw.directio.root.fs.path)をtarget/testing/directioからdirectioに変更しています。

この設定により、このサンプルシナリオではDirectI/Oのルートパスがhdfs:///user/asakusa/directioになります。


src/dist/prod/core/conf/asakusa-resources.xml

<?xml version="1.0" encoding="UTF-8"?>

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>com.asakusafw.runtime.core.Report.Delegate</name>
<value>com.asakusafw.runtime.report.CommonsLoggingReport</value>
</property>

<property>
<name>com.asakusafw.directio.root</name>
<value>com.asakusafw.runtime.directio.hadoop.HadoopDataSource</value>
</property>
<property>
<name>com.asakusafw.directio.root.path</name>
<value>/</value>
</property>
<property>
<name>com.asakusafw.directio.root.fs.path</name>
<value>directio</value>
</property>
</configuration>



DMDL

データモデルを定義する DMDL(Data Model Definition Language) スクリプトファイルを作成します。

このモデルは入力がCSV形式、出力がParquet形式を想定しています。

Parquet形式で出力する場合、Direct I/O HiveというAsakusa Frameworkの機能を利用します。Parquet形式の他、ORC形式にも対応しています。

以下のsales_dateプロパティはHive標準のマッピングでは、Asakusaデータ型と同じDATEですが、ImpalaにはHiveのDATE型に相当する型がないため、Direct I/O Hiveのマッピング型変換機能を利用してTIMESTAMP型に変換しています。


src/main/dmdl/sales.dmdl

@directio.csv

@directio.hive.parquet(
table_name = "sales"
)
sales = {
@directio.hive.timestamp
sales_date : DATE;

item_code : TEXT;

amount : INT;

selling_price : INT;
};


以下のコマンドを実行してDMDLからデータモデルクラスを生成します。

gradle compileDMDL


入出力定義クラス

入力定義クラスは、inputパス上のsales.csvファイルを入力します。


src/main/java/com/example/jobflow/SalesFromCSV.java

package com.example.jobflow;

import com.example.modelgen.dmdl.csv.AbstractSalesCsvInputDescription;

public class SalesFromCSV extends AbstractSalesCsvInputDescription {

@Override
public String getBasePath() {
return "input";
}

@Override
public String getResourcePattern() {
return "sales.csv";
}
}


出力定義クラスは、result/salesパス上にsales.parquet.*というファイル名で出力します。ファイル名にワイルドカード(*)がある場合、分散並列処理の結果ファイルが分割されたまま一意な名前が割り当てられて並列で出力されるため、高速化が望めます。


src/main/java/com/example/jobflow/SalesToParquet.java

package com.example.jobflow;

import java.util.Arrays;
import java.util.List;

import com.example.modelgen.dmdl.hive.parquet.AbstractSalesParquetFileOutputDescription;

public class SalesToParquet extends AbstractSalesParquetFileOutputDescription {

@Override
public String getBasePath() {
return "result/sales";
}

@Override
public String getResourcePattern() {
return "sales.parquet.*";
}

@Override
public List<String> getDeletePatterns() {
return Arrays.asList("*");
}
}



ジョブフローとバッチクラス

今回のサンプルシナリオでは演算子の実装はしないため、入力から出力に結線したジョブフローとそのジョブフローを実行するバッチクラスを実装するだけです。


src/main/java/com/example/jobflow/SalesParquetJob.java

package com.example.jobflow;

import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;
import com.example.modelgen.dmdl.model.Sales;

@JobFlow(name = "salesParquetJob")
public class SalesParquetJob extends FlowDescription {
final In<Sales> salesIn;
final Out<Sales> salesOut;
public SalesParquetJob(
@Import(name = "salesIn", description = SalesFromCSV.class)
In<Sales> salesIn,
@Export(name = "salesOut", description = SalesToParquet.class)
Out<Sales> salesOut) {
this.salesIn = salesIn;
this.salesOut = salesOut;
}

@Override
protected void describe() {
salesOut.add(salesIn);
}
}



src/main/java/com/example/batch/SalesParquetBatch.java

package com.example.batch;

import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;
import com.example.jobflow.SalesParquetJob;

@Batch(name = "example.salesParquet")
public class SalesParquetBatch extends BatchDescription {

@Override
protected void describe() {
run(SalesParquetJob.class).soon();
}
}



アプリケーションのビルド

以下のコマンドで作成されたデプロイメントアーカイブファイルをHadoopクラスタ(asakusaユーザーの$HOMEパス上)へコピーします。

gradle assemble

./build/asakusafw-asakusa-example-parquet.tar.gz


アプリケーションのデプロイ

下記例のように、.bash_profileなどに環境変数ASAKUSA_HOMESPARK_CMDを設定します。

ただし、spark-submitコマンドがパス上に存在する場合は、SPARK_CMD環境変数は設定する必要はありません。


.bash_profile

export ASAKUSA_HOME=${HOME}/asakusa

export SPARK_CMD=/opt/mapr/spark/spark-2.1.0/bin/spark-submit

ASAKUSA_HOME環境変数のパス上にデプロイメントアーカイブファイルを展開して、setup.jarコマンドを実行します。

$ rm -r $ASAKUSA_HOME

$ mkdir $ASAKUSA_HOME
$ cd $ASAKUSA_HOME
$ tar xvf ~/asakusafw-asakusa-example-parquet.tar.gz
$ java -jar $ASAKUSA_HOME/tools/bin/setup.jar


入力ファイルの配備

ランダムで生成した下記のCSVファイルをHDFS上に配備します。


sales.csv

2008-05-04,ilCQBVYBWSVOO,46,224

2001-02-28,nTMbJJqLzwYqw,4,208
2003-05-09,MASAMJmjlETfL,18,246
1996-04-18,RCXfHnTwpcqFS,50,249
2004-01-15,RqppmAoSuhamm,1,360
1994-01-02,kjVImLuoLaeQb,9,600
2013-08-22,JPQkeJNzMQtjI,5,250
1991-05-12,aLzNHOcSqcrys,22,785
1981-08-01,sGOCOObwYSbFr,21,813
2010-03-02,PZvFqxThHEnsX,21,198

$ hadoop fs -mkdir -p directio/input

$ hadoop fs -put sales.csv directio/input/


アプリケーションの実行

YAESSでバッチIDを引数に指定してアプリケーションをSpark上で実行します。結果ファイルは、parquet形式のファイルで出力されています。

$ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh spark.example.salesParquet

$ hadoop fs -ls directio/result/sales/

Found 1 items
-rwxr-xr-x 3 asakusa asakusa 25733 2018-02-15 09:01 directio/result/sales/sales.parquet.s0-p0
[a


DDLファイルの生成

Asakusa CLIのgenerateコマンドより、ImpalaのDDLファイルを生成します。

以下では、LOCATIONの追加と外部表として登録する設定を指定してDDLを生成しています。

$ $ASAKUSA_HOME/bin/asakusa generate ddl hive --external --location /=hdfs:///user/asakusa/directio -o sales.sql spark.example.salesParquet


sales.sql

CREATE EXTERNAL TABLE sales (

sales_date TIMESTAMP ,
item_code STRING ,
amount INT ,
selling_price INT
)
STORED AS PARQUET
LOCATION 'hdfs:///user/asakusa/directio/result/sales';

生成したDDLファイル(sales.sql)を登録します。

$ impala-shell -i localhost -f sales.sql


クエリーの実行

impala-shellコマンドからSQLクエリーを実行して動作を確認します。

$ impala-shell -i localhost

[localhost:21000] > select * from sales;
+---------------------+---------------+--------+---------------+
| sales_date | item_code | amount | selling_price |
+---------------------+---------------+--------+---------------+
| 2008-05-04 00:00:00 | ilCQBVYBWSVOO | 46 | 224 |
| 2001-02-28 00:00:00 | nTMbJJqLzwYqw | 4 | 208 |
| 2003-05-09 00:00:00 | MASAMJmjlETfL | 18 | 246 |
| 1996-04-18 00:00:00 | RCXfHnTwpcqFS | 50 | 249 |
| 2004-01-15 00:00:00 | RqppmAoSuhamm | 1 | 360 |
| 1994-01-02 00:00:00 | kjVImLuoLaeQb | 9 | 600 |
| 2013-08-22 00:00:00 | JPQkeJNzMQtjI | 5 | 250 |
| 1991-05-12 00:00:00 | aLzNHOcSqcrys | 22 | 785 |
| 1981-08-01 00:00:00 | sGOCOObwYSbFr | 21 | 813 |
| 2010-03-02 00:00:00 | PZvFqxThHEnsX | 21 | 198 |
+---------------------+---------------+--------+---------------+
Fetched 10 row(s) in 0.11s


BIツールから参照する

Cloudera社から提供されているImpala ODBC Driverで接続することでBIツール(例はTableau)から参照することができます。

bi_demo.png


さいごに

今回はImpalaを例にAsakusa Frameworkの連携機能をご紹介しました。

Parquet形式やORC形式に対応しているHiveなどのプロダクトであれば連携することができるため、参考になれば幸いです。