はじめに
ほとんどのデータをS3に突っ込んでいる場合、アドホックな分析であったり統計モデル作成をしたりする際に、適当にバケットまるっとロードしてSparkSQLでクエリなげたりMlibに突っ込むというのは割りとよくあることかと思います。
EMRでSparkを動かす際に、これまではYARNに対応しているBootstrap Actionが無かったので設定がだるかったのですが、awslabsがYARN対応のbootstrap actionsを出しているのでそれを試してみます。
- http://blogs.aws.amazon.com/bigdata/post/TxO6EHTHQALSIB/Getting-Started-with-Amazon-EMR-Bootstrap-Actions
- https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark
事前にやること
動かすサーバに、iamロールが付与されていること前提。必要なパーミッションは、EMR、EC2、S3、IAMのフルアクセス。
aws cliの設定
aws cliのconfigで、emrを有効化
[preview]
emr=true
ついでにリージョンの設定をしてくおくと、--region ap-northeast-1
とかいらなくなる。
EMRのデフォルトロールを作成
aws emr create-default-roles --region ap-northeast-1
いらないパーミッションがいっぱい(DynamoDB full accessとか)付いてくるので適宜消す。
使ってみる
起動コマンド
東京リージョンの適当なホストのec2-userで試す。spotインスタンスで安く使ってみる。とりあえず1時間0.03ドルで試す。EMR料金と合わせて、一台一時間0.07ドル。とりあえずMaster 1台、Worker 10台で起動する。
aws emr create-cluster --region ap-northeast-1 --name SparkCluster --ami-version 3.3.1 --no-auto-terminate --service-role EMR_DefaultRole --instance-groups InstanceCount=1,BidPrice=0.03,Name=sparkmaster,InstanceGroupType=MASTER,InstanceType=m1.large InstanceCount=10,BidPrice=0.03,Name=sparkworker,InstanceGroupType=CORE,InstanceType=m1.large --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,KeyName={{KeyPairName}} --applications Name=HIVE --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args=[-v,1.2]
起動すると、cluster idが返却される。これを忘れると大変。一応、list-clusterで見えるけど。
{{KeyPairName}}は、AWSに登録しているキーペアの名前を入れる。新しく作っても良い。作る場合はこんな感じ。秘密鍵は*.pemという名前で保存
aws ec2 create-key-pair --region ap-northeast-1 --key-name emr | jq -r '.KeyMaterial' | tee ~/.ssh/emr.pem && chmod 600 ~/.ssh/emr.pem
クラスタの確認とか
aws emr list-clusters --region ap-northeast-1
クラスタへのsshログイン
aws emr ssh --cluster-id クラスタID --key-pair-file ~/.ssh/emr.pem --region ap-northeast-1
key-pair-fileは拡張子が.pemとかじゃないと怒られる。
クラスタの停止
aws emr terminate-clusters --cluster-ids クラスタID --region ap-northeast-1
spark shellを起動する
10台2coreなので、以下のように起動。ワーカー毎にメモリ確保するので2gずつで切る。
./spark/bin/spark-shell --master yarn-client --num-executors 10 --driver-memory 2g --executor-memory 2g --executor-cores 2
とりあえず、fluent-plugin-s3で上げた1日分のnginxログをSparkSQLのtableにしてクエリ投げてみる。一行一jsonならJsonFileを使うだけで良いのだけれど、fluent-plugin-s3の出力はそうではないのでjson部分だけ取り出してJsonRDDでtable化する。
scala> var data = sc.textFile("s3://buket_name/hogehoge/20141122/*/*.gz")
scala> var jsons = data.map(line => line.split("\t")(2))
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> var table = sqlContext.jsonRDD(jsons)
scala> table.printSchema()
root
|-- agent: string (nullable = true)
|-- code: string (nullable = true)
|-- forwarded_for: string (nullable = true)
|-- host: string (nullable = true)
|-- method: string (nullable = true)
|-- path: string (nullable = true)
|-- referer: string (nullable = true)
|-- response_time: string (nullable = true)
|-- size: string (nullable = true)
|-- user: string (nullable = true)
テーブルとして登録
scala> table.registerTempTable("logs")
scala> sqlContext.cacheTable("logs")
cacheTableで、メモリに載せたままにしてくれる様子。
クエリ投げてみる。数字は割愛
scala> var code_counts = sqlContext.sql("SELECT code,count(*) FROM logs GROUP BY code").collect()
scala> code_counts.foreach(println)
[400,26]
[404,...]
[405,...]
[416,...]
[304,...]
[200,...]
[206,...]
[499,...]
うむ。
hiveContext使うほうが高機能。sqlContextだとregexp_extract等のhiveのコマンドが使えない。とりあえず、paramsというgetパラメータの値毎にstatus codeを集計してみるとこんな感じ。
scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala> var htable = hiveContext.jsonRDD(jsons)
scala> htable.registerTempTable("logs")
scala> hiveContext.cacheTable("logs")
scala> var media_code = hiveContext.sql("SELECT a.param, a.code, count(*) FROM (SELECT regexp_extract(path, 'params=([0-9]+)', 1) AS param, code FROM logs WHERE path LIKE '%_show%') a GROUP BY a.param, a.code ORDER BY a.media, a.code")
scala> media_code.collect()
res3: Array[org.apache.spark.sql.Row] = Array([,200,...], ...
うむ。index効かせてゴリゴリjoinしないただのフルスキャンならload時間含めて4台クラスタ程度でもredshiftよりだいぶ早い感じ。一日分しかメモリに載せないし。
subqueryにくくり出さないとparamをGROUP BYに使えなかった。
バッチにする
--jar s3://elasticmapreduce/libs/script-runner/script-runner.jar
をわたして、実行するバッチの内容が記載されたshell scriptを渡せば一番簡単っぽい。こちらを以下を参考にする。
わりかしダルいしscalaマンでは無いのでspark-shellにスクリプト食わしてやっていきたい。
pysparkを使ってみる
YARNの上でpysparkを動かすには環境変数"SPARK_YARN_USER_ENV"にpyspark用のコードを含んだPYTHONPATHを指定して置く必要がある。SPARK_YARN_USER_ENVは、YARNで動かしているslave系にも伝播して渡すことが出来る様子。
以下のコードで環境変数を設定可能
import os, sys, glob
SPARK_HOME = "/home/hadoop/spark/"
sys.path.insert(0, os.path.join(SPARK_HOME, 'python'))
for lib in glob.glob(os.path.join(SPARK_HOME, 'python/lib/py4j-*-src.zip')):
sys.path.insert(0, lib)
SPARK_YARN_USER_ENV = "PYTHONPATH=" + ":".join(sys.path)
if os.getenv('SPARK_YARN_USER_ENV', None):
os.environ['SPARK_YARN_USER_ENV'] = os.environ['SPARK_YARN_USER_ENV'] + ",PYTHONPATH=" + ":".join(sys.path)
else:
os.environ['SPARK_YARN_USER_ENV'] = "PYTHONPATH=" + ":".join(sys.path)
sshでクラスタにログインしたら、以下を実行してpysparkのシェルを起動。ipythonで使えないと不便なのでpipでipython入れる。
sudo pip install ipython[all]
MASTER=yarn-client IPYTHON=1 spark/bin/pyspark --master yarn-client --num-executors 10 --driver-memory 2g --executor-memory 2g --executor-cores 2
並列度の指定はspark-shellの起動と同じように指定すれば良い。
ipython notebookを使うにはまずprofileを設定する
ipython profile create spark
プロファイルには以下のように設定
c = get_config()
c.IPKernelApp.pylab = 'inline'
c.NotebookApp.ip = '*'
c.NotebookApp.open_browser = False
c.NotebookApp.port = 8888
パスワードもきちんと設定したほうが良さそう。その上で、SecurityGroupのElasticMapReduce-masterの8888番を適切に開けると外部から接続できるようになる。
ipython notebookの起動は以下のコマンドで可能。作成したクラスタのmaster serverのglobal ipを持ってきて、8888番を開けばそのままpysparkが利用できる。
MASTER=yarn-client IPYTHON_OPTS='notebook --profile=spark' spark/bin/pyspark --master yarn-client --num-executors 10 --driver-memory 2g --executor-memory 2g --executor-cores 2
GraphXのPython APIが無いとはいえ、mllibも全部使えるようになっているし、1.2からmllib.featureが使えるようになって更に嬉しい。
このあたりの起動設定やらipythonのインストールやらはbootstrap actionsにまとめておくと良い。