はじめに
前回の記事 Asakusa Frameworkで Hello, World! ではAsakusa Frameowkの実装方法について説明しました。この記事はより業務シナリオに近い内容について紹介していきたいと思います。
下記の図は、様々なデータをHadoop(HDFS)上に蓄積し、Asakusa on Sparkで加工したデータをBIツール等で可視化するというデータ分析基盤の一例です。データ規模は数十TB〜数PBといった非常に大きなデータを扱うことを想定したものとなっています。
今回の記事ではこれら要素技術のAsakusa on SparkとSQLクエリーエンジンとしてImpalaを例に連携方法を紹介します。
この記事でやりたいこと
入力ファイルはHadoopのHDFS上に配備済みであるものとします。本来は入力ファイルを加工後に出力するのですが、このサンプルシナリオでは入力ファイルをそのままHDFS上にParquet形式に変換して出力するだけです。出力したデータをImpalaの外部表として定義し、BIツールから参照します。
salesデータモデルは次のようになっています。Asakusaのプロパティのデータ形式とImpalaのデータ形式を変換する機能については、DMDL定義で説明します。
データ項目 | Asakusaデータ型 | Impalaデータ型 |
---|---|---|
売上日付(sales_date) | DATE | TIMESTAMP |
商品コード(item_code) | TEXT | STRING |
販売数量(amount) | INT | INT |
販売金額(selling_price) | INT | INT |
動作環境
以下の環境で動作確認をしています。
- MapR 5.2.1(Spark2.1.0、Impala2.7.0、Hive2.1、Hive Metastore2.1) ※Spark2.1であれば特にMapR以外のHaoopクラスタでも問題ありません。
- JDK 1.8.0_161
- Gradle 4.5
なお、Asakusa Framework 0.10のDirect I/O Hiveは、Hive1.2.2を使用しているため、Hiveのバージョン間によるデータの互換性について、全てを動作検証しているわけではございませんのでご了承ください。
以下は事前作業として完了していること。
- JDK及びGradleのインストール
- Hadoopクラスタ上にasakusaユーザーを作成
- HDFS上にユーザーディレクトリの作成
開発環境の準備
まずはじめにプロジェクトフォルダを作成します。このサンプルでは以下のフォルダ上で作業します。
mkdir asakusa-example-parquet
プロジェクトフォルダ上にGradleスクリプトファイルを作成します。今回はSparkを実行エンジンにするため、Asakusa on Spark Pluginを構成しています。
作成したGradleスクリプトの詳細については、Asakusa Gradle Plugin リファレンスを参照してください。
group 'com.example'
buildscript {
repositories {
maven { url 'http://asakusafw.s3.amazonaws.com/maven/releases' }
maven { url 'http://asakusafw.s3.amazonaws.com/maven/snapshots' }
}
dependencies {
classpath group: 'com.asakusafw.gradle', name: 'asakusa-distribution', version: '0.10.0'
}
}
apply plugin: 'asakusafw-sdk'
apply plugin: 'asakusafw-organizer'
apply plugin: 'asakusafw-spark'
apply plugin: 'eclipse'
asakusafw {
sdk.hive true
}
asakusafwOrganizer {
hive.enabled true
profiles.prod {
assembly.into('.') {
put 'src/dist/prod'
}
}
}
デフォルトのAsakusa設定ファイルから、DirectI/O のファイルシステムパス(com.asakusafw.directio.root.fs.path
)をtarget/testing/directio
からdirectio
に変更しています。
この設定により、このサンプルシナリオではDirectI/Oのルートパスがhdfs:///user/asakusa/directio
になります。
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>com.asakusafw.runtime.core.Report.Delegate</name>
<value>com.asakusafw.runtime.report.CommonsLoggingReport</value>
</property>
<property>
<name>com.asakusafw.directio.root</name>
<value>com.asakusafw.runtime.directio.hadoop.HadoopDataSource</value>
</property>
<property>
<name>com.asakusafw.directio.root.path</name>
<value>/</value>
</property>
<property>
<name>com.asakusafw.directio.root.fs.path</name>
<value>directio</value>
</property>
</configuration>
DMDL
データモデルを定義する DMDL(Data Model Definition Language) スクリプトファイルを作成します。
このモデルは入力がCSV形式、出力がParquet形式を想定しています。
Parquet形式で出力する場合、Direct I/O HiveというAsakusa Frameworkの機能を利用します。Parquet形式の他、ORC形式にも対応しています。
以下のsales_dateプロパティはHive標準のマッピングでは、Asakusaデータ型と同じDATEですが、ImpalaにはHiveのDATE型に相当する型がないため、Direct I/O Hiveのマッピング型変換機能を利用してTIMESTAMP型に変換しています。
@directio.csv
@directio.hive.parquet(
table_name = "sales"
)
sales = {
@directio.hive.timestamp
sales_date : DATE;
item_code : TEXT;
amount : INT;
selling_price : INT;
};
以下のコマンドを実行してDMDLからデータモデルクラスを生成します。
gradle compileDMDL
入出力定義クラス
入力定義クラスは、inputパス上のsales.csvファイルを入力します。
package com.example.jobflow;
import com.example.modelgen.dmdl.csv.AbstractSalesCsvInputDescription;
public class SalesFromCSV extends AbstractSalesCsvInputDescription {
@Override
public String getBasePath() {
return "input";
}
@Override
public String getResourcePattern() {
return "sales.csv";
}
}
出力定義クラスは、result/sales
パス上にsales.parquet.*
というファイル名で出力します。ファイル名にワイルドカード(*)がある場合、分散並列処理の結果ファイルが分割されたまま一意な名前が割り当てられて並列で出力されるため、高速化が望めます。
package com.example.jobflow;
import java.util.Arrays;
import java.util.List;
import com.example.modelgen.dmdl.hive.parquet.AbstractSalesParquetFileOutputDescription;
public class SalesToParquet extends AbstractSalesParquetFileOutputDescription {
@Override
public String getBasePath() {
return "result/sales";
}
@Override
public String getResourcePattern() {
return "sales.parquet.*";
}
@Override
public List<String> getDeletePatterns() {
return Arrays.asList("*");
}
}
ジョブフローとバッチクラス
今回のサンプルシナリオでは演算子の実装はしないため、入力から出力に結線したジョブフローとそのジョブフローを実行するバッチクラスを実装するだけです。
package com.example.jobflow;
import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;
import com.example.modelgen.dmdl.model.Sales;
@JobFlow(name = "salesParquetJob")
public class SalesParquetJob extends FlowDescription {
final In<Sales> salesIn;
final Out<Sales> salesOut;
public SalesParquetJob(
@Import(name = "salesIn", description = SalesFromCSV.class)
In<Sales> salesIn,
@Export(name = "salesOut", description = SalesToParquet.class)
Out<Sales> salesOut) {
this.salesIn = salesIn;
this.salesOut = salesOut;
}
@Override
protected void describe() {
salesOut.add(salesIn);
}
}
package com.example.batch;
import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;
import com.example.jobflow.SalesParquetJob;
@Batch(name = "example.salesParquet")
public class SalesParquetBatch extends BatchDescription {
@Override
protected void describe() {
run(SalesParquetJob.class).soon();
}
}
アプリケーションのビルド
以下のコマンドで作成されたデプロイメントアーカイブファイルをHadoopクラスタ(asakusa
ユーザーの$HOME
パス上)へコピーします。
gradle assemble
./build/asakusafw-asakusa-example-parquet.tar.gz
アプリケーションのデプロイ
下記例のように、.bash_profile
などに環境変数ASAKUSA_HOME
とSPARK_CMD
を設定します。
ただし、spark-submit
コマンドがパス上に存在する場合は、SPARK_CMD
環境変数は設定する必要はありません。
export ASAKUSA_HOME=${HOME}/asakusa
export SPARK_CMD=/opt/mapr/spark/spark-2.1.0/bin/spark-submit
ASAKUSA_HOME
環境変数のパス上にデプロイメントアーカイブファイルを展開して、setup.jarコマンドを実行します。
$ rm -r $ASAKUSA_HOME
$ mkdir $ASAKUSA_HOME
$ cd $ASAKUSA_HOME
$ tar xvf ~/asakusafw-asakusa-example-parquet.tar.gz
$ java -jar $ASAKUSA_HOME/tools/bin/setup.jar
入力ファイルの配備
ランダムで生成した下記のCSVファイルをHDFS上に配備します。
2008-05-04,ilCQBVYBWSVOO,46,224
2001-02-28,nTMbJJqLzwYqw,4,208
2003-05-09,MASAMJmjlETfL,18,246
1996-04-18,RCXfHnTwpcqFS,50,249
2004-01-15,RqppmAoSuhamm,1,360
1994-01-02,kjVImLuoLaeQb,9,600
2013-08-22,JPQkeJNzMQtjI,5,250
1991-05-12,aLzNHOcSqcrys,22,785
1981-08-01,sGOCOObwYSbFr,21,813
2010-03-02,PZvFqxThHEnsX,21,198
$ hadoop fs -mkdir -p directio/input
$ hadoop fs -put sales.csv directio/input/
アプリケーションの実行
YAESSでバッチIDを引数に指定してアプリケーションをSpark上で実行します。結果ファイルは、parquet形式のファイルで出力されています。
$ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh spark.example.salesParquet
$ hadoop fs -ls directio/result/sales/
Found 1 items
-rwxr-xr-x 3 asakusa asakusa 25733 2018-02-15 09:01 directio/result/sales/sales.parquet.s0-p0
[a
DDLファイルの生成
Asakusa CLIのgenerateコマンドより、ImpalaのDDLファイルを生成します。
以下では、LOCATIONの追加と外部表として登録する設定を指定してDDLを生成しています。
$ $ASAKUSA_HOME/bin/asakusa generate ddl hive --external --location /=hdfs:///user/asakusa/directio -o sales.sql spark.example.salesParquet
CREATE EXTERNAL TABLE sales (
sales_date TIMESTAMP ,
item_code STRING ,
amount INT ,
selling_price INT
)
STORED AS PARQUET
LOCATION 'hdfs:///user/asakusa/directio/result/sales';
生成したDDLファイル(sales.sql
)を登録します。
$ impala-shell -i localhost -f sales.sql
クエリーの実行
impala-shell
コマンドからSQLクエリーを実行して動作を確認します。
$ impala-shell -i localhost
[localhost:21000] > select * from sales;
+---------------------+---------------+--------+---------------+
| sales_date | item_code | amount | selling_price |
+---------------------+---------------+--------+---------------+
| 2008-05-04 00:00:00 | ilCQBVYBWSVOO | 46 | 224 |
| 2001-02-28 00:00:00 | nTMbJJqLzwYqw | 4 | 208 |
| 2003-05-09 00:00:00 | MASAMJmjlETfL | 18 | 246 |
| 1996-04-18 00:00:00 | RCXfHnTwpcqFS | 50 | 249 |
| 2004-01-15 00:00:00 | RqppmAoSuhamm | 1 | 360 |
| 1994-01-02 00:00:00 | kjVImLuoLaeQb | 9 | 600 |
| 2013-08-22 00:00:00 | JPQkeJNzMQtjI | 5 | 250 |
| 1991-05-12 00:00:00 | aLzNHOcSqcrys | 22 | 785 |
| 1981-08-01 00:00:00 | sGOCOObwYSbFr | 21 | 813 |
| 2010-03-02 00:00:00 | PZvFqxThHEnsX | 21 | 198 |
+---------------------+---------------+--------+---------------+
Fetched 10 row(s) in 0.11s
BIツールから参照する
Cloudera社から提供されているImpala ODBC Driver
で接続することでBIツール(例はTableau)から参照することができます。
さいごに
今回はImpalaを例にAsakusa Frameworkの連携機能をご紹介しました。
Parquet形式やORC形式に対応しているHiveなどのプロダクトであれば連携することができるため、参考になれば幸いです。