GunosyDay 3

Spark on EMR(YARN対応)を動かす

More than 3 years have passed since last update.


はじめに

ほとんどのデータをS3に突っ込んでいる場合、アドホックな分析であったり統計モデル作成をしたりする際に、適当にバケットまるっとロードしてSparkSQLでクエリなげたりMlibに突っ込むというのは割りとよくあることかと思います。

EMRでSparkを動かす際に、これまではYARNに対応しているBootstrap Actionが無かったので設定がだるかったのですが、awslabsがYARN対応のbootstrap actionsを出しているのでそれを試してみます。


事前にやること

動かすサーバに、iamロールが付与されていること前提。必要なパーミッションは、EMR、EC2、S3、IAMのフルアクセス。


aws cliの設定

aws cliのconfigで、emrを有効化


~/.aws/config

[preview]

emr=true

ついでにリージョンの設定をしてくおくと、--region ap-northeast-1とかいらなくなる。


EMRのデフォルトロールを作成

aws emr create-default-roles --region ap-northeast-1

いらないパーミッションがいっぱい(DynamoDB full accessとか)付いてくるので適宜消す。


使ってみる


起動コマンド

東京リージョンの適当なホストのec2-userで試す。spotインスタンスで安く使ってみる。とりあえず1時間0.03ドルで試す。EMR料金と合わせて、一台一時間0.07ドル。とりあえずMaster 1台、Worker 10台で起動する。

aws emr create-cluster --region ap-northeast-1 --name SparkCluster --ami-version 3.3.1 --no-auto-terminate --service-role EMR_DefaultRole --instance-groups InstanceCount=1,BidPrice=0.03,Name=sparkmaster,InstanceGroupType=MASTER,InstanceType=m1.large InstanceCount=10,BidPrice=0.03,Name=sparkworker,InstanceGroupType=CORE,InstanceType=m1.large --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,KeyName={{KeyPairName}} --applications Name=HIVE --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args=[-v,1.2]

起動すると、cluster idが返却される。これを忘れると大変。一応、list-clusterで見えるけど。

{{KeyPairName}}は、AWSに登録しているキーペアの名前を入れる。新しく作っても良い。作る場合はこんな感じ。秘密鍵は*.pemという名前で保存

aws ec2 create-key-pair --region ap-northeast-1 --key-name emr | jq -r '.KeyMaterial' | tee ~/.ssh/emr.pem && chmod 600 ~/.ssh/emr.pem


クラスタの確認とか

aws emr list-clusters --region ap-northeast-1


クラスタへのsshログイン

aws emr ssh --cluster-id クラスタID --key-pair-file ~/.ssh/emr.pem --region ap-northeast-1

key-pair-fileは拡張子が.pemとかじゃないと怒られる。


クラスタの停止

aws emr terminate-clusters --cluster-ids クラスタID --region ap-northeast-1


spark shellを起動する

10台2coreなので、以下のように起動。ワーカー毎にメモリ確保するので2gずつで切る。

./spark/bin/spark-shell --master yarn-client --num-executors 10 --driver-memory 2g --executor-memory 2g --executor-cores 2

とりあえず、fluent-plugin-s3で上げた1日分のnginxログをSparkSQLのtableにしてクエリ投げてみる。一行一jsonならJsonFileを使うだけで良いのだけれど、fluent-plugin-s3の出力はそうではないのでjson部分だけ取り出してJsonRDDでtable化する。

scala> var data = sc.textFile("s3://buket_name/hogehoge/20141122/*/*.gz")

scala> var jsons = data.map(line => line.split("\t")(2))
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> var table = sqlContext.jsonRDD(jsons)
scala> table.printSchema()
root
|-- agent: string (nullable = true)
|-- code: string (nullable = true)
|-- forwarded_for: string (nullable = true)
|-- host: string (nullable = true)
|-- method: string (nullable = true)
|-- path: string (nullable = true)
|-- referer: string (nullable = true)
|-- response_time: string (nullable = true)
|-- size: string (nullable = true)
|-- user: string (nullable = true)

テーブルとして登録

scala> table.registerTempTable("logs")

scala> sqlContext.cacheTable("logs")

cacheTableで、メモリに載せたままにしてくれる様子。

クエリ投げてみる。数字は割愛

