hive
EMR
sparksql

hiveやSparkSQLをEMRで試してみた。

More than 1 year has passed since last update.

はじめに

この内容は、個人的な投稿であり、会社や組織を代表するものではございません。
また、こちらの内容は素人がググりながら試した結果をまとめたものなので、節々に間違いがあるかもしれませんが、ご容赦くださいませ。
(本内容により発生した損害については一切の責任をおいませんのでご容赦下さいませ)

この記事を見て、SQLはかけるけど、Hadoop,Hive,EMRとかよくわからないから大きいデータの集計に困ってる、なんて方のお役に立てれば幸いです。

今回の検証の諸条件

事前準備

テストデータの準備

Wikipedia Traffic Statistics V2は、EBSスナップショット(snapshot-id snap-0c155c67)として、US East (N. Virginia)で公開されてるため、US East (N. Virginia)でEC2を立てて、当該スナップショットからEBSを作成してマウントして、S3に転送します。
今回は/wikistats/pagecounts/配下の20081001,20081002のデータのみを利用するので、1台のEC2で大丈夫だと思いますが、全件のデータを送信する場合は、複数台のEC2インスタンスを立てて、並列でS3に転送することをおすすめします!(AWSなら、台数を倍にしても、時間が半分で済めばコストは増えませんので!)

サンプルデータをマウントしたEC2の起動

マネジメントコンソールでEC2を起動します。画面1個1個はちょっと大変だったので、簡単にマネジメントコンソールのEC2起動のGUIのSTEPごとにポイント書いてます。
1. Amazon Linuxの選択
2. インスタンスタイプは適当に・・・(今回のサンプルデータの20081001, 20081002位を転送するならt2.micro 1台で十分)
3. Auto-assign Public IPはYesIAM RoleでS3にフルアクセス出来るロールを選択(無ければ隣のCreate new IAM roleから作成。[AWS Service Roles] -> [AmazonS3FullAccess]を選ぶのが楽)
4. Add New Volumeを押してEBSを追加し、Snapshot-IDにsnap-0c155c67を選んで、容量は1024、タイプはgp2でよいかと
5. Tagはお好きに・・・
6. Security Groupはsshだけとおるようにしておいて下さい。

サンプルデータのアップロード

作成したEC2にSSHして、以下の操作でS3に必要なファイルをアップロードしましょう!

サンプルデータのアップロード
# パブリックデータのEBSが/dev/sdbに有る想定で、マウント
 $ sudo mount /dev/sdb /mnt/
 $ cd /mnt/wikistats/pagecounts

# テスト用S3バケットを作成。
#  テストデータはUS Eastに有るのですが、検証でいろいろ操作するときの操作感が良いほうが良いので、
#  S3バケットは安くて日本からも近いUS West 2(Oregon)がいいかなと思います。
 $ aws s3 mb s3://テスト用バケット名 --region us-west-2

# テストデータのアップロード。
#  aws s3 cpはファイルパスに*が使えないので、
#   --exclude "*"で全件対象外にしといて、 
#   --include "pagecounts-20081001*"で必要なファイルのみ指定して、
#   --recursive
#  つけるのがコツ
# aws s3 コマンドがエラーになる場合はIAMロールが適切じゃないと思うので、見直しを!
 $ sudo aws s3 cp . s3://テスト用バケット名/work20081001 --exclude "*" --include "pagecounts-20081001*" --recursive
 $ sudo aws s3 cp . s3://テスト用バケット名/work20081002 --exclude "*" --include "pagecounts-20081002*" --recursive

# アップロードできたか確認。
 $ aws s3 ls s3://テスト用バケット名/work20081001/ |wc -l
 $ aws s3 ls s3://テスト用バケット名/work20081002/ |wc -l
# ファイルが24個ずつあればOK。下のコマンドで24って出てくればOK

上記が終わったら、当該EC2インスタンスはターミネートしましょう!今回のEBSボリュームが残っていたら、それも削除を忘れずに!

EMR起動

EMRをマネジメントコンソールから起動してみましょう。もちろんCLIでもOKです!
以下にマネジメントコンソールでEMRの画面から、Create Clusterを選択した後の画面ごとの注意点を記載します。
1. Go to Advanced optionsを選択し、とりあえず今回は下の画面のようなアプリを選択し、Nextを選択スクリーンショット 2016-09-13 17.47.15.png
2. インスタンスタイプと数の選択画面はとりあえず、今回くらいのデータならデフォルトのMaster m3.xlarge x 1, Core m3.xlarge x 2, Task 0でよいかと。ちなみにこの構成だと資料記載当時で1時間1.05ドルくらい。
3. Step3はデフォルトでも良いかと・・・
4. Security OptionsのEC2 key pairは必ず設定。これがないとSSHトンネルが貼れないので、ターミナル接続も、GUI接続もできない・・・
5. Create Clusterを選択してクラスタ起動!

