はじめに
HadoopでAmazon S3を利用する方法を記述します。
環境
- CentOS 6.5
- CDH5
構成
ホスト名 | IPアドレス | ResourceManager | Namenode | NodeManager | Datanode | JobHistoryServer |
---|---|---|---|---|---|---|
hadoop-master | 192.168.122.101 | ○ | ○ | - | - | ○ |
hadoop-master2 | 192.168.122.102 | ○ | ○ | - | - | - |
hadoop-slave | 192.168.122.111 | - | - | ○ | ○ | - |
hadoop-slave2 | 192.168.122.112 | - | - | ○ | ○ | - |
hadoop-slave3 | 192.168.122.113 | - | - | ○ | ○ | - |
hadoop-client | 192.168.122.201 | - | - | - | - | - |
クラスタの構築方法は、CDH5でhadoopのクラスタを構築するをご参照ください。
hadoopのクラスタの設定
- s3を利用するための設定を追加します。
/etc/hadoop/conf/core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop-master:8020</value>
</property>
<!-- 追加 ここから -->
<property>
<name>fs.s3n.awsAccessKeyId</name>
<value>${s3のアクセスキー}</value>
</property>
<property>
<name>fs.s3n.awsSecretAccessKey</name>
<value>${s3のシークレットキー}</value>
</property>
<!-- 追加 ここまで -->
</configuration>
動作確認
- s3の操作をs3cmdで行うため、s3cmdをインストールします。
$ sudo sh -c "curl -kL https://raw.github.com/pypa/pip/master/contrib/get-pip.py | python"
$ sudo pip install python-dateutil
$ git clone https://github.com/s3tools/s3cmd.git
$ cd s3cmd
$ sudo python setup.py intall
$ s3cmd --configure
- 動作確認用にhadoop-s3というバケットを作成します。
$ s3cmd mb s3://hadoop-s3
$ mkdir input output
$ touch input/readme
$ touch output/readme
$ s3cmd put -r input s3://hadoop-s3/
$ s3cmd put -r output s3://hadoop-s3/
$ s3cmd ls s3://hadoop-s3/
DIR s3://hadoop-s3/input/
DIR s3://hadoop-s3/output/
- hadoopコマンドを使用してs3へファイルをputします。
input/sample.txt
Java Ruby Python Java Ruby Python Java
$ sudo -u hdfs hadoop fs -put sample.txt s3n://hadoop-s3/input/
14/09/23 02:27:14 INFO s3native.NativeS3FileSystem: OutputStream for key 'input/sample.txt._COPYING_' writing to tempfile '/tmp/hadoop-hdfs/s3/output-6707563359729154691.tmp'
14/09/23 02:27:14 INFO s3native.NativeS3FileSystem: OutputStream for key 'input/sample.txt._COPYING_' closed. Now beginning upload
- アップロードしたファイルが参照できるか確認します。
$ sudo -u hdfs hadoop fs -ls s3n://hadoop-s3/input/
Found 2 items
-rw-rw-rw- 1 0 2014-09-22 14:54 s3n://hadoop-s3/input/readme
-rw-rw-rw- 1 39 2014-09-23 02:27 s3n://hadoop-s3/input/sample.txt
$ sudo -u hdfs hadoop fs -cat s3n://hadoop-s3/input/sample.txt
14/09/23 02:32:17 INFO s3native.NativeS3FileSystem: Opening 's3n://hadoop-s3/input/sample.txt' for reading
Java Ruby Python Java Ruby Python Java
- s3cmdでも確認しておきます。
$ s3cmd ls s3://hadoop-s3/input/
2014-09-22 14:54 0 s3://hadoop-s3/input/readme
2014-09-23 02:27 39 s3://hadoop-s3/input/sample.txt
- hadoop-streamingで、s3を使用してみます。
mapper及びreducerは、ファイル内に含まれる単語の数をカウントする処理を実装しています。
詳細は、Hadoop StreamingでMapReduceをシェルで実装するをご参照ください。
$ sudo -u hdfs hadoop \
jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-files /tmp/sample_mapper.sh,/tmp/sample_reducer.sh \
-input s3n://hadoop-s3/input/sample.txt \
-output s3n://hadoop-s3/output/sample \
-mapper sample_mapper.sh \
-reducer sample_reducer.sh
14/09/23 02:38:17 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
14/09/23 02:38:17 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/09/23 02:38:17 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
14/09/23 02:38:18 INFO mapred.FileInputFormat: Total input paths to process : 1
14/09/23 02:38:18 INFO mapreduce.JobSubmitter: number of splits:1
14/09/23 02:38:18 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1656003931_0001
14/09/23 02:38:18 WARN conf.Configuration: file:/tmp/hadoop-hdfs/mapred/staging/hdfs1656003931/.staging/job_local1656003 931_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.14/09/23 02:38:18 WARN conf.Configuration: file:/tmp/hadoop-hdfs/mapred/staging/hdfs1656003931/.staging/job_local1656003931_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.
14/09/23 02:38:19 INFO mapred.LocalDistributedCacheManager: Localized file:/tmp/sample_mapper.sh as file:/tmp/hadoop-hdfs/mapred/local/1411439898863/sample_mapper.sh
14/09/23 02:38:19 INFO mapred.LocalDistributedCacheManager: Localized file:/tmp/sample_reducer.sh as file:/tmp/hadoop-hdfs/mapred/local/1411439898864/sample_reducer.sh
14/09/23 02:38:19 WARN conf.Configuration: file:/tmp/hadoop-hdfs/mapred/local/localRunner/hdfs/job_local1656003931_0001/ job_local1656003931_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;Ignoring.
14/09/23 02:38:19 WARN conf.Configuration: file:/tmp/hadoop-hdfs/mapred/local/localRunner/hdfs/job_local1656003931_0001/ job_local1656003931_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.
14/09/23 02:38:19 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
14/09/23 02:38:19 INFO mapreduce.Job: Running job: job_local1656003931_0001
14/09/23 02:38:19 INFO mapred.LocalJobRunner: OutputCommitter set in config null
14/09/23 02:38:19 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
14/09/23 02:38:20 INFO mapred.LocalJobRunner: Waiting for map tasks
14/09/23 02:38:20 INFO mapred.LocalJobRunner: Starting task: attempt_local1656003931_0001_m_000000_0
14/09/23 02:38:20 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]
14/09/23 02:38:20 INFO mapred.MapTask: Processing split: s3n://hadoop-s3/input/sample.txt:0+39
14/09/23 02:38:20 INFO s3native.NativeS3FileSystem: Opening 's3n://hadoop-s3/input/sample.txt' for reading
14/09/23 02:38:20 INFO s3native.NativeS3FileSystem: Opening key 'input/sample.txt' for reading at position '0'
14/09/23 02:38:20 INFO mapred.MapTask: numReduceTasks: 1
14/09/23 02:38:20 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
14/09/23 02:38:20 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
14/09/23 02:38:20 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
14/09/23 02:38:20 INFO mapred.MapTask: soft limit at 83886080
14/09/23 02:38:20 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
14/09/23 02:38:20 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
14/09/23 02:38:20 INFO streaming.PipeMapRed: PipeMapRed exec [/tmp/./sample_mapper.sh]
14/09/23 02:38:20 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
14/09/23 02:38:20 INFO streaming.PipeMapRed: MRErrorThread done
14/09/23 02:38:20 INFO streaming.PipeMapRed: Records R/W=1/1
14/09/23 02:38:20 INFO streaming.PipeMapRed: mapRedFinished
14/09/23 02:38:20 INFO mapred.LocalJobRunner:
14/09/23 02:38:20 INFO mapred.MapTask: Starting flush of map output
14/09/23 02:38:20 INFO mapred.MapTask: Spilling map output
14/09/23 02:38:20 INFO mapred.MapTask: bufstart = 0; bufend = 53; bufvoid = 104857600
14/09/23 02:38:20 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214372(104857488); length = 25/6553600
14/09/23 02:38:20 INFO mapred.MapTask: Finished spill 0
14/09/23 02:38:20 INFO mapred.Task: Task:attempt_local1656003931_0001_m_000000_0 is done. And is in the process of committing
14/09/23 02:38:20 INFO mapred.LocalJobRunner: Records R/W=1/1
14/09/23 02:38:20 INFO mapred.Task: Task 'attempt_local1656003931_0001_m_000000_0' done.
14/09/23 02:38:20 INFO mapred.LocalJobRunner: Finishing task: attempt_local1656003931_0001_m_000000_0
14/09/23 02:38:20 INFO mapred.LocalJobRunner: map task executor complete.
14/09/23 02:38:20 INFO mapreduce.Job: Job job_local1656003931_0001 running in uber mode : false
14/09/23 02:38:20 INFO mapreduce.Job: map 100% reduce 0%
14/09/23 02:38:20 INFO mapred.LocalJobRunner: Waiting for reduce tasks
14/09/23 02:38:20 INFO mapred.LocalJobRunner: Starting task: attempt_local1656003931_0001_r_000000_0
14/09/23 02:38:20 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]
14/09/23 02:38:20 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@3ad7313f
14/09/23 02:38:20 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=652528832, maxSingleShuffleLimit=163132208, mergeThreshold=430669056, ioSortFactor=10, memToMemMergeOutputsThreshold=10
14/09/23 02:38:20 INFO reduce.EventFetcher: attempt_local1656003931_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
14/09/23 02:38:20 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local1656003931_0001_m_000000_0 decomp: 69 len: 73 to MEMORY
14/09/23 02:38:20 INFO reduce.InMemoryMapOutput: Read 69 bytes from map-output for attempt_local1656003931_0001_m_000000_0
14/09/23 02:38:20 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 69, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->69
14/09/23 02:38:20 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
14/09/23 02:38:20 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/23 02:38:20 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
14/09/23 02:38:20 INFO mapred.Merger: Merging 1 sorted segments
14/09/23 02:38:20 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 62 bytes
14/09/23 02:38:20 INFO reduce.MergeManagerImpl: Merged 1 segments, 69 bytes to disk to satisfy reduce memory limit
14/09/23 02:38:20 INFO reduce.MergeManagerImpl: Merging 1 files, 73 bytes from disk
14/09/23 02:38:20 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
14/09/23 02:38:20 INFO mapred.Merger: Merging 1 sorted segments
14/09/23 02:38:20 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 62 bytes
14/09/23 02:38:20 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/23 02:38:20 INFO streaming.PipeMapRed: PipeMapRed exec [/tmp/./sample_reducer.sh]
14/09/23 02:38:20 INFO s3native.NativeS3FileSystem: OutputStream for key 'output/sample/_temporary/0/_temporary/attempt_local1656003931_0001_r_000000_0/part-00000' writing to tempfile '/tmp/hadoop-hdfs/s3/output-4242374696574962917.tmp'
14/09/23 02:38:20 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
14/09/23 02:38:20 INFO streaming.PipeMapRed: Records R/W=7/1
14/09/23 02:38:20 INFO streaming.PipeMapRed: MRErrorThread done
14/09/23 02:38:20 INFO streaming.PipeMapRed: mapRedFinished
14/09/23 02:38:20 INFO s3native.NativeS3FileSystem: OutputStream for key 'output/sample/_temporary/0/_temporary/attempt_local1656003931_0001_r_000000_0/part-00000' closed. Now beginning upload
14/09/23 02:38:20 INFO s3native.NativeS3FileSystem: OutputStream for key 'output/sample/_temporary/0/_temporary/attempt_local1656003931_0001_r_000000_0/part-00000' upload complete
14/09/23 02:38:20 INFO mapred.Task: Task:attempt_local1656003931_0001_r_000000_0 is done. And is in the process of committing
14/09/23 02:38:20 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/23 02:38:20 INFO mapred.Task: Task attempt_local1656003931_0001_r_000000_0 is allowed to commit now
14/09/23 02:38:21 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1656003931_0001_r_000000_0' to s3n://hadoop-s3/output/sample/_temporary/0/task_local1656003931_0001_r_000000
14/09/23 02:38:21 INFO mapred.LocalJobRunner: Records R/W=7/1 > reduce
14/09/23 02:38:21 INFO mapred.Task: Task 'attempt_local1656003931_0001_r_000000_0' done.
14/09/23 02:38:21 INFO mapred.LocalJobRunner: Finishing task: attempt_local1656003931_0001_r_000000_0
14/09/23 02:38:21 INFO mapred.LocalJobRunner: reduce task executor complete.
14/09/23 02:38:21 INFO mapreduce.Job: map 100% reduce 100%
14/09/23 02:38:21 INFO s3native.NativeS3FileSystem: OutputStream for key 'output/sample/_SUCCESS' writing to tempfile '/tmp/hadoop-hdfs/s3/output-6220050609694301661.tmp'
14/09/23 02:38:21 INFO s3native.NativeS3FileSystem: OutputStream for key 'output/sample/_SUCCESS' closed. Now beginning upload
14/09/23 02:38:21 INFO s3native.NativeS3FileSystem: OutputStream for key 'output/sample/_SUCCESS' upload complete
14/09/23 02:38:22 INFO mapreduce.Job: Job job_local1656003931_0001 completed successfully
14/09/23 02:38:22 INFO mapreduce.Job: Counters: 43
File System Counters
FILE: Number of bytes read=220386
FILE: Number of bytes written=667381
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=0
HDFS: Number of bytes written=0
HDFS: Number of read operations=0
HDFS: Number of large read operations=0
HDFS: Number of write operations=0
S3N: Number of bytes read=78
S3N: Number of bytes written=23
S3N: Number of read operations=0
S3N: Number of large read operations=0
S3N: Number of write operations=0
Map-Reduce Framework
Map input records=1
Map output records=7
Map output bytes=53
Map output materialized bytes=73
Input split bytes=92
Combine input records=0
Combine output records=0
Reduce input groups=3
Reduce shuffle bytes=73
Reduce input records=7
Reduce output records=3
Spilled Records=14
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=0
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=634388480
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=39
File Output Format Counters
Bytes Written=23
14/09/23 02:38:22 INFO streaming.StreamJob: Output directory: s3n://hadoop-s3/output/sample
- MapReduceの結果を確認します。
$ sudo -u hdfs hadoop fs -ls s3n://hadoop-s3/output/sample/
Found 2 items
-rw-rw-rw- 1 0 2014-09-23 02:38 s3n://hadoop-s3/output/sample/_SUCCESS
-rw-rw-rw- 1 23 2014-09-23 02:38 s3n://hadoop-s3/output/sample/part-00000
$ sudo -u hdfs hadoop fs -cat 's3n://hadoop-s3/output/sample/part-*'
Java 3
Python 2
Ruby 2