はじめに
この内容は、個人的な投稿であり、会社や組織を代表するものではございません。
また、こちらの内容は素人がググりながら試した結果をまとめたものなので、節々に間違いがあるかもしれませんが、ご容赦くださいませ。
(本内容により発生した損害については一切の責任をおいませんのでご容赦下さいませ)
この記事を見て、SQLはかけるけど、Hadoop,Hive,EMRとかよくわからないから大きいデータの集計に困ってる、なんて方のお役に立てれば幸いです。
今回の検証の諸条件
- 利用データは、AWS パブリックデータセットの「Wikipedia Traffic Statistics V2」を利用しています。https://aws.amazon.com/datasets/wikipedia-traffic-statistics-v2/?_encoding=UTF8&jiveRedirect=1
- EC2の起動終了などの単純操作や、IAMロールの意味・操作などは理解している前提です。
- その他 2016/9/8時点の状況に基づきます。
事前準備
テストデータの準備
Wikipedia Traffic Statistics V2は、EBSスナップショット(snapshot-id snap-0c155c67)として、**US East (N. Virginia)**で公開されてるため、US East (N. Virginia)でEC2を立てて、当該スナップショットからEBSを作成してマウントして、S3に転送します。
今回は/wikistats/pagecounts/配下の20081001,20081002のデータのみを利用するので、1台のEC2で大丈夫だと思いますが、全件のデータを送信する場合は、複数台のEC2インスタンスを立てて、並列でS3に転送することをおすすめします!(AWSなら、台数を倍にしても、時間が半分で済めばコストは増えませんので!)
サンプルデータをマウントしたEC2の起動
マネジメントコンソールでEC2を起動します。画面1個1個はちょっと大変だったので、簡単にマネジメントコンソールのEC2起動のGUIのSTEPごとにポイント書いてます。
- Amazon Linuxの選択
- インスタンスタイプは適当に・・・(今回のサンプルデータの20081001, 20081002位を転送するならt2.micro 1台で十分)
- Auto-assign Public IPはYes、IAM RoleでS3にフルアクセス出来るロールを選択(無ければ隣のCreate new IAM roleから作成。[AWS Service Roles] -> [AmazonS3FullAccess]を選ぶのが楽)
- Add New Volumeを押してEBSを追加し、Snapshot-IDにsnap-0c155c67を選んで、容量は1024、タイプはgp2でよいかと
- Tagはお好きに・・・
- Security Groupはsshだけとおるようにしておいて下さい。
サンプルデータのアップロード
作成したEC2にSSHして、以下の操作でS3に必要なファイルをアップロードしましょう!
# パブリックデータのEBSが/dev/sdbに有る想定で、マウント
$ sudo mount /dev/sdb /mnt/
$ cd /mnt/wikistats/pagecounts
# テスト用S3バケットを作成。
# テストデータはUS Eastに有るのですが、検証でいろいろ操作するときの操作感が良いほうが良いので、
# S3バケットは安くて日本からも近いUS West 2(Oregon)がいいかなと思います。
$ aws s3 mb s3://テスト用バケット名 --region us-west-2
# テストデータのアップロード。
# aws s3 cpはファイルパスに*が使えないので、
# --exclude "*"で全件対象外にしといて、
# --include "pagecounts-20081001*"で必要なファイルのみ指定して、
# --recursive
# つけるのがコツ
# aws s3 コマンドがエラーになる場合はIAMロールが適切じゃないと思うので、見直しを!
$ sudo aws s3 cp . s3://テスト用バケット名/work20081001 --exclude "*" --include "pagecounts-20081001*" --recursive
$ sudo aws s3 cp . s3://テスト用バケット名/work20081002 --exclude "*" --include "pagecounts-20081002*" --recursive
# アップロードできたか確認。
$ aws s3 ls s3://テスト用バケット名/work20081001/ |wc -l
$ aws s3 ls s3://テスト用バケット名/work20081002/ |wc -l
# ファイルが24個ずつあればOK。下のコマンドで24って出てくればOK
上記が終わったら、当該EC2インスタンスはターミネートしましょう!今回のEBSボリュームが残っていたら、それも削除を忘れずに!
EMR起動
EMRをマネジメントコンソールから起動してみましょう。もちろんCLIでもOKです!
以下にマネジメントコンソールでEMRの画面から、Create Clusterを選択した後の画面ごとの注意点を記載します。
- Go to Advanced optionsを選択し、とりあえず今回は下の画面のようなアプリを選択し、Nextを選択
- インスタンスタイプと数の選択画面はとりあえず、今回くらいのデータならデフォルトのMaster m3.xlarge x 1, Core m3.xlarge x 2, Task 0でよいかと。ちなみにこの構成だと資料記載当時で1時間1.05ドルくらい。
- Step3はデフォルトでも良いかと・・・
- Security OptionsのEC2 key pairは必ず設定。これがないとSSHトンネルが貼れないので、ターミナル接続も、GUI接続もできない・・・
- Create Clusterを選択してクラスタ起動!
アドホックにクエリ実行
SSHで接続
EMRのデフォルトのセキュリティグループだと、マスターノードに対してSSHのみが開いている状態かと思います。そのため、EMR起動時に設定したKey Pairを使ってSSHします。
SSH接続するコマンドは、マネジメントコンソールからクラスタを選択すると、SSHというリンクを選択します。
するとポップアップで、Windows用、Mac用それぞれでSSH接続のための参考情報が表示されます。
このリンクで表示されるSSHコマンドの鍵名に.pemが2重に付いているのは私だけでしょうか??
# 鍵名に二重で.pemが付いている場合は、一つ.pemを削除
ssh -i ~/鍵名 hadoop@マスターノードのホスト名
これで無事ターミナル接続が完了です。
ターミナルからhiveクエリ実行
とりあえずS3にあるテキストファイルからテーブルを作ってみる
とりあえず、事前に準備したpagecount-hogehoge.gzファイルを元にしたテーブルを作ってみます。
まずは、hiveのプロンプトに・・・
# これだけw
$ hive
# もしhiveコマンドがないよ、みたいになったら、
# hiveインストールが終わるまでちょっと時間を置くか、(Master, Coreがブートストラッピングかと)
# クラスタ作成時のApplication選択に漏れがないか確認を!
S3においてあるテキストファイルを元にして、hiveのテーブルを作ってみます。
元ファイルはpjcode,pagename,pv,sizeがスペース区切りで、改行コードがLFです。
hive> CREATE EXTERNAL TABLE IF NOT EXISTS
wikistat20081001
(pjcode string, pagename string, pv int, size int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
LINES TERMINATED BY '\n'
stored as textfile
LOCATION 's3://テスト用バケット名/work20081001/';
hive> SELECT COUNT(*) FROM wikistat20081001;
-- 多分105192439行あると思います。
hive> CREATE EXTERNAL TABLE IF NOT EXISTS
wikistat20081002
(pjcode string, pagename string, pv int, size int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
LINES TERMINATED BY '\n'
stored as textfile
LOCATION 's3://テスト用バケット名/work20081002/';
hive> SELECT COUNT(*) FROM wikistat20081002;
-- 多分97725342行あると思います。
hive> CREATE EXTERNAL TABLE IF NOT EXISTS
wikistattest
(pjcode string, pagename string, pv int, size int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
LINES TERMINATED BY '\n'
stored as textfile
LOCATION 's3://テスト用バケット名/worktest/';
hive> SELECT COUNT(*) FROM wikistattest;
-- 0行のはずです。まだ作ったばかりなので。。。
非常にシンプルですね!
FIELDS TERMINATED BY ' '
は区切り文字を、LINES TERMINATED BY '\n'
は改行コードを、stored as textfile
はファイル形式を、LOCATION 's3://テスト用バケット名/work20081002/'
は実体ファイルのあるS3のパスを指定しています。
この時S3のパスは最後/
で終わらせることを忘れずに!(そうすると当該ディレクトリの中のファイルがすべて対象となります)
ここで作成されたテーブルは、ファイルシステムはEMRFS(つまり実体のファイルはS3に存在)、ストレージタイプはText(つまりカラムナ型じゃなくて行指向)です。
いろんな形式のテーブルを作ってみる
前述で作ったテーブルを使って、今度はストレージタイプが異なるテーブルを作っていきます。
hive> CREATE EXTERNAL TABLE IF NOT EXISTS
wikistat20081001_emrfs_orc
(pjcode string, pagename string, pv int, size int)
stored as ORC
LOCATION 's3://テスト用バケット名/work20081001_emrfs_orc/'
tblproperties("orc.compress"="ZLIB");
# もとのテーブルからデータをコピー
hive>INSERT INTO wikistat20081001_emrfs_orc
SELECT * FROM wikistat20081001;
hive> CREATE EXTERNAL TABLE IF NOT EXISTS
wikistat20081001_hdfs_text
(pjcode string, pagename string, pv int, size int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
LINES TERMINATED BY '\n'
stored as textfile
LOCATION '/work20081001_hdfs_text/';
# もとのテーブルからデータをコピー
hive> INSERT INTO wikistat20081001_hdfs_text
SELECT * FROM wikistat20081001;
hive> CREATE EXTERNAL TABLE IF NOT EXISTS
wikistat20081001_hdfs_orc
(pjcode string, pagename string, pv int, size int)
stored as ORC
LOCATION '/work20081001_hdfs_orc/'
tblproperties("orc.compress"="ZLIB");
# もとのテーブルからデータをコピー
hive> INSERT INTO wikistat20081001_hdfs_orc
SELECT * FROM wikistat20081001;
今回、ストレージタイプとファイルシステムが異なる4つのテーブルを作りました。
(ORCについてはこちらが分かりやすかったです。http://www.slideshare.net/imaifactory/apache-hive-2016/23)
それぞれクエリの種類や、全体のワークフロー、求める時間などにより正解は異なるはずなので、
それぞれどんな特性があるかは、どうぞ皆様のクエリで試してみてください。
(タスクノード追加したり、インスタンスタイプ変えてみてとかでも良いかもですね!)
Hueからhiveクエリ実行
HueのUIにアクセスするには、EMRのマスターノードの8888ポートにアクセスする必要があります。ただ、デフォルトのSecurity GroupではSSHしか空いておらず、またSecurity GroupでUI用のポートも開けてしまうのはセキュリティ的に・・・なので、ローカル端末からSSHでトンネリングをしてマスターノードに接続します。
# -Nはポート転送のみ実行
# -Lでローカルでポートフォワーディングします。
# ローカルのポート:リモートホスト名:リモートホストのポート
# つまり下の例だとローカルの8157にアクセスすると,EMRのマスターノードの8888にアクセスしたことになります。
$ ssh -i ローカル端末に保存した鍵 -N -L 8157:EMRのマスターノードのホスト名:8888 hadoop@EMRのマスターノードのホスト名
これで、ローカル端末のブラウザからhttp://localhost:8157
にアクセスしてみると、HUEの初期画面が出るはずです。
ユーザ名とパスワードは適当に決めてCreate accountを押してください。
HUEにも色んな機能があると思いますが、Webブラウザからクエリを実行するだけなら、以下のような感じです。
Query Editors を選択してから、Hiveを選択。
クエリを書いて、再生マークのボタンを押すとクエリを実行できます。
ZeppelinからSparkSQL実行
次にSparkSQLもWeb UIから実行してみたいと思います。HUEと同じく、Zeppelinというツール経由で実行します。HUEと同じくまずはZeppelinが動作している8890ポートにSSHトンネルを貼ります。
# -Nはポート転送のみ実行
# -Lでローカルでポートフォワーディングします。
# ローカルのポート:リモートホスト名:リモートホストのポート
# つまり下の例だとローカルの8158にアクセスすると,EMRのマスターノードの8890にアクセスしたことになります。
$ ssh -i ローカル端末に保存した鍵 -N -L 8158:EMRのマスターノードのホスト名:8890 hadoop@EMRのマスターノードのホスト名
これで、ローカル端末のブラウザからhttp://localhost:8158
にアクセスしてみると、Zeppelinの初期画面が出るはずです。
上部のNotebookを選択して、Create new noteを選択。
適当にNoteに名前をつけて、Create Noteを選択。
冒頭に**%sqlと入力して、その次の行にクエリを記載します。ただ、クエリ最後の;は不要です。これで再生マーク**を選択すれば、クエリを実行することができます。
Zeppelinの詳しい使い方には触れません(触れられない?w)が、クエリをWeb UIから実行するだけなら以下のような感じです。
STEP追加でのクエリ実行
EMRの機能の一つにSTEPというものがあります。STEPを使うとEMRクラスタにタスクをキューに詰め込んで外部から様々なアクションを行わせることができます。STEP追加はマネジメントコンソール、CLI、SDKで行うことができます。
今回はマネジメントコンソールとCLIから追加する方法を試してみます。(SDKから追加するものさほど難しくないはずです。LambdaからSTEP追加するときは、お好きな言語でSDK経由かなと)
STEPの事前準備
STEPは、クエリをS3にファイルとして保存しておく必要があります。
今回は以下のようなクエリを保存しておきます。
INSERT INTO wikistattest
SELECT * FROM wikistat${DATE};
ここではサンプルで、wikistattestというテーブルにwikistat${DATE}というテーブルの中身をそのままコピーするだけのクエリです。
ここで登場する${DATA}が変数として扱える部分で、STEP追加時に${DATE}に相当する部分の値を指定できます。これはバッチ処理とかするときに非常に流用な機能ですね!
それでは、上記のファイルを適当にS3に保存しましょう。(CLIでローカルのカレントディレクトリのadd-step-test.hqlをs3://テスト用バケット/step-hql/に保存する方法は以下の通りです)
$ aws s3 cp add-step-test.hql s3://テスト用バケット/step-hql/
マネジメントコンソールからSTEP追加
マネジメントコンソールから、GUIでSTEPを追加することができます。
以下、簡単にEMRのマネジメントコンソール画面からの操作をご案内します。
STEPタイプはHive プログラム、名前は適当につけて、スクリプトS3場所はクエリを記載したテキストファイルが格納されている場所を入力します。
引数の部分は引数には-d 変数名=値というように指定します。今回は**-d DATE=20081001**を入力します。
後は、マネジメントコンソールのSTEP実行状況を確認して気長に待ちます。(マネジメントコンソールからだと、ちょっと遅いかな・・・)
完了したら、Hueやターミナル接続して、以下のクエリでwikistattestの行数をチェックしてみましょう。
hive> SELECT COUNT(*) FROM wikistattest;
テーブルを作ったときには0行だったはずですが、STEPの実行でwikistat20081001のデータがコピーされたことにより件数が増えていると思います。
この後、STEP追加で、変数の部分を20081002に変更して実行して終了するのを待ち、その後、同じクエリを実行してみましょう。
hive> SELECT COUNT(*) FROM wikistattest;
先程よりも行数が増えているはずです。無事20081002のデータもコピーされたことが確認できたはずです。
CLIからSTEP追加
マネジメントコンソールから実行したSTEP追加は、CLIのaws emr add-steps
コマンドでも実行できます。
以下にサンプルを記載しますので、EMRのAPIが叩けるロールやクレデンシャルがある環境から実行してみてください。
なお、CLIで実行するほうが、マネジメントコンソールから実行するより、素早くSTEPが実行されてるような・・・
$ aws emr add-steps --cluster-id クラスタID --steps Type=HIVE,Name='ステップの名前',Args=[-f,s3://テスト用バケット名/ステップのファイルが格納されたディレクトリ/add-step-test.hql,-d,DATE=20081001] --region us-west-2
CLIで追加したSTEPも、マネジメントコンソールから結果を確認できます。また、CLIでももちろんSTEPの実行状況は確認できます。
$ aws emr describe-step --cluster-id クラスタID --step-id STEP_ID(STEP追加時に出力されたはず) --region us-west-2
STEPが完了後に、再度同じテーブルの行数を調べてみましょう。きっとさっきより増えているはずです。
LambdaからSTEP追加
後日また・・・
最後に
後処理忘れずに!!特にEMRクラスタ、テストデータアップロード用EC2,EBS、そしてテストデータを保存したS3の消し忘れには要注意!!
今回は単に主にEMRを通してhiveに触れて見るだけでしたが、非常に簡単に触れることができたと思います。この後は、クエリチューニングやクラスタの台数とタイプのチューニング、STEPの作成などがありますが、基本的な操作で躓くことはないと思うので、思う存分検証いただければと思います!