HadoopでAmazon S3を利用する

Last updated at Posted at 2014-09-23


HadoopでAmazon S3を利用する方法を記述します。


  • CentOS 6.5
  • CDH5


ホスト名 IPアドレス ResourceManager Namenode NodeManager Datanode JobHistoryServer
hadoop-master - -
hadoop-master2 - - -
hadoop-slave - - -
hadoop-slave2 - - -
hadoop-slave3 - - -
hadoop-client - - - - -



  • s3を利用するための設定を追加します。
  <!-- 追加 ここから -->
  <!-- 追加 ここまで -->


  • s3の操作をs3cmdで行うため、s3cmdをインストールします。
$ sudo sh -c "curl -kL https://raw.github.com/pypa/pip/master/contrib/get-pip.py | python"
$ sudo pip install python-dateutil
$ git clone https://github.com/s3tools/s3cmd.git
$ cd s3cmd
$ sudo python setup.py intall
$ s3cmd --configure
  • 動作確認用にhadoop-s3というバケットを作成します。
$ s3cmd mb s3://hadoop-s3
$ mkdir input output
$ touch input/readme
$ touch output/readme
$ s3cmd put -r input s3://hadoop-s3/
$ s3cmd put -r output s3://hadoop-s3/
$ s3cmd ls s3://hadoop-s3/
                       DIR   s3://hadoop-s3/input/
                       DIR   s3://hadoop-s3/output/
  • hadoopコマンドを使用してs3へファイルをputします。
Java Ruby Python Java Ruby Python Java
$ sudo -u hdfs hadoop fs -put sample.txt s3n://hadoop-s3/input/
14/09/23 02:27:14 INFO s3native.NativeS3FileSystem: OutputStream for key 'input/sample.txt._COPYING_' writing to tempfile '/tmp/hadoop-hdfs/s3/output-6707563359729154691.tmp'
14/09/23 02:27:14 INFO s3native.NativeS3FileSystem: OutputStream for key 'input/sample.txt._COPYING_' closed. Now beginning upload
  • アップロードしたファイルが参照できるか確認します。
$ sudo -u hdfs hadoop fs -ls s3n://hadoop-s3/input/
Found 2 items
-rw-rw-rw-   1          0 2014-09-22 14:54 s3n://hadoop-s3/input/readme
-rw-rw-rw-   1         39 2014-09-23 02:27 s3n://hadoop-s3/input/sample.txt
$ sudo -u hdfs hadoop fs -cat s3n://hadoop-s3/input/sample.txt
14/09/23 02:32:17 INFO s3native.NativeS3FileSystem: Opening 's3n://hadoop-s3/input/sample.txt' for reading
Java Ruby Python Java Ruby Python Java
  • s3cmdでも確認しておきます。
$ s3cmd ls s3://hadoop-s3/input/
2014-09-22 14:54         0   s3://hadoop-s3/input/readme
2014-09-23 02:27        39   s3://hadoop-s3/input/sample.txt
  • hadoop-streamingで、s3を使用してみます。


詳細は、Hadoop StreamingでMapReduceをシェルで実装するをご参照ください。

$ sudo -u hdfs hadoop                                   \
  jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar    \
  -files   /tmp/sample_mapper.sh,/tmp/sample_reducer.sh \
  -input   s3n://hadoop-s3/input/sample.txt             \
  -output  s3n://hadoop-s3/output/sample                \
  -mapper  sample_mapper.sh                             \
  -reducer sample_reducer.sh
