EMRでembulkのmapreduce executorを動かすまでに割と苦労したのでメモっておく。
そもそもあんまHadoopエコシステムに詳しくないし、Javaにも詳しくない。
そして、余り情報源が無いので、本当に合ってるのかどうかはちょっと怪しい。
とりあえず、EMRでHadoopクラスタを作る。
使ったのは、emr-4.2.0で、Hadoopのバージョンは2.6.0。
最新じゃないのだが、この理由は後で書く。
HadoopとYARNさえあれば動くので、他のコンポーネントは要らない。
構築時にカスタムブートストラップを使って、embulk、その他必要なjar等をインストールする。
適当な例は以下。
set -e
sudo wget http://dl.embulk.org/embulk-latest.jar -O /usr/bin/embulk
sudo chmod 755 /usr/bin/embulk
sudo mkdir -p /opt/embulk-bundle
sudo chmod 777 /opt/embulk-bundle
aws s3 cp s3://config-store/emr/Gemfile /opt/embulk-bundle/Gemfile
cd /opt/embulk-bundle && embulk bundle install
cd ~
aws s3 cp s3://config-store/emr/logback-1.1.3.tar.gz ./
tar xvzf logback-1.1.3.tar.gz
aws s3 cp s3://config-store/emr/stax-1.2.0.jar ./
embulkが内部のロガーにlogback-classicを利用しているが、emr上のHadoopは利用しないので別途DLしておく。
バージョンが最新でないのは、1.1.4以降だとYARNのクラスパスに入っているslf4j-apiのバージョンと相性が悪いらしい。
Hadoop側のjarを弄るのはハマりを助長しそうだったので、embulk側で調整する。
後、staxは正直良く分からないのだが、何故かプラグインで使っているXMLパーサの実装が見つからない問題が発生したので、適当にググって突っ込んだ。
embulkやらjar等は実行主体の場所にあれば良いので、ResourceManagerノードに入っていればOKだが、大したスクリプトではないので、全ノードで実行しても問題無いと思う。
後はconfigを書いて実行する。configは以下の様な感じで動いた。
---
exec:
type: mapreduce
config_files:
- /etc/hadoop/conf/core-site.xml
- /etc/hadoop/conf/hdfs-site.xml
- /etc/hadoop/conf/mapred-site.xml
- /etc/hadoop/conf/yarn-site.xml
config:
mapreduce.task.timeout: 72000000
mapreduce.map.memory.mb: 3072
mapreduce.map.java.opts: -Xmx2458m
mapreduce.map.speculative: false
libjars:
- /home/hadoop/logback-1.1.3/logback-core-1.1.3.jar
- /home/hadoop/logback-1.1.3/logback-classic-1.1.3.jar
- /home/hadoop/stax-1.2.0.jar
exclude_jars: [log4j-over-slf4j.jar, log4j-core-*, slf4j-log4j12*]
in:
type: gcs
auth_method: json_key
json_keyfile: "google-api-key.json"
bucket: gcs_bucket_name
path_prefix: bigquery_exports/day/20160810/hoge_
parser:
type: jsonl
columns:
- name: id
type: string
- name: user_ids
type: json
decoders:
- type: gzip
# ...
config_filesで、EMRが用意したconfigファイルを読み、YARN ResourceManagerの接続先や、HDFSの接続先を得る。
その他、embulkで上書きしたい設定をconfig以下に書く。
mapreduce.task.timeout
を指定しておかないと、デフォルトが10分なので、embulkみたいな用途だと結構タイムアウトする。
後、適宜利用するメモリを調整しておく。越えるような状態になるとYARNコンテナが殺されるらしい。
特に重要なのが投機的な実行の有効・無効を設定するmapreduce.map.speculative
をfalseにして無効化しておくこと。
現時点でmapreduce executorがこの挙動に対応していないように見える。(まだ調査中)
投機的な実行とは、特定のノードがえらく遅い場合にジョブ全体が引きづられて遅くなるのを防ぐために、同一のジョブを複数実行する仕組み。
そもそもembulkのoutputが冗長な書き込みされると困る場合もあるし、embulk向きではない。
この投機的な実行で、先行している同一ジョブが成功したことを検知すると、実行中のコンテナをkillして捨てるようになっているが、embulkはそのkillをエラーとして検知していると思われる。
libjars
では、logbackのjarを指定するのが重要だった。
logbackのjarが無いとembulkがLoggerContext
のインスタンスを取得できなくて、エラーになる。
後、exclude_jars
で、log4j
周りの読み込みをフィルタリングしておかないと、依存性解決で整合性取れなくてエラーになったり、ならなかったりと大分辛い目にあった。
この辺りも、とりあえず動くようにはなったけど対処方法として正しいかというと割と怪しさがある。
また、現在EMRで普通に使える最新のHadoop-2.7.2を使うと、ApplicationMasterのログを書き込む段階で、ログの出力ディレクトリの設定がおかしくなってエラーになる問題があり、どうにも解決できそうになかったので、Hadoopのバージョンを戻したため、2.6.0を利用している。
とにかくログ周りでめっちゃ苦労した。
毎回毎回Javaに触れる度にロガーについてググっては、なるほど分からんという辛みを味わっている……。
一つ、未解決の謎があって、embulkのmapタスクが終了した後、YARNがコンテナの後始末をするのだが、何故かkilled By ApplicationMasterというログとともに143コードで終了する警告が出る。
一応、ジョブは動いてるし、embulkも成功と見做しているのだが、何となく不安感がある。
こういうものなのだろうか……。