1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Asakusa FrameworkとImpalaを連携してBIツールで可視化する

Last updated at Posted at 2018-02-16

はじめに

前回の記事 Asakusa Frameworkで Hello, World! ではAsakusa Frameowkの実装方法について説明しました。この記事はより業務シナリオに近い内容について紹介していきたいと思います。
下記の図は、様々なデータをHadoop(HDFS)上に蓄積し、Asakusa on Sparkで加工したデータをBIツール等で可視化するというデータ分析基盤の一例です。データ規模は数十TB〜数PBといった非常に大きなデータを扱うことを想定したものとなっています。
今回の記事ではこれら要素技術のAsakusa on SparkとSQLクエリーエンジンとしてImpalaを例に連携方法を紹介します。

architecture.png

この記事でやりたいこと

入力ファイルはHadoopのHDFS上に配備済みであるものとします。本来は入力ファイルを加工後に出力するのですが、このサンプルシナリオでは入力ファイルをそのままHDFS上にParquet形式に変換して出力するだけです。出力したデータをImpalaの外部表として定義し、BIツールから参照します。

asakusa_to_impala.png

salesデータモデルは次のようになっています。Asakusaのプロパティのデータ形式とImpalaのデータ形式を変換する機能については、DMDL定義で説明します。

データ項目 Asakusaデータ型 Impalaデータ型
売上日付(sales_date) DATE TIMESTAMP
商品コード(item_code) TEXT STRING
販売数量(amount) INT INT
販売金額(selling_price) INT INT

動作環境

以下の環境で動作確認をしています。

  • MapR 5.2.1(Spark2.1.0、Impala2.7.0、Hive2.1、Hive Metastore2.1) ※Spark2.1であれば特にMapR以外のHaoopクラスタでも問題ありません。
  • JDK 1.8.0_161
  • Gradle 4.5

なお、Asakusa Framework 0.10のDirect I/O Hiveは、Hive1.2.2を使用しているため、Hiveのバージョン間によるデータの互換性について、全てを動作検証しているわけではございませんのでご了承ください。

以下は事前作業として完了していること。

  • JDK及びGradleのインストール
  • Hadoopクラスタ上にasakusaユーザーを作成
  • HDFS上にユーザーディレクトリの作成

開発環境の準備

まずはじめにプロジェクトフォルダを作成します。このサンプルでは以下のフォルダ上で作業します。

mkdir asakusa-example-parquet

プロジェクトフォルダ上にGradleスクリプトファイルを作成します。今回はSparkを実行エンジンにするため、Asakusa on Spark Pluginを構成しています。
作成したGradleスクリプトの詳細については、Asakusa Gradle Plugin リファレンスを参照してください。

build.gradle
group 'com.example'

buildscript {
    repositories {
        maven { url 'http://asakusafw.s3.amazonaws.com/maven/releases' }
        maven { url 'http://asakusafw.s3.amazonaws.com/maven/snapshots' }
    }
    dependencies {
        classpath group: 'com.asakusafw.gradle', name: 'asakusa-distribution', version: '0.10.0'
    }
}

apply plugin: 'asakusafw-sdk'
apply plugin: 'asakusafw-organizer'
apply plugin: 'asakusafw-spark'
apply plugin: 'eclipse'

asakusafw {
    sdk.hive true
}

asakusafwOrganizer {
    hive.enabled true
    profiles.prod {
        assembly.into('.') {
            put 'src/dist/prod'
        }
    }
}

デフォルトのAsakusa設定ファイルから、DirectI/O のファイルシステムパス(com.asakusafw.directio.root.fs.path)をtarget/testing/directioからdirectioに変更しています。
この設定により、このサンプルシナリオではDirectI/Oのルートパスがhdfs:///user/asakusa/directioになります。

src/dist/prod/core/conf/asakusa-resources.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
	<property>
		<name>com.asakusafw.runtime.core.Report.Delegate</name>
		<value>com.asakusafw.runtime.report.CommonsLoggingReport</value>
	</property>

	<property>
		<name>com.asakusafw.directio.root</name>
		<value>com.asakusafw.runtime.directio.hadoop.HadoopDataSource</value>
	</property>
	<property>
		<name>com.asakusafw.directio.root.path</name>
		<value>/</value>
	</property>
	<property>
		<name>com.asakusafw.directio.root.fs.path</name>
		<value>directio</value>
	</property>
</configuration>

DMDL

データモデルを定義する DMDL(Data Model Definition Language) スクリプトファイルを作成します。
このモデルは入力がCSV形式、出力がParquet形式を想定しています。
Parquet形式で出力する場合、Direct I/O HiveというAsakusa Frameworkの機能を利用します。Parquet形式の他、ORC形式にも対応しています。
以下のsales_dateプロパティはHive標準のマッピングでは、Asakusaデータ型と同じDATEですが、ImpalaにはHiveのDATE型に相当する型がないため、Direct I/O Hiveのマッピング型変換機能を利用してTIMESTAMP型に変換しています。

src/main/dmdl/sales.dmdl
@directio.csv
@directio.hive.parquet(
    table_name = "sales"
)
sales = {
    @directio.hive.timestamp
    sales_date : DATE;

    item_code : TEXT;

    amount : INT;

    selling_price : INT;
};

以下のコマンドを実行してDMDLからデータモデルクラスを生成します。

gradle compileDMDL

入出力定義クラス

入力定義クラスは、inputパス上のsales.csvファイルを入力します。

src/main/java/com/example/jobflow/SalesFromCSV.java
package com.example.jobflow;

import com.example.modelgen.dmdl.csv.AbstractSalesCsvInputDescription;

public class SalesFromCSV extends AbstractSalesCsvInputDescription {

    @Override
    public String getBasePath() {
        return "input";
    }

    @Override
    public String getResourcePattern() {
        return "sales.csv";
    }
}

出力定義クラスは、result/salesパス上にsales.parquet.*というファイル名で出力します。ファイル名にワイルドカード(*)がある場合、分散並列処理の結果ファイルが分割されたまま一意な名前が割り当てられて並列で出力されるため、高速化が望めます。

src/main/java/com/example/jobflow/SalesToParquet.java
package com.example.jobflow;

import java.util.Arrays;
import java.util.List;

import com.example.modelgen.dmdl.hive.parquet.AbstractSalesParquetFileOutputDescription;

public class SalesToParquet extends AbstractSalesParquetFileOutputDescription {

    @Override
    public String getBasePath() {
        return "result/sales";
    }

    @Override
    public String getResourcePattern() {
        return "sales.parquet.*";
    }

    @Override
    public List<String> getDeletePatterns() {
        return Arrays.asList("*");
    }
}

ジョブフローとバッチクラス

今回のサンプルシナリオでは演算子の実装はしないため、入力から出力に結線したジョブフローとそのジョブフローを実行するバッチクラスを実装するだけです。

src/main/java/com/example/jobflow/SalesParquetJob.java
package com.example.jobflow;

import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;
import com.example.modelgen.dmdl.model.Sales;

@JobFlow(name = "salesParquetJob")
public class SalesParquetJob extends FlowDescription {
    final In<Sales> salesIn;
    final Out<Sales> salesOut;
    public SalesParquetJob(
            @Import(name = "salesIn", description = SalesFromCSV.class)
            In<Sales> salesIn,
            @Export(name = "salesOut", description = SalesToParquet.class)
            Out<Sales> salesOut) {
        this.salesIn = salesIn;
        this.salesOut = salesOut;
    }

    @Override
    protected void describe() {
        salesOut.add(salesIn);
    }
}
src/main/java/com/example/batch/SalesParquetBatch.java
package com.example.batch;

import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;
import com.example.jobflow.SalesParquetJob;

@Batch(name = "example.salesParquet")
public class SalesParquetBatch extends BatchDescription {

    @Override
    protected void describe() {
        run(SalesParquetJob.class).soon();
    }
}

アプリケーションのビルド

以下のコマンドで作成されたデプロイメントアーカイブファイルをHadoopクラスタ(asakusaユーザーの$HOMEパス上)へコピーします。

gradle assemble
./build/asakusafw-asakusa-example-parquet.tar.gz

アプリケーションのデプロイ

下記例のように、.bash_profileなどに環境変数ASAKUSA_HOMESPARK_CMDを設定します。
ただし、spark-submitコマンドがパス上に存在する場合は、SPARK_CMD環境変数は設定する必要はありません。

