34
27

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Amazon EMRについて

Posted at

Amazon EMRとは?

AWSの内部で Hadoop を動かせる環境を提供してくれるサービス

Hadoopとは?

大規模データの分散処理を支えるJavaソフトウェアフレームワーク
http://ja.wikipedia.org/wiki/Apache_Hadoop

そのHadoopが行う分散処理のことをMapReduceと呼ぶ。
実際はMap関数とReduce関数を、あるルールに従って書くだけで良い
後はよしなに複数台のサーバーで分散処理をしてくれる。

Amazon EMRの構成

20140503163126.png

Amazon S3を入出力とし、複数台起動させたEC2インスタンスがMapReduce処理を行う。
経過はAWSコンソール上から監視可能。

Hadoop Streaming

MapとReduceのフェーズは標準入出力を介してデータのやり取りが行われるようになっている。
Hadoop自体はJavaにより構築されているが、Java以外の(標準入出力に対応している)言語でMapとReduceのフェーズを記述する方法(Hadoop Streaming)という仕組みがある。
これにより利用者が自由にプログラムを選択して、分散処理が可能となる。

TH400_002.jpg

使いどころ

・大規模なデータを元にして、ある条件に該当するデータを抽出or集計したいが、処理リソース&時間が膨大になってしまう場合
・DBに直接的な負荷を掛けたくない場合
・抽出するデータの細かな加工を行う必要がある場合
・実行される処理の頻度が比較的少ない場合

MapとReduce

Map: 標準入力から得られたデータを元に、keyとvalueのセットになるようにデータをマッピングする。
Reduce: マッピングされたkey-valueを元にデータを集約し、所望していたデータを出力する。

MapReduceの特徴は、MapとReduceの各ステップで並列処理が可能なことである。それぞれのMap処理は、他のMap処理と完全独立であり、理論的に全て並列実行することができる(実際には、データソースやCPUの数により制限がかかる)。続くReduceステップでは、Mapステップでの処理結果がキー毎にまとめられて各ノードのReduce処理に送られることになるため、こちらも分散処理が可能である。

Bootstrap

EMR起動時に任意のソフトウェアを追加するためにAWSが提供している仕組み。
任意のスクリプトファイルを用意し、EMRの起動時に実行されるように設定することができる。

例: Ruby2.1系のパッケージをインストールした後、必要なgemをインストールする

#!/bin/bash

sudo yum -y install ruby21 ruby21-devel rubygems21 rubygem21-rake
sudo alternatives --set ruby /usr/bin/ruby2.1

sudo gem install bundler --no-rdoc --no-ri

上記は2014.09リリースのAMIにてruby2.1をインストールするためのスクリプトである。
https://aws.amazon.com/jp/amazon-linux-ami/2014.09-release-notes/
パッケージがAmazonから提供されているのでそれを利用し、デフォルトとして設定する。

また、その後にgemをインストールしている。
Map, Reduce処理に必要なgemはここでインストールすることができる。

具体的な使い方(ruby)

AWS SDK for rubyを用いる。
また、Map, Reduce処理に用いる実行ファイルは予め用意しておき、S3にアップロードする必要がある。

例: rubyによるMap処理スクリプト

#!/usr/bin/ruby
  
require 'rubygems'
require 'json'
  
module Mapper
  def self.map_stream(input, output, error_output)
    input.each_line do |line|
      begin
        blob = JSON.parse(line)
        output.puts "#{blob["user_id"]}\t#{blob["quantity"]}"
      rescue
        error_output.puts "Unable to parse line: #{line}"
      end
    end
  end
end
  
if __FILE__ == $0
  Mapper.map_stream(ARGF, STDOUT, STDERR)
end

 

例: rubyによるReduce処理スクリプト

#!/usr/bin/ruby
  
require 'rubygems'
require 'time'
  
module Reducer
  def self.reduce_stream(input, output, error_output)
    item_count = 0
    stored_key = nil
  
    input.each_line do |line|
      (key, quantity) = line.split("\t")
      stored_key = key if stored_key.nil?
      if stored_key != key
        output.puts "#{stored_key}\t#{item_count}"
        item_count = quantity.to_i
        stored_key = key
      else
        item_count += quantity.to_i
      end
    end
  
    output.puts "#{stored_key}\t#{item_count}"
  end
end
  
if __FILE__ == $0
  Reducer.reduce_stream(ARGF, STDOUT, STDERR)