14/09/23 02:38:17 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
14/09/23 02:38:17 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/09/23 02:38:17 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
14/09/23 02:38:18 INFO mapred.FileInputFormat: Total input paths to process : 1
14/09/23 02:38:18 INFO mapreduce.JobSubmitter: number of splits:1
14/09/23 02:38:18 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1656003931_0001
14/09/23 02:38:18 WARN conf.Configuration: file:/tmp/hadoop-hdfs/mapred/staging/hdfs1656003931/.staging/job_local1656003                                           931_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.14/09/23 02:38:18 WARN conf.Configuration: file:/tmp/hadoop-hdfs/mapred/staging/hdfs1656003931/.staging/job_local1656003931_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
14/09/23 02:38:19 INFO mapred.LocalDistributedCacheManager: Localized file:/tmp/sample_mapper.sh as file:/tmp/hadoop-hdfs/mapred/local/1411439898863/sample_mapper.sh
14/09/23 02:38:19 INFO mapred.LocalDistributedCacheManager: Localized file:/tmp/sample_reducer.sh as file:/tmp/hadoop-hdfs/mapred/local/1411439898864/sample_reducer.sh
14/09/23 02:38:19 WARN conf.Configuration: file:/tmp/hadoop-hdfs/mapred/local/localRunner/hdfs/job_local1656003931_0001/                                           job_local1656003931_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;Ignoring.
14/09/23 02:38:19 WARN conf.Configuration: file:/tmp/hadoop-hdfs/mapred/local/localRunner/hdfs/job_local1656003931_0001/                                           job_local1656003931_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
14/09/23 02:38:19 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
14/09/23 02:38:19 INFO mapreduce.Job: Running job: job_local1656003931_0001
14/09/23 02:38:19 INFO mapred.LocalJobRunner: OutputCommitter set in config null
14/09/23 02:38:19 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
14/09/23 02:38:20 INFO mapred.LocalJobRunner: Waiting for map tasks
14/09/23 02:38:20 INFO mapred.LocalJobRunner: Starting task: attempt_local1656003931_0001_m_000000_0
14/09/23 02:38:20 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
14/09/23 02:38:20 INFO mapred.MapTask: Processing split: s3n://hadoop-s3/input/sample.txt:0+39
14/09/23 02:38:20 INFO s3native.NativeS3FileSystem: Opening 's3n://hadoop-s3/input/sample.txt' for reading
14/09/23 02:38:20 INFO s3native.NativeS3FileSystem: Opening key 'input/sample.txt' for reading at position '0'
14/09/23 02:38:20 INFO mapred.MapTask: numReduceTasks: 1
14/09/23 02:38:20 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
14/09/23 02:38:20 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
14/09/23 02:38:20 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
14/09/23 02:38:20 INFO mapred.MapTask: soft limit at 83886080
14/09/23 02:38:20 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
14/09/23 02:38:20 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
14/09/23 02:38:20 INFO streaming.PipeMapRed: PipeMapRed exec [/tmp/./sample_mapper.sh]
14/09/23 02:38:20 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
14/09/23 02:38:20 INFO streaming.PipeMapRed: MRErrorThread done
14/09/23 02:38:20 INFO streaming.PipeMapRed: Records R/W=1/1
14/09/23 02:38:20 INFO streaming.PipeMapRed: mapRedFinished
14/09/23 02:38:20 INFO mapred.LocalJobRunner:
14/09/23 02:38:20 INFO mapred.MapTask: Starting flush of map output
14/09/23 02:38:20 INFO mapred.MapTask: Spilling map output
14/09/23 02:38:20 INFO mapred.MapTask: bufstart = 0; bufend = 53; bufvoid = 104857600
14/09/23 02:38:20 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214372(104857488); length = 25/6553600
14/09/23 02:38:20 INFO mapred.MapTask: Finished spill 0
14/09/23 02:38:20 INFO mapred.Task: Task:attempt_local1656003931_0001_m_000000_0 is done. And is in the process of committing
14/09/23 02:38:20 INFO mapred.LocalJobRunner: Records R/W=1/1
14/09/23 02:38:20 INFO mapred.Task: Task 'attempt_local1656003931_0001_m_000000_0' done.
14/09/23 02:38:20 INFO mapred.LocalJobRunner: Finishing task: attempt_local1656003931_0001_m_000000_0
14/09/23 02:38:20 INFO mapred.LocalJobRunner: map task executor complete.
14/09/23 02:38:20 INFO mapreduce.Job: Job job_local1656003931_0001 running in uber mode : false
14/09/23 02:38:20 INFO mapreduce.Job:  map 100% reduce 0%
14/09/23 02:38:20 INFO mapred.LocalJobRunner: Waiting for reduce tasks
14/09/23 02:38:20 INFO mapred.LocalJobRunner: Starting task: attempt_local1656003931_0001_r_000000_0
14/09/23 02:38:20 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
14/09/23 02:38:20 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@3ad7313f
14/09/23 02:38:20 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=652528832, maxSingleShuffleLimit=163132208, mergeThreshold=430669056, ioSortFactor=10, memToMemMergeOutputsThreshold=10
14/09/23 02:38:20 INFO reduce.EventFetcher: attempt_local1656003931_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
14/09/23 02:38:20 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local1656003931_0001_m_000000_0 decomp: 69 len: 73 to MEMORY
14/09/23 02:38:20 INFO reduce.InMemoryMapOutput: Read 69 bytes from map-output for attempt_local1656003931_0001_m_000000_0
14/09/23 02:38:20 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 69, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->69
14/09/23 02:38:20 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
14/09/23 02:38:20 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/23 02:38:20 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
14/09/23 02:38:20 INFO mapred.Merger: Merging 1 sorted segments
14/09/23 02:38:20 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 62 bytes
14/09/23 02:38:20 INFO reduce.MergeManagerImpl: Merged 1 segments, 69 bytes to disk to satisfy reduce memory limit
14/09/23 02:38:20 INFO reduce.MergeManagerImpl: Merging 1 files, 73 bytes from disk
14/09/23 02:38:20 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
14/09/23 02:38:20 INFO mapred.Merger: Merging 1 sorted segments
14/09/23 02:38:20 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 62 bytes
14/09/23 02:38:20 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/23 02:38:20 INFO streaming.PipeMapRed: PipeMapRed exec [/tmp/./sample_reducer.sh]
14/09/23 02:38:20 INFO s3native.NativeS3FileSystem: OutputStream for key 'output/sample/_temporary/0/_temporary/attempt_local1656003931_0001_r_000000_0/part-00000' writing to tempfile '/tmp/hadoop-hdfs/s3/output-4242374696574962917.tmp'
14/09/23 02:38:20 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
14/09/23 02:38:20 INFO streaming.PipeMapRed: Records R/W=7/1
14/09/23 02:38:20 INFO streaming.PipeMapRed: MRErrorThread done
14/09/23 02:38:20 INFO streaming.PipeMapRed: mapRedFinished
14/09/23 02:38:20 INFO s3native.NativeS3FileSystem: OutputStream for key 'output/sample/_temporary/0/_temporary/attempt_local1656003931_0001_r_000000_0/part-00000' closed. Now beginning upload
14/09/23 02:38:20 INFO s3native.NativeS3FileSystem: OutputStream for key 'output/sample/_temporary/0/_temporary/attempt_local1656003931_0001_r_000000_0/part-00000' upload complete
14/09/23 02:38:20 INFO mapred.Task: Task:attempt_local1656003931_0001_r_000000_0 is done. And is in the process of committing
14/09/23 02:38:20 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/23 02:38:20 INFO mapred.Task: Task attempt_local1656003931_0001_r_000000_0 is allowed to commit now
14/09/23 02:38:21 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1656003931_0001_r_000000_0' to s3n://hadoop-s3/output/sample/_temporary/0/task_local1656003931_0001_r_000000
14/09/23 02:38:21 INFO mapred.LocalJobRunner: Records R/W=7/1 > reduce
14/09/23 02:38:21 INFO mapred.Task: Task 'attempt_local1656003931_0001_r_000000_0' done.
14/09/23 02:38:21 INFO mapred.LocalJobRunner: Finishing task: attempt_local1656003931_0001_r_000000_0
14/09/23 02:38:21 INFO mapred.LocalJobRunner: reduce task executor complete.
14/09/23 02:38:21 INFO mapreduce.Job:  map 100% reduce 100%
14/09/23 02:38:21 INFO s3native.NativeS3FileSystem: OutputStream for key 'output/sample/_SUCCESS' writing to tempfile '/tmp/hadoop-hdfs/s3/output-6220050609694301661.tmp'
14/09/23 02:38:21 INFO s3native.NativeS3FileSystem: OutputStream for key 'output/sample/_SUCCESS' closed. Now beginning upload
14/09/23 02:38:21 INFO s3native.NativeS3FileSystem: OutputStream for key 'output/sample/_SUCCESS' upload complete
14/09/23 02:38:22 INFO mapreduce.Job: Job job_local1656003931_0001 completed successfully
14/09/23 02:38:22 INFO mapreduce.Job: Counters: 43
        File System Counters
                FILE: Number of bytes read=220386
                FILE: Number of bytes written=667381
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=0
                HDFS: Number of bytes written=0
                HDFS: Number of read operations=0
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=0
                S3N: Number of bytes read=78
                S3N: Number of bytes written=23
                S3N: Number of read operations=0
                S3N: Number of large read operations=0
                S3N: Number of write operations=0
        Map-Reduce Framework
                Map input records=1
                Map output records=7
                Map output bytes=53
                Map output materialized bytes=73
                Input split bytes=92
                Combine input records=0
                Combine output records=0
                Reduce input groups=3
                Reduce shuffle bytes=73
                Reduce input records=7
                Reduce output records=3
                Spilled Records=14
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=0
                CPU time spent (ms)=0
                Physical memory (bytes) snapshot=0
                Virtual memory (bytes) snapshot=0
                Total committed heap usage (bytes)=634388480
        Shuffle Errors
        File Input Format Counters
                Bytes Read=39
        File Output Format Counters
                Bytes Written=23
14/09/23 02:38:22 INFO streaming.StreamJob: Output directory: s3n://hadoop-s3/output/sample
  • MapReduceの結果を確認します。
$ sudo -u hdfs hadoop fs -ls s3n://hadoop-s3/output/sample/
Found 2 items
-rw-rw-rw-   1          0 2014-09-23 02:38 s3n://hadoop-s3/output/sample/_SUCCESS
-rw-rw-rw-   1         23 2014-09-23 02:38 s3n://hadoop-s3/output/sample/part-00000
$ sudo -u hdfs hadoop fs -cat 's3n://hadoop-s3/output/sample/part-*'
Java    3
Python  2
Ruby    2