アドホックにクエリ実行

SSHで接続

EMRのデフォルトのセキュリティグループだと、マスターノードに対してSSHのみが開いている状態かと思います。そのため、EMR起動時に設定したKey Pairを使ってSSHします。
SSH接続するコマンドは、マネジメントコンソールからクラスタを選択すると、SSHというリンクを選択します。
AWS_Elastic_MapReduce_Management_Console.png

するとポップアップで、Windows用、Mac用それぞれでSSH接続のための参考情報が表示されます。
AWS_Elastic_MapReduce_Management_Console.png
このリンクで表示されるSSHコマンドの鍵名に.pemが2重に付いているのは私だけでしょうか??

SSH接続(ローカルPCなど、KeyPairの秘密鍵を保存している端末から実施)
# 鍵名に二重で.pemが付いている場合は、一つ.pemを削除
ssh -i ~/鍵名 hadoop@マスターノードのホスト名

これで無事ターミナル接続が完了です。

ターミナルからhiveクエリ実行

とりあえずS3にあるテキストファイルからテーブルを作ってみる

とりあえず、事前に準備したpagecount-hogehoge.gzファイルを元にしたテーブルを作ってみます。
まずは、hiveのプロンプトに・・・

hiveプロンプトへ
# これだけw
$ hive
# もしhiveコマンドがないよ、みたいになったら、
# hiveインストールが終わるまでちょっと時間を置くか、(Master, Coreがブートストラッピングかと)
# クラスタ作成時のApplication選択に漏れがないか確認を!

S3においてあるテキストファイルを元にして、hiveのテーブルを作ってみます。
元ファイルはpjcode,pagename,pv,sizeがスペース区切りで、改行コードがLFです。

テーブル作成(2008年10月01日のデータのテーブル)
hive> CREATE EXTERNAL TABLE IF NOT EXISTS
 wikistat20081001
 (pjcode string, pagename string, pv int, size int)
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY ' '
 LINES TERMINATED BY '\n'
 stored as textfile
 LOCATION 's3://テスト用バケット名/work20081001/';
hive> SELECT COUNT(*) FROM wikistat20081001;
-- 多分105192439行あると思います。
テーブル作成(2008年10月02日のデータのテーブル)
hive> CREATE EXTERNAL TABLE IF NOT EXISTS
 wikistat20081002
 (pjcode string, pagename string, pv int, size int)
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY ' '
 LINES TERMINATED BY '\n'
 stored as textfile
 LOCATION 's3://テスト用バケット名/work20081002/';
hive> SELECT COUNT(*) FROM wikistat20081002;
-- 多分97725342行あると思います。
テーブル作成(後述のSTEP追加で使うからテーブル)
hive> CREATE EXTERNAL TABLE IF NOT EXISTS
 wikistattest
 (pjcode string, pagename string, pv int, size int)
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY ' '
 LINES TERMINATED BY '\n'
 stored as textfile
 LOCATION 's3://テスト用バケット名/worktest/';
hive> SELECT COUNT(*) FROM wikistattest;
-- 0行のはずです。まだ作ったばかりなので。。。

非常にシンプルですね!
FIELDS TERMINATED BY ' 'は区切り文字を、LINES TERMINATED BY '\n'は改行コードを、stored as textfileはファイル形式を、LOCATION 's3://テスト用バケット名/work20081002/'は実体ファイルのあるS3のパスを指定しています。
この時S3のパスは最後/で終わらせることを忘れずに!(そうすると当該ディレクトリの中のファイルがすべて対象となります)

ここで作成されたテーブルは、ファイルシステムはEMRFS(つまり実体のファイルはS3に存在)、ストレージタイプはText(つまりカラムナ型じゃなくて行指向)です。

いろんな形式のテーブルを作ってみる

前述で作ったテーブルを使って、今度はストレージタイプが異なるテーブルを作っていきます。

EMRFS+ORC形式なテーブル
hive> CREATE EXTERNAL TABLE IF NOT EXISTS
 wikistat20081001_emrfs_orc
 (pjcode string, pagename string, pv int, size int)
 stored as ORC
 LOCATION 's3://テスト用バケット名/work20081001_emrfs_orc/'
 tblproperties("orc.compress"="ZLIB");

# もとのテーブルからデータをコピー
hive>INSERT INTO wikistat20081001_emrfs_orc 
 SELECT * FROM wikistat20081001;
HDFS+Text形式なテーブル
hive> CREATE EXTERNAL TABLE IF NOT EXISTS
 wikistat20081001_hdfs_text
 (pjcode string, pagename string, pv int, size int)
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY ' '
 LINES TERMINATED BY '\n'
 stored as textfile
 LOCATION '/work20081001_hdfs_text/';

# もとのテーブルからデータをコピー
hive> INSERT INTO wikistat20081001_hdfs_text
 SELECT * FROM wikistat20081001;
HDFS+ORC形式なテーブル
hive> CREATE EXTERNAL TABLE IF NOT EXISTS
 wikistat20081001_hdfs_orc
 (pjcode string, pagename string, pv int, size int)
 stored as ORC
 LOCATION '/work20081001_hdfs_orc/'
 tblproperties("orc.compress"="ZLIB");

# もとのテーブルからデータをコピー
hive> INSERT INTO wikistat20081001_hdfs_orc 
 SELECT * FROM wikistat20081001;

今回、ストレージタイプとファイルシステムが異なる4つのテーブルを作りました。
(ORCについてはこちらが分かりやすかったです。http://www.slideshare.net/imaifactory/apache-hive-2016/23)
それぞれクエリの種類や、全体のワークフロー、求める時間などにより正解は異なるはずなので、
それぞれどんな特性があるかは、どうぞ皆様のクエリで試してみてください。
(タスクノード追加したり、インスタンスタイプ変えてみてとかでも良いかもですね!)

Hueからhiveクエリ実行

HueのUIにアクセスするには、EMRのマスターノードの8888ポートにアクセスする必要があります。ただ、デフォルトのSecurity GroupではSSHしか空いておらず、またSecurity GroupでUI用のポートも開けてしまうのはセキュリティ的に・・・なので、ローカル端末からSSHでトンネリングをしてマスターノードに接続します。

SSHトンネルの貼り方
# -Nはポート転送のみ実行
# -Lでローカルでポートフォワーディングします。
#   ローカルのポート:リモートホスト名:リモートホストのポート
#   つまり下の例だとローカルの8157にアクセスすると,EMRのマスターノードの8888にアクセスしたことになります。
$ ssh -i ローカル端末に保存した鍵 -N -L 8157:EMRのマスターノードのホスト名:8888 hadoop@EMRのマスターノードのホスト名

これで、ローカル端末のブラウザからhttp://localhost:8157にアクセスしてみると、HUEの初期画面が出るはずです。
貼り付けた画像_2016_09_24_14_40.png
ユーザ名とパスワードは適当に決めてCreate accountを押してください。

HUEにも色んな機能があると思いますが、Webブラウザからクエリを実行するだけなら、以下のような感じです。
貼り付けた画像_2016_09_24_14_41.png
Query Editors を選択してから、Hiveを選択。

貼り付けた画像_2016_09_24_14_41−1.png
クエリを書いて再生マークのボタンを押すとクエリを実行できます。

ZeppelinからSparkSQL実行

次にSparkSQLもWeb UIから実行してみたいと思います。HUEと同じく、Zeppelinというツール経由で実行します。HUEと同じくまずはZeppelinが動作している8890ポートにSSHトンネルを貼ります。

SSHトンネルの貼り方
# -Nはポート転送のみ実行
# -Lでローカルでポートフォワーディングします。
#   ローカルのポート:リモートホスト名:リモートホストのポート
#   つまり下の例だとローカルの8158にアクセスすると,EMRのマスターノードの8890にアクセスしたことになります。
$ ssh -i ローカル端末に保存した鍵 -N -L 8158:EMRのマスターノードのホスト名:8890 hadoop@EMRのマスターノードのホスト名

これで、ローカル端末のブラウザからhttp://localhost:8158にアクセスしてみると、Zeppelinの初期画面が出るはずです。
貼り付けた画像_2016_09_24_14_43.png
上部のNotebookを選択して、Create new noteを選択。

貼り付けた画像_2016_09_24_14_44.png
適当にNoteに名前をつけて、Create Noteを選択。

貼り付けた画像_2016_09_24_14_45.png

冒頭に%sqlと入力して、その次の行にクエリを記載します。ただ、クエリ最後の;は不要です。これで再生マークを選択すれば、クエリを実行することができます。

Zeppelinの詳しい使い方には触れません(触れられない?w)が、クエリをWeb UIから実行するだけなら以下のような感じです。

STEP追加でのクエリ実行

EMRの機能の一つにSTEPというものがあります。STEPを使うとEMRクラスタにタスクをキューに詰め込んで外部から様々なアクションを行わせることができます。STEP追加はマネジメントコンソール、CLI、SDKで行うことができます。
今回はマネジメントコンソールとCLIから追加する方法を試してみます。(SDKから追加するものさほど難しくないはずです。LambdaからSTEP追加するときは、お好きな言語でSDK経由かなと)

STEPの事前準備

STEPは、クエリをS3にファイルとして保存しておく必要があります。
今回は以下のようなクエリを保存しておきます。

STEPで追加するクエリ(add-step-test.hql)
INSERT INTO wikistattest 
 SELECT * FROM wikistat${DATE}; 

ここではサンプルで、wikistattestというテーブルにwikistat\${DATE}というテーブルの中身をそのままコピーするだけのクエリです。
ここで登場する\${DATA}が変数として扱える部分で、STEP追加時に\${DATE}に相当する部分の値を指定できます。これはバッチ処理とかするときに非常に流用な機能ですね!

それでは、上記のファイルを適当にS3に保存しましょう。(CLIでローカルのカレントディレクトリのadd-step-test.hqlをs3://テスト用バケット/step-hql/に保存する方法は以下の通りです)

STEPファイルのS3転送
$ aws s3 cp add-step-test.hql s3://テスト用バケット/step-hql/

マネジメントコンソールからSTEP追加

マネジメントコンソールから、GUIでSTEPを追加することができます。
以下、簡単にEMRのマネジメントコンソール画面からの操作をご案内します。

貼り付けた画像_2016_09_24_14_36.png
STEPを追加したいクラスタのクラスタ名を選択します。

貼り付けた画像_2016_09_24_14_37.png
その後、STEPの追加を選択します。

貼り付けた画像_2016_09_24_14_38.png
STEPタイプはHive プログラム、名前は適当につけて、スクリプトS3場所はクエリを記載したテキストファイルが格納されている場所を入力します。
引数の部分は引数には-d 変数名=値というように指定します。今回は-d DATE=20081001を入力します。

後は、マネジメントコンソールのSTEP実行状況を確認して気長に待ちます。(マネジメントコンソールからだと、ちょっと遅いかな・・・)
AWS_Elastic_MapReduce_Management_Console.png

完了したら、Hueやターミナル接続して、以下のクエリでwikistattestの行数をチェックしてみましょう。

20081001のデータがコピーされたことを確認
hive> SELECT COUNT(*) FROM wikistattest;

テーブルを作ったときには0行だったはずですが、STEPの実行でwikistat20081001のデータがコピーされたことにより件数が増えていると思います。

この後、STEP追加で、変数の部分を20081002に変更して実行して終了するのを待ち、その後、同じクエリを実行してみましょう。

20081002のデータがコピーされたことを確認
hive> SELECT COUNT(*) FROM wikistattest;

先程よりも行数が増えているはずです。無事20081002のデータもコピーされたことが確認できたはずです。

CLIからSTEP追加

マネジメントコンソールから実行したSTEP追加は、CLIのaws emr add-stepsコマンドでも実行できます。
以下にサンプルを記載しますので、EMRのAPIが叩けるロールやクレデンシャルがある環境から実行してみてください。
なお、CLIで実行するほうが、マネジメントコンソールから実行するより、素早くSTEPが実行されてるような・・・

STEP追加(20081001のデータを追加する例)
$ aws emr add-steps --cluster-id クラスタID --steps Type=HIVE,Name='ステップの名前',Args=[-f,s3://テスト用バケット名/ステップのファイルが格納されたディレクトリ/add-step-test.hql,-d,DATE=20081001] --region us-west-2

CLIで追加したSTEPも、マネジメントコンソールから結果を確認できます。また、CLIでももちろんSTEPの実行状況は確認できます。

STEPの実行状況確認
$ aws emr describe-step --cluster-id クラスタID --step-id STEP_ID(STEP追加時に出力されたはず) --region us-west-2

STEPが完了後に、再度同じテーブルの行数を調べてみましょう。きっとさっきより増えているはずです。

LambdaからSTEP追加

後日また・・・

最後に

後処理忘れずに!!特にEMRクラスタ、テストデータアップロード用EC2,EBS、そしてテストデータを保存したS3の消し忘れには要注意!!
今回は単に主にEMRを通してhiveに触れて見るだけでしたが、非常に簡単に触れることができたと思います。この後は、クエリチューニングやクラスタの台数とタイプのチューニング、STEPの作成などがありますが、基本的な操作で躓くことはないと思うので、思う存分検証いただければと思います!