end

現在、AmazonEMRにて使用可能なAMIバージョンはここを参照のこと。
AMIバージョンによってインストールされているRubyバージョンが異なり、デフォルトのHadoop1.x系を使用するAMIはruby1.8.7となっている。
このため、ruby2.0を使用する場合は、Hadoopバージョンを指定する必要が有る。
また、ruby2.1以上をインストールする場合は、Bootstrapを使用してインストールを行う必要が有る。

例: S3にmap, reduce用のスクリプトをアップロードし、EMRのジョブを起動(aws-sdk-v2を使用)、Hadoopバージョン、Bootstrap指定あり。


require 'aws-sdk'

# 最新のMapReduceをs3にアップロードする
mapper = File.open(MAPPER_FILE_NAME)
reducer = File.open(REDUCER_FILE_NAME)

s3 = Aws::S3::Client.new({
      access_key_id:      AWS_ACCESS_KEY,
      secret_access_key:  AWS_SECRET,
      region:             AWS_REGION,
    })
    
s3.put_object(bucket: S3BUCKET, body: mapper, key: MAPPER_S3_OBJECT_KEY)
s3.put_object(bucket: S3BUCKET, body: reducer, key: REDUCER_S3_OBJECT_KEY)    


# emrのジョブを開始する
emr = Aws::EMR::Client.new({
      access_key_id:      AWS_ACCESS_KEY,
      secret_access_key:  AWS_SECRET,
      region:             AWS_REGION,
    })

emr_result = emr.run_job_flow(
  name: "#{BATCHNAME}",
  log_uri: "s3://#{S3BUCKET}/batch_logs",
  ami_version: 'latest',
  instances: {
    instance_count: INSTANCE_COUNT,
    master_instance_type: INSTANCE_TYPE,
    slave_instance_type: INSTANCE_TYPE,
    termination_protected:  false,
    hadoop_version: "2.4.0",
    ec2_key_name: EC2_KEY_NAME,
  },
  bootstrap_actions:
    [{
      name: "#{BATCHNAME}",
      script_bootstrap_action: {
        path: "s3://#{S3BUCKET}/#{BOOTSTRAP_S3_OBJECT_KEY}",
      }
    }],
  steps:
    [{
      name: "#{BATCHNAME}",
      action_on_failure: 'CONTINUE',
      hadoop_jar_step: {
        jar: '/home/hadoop/contrib/streaming/hadoop-streaming.jar',
        args: [
              "-files", "s3n://#{S3BUCKET}/#{MAPPER_S3_OBJECT_KEY},s3n://#{S3BUCKET}/#{REDUCER_S3_OBJECT_KEY}",
              "-mapper", MAPPER_FILE_NAME,
              "-reducer", REDUCER_FILE_NAME,
              "-input","s3n://#{S3BUCKET}/#{BATCHNAME}/input",
              "-output","s3n://#{S3BUCKET}/#{BATCHNAME}/output_#{Time.now.strftime('%Y%m%d%H%M')}",
    ]}}],
)

料金の節約Tips

通常の設定ではEMRの処理が終了すると、自動的に起動された各ノードのEC2インスタンスはTerminateされてしまう。
動作テストの段階ではテストデータを入力して一本処理フローを通すことが多くなると思うが、途中でエラーが発生したり、データが小規模であったりで処理時間が非常に短くなってしまってTerminateされてしまうと、EC2インスタンス的にはきっちり1時間分の料金を取られてしまう。

これを回避するには、EMRのジョブがなくなってしまった場合でもTerminateされないようにインスタンスの設定を行う必要がある。


emr_result = emr.run_job_flow(
  # 中略
  instances: {
    # 中略
    keep_job_flow_alive_when_no_steps: KEEP_ALIVE,

この場合、再度処理を実行したい場合は、

Aws::EMR::Client#add_job_flow_steps

を用いる。

また、EMRを終了し、Terminateしたい場合は、

Aws::EMR::Client#terminate_job_flows

を用いる。

参考リンク

http://gihyo.jp/dev/serial/01/ruby/0028
http://gihyo.jp/dev/serial/01/ruby/0029
http://mgi.hatenablog.com/entry/2014/05/04/085148
http://www.localytics.jp/guide/tips/mapreduce.html
http://d.hatena.ne.jp/lettas0726/20111013/1318497203

34
27
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
34
27

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?