scala> var code_counts = sqlContext.sql("SELECT code,count(*) FROM logs GROUP BY code").collect()

scala> code_counts.foreach(println)

[400,26]
[404,...]
[405,...]
[416,...]
[304,...]
[200,...]
[206,...]
[499,...]

うむ。

hiveContext使うほうが高機能。sqlContextだとregexp_extract等のhiveのコマンドが使えない。とりあえず、paramsというgetパラメータの値毎にstatus codeを集計してみるとこんな感じ。

scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

scala> var htable = hiveContext.jsonRDD(jsons)
scala> htable.registerTempTable("logs")
scala> hiveContext.cacheTable("logs")
scala> var media_code = hiveContext.sql("SELECT a.param, a.code, count(*) FROM (SELECT regexp_extract(path, 'params=([0-9]+)', 1) AS param, code FROM logs WHERE path LIKE '%_show%') a GROUP BY a.param, a.code ORDER BY a.media, a.code")
scala> media_code.collect()
res3: Array[org.apache.spark.sql.Row] = Array([,200,...], ...

うむ。index効かせてゴリゴリjoinしないただのフルスキャンならload時間含めて4台クラスタ程度でもredshiftよりだいぶ早い感じ。一日分しかメモリに載せないし。

subqueryにくくり出さないとparamをGROUP BYに使えなかった。


バッチにする

--jar s3://elasticmapreduce/libs/script-runner/script-runner.jarをわたして、実行するバッチの内容が記載されたshell scriptを渡せば一番簡単っぽい。こちらを以下を参考にする。

わりかしダルいしscalaマンでは無いのでspark-shellにスクリプト食わしてやっていきたい。


pysparkを使ってみる

YARNの上でpysparkを動かすには環境変数"SPARK_YARN_USER_ENV"にpyspark用のコードを含んだPYTHONPATHを指定して置く必要がある。SPARK_YARN_USER_ENVは、YARNで動かしているslave系にも伝播して渡すことが出来る様子。

以下のコードで環境変数を設定可能

import os, sys, glob

SPARK_HOME = "/home/hadoop/spark/"
sys.path.insert(0, os.path.join(SPARK_HOME, 'python'))
for lib in glob.glob(os.path.join(SPARK_HOME, 'python/lib/py4j-*-src.zip')):
sys.path.insert(0, lib)

SPARK_YARN_USER_ENV = "PYTHONPATH=" + ":".join(sys.path)
if os.getenv('SPARK_YARN_USER_ENV', None):
os.environ['SPARK_YARN_USER_ENV'] = os.environ['SPARK_YARN_USER_ENV'] + ",PYTHONPATH=" + ":".join(sys.path)
else:
os.environ['SPARK_YARN_USER_ENV'] = "PYTHONPATH=" + ":".join(sys.path)

sshでクラスタにログインしたら、以下を実行してpysparkのシェルを起動。ipythonで使えないと不便なのでpipでipython入れる。

sudo pip install ipython[all]

MASTER=yarn-client IPYTHON=1 spark/bin/pyspark --master yarn-client --num-executors 10 --driver-memory 2g --executor-memory 2g --executor-cores 2

並列度の指定はspark-shellの起動と同じように指定すれば良い。

ipython notebookを使うにはまずprofileを設定する

ipython profile create spark

プロファイルには以下のように設定


~/.ipython/profile_spark/ipython_notebook_config.py

c = get_config()

c.IPKernelApp.pylab = 'inline'
c.NotebookApp.ip = '*'
c.NotebookApp.open_browser = False
c.NotebookApp.port = 8888


パスワードもきちんと設定したほうが良さそう。その上で、SecurityGroupのElasticMapReduce-masterの8888番を適切に開けると外部から接続できるようになる。

ipython notebookの起動は以下のコマンドで可能。作成したクラスタのmaster serverのglobal ipを持ってきて、8888番を開けばそのままpysparkが利用できる。

MASTER=yarn-client IPYTHON_OPTS='notebook --profile=spark' spark/bin/pyspark --master yarn-client --num-executors 10 --driver-memory 2g --executor-memory 2g --executor-cores 2

GraphXのPython APIが無いとはいえ、mllibも全部使えるようになっているし、1.2からmllib.featureが使えるようになって更に嬉しい。

このあたりの起動設定やらipythonのインストールやらはbootstrap actionsにまとめておくと良い。