.bash_profile
export ASAKUSA_HOME=${HOME}/asakusa
export SPARK_CMD=/opt/mapr/spark/spark-2.1.0/bin/spark-submit

ASAKUSA_HOME環境変数のパス上にデプロイメントアーカイブファイルを展開して、setup.jarコマンドを実行します。

$ rm -r $ASAKUSA_HOME
$ mkdir $ASAKUSA_HOME
$ cd $ASAKUSA_HOME
$ tar xvf ~/asakusafw-asakusa-example-parquet.tar.gz 
$ java -jar $ASAKUSA_HOME/tools/bin/setup.jar

入力ファイルの配備

ランダムで生成した下記のCSVファイルをHDFS上に配備します。

sales.csv
2008-05-04,ilCQBVYBWSVOO,46,224
2001-02-28,nTMbJJqLzwYqw,4,208
2003-05-09,MASAMJmjlETfL,18,246
1996-04-18,RCXfHnTwpcqFS,50,249
2004-01-15,RqppmAoSuhamm,1,360
1994-01-02,kjVImLuoLaeQb,9,600
2013-08-22,JPQkeJNzMQtjI,5,250
1991-05-12,aLzNHOcSqcrys,22,785
1981-08-01,sGOCOObwYSbFr,21,813
2010-03-02,PZvFqxThHEnsX,21,198
$ hadoop fs -mkdir -p directio/input
$ hadoop fs -put sales.csv directio/input/

アプリケーションの実行

YAESSでバッチIDを引数に指定してアプリケーションをSpark上で実行します。結果ファイルは、parquet形式のファイルで出力されています。

$ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh spark.example.salesParquet
$ hadoop fs -ls directio/result/sales/
Found 1 items
-rwxr-xr-x   3 asakusa asakusa      25733 2018-02-15 09:01 directio/result/sales/sales.parquet.s0-p0
[a

DDLファイルの生成

Asakusa CLIのgenerateコマンドより、ImpalaのDDLファイルを生成します。
以下では、LOCATIONの追加と外部表として登録する設定を指定してDDLを生成しています。

$ $ASAKUSA_HOME/bin/asakusa generate ddl hive --external --location /=hdfs:///user/asakusa/directio -o sales.sql spark.example.salesParquet
sales.sql
CREATE EXTERNAL TABLE sales (
    sales_date TIMESTAMP ,
    item_code STRING ,
    amount INT ,
    selling_price INT
)
STORED AS PARQUET
LOCATION 'hdfs:///user/asakusa/directio/result/sales';

生成したDDLファイル(sales.sql)を登録します。

$ impala-shell -i localhost -f sales.sql

クエリーの実行

impala-shellコマンドからSQLクエリーを実行して動作を確認します。

$ impala-shell -i localhost
[localhost:21000] > select * from sales;
+---------------------+---------------+--------+---------------+
| sales_date          | item_code     | amount | selling_price |
+---------------------+---------------+--------+---------------+
| 2008-05-04 00:00:00 | ilCQBVYBWSVOO | 46     | 224           |
| 2001-02-28 00:00:00 | nTMbJJqLzwYqw | 4      | 208           |
| 2003-05-09 00:00:00 | MASAMJmjlETfL | 18     | 246           |
| 1996-04-18 00:00:00 | RCXfHnTwpcqFS | 50     | 249           |
| 2004-01-15 00:00:00 | RqppmAoSuhamm | 1      | 360           |
| 1994-01-02 00:00:00 | kjVImLuoLaeQb | 9      | 600           |
| 2013-08-22 00:00:00 | JPQkeJNzMQtjI | 5      | 250           |
| 1991-05-12 00:00:00 | aLzNHOcSqcrys | 22     | 785           |
| 1981-08-01 00:00:00 | sGOCOObwYSbFr | 21     | 813           |
| 2010-03-02 00:00:00 | PZvFqxThHEnsX | 21     | 198           |
+---------------------+---------------+--------+---------------+
Fetched 10 row(s) in 0.11s

BIツールから参照する

Cloudera社から提供されているImpala ODBC Driverで接続することでBIツール(例はTableau)から参照することができます。

bi_demo.png

さいごに

今回はImpalaを例にAsakusa Frameworkの連携機能をご紹介しました。
Parquet形式やORC形式に対応しているHiveなどのプロダクトであれば連携することができるため、参考になれば幸いです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?