Amazon EMRとは?
AWSの内部で Hadoop を動かせる環境を提供してくれるサービス
Hadoopとは?
大規模データの分散処理を支えるJavaソフトウェアフレームワーク
http://ja.wikipedia.org/wiki/Apache_Hadoop
そのHadoopが行う分散処理のことをMapReduceと呼ぶ。
実際はMap関数とReduce関数を、あるルールに従って書くだけで良い
後はよしなに複数台のサーバーで分散処理をしてくれる。
Amazon EMRの構成
Amazon S3を入出力とし、複数台起動させたEC2インスタンスがMapReduce処理を行う。
経過はAWSコンソール上から監視可能。
Hadoop Streaming
MapとReduceのフェーズは標準入出力を介してデータのやり取りが行われるようになっている。
Hadoop自体はJavaにより構築されているが、Java以外の(標準入出力に対応している)言語でMapとReduceのフェーズを記述する方法(Hadoop Streaming)という仕組みがある。
これにより利用者が自由にプログラムを選択して、分散処理が可能となる。
使いどころ
・大規模なデータを元にして、ある条件に該当するデータを抽出or集計したいが、処理リソース&時間が膨大になってしまう場合
・DBに直接的な負荷を掛けたくない場合
・抽出するデータの細かな加工を行う必要がある場合
・実行される処理の頻度が比較的少ない場合
MapとReduce
Map: 標準入力から得られたデータを元に、keyとvalueのセットになるようにデータをマッピングする。
Reduce: マッピングされたkey-valueを元にデータを集約し、所望していたデータを出力する。
MapReduceの特徴は、MapとReduceの各ステップで並列処理が可能なことである。それぞれのMap処理は、他のMap処理と完全独立であり、理論的に全て並列実行することができる(実際には、データソースやCPUの数により制限がかかる)。続くReduceステップでは、Mapステップでの処理結果がキー毎にまとめられて各ノードのReduce処理に送られることになるため、こちらも分散処理が可能である。
Bootstrap
EMR起動時に任意のソフトウェアを追加するためにAWSが提供している仕組み。
任意のスクリプトファイルを用意し、EMRの起動時に実行されるように設定することができる。
例: Ruby2.1系のパッケージをインストールした後、必要なgemをインストールする
#!/bin/bash
sudo yum -y install ruby21 ruby21-devel rubygems21 rubygem21-rake
sudo alternatives --set ruby /usr/bin/ruby2.1
sudo gem install bundler --no-rdoc --no-ri
上記は2014.09リリースのAMIにてruby2.1をインストールするためのスクリプトである。
https://aws.amazon.com/jp/amazon-linux-ami/2014.09-release-notes/
パッケージがAmazonから提供されているのでそれを利用し、デフォルトとして設定する。
また、その後にgemをインストールしている。
Map, Reduce処理に必要なgemはここでインストールすることができる。
具体的な使い方(ruby)
AWS SDK for rubyを用いる。
また、Map, Reduce処理に用いる実行ファイルは予め用意しておき、S3にアップロードする必要がある。
例: rubyによるMap処理スクリプト
#!/usr/bin/ruby
require 'rubygems'
require 'json'
module Mapper
def self.map_stream(input, output, error_output)
input.each_line do |line|
begin
blob = JSON.parse(line)
output.puts "#{blob["user_id"]}\t#{blob["quantity"]}"
rescue
error_output.puts "Unable to parse line: #{line}"
end
end
end
end
if __FILE__ == $0
Mapper.map_stream(ARGF, STDOUT, STDERR)
end
例: rubyによるReduce処理スクリプト
#!/usr/bin/ruby
require 'rubygems'
require 'time'
module Reducer
def self.reduce_stream(input, output, error_output)
item_count = 0
stored_key = nil
input.each_line do |line|
(key, quantity) = line.split("\t")
stored_key = key if stored_key.nil?
if stored_key != key
output.puts "#{stored_key}\t#{item_count}"
item_count = quantity.to_i
stored_key = key
else
item_count += quantity.to_i
end
end
output.puts "#{stored_key}\t#{item_count}"
end
end
if __FILE__ == $0
Reducer.reduce_stream(ARGF, STDOUT, STDERR)
end
現在、AmazonEMRにて使用可能なAMIバージョンはここを参照のこと。
AMIバージョンによってインストールされているRubyバージョンが異なり、デフォルトのHadoop1.x系を使用するAMIはruby1.8.7となっている。
このため、ruby2.0を使用する場合は、Hadoopバージョンを指定する必要が有る。
また、ruby2.1以上をインストールする場合は、Bootstrapを使用してインストールを行う必要が有る。
例: S3にmap, reduce用のスクリプトをアップロードし、EMRのジョブを起動(aws-sdk-v2を使用)、Hadoopバージョン、Bootstrap指定あり。
require 'aws-sdk'
# 最新のMapReduceをs3にアップロードする
mapper = File.open(MAPPER_FILE_NAME)
reducer = File.open(REDUCER_FILE_NAME)
s3 = Aws::S3::Client.new({
access_key_id: AWS_ACCESS_KEY,
secret_access_key: AWS_SECRET,
region: AWS_REGION,
})
s3.put_object(bucket: S3BUCKET, body: mapper, key: MAPPER_S3_OBJECT_KEY)
s3.put_object(bucket: S3BUCKET, body: reducer, key: REDUCER_S3_OBJECT_KEY)
# emrのジョブを開始する
emr = Aws::EMR::Client.new({
access_key_id: AWS_ACCESS_KEY,
secret_access_key: AWS_SECRET,
region: AWS_REGION,
})
emr_result = emr.run_job_flow(
name: "#{BATCHNAME}",
log_uri: "s3://#{S3BUCKET}/batch_logs",
ami_version: 'latest',
instances: {
instance_count: INSTANCE_COUNT,
master_instance_type: INSTANCE_TYPE,
slave_instance_type: INSTANCE_TYPE,
termination_protected: false,
hadoop_version: "2.4.0",
ec2_key_name: EC2_KEY_NAME,
},
bootstrap_actions:
[{
name: "#{BATCHNAME}",
script_bootstrap_action: {
path: "s3://#{S3BUCKET}/#{BOOTSTRAP_S3_OBJECT_KEY}",
}
}],
steps:
[{
name: "#{BATCHNAME}",
action_on_failure: 'CONTINUE',
hadoop_jar_step: {
jar: '/home/hadoop/contrib/streaming/hadoop-streaming.jar',
args: [
"-files", "s3n://#{S3BUCKET}/#{MAPPER_S3_OBJECT_KEY},s3n://#{S3BUCKET}/#{REDUCER_S3_OBJECT_KEY}",
"-mapper", MAPPER_FILE_NAME,
"-reducer", REDUCER_FILE_NAME,
"-input","s3n://#{S3BUCKET}/#{BATCHNAME}/input",
"-output","s3n://#{S3BUCKET}/#{BATCHNAME}/output_#{Time.now.strftime('%Y%m%d%H%M')}",
]}}],
)
料金の節約Tips
通常の設定ではEMRの処理が終了すると、自動的に起動された各ノードのEC2インスタンスはTerminateされてしまう。
動作テストの段階ではテストデータを入力して一本処理フローを通すことが多くなると思うが、途中でエラーが発生したり、データが小規模であったりで処理時間が非常に短くなってしまってTerminateされてしまうと、EC2インスタンス的にはきっちり1時間分の料金を取られてしまう。
これを回避するには、EMRのジョブがなくなってしまった場合でもTerminateされないようにインスタンスの設定を行う必要がある。
emr_result = emr.run_job_flow(
# 中略
instances: {
# 中略
keep_job_flow_alive_when_no_steps: KEEP_ALIVE,
この場合、再度処理を実行したい場合は、
Aws::EMR::Client#add_job_flow_steps
を用いる。
また、EMRを終了し、Terminateしたい場合は、
Aws::EMR::Client#terminate_job_flows
を用いる。
参考リンク
http://gihyo.jp/dev/serial/01/ruby/0028
http://gihyo.jp/dev/serial/01/ruby/0029
http://mgi.hatenablog.com/entry/2014/05/04/085148
http://www.localytics.jp/guide/tips/mapreduce.html
http://d.hatena.ne.jp/lettas0726/20111013/1318497203