Help us understand the problem. What is going on with this article?

Spark on EMR(YARN対応)を動かす

More than 5 years have passed since last update.

はじめに

ほとんどのデータをS3に突っ込んでいる場合、アドホックな分析であったり統計モデル作成をしたりする際に、適当にバケットまるっとロードしてSparkSQLでクエリなげたりMlibに突っ込むというのは割りとよくあることかと思います。

EMRでSparkを動かす際に、これまではYARNに対応しているBootstrap Actionが無かったので設定がだるかったのですが、awslabsがYARN対応のbootstrap actionsを出しているのでそれを試してみます。

事前にやること

動かすサーバに、iamロールが付与されていること前提。必要なパーミッションは、EMR、EC2、S3、IAMのフルアクセス。

aws cliの設定

aws cliのconfigで、emrを有効化

~/.aws/config
[preview]
emr=true

ついでにリージョンの設定をしてくおくと、--region ap-northeast-1とかいらなくなる。

EMRのデフォルトロールを作成

aws emr create-default-roles --region ap-northeast-1

いらないパーミッションがいっぱい(DynamoDB full accessとか)付いてくるので適宜消す。

使ってみる

起動コマンド

東京リージョンの適当なホストのec2-userで試す。spotインスタンスで安く使ってみる。とりあえず1時間0.03ドルで試す。EMR料金と合わせて、一台一時間0.07ドル。とりあえずMaster 1台、Worker 10台で起動する。

aws emr create-cluster --region ap-northeast-1 --name SparkCluster --ami-version 3.3.1 --no-auto-terminate --service-role EMR_DefaultRole --instance-groups InstanceCount=1,BidPrice=0.03,Name=sparkmaster,InstanceGroupType=MASTER,InstanceType=m1.large InstanceCount=10,BidPrice=0.03,Name=sparkworker,InstanceGroupType=CORE,InstanceType=m1.large --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,KeyName={{KeyPairName}} --applications Name=HIVE --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args=[-v,1.2]

起動すると、cluster idが返却される。これを忘れると大変。一応、list-clusterで見えるけど。

{{KeyPairName}}は、AWSに登録しているキーペアの名前を入れる。新しく作っても良い。作る場合はこんな感じ。秘密鍵は*.pemという名前で保存

aws ec2 create-key-pair --region ap-northeast-1 --key-name emr | jq -r '.KeyMaterial' | tee ~/.ssh/emr.pem && chmod 600 ~/.ssh/emr.pem

クラスタの確認とか

aws emr list-clusters --region ap-northeast-1

クラスタへのsshログイン

aws emr ssh --cluster-id クラスタID --key-pair-file ~/.ssh/emr.pem --region ap-northeast-1

key-pair-fileは拡張子が.pemとかじゃないと怒られる。

クラスタの停止

aws emr terminate-clusters --cluster-ids クラスタID --region ap-northeast-1

spark shellを起動する

10台2coreなので、以下のように起動。ワーカー毎にメモリ確保するので2gずつで切る。

./spark/bin/spark-shell --master yarn-client --num-executors 10 --driver-memory 2g --executor-memory 2g --executor-cores 2

とりあえず、fluent-plugin-s3で上げた1日分のnginxログをSparkSQLのtableにしてクエリ投げてみる。一行一jsonならJsonFileを使うだけで良いのだけれど、fluent-plugin-s3の出力はそうではないのでjson部分だけ取り出してJsonRDDでtable化する。

scala> var data = sc.textFile("s3://buket_name/hogehoge/20141122/*/*.gz")
scala> var jsons = data.map(line => line.split("\t")(2))
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> var table = sqlContext.jsonRDD(jsons)
scala> table.printSchema()
root
 |-- agent: string (nullable = true)
 |-- code: string (nullable = true)
 |-- forwarded_for: string (nullable = true)
 |-- host: string (nullable = true)
 |-- method: string (nullable = true)
 |-- path: string (nullable = true)
 |-- referer: string (nullable = true)
 |-- response_time: string (nullable = true)
 |-- size: string (nullable = true)
 |-- user: string (nullable = true)

テーブルとして登録

scala> table.registerTempTable("logs")
scala> sqlContext.cacheTable("logs")

cacheTableで、メモリに載せたままにしてくれる様子。

クエリ投げてみる。数字は割愛

scala> var code_counts = sqlContext.sql("SELECT code,count(*) FROM logs GROUP BY code").collect()
scala> code_counts.foreach(println)

[400,26]
[404,...]
[405,...]
[416,...]
[304,...]
[200,...]
[206,...]
[499,...]

うむ。

hiveContext使うほうが高機能。sqlContextだとregexp_extract等のhiveのコマンドが使えない。とりあえず、paramsというgetパラメータの値毎にstatus codeを集計してみるとこんな感じ。

scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala> var htable = hiveContext.jsonRDD(jsons)
scala> htable.registerTempTable("logs")
scala> hiveContext.cacheTable("logs")
scala> var media_code = hiveContext.sql("SELECT a.param, a.code, count(*) FROM (SELECT regexp_extract(path, 'params=([0-9]+)', 1) AS param, code FROM logs WHERE path LIKE '%_show%') a GROUP BY a.param, a.code ORDER BY a.media, a.code")
scala> media_code.collect()
res3: Array[org.apache.spark.sql.Row] = Array([,200,...], ...

うむ。index効かせてゴリゴリjoinしないただのフルスキャンならload時間含めて4台クラスタ程度でもredshiftよりだいぶ早い感じ。一日分しかメモリに載せないし。

subqueryにくくり出さないとparamをGROUP BYに使えなかった。

バッチにする

--jar s3://elasticmapreduce/libs/script-runner/script-runner.jarをわたして、実行するバッチの内容が記載されたshell scriptを渡せば一番簡単っぽい。こちらを以下を参考にする。

わりかしダルいしscalaマンでは無いのでspark-shellにスクリプト食わしてやっていきたい。

pysparkを使ってみる

YARNの上でpysparkを動かすには環境変数"SPARK_YARN_USER_ENV"にpyspark用のコードを含んだPYTHONPATHを指定して置く必要がある。SPARK_YARN_USER_ENVは、YARNで動かしているslave系にも伝播して渡すことが出来る様子。

以下のコードで環境変数を設定可能

import os, sys, glob
SPARK_HOME = "/home/hadoop/spark/"
sys.path.insert(0, os.path.join(SPARK_HOME, 'python'))
for lib in glob.glob(os.path.join(SPARK_HOME, 'python/lib/py4j-*-src.zip')):
    sys.path.insert(0, lib)

SPARK_YARN_USER_ENV =  "PYTHONPATH=" + ":".join(sys.path)
if os.getenv('SPARK_YARN_USER_ENV', None):
    os.environ['SPARK_YARN_USER_ENV'] = os.environ['SPARK_YARN_USER_ENV'] + ",PYTHONPATH=" + ":".join(sys.path)
else:
    os.environ['SPARK_YARN_USER_ENV'] = "PYTHONPATH=" + ":".join(sys.path)

sshでクラスタにログインしたら、以下を実行してpysparkのシェルを起動。ipythonで使えないと不便なのでpipでipython入れる。

sudo pip install ipython[all]
MASTER=yarn-client IPYTHON=1 spark/bin/pyspark --master yarn-client --num-executors 10 --driver-memory 2g --executor-memory 2g --executor-cores 2

並列度の指定はspark-shellの起動と同じように指定すれば良い。

ipython notebookを使うにはまずprofileを設定する

ipython profile create spark

プロファイルには以下のように設定

~/.ipython/profile_spark/ipython_notebook_config.py
c = get_config()

c.IPKernelApp.pylab = 'inline'
c.NotebookApp.ip = '*'
c.NotebookApp.open_browser = False
c.NotebookApp.port = 8888

パスワードもきちんと設定したほうが良さそう。その上で、SecurityGroupのElasticMapReduce-masterの8888番を適切に開けると外部から接続できるようになる。

ipython notebookの起動は以下のコマンドで可能。作成したクラスタのmaster serverのglobal ipを持ってきて、8888番を開けばそのままpysparkが利用できる。

MASTER=yarn-client IPYTHON_OPTS='notebook --profile=spark' spark/bin/pyspark --master yarn-client --num-executors 10 --driver-memory 2g --executor-memory 2g --executor-cores 2

GraphXのPython APIが無いとはいえ、mllibも全部使えるようになっているし、1.2からmllib.featureが使えるようになって更に嬉しい。

このあたりの起動設定やらipythonのインストールやらはbootstrap actionsにまとめておくと良い。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした