32
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Docker 上で入門する Apache Hadoop

Last updated at Posted at 2020-04-02

はじめに

現在は Amazon EMR や Cloud Dataflow 等のクラウドサービスや Apache Beam のような、より入門しやすいフレームワークの登場によって、大量のデータを処理するための並列分散処理プログラムをより簡単に実行できる環境が整っています。しかし、この辺りの技術を利用するにあたっては MapReduce をはじめとした従来からの技術も前提知識として必要になってくるため、Apache Hadoop についてまとめてみました。今回は、環境をできるだけ楽に構築できるように Docker 上に疑似分散モード(後述)で Hadoop クラスタを構築します。

1600px-Hadoop_logo_new.svg.png

今回利用するサンプルコード:https://github.com/esakik/hadoop-playground

Hadoop 概要

Hadoop は、大容量のデータを処理するための分散処理のフレームワークです。通常は Linux 上で動作させます。スケールアウトに優れているため、処理するデータ量が増えたとしても、サーバーの台数を追加することで性能を向上させることができます。

Hadoop は、主に次の2つのシステムで構成されます。

  • **HDFS (Hadoop Distributed File System):**分散ファイルシステム。大容量のデータを複数のサーバーに分割して格納します。ユーザーはこれらの複数のサーバーをひとつの大きなファイルシステムのように扱うことができます。
  • **MapReduce:**分散処理を実現するフレームワーク。ひとつの大きな処理(ジョブ)を複数の単位(タスク)に分割し、複数のサーバーで並列に実行できるように作られています。

Hadoop は、ひとつのコンポーネントで動作するのではなく、HDFS や MapReduce フレームワークなど**複数のコンポーネントが連携することで動作します。**このような Hadoop の主要なコンポーネントを Hadoop エコシステムと呼ぶことがあります。

HDFS (Hadoop Distributed File System)

HDFS 上では、大容量のデータは細かい単位(ブロック)に分割され、複数のサーバーのファイルシステムに配置されます。たとえば、データサイズが 1 GB (1024 MB) で、ブロックサイズが 64 MB だった場合、データは 16 個のブロックに分割され、複数のサーバーに配置されます。

このように複数のサーバーにデータを分配し、並列に処理することでスループットの向上が見込めます。ストレージとサーバー間の通信はコストが高いので、各サーバーで読み込んだデータはできる限りそのサーバーで処理するように動作します。しかし、最終的には各サーバーで処理した結果をネットワーク経由で転送してひとつの結果としてまとめる必要があります。

また、1 台のサーバーが故障しても、データを失ったり、処理が失敗したりしないように、分割されたブロックは複数のサーバーに格納されます。

スクリーンショット 2020-12-29 0.11.54.png

ユーザーは、HDFS を利用するにあたって、裏側で複数のサーバーが動いていることやファイルがどうブロックに分割されたかなどを考慮する必要はありません。

MapReduce

MapReduce は、ひとつのジョブを複数のタスクに分割し、並列に処理を実行します。MapReduce の処理は、大きく Map, Shuffle, Reduce と呼ばれる 3 つの処理で構成されます。 このうち、Shuffle は、自動的に実行されるものなのでユーザーが処理を定義する必要はありません。また、場合によっては、Shuffle の処理コストを軽減するために、Combine と呼ばれる処理を挟むこともあります。

それぞれの処理の内容については次の通りです。ここでは、例として、文書内の単語の出現回数を MapReduce の処理で数えてみます。

**Map:**HDFS 上のデータを分割し、タスクに割り当てます。タスクは、割り当てられた入力データから1行ずつキーバリューのペアとなるデータを取り出し、ユーザーが定義した処理を行った後、処理結果としてキーバリューのペアを出力します。

入力ファイル
We are the world, we are the children
We are the ones who make a brighter day
ユーザー定義の処理:単語をスペース区切りで取得して1行ごとに「単語,1」の形で出力する
# 入力ファイルがテキストの場合は単純に Value しか存在しない
入力: (None, "we are the world, we are the children")

出力:

"we", 1
"are", 1
"the", 1
"world", 1
"we", 1
"are", 1
"the", 1
"children", 1

Combine:Shuffle の前にサーバー内のデータのみで集計を済ませておくことでネットワーク転送量を減らす処理を挟むこともできます。つまり、中間集計を行う役割を持ちます。

ユーザー定義の処理:1行ごとに同じ単語(Key)でまとめる
入力: ("we", [1, 1])

出力:

"we", 2

**Shuffle:**Map (Combine) 処理後のデータを、同じキーを持つデータでまとめます。このとき、複数のサーバー間でデータの転送が行われるため、転送データ量が大きい場合は処理全体のボトルネックとなる可能性があります。

**Reduce:**Shuffle 処理後、キーごとにまとめられたデータに対してユーザーが定義した処理を行います。

ユーザー定義の処理:同じ単語(Key)でまとめる
Input: ("we", [2, 1])  # 1行目に "we" は 2回、2行目に "we" は1回出現する

Output:

"we", 3  # 文書内に "we" は3回出現することがわかった!

次の図は、上記のような MapReduce 処理のアーキテクチャを概要図としてまとめたものです。

スクリーンショット 2020-12-28 10.57.43.png

こういった並列分散処理を MapReduce のようなフレームワークなしで実装しようとすると、ひとつのジョブをどのような単位に分割するか、そのタスクをどのコンピュータで実行するか、各タスクの結果をどのようにひとつにまとめるか、途中でサーバーの故障などでどのようにリカバリするかなど多くのことを考慮する必要があります。

Hadoop アーキテクチャ

Hadoop には、3 つのメインバージョンがあり、それぞれでアーキテクチャが異なります。そして、Hadoop 1 と 2 の主な違いは MapReduce アーキテクチャの変更にあります。Hadoop 1 での MapReduce アーキテクチャのことを MRv1 と呼び、Hadoop 2 では、YARN (Yet-Another-Resource-Negotiator) という技術の上で MapReduce が動作し、これを MRv2 と呼びます。

Hadoop 1 Hadoop 2
HDFS HDFS
MapReduce (MRv1) MapReduce (MRv2) / YARN

Hadoop 2 から Hadoop 3 への大規模なアーキテクチャの変更はないので、ここでは、Hadoop 1 と Hadoop 2 のアーキテクチャについて触れます。

Hadoop 1

Hadoop クラスタは、クラスタ全体を管理するマスターサーバー群と実際のデータ処理を担当するスレーブサーバー群の 2 種類のサーバー群により構成されます。HDFS と MapReduce のそれぞれにマスターサーバーとスレーブサーバーが存在し、基本的にマスターサーバーは 1 台(冗長構成にする場合は複数台にし、Zookeeper と呼ばれるソフトウェアで集中管理する)、スレーブサーバーは複数台(10 台 ~ 数 1000 台規模)で構成されます。

HDFS アーキテクチャ

  • **NameNode:**HDFS のマスターサーバーです。クラスタ全体に渡って、データがどこに配置されているかなどのメタデータを管理します。HDFS のメタデータはメモリ上で管理するため、瞬時に応答を返すことができます。メタデータには、ファイルの格納先に関する情報やファイルサイズなどの情報が含まれます。
  • **DataNode:**HDFS のスレーブサーバーです。割り振られたタスクを実行し、応答を返します。複数台の DataNode のストレージでひとつのストレージを構成するので、ユーザーは HDFS を利用するにあたって、複数のストレージがあることを意識する必要がありません。
  • **HDFS Client:**NameNode から読み出し対象の DataNode 群の情報を取得し、データを DataNode から読み出します。

MapReduce (MRv1) アーキテクチャ

  • **JobTracker(リソースの管理、ジョブスケジューラ、履歴管理):**MapReduce (MRv1) のマスターサーバーです。JobClient にジョブの進捗状況や完了を通知するといったジョブの管理や、1 つのジョブを複数のタスクに分割し、各スレーブサーバーにタスクを割り振るなどのリソースの管理を行います。また、ジョブの履歴管理も行います。JobTracker は単一障害点となりえます。
  • **TaskTracker(スロットのリソース管理):**MapReduce (MRv1) のスレーブサーバーです。割り振られたタスクを実行し、応答を返します。Map 処理や Reduce 処理を実行し、そのリソースをスロットという単位で管理します。
  • **JobClient:**ジョブの依頼や優先度変更、強制終了などを行います。

スクリーンショット 2020-12-29 0.13.37.png

通常、DataNode と TaskTracker は同じマシンに置かれ、TaskTracker はまず同じマシンの DataNode 上のデータに対してジョブを実行します。これにより、ネットワークの通信コストを抑えることができます。

Hadoop 2

Hadoop 1 と Hadoop 2 のアーキテクチャの違いは主に MapReduce にあります。そのため、ここでは HDFS のアーキテクチャについては省略します。

MapReduce / YARN (MRv2) アーキテクチャ

MapReduce (MRv1) では、タスクの数が数千〜数万規模になると、JobTracker への負荷が集中し、ボトルネックになる可能性があります。また、クラスタ内で単一の JobTracker を使用するため、**負荷を分散させようと思うと、クラスタを新たに用意する必要があります。**この方法で負荷を分散させた場合、クラスタごとにリソースを管理することになるため、リソースの利用効率が下がったり、単一障害点である JobTracker 数の増加による監視対象の増加が起きたりといった問題が生じます。

こういった問題に対処するために導入されたのが YARN です。YARN では、JobTracker、TaskTracker の機能を次のように変更します。

MapReduce (MRv1) MapReduce (MRv2) / YARN
JobTracker ResourceManager、ApplicationMaster、JobHistoryServer
TaskTracker NodeManager
  • **ResourceManager(リソースの一元管理):**JobTracker からリソース管理を切り離します。リソース管理を ResourceManager が一元管理することで、リソースの使用効率が高まります。
  • **ApplicationMaster(ジョブスケジューラ):**JobTracker からジョブ管理を切り離します。ユーザーの実装によって、独自のジョブ管理を行うことができます。MapReduce 以外にも、Apache Spark や Apache Tez など他の分散処理フレームワークにも対応しています。また、ApplicationMaster をジョブごとに立ち上げることで、タスク数が増えた場合のボトルネックを回避することができます。
  • **JobHistoryServer(履歴管理):**JobTracker の履歴管理を切り離します。
  • **NodeManager(コンテナのリソース管理):**TaskTracker が行っていたリソース管理を切り離します。コンテナ(リソースをメモリ基準に分割したもの)という単位でリソースを管理します。

スクリーンショット 2020-12-29 0.19.35.png

このように YARN では機能を分割しており、次のようなフローをたどってジョブを実行します。

  1. JobClient は ResourceManager にリクエストを行う
  2. ResouceManager は NodeManager に問い合わせて ApplicationMaster のためのコンテナのリソースを割り当てる
  3. ResouceManager は ApplicationMaster をコンテナ上で実行する
  4. ApplicationMaster が ResouceManager に問い合わせて、ResouceManager は NodeManager に問い合わせてジョブ実行のためのコンテナのリソースを割り当てる
  5. ApplicationMaster はコンテナ上のジョブのスケジューリングを行う

Docker で Hadoop の環境構築

Hadoop (MRv2) の動作環境を構築していきます。前述の通り、Hadoop は、複数のコンポーネントが連携することで動作します。そのため、各種ソフトウェアをまとめたディストリビューションが提供されています。ディストリビューションを用いることで、分散処理を実行する環境を容易に構築できます。今回は、ディストリビューションとして CDH を Docker 上でインストールします。

また、Hadoop では次の 3 つの中から動作モードを選択できます。今回は、動作確認を手軽に行える擬似分散モードを選択します。

  • **ローカルモード:**1 台のサーバー上で、HDFS は使用せずに MapReduce の動作環境を構築する
  • **擬似分散モード:**1 台のサーバー上で、HDFS を使用した MapReduce の動作環境を構築する
  • **完全分散モード:**複数台のサーバー上で、HDFS を使用した MapReduce の動作環境を構築する

次のようなディレクトリ構成になります。今回利用するコードは こちら にもあります(ほぼ同じコードですが、ディレクトリ構成を変えたりしているので、利用する際は README を参照してください :pray:)。

ディレクトリ構成
.
├── Dockerfile
├── main
    ├── WordCount.java  # Hadoop ジョブ(Java)
    ├── scripts  # Hadoop 起動スクリプトなど
    │   ├── create-input-text.sh
    │   ├── execute-wordcount-python.sh
    │   ├── execute-wordcount.sh
    │   ├── make-jar.sh
    │   └── start-hadoop.sh
    └── streaming  # Hadoop Streaming ジョブ(Python)
        └── python
            ├── map.py
            └── reduce.py

使用する Dockerfile はこちらです。Hadoop は Java のアプリケーションなので JDK をインストールします。CDH のインストールは こちら を参考にしました。

Dockerfile
FROM centos:centos7

RUN yum -y update
RUN yum -y install sudo

# インストール:JDK
RUN yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel

# 環境変数の設定
ENV JAVA_HOME /usr/lib/jvm/java-1.8.0-openjdk
ENV PATH $PATH:$JAVA_HOME/bin
# hadoop コマンドを実行できるようにするため(また、tools.jar は javac コンパイラを含む)
ENV HADOOP_CLASSPATH $JAVA_HOME/lib/tools.jar

# インストール:CDH 5 パッケージ
RUN rpm --import http://archive.cloudera.com/cdh5/redhat/6/x86_64/cdh/RPM-GPG-KEY-cloudera # GPG キーのインストール
RUN rpm -ivh http://archive.cloudera.com/cdh5/one-click-install/redhat/6/x86_64/cloudera-cdh-5-0.x86_64.rpm # yum リポジトリへ登録
RUN yum -y install hadoop-conf-pseudo # 疑似分散モードの設定と YARN や HDFS などを提供するパッケージをインストール

ADD main main
RUN chmod +x -R main

WORKDIR main

# コマンド実行後もコンテナを起動させ続ける
CMD ["tail", "-f", "/dev/null"]

では、こちらの Dockerfile から Docker イメージを作成します。

docker image build -t {名前空間/イメージ名:タグ名} .

ビルドが成功したらコンテナを起動します。Hadoop 起動後は http://localhost:50070 で Web インターフェースにアクセスできるようになるので、ポートフォワーディングしておきます。

docker container run --name {コンテナ名} -d -p 50070:50070 {名前空間/イメージ名:タグ名}

コンテナの起動が成功したら、コマンド操作を行えるようにコンテナに入ります。

docker exec -it {コンテナ名} /bin/bash

コンテナに入ることができたら、scripts/start-hadoop.sh を実行して Hadoop を起動します。

scripts/start-hadoop.sh
#!/usr/bin/env bash

# NameNode が管理するメタデータ領域をフォーマット
# 初めて HDFS を利用する際には必要なコマンド
sudo -u hdfs hdfs namenode -format

# HDFS の起動
for x in `cd /etc/init.d ; ls hadoop-hdfs-*` ; do sudo service $x start ; done

# 必要なディレクトリを作成
sudo /usr/lib/hadoop/libexec/init-hdfs.sh

# HDFS のファイル構造を把握
# hadoop fs は HDFS を操作するコマンド(ここでは HDFS 上で ls -R / を実行しています)
sudo -u hdfs hadoop fs -ls -R /

# YARN の起動
sudo service hadoop-yarn-resourcemanager start
sudo service hadoop-yarn-nodemanager start
sudo service hadoop-mapreduce-historyserver start
[root@xxxxxxxxx main]# ./scripts/start-hadoop.sh

Hadoop の起動が完了したら、http://localhost:50070 で Web インターフェースにアクセスでき、GUI からクラスタの状態や Job 実行の経過、結果を見ることができます。

スクリーンショット 2020-04-03 3.17.15.png

HDFS の設定ファイル

設定ファイルについても少し見ていきます。次に記述するファイルは特に重要な設定を含むファイルになります。今回は、疑似分散モードの設定を含む hadoop-conf-pseudo をインストールしているため、あらかじめ疑似分散モードの設定が記述されています。

core-site.xml は、共通の設定を含むファイルです。

/etc/hadoop/conf/core-site.xml
<configuration>
  ...
  <!-- HDFS (NameNode) の接続先の設定 -->
  <property>
    <name>fs.defaultFS</name>
    <!-- 疑似分散モードの場合:hdfs スキームを用いることで、ローカルファイルシステムではなく HDFS を利用するように設定できる -->
    <value>hdfs://localhost:8020</value>
    <!-- 完全分散モードの場合:NameNode と DataNode のサーバーが違うため、名前解決を行う必要がある -->
    <!-- <value>hdfs://nn.example.com:8020</value> -->
  </property>
  ...
</configuration>

また、hdfs-site.xml は、HDFS の設定を含むファイルです。

/etc/hadoop/conf/hdfs-site.xml
<!-- HDFS の設定 -->
<configuration>
  ...
  <!-- レプリカ数の設定 -->
  <property>
    <name>dfs.replication</name>
    <!-- 疑似分散モードの場合:サーバーが 1 台なのでこれ以上にはできない -->
    <value>1</value>
    <!-- 完全分散モードの場合:デフォルトは 3 -->
    <!-- <value>3</value> -->
  </property>
  ...
</configuration>

MapReduce の実装と実行

環境構築が終了したので、実際に MapReduce アプリケーションを作成してみます。MapReduce アプリケーションは Java はもちろん Pig Latin や HiveQL と呼ばれる言語でも作成することができます。

※ MapReduce アプリケーションは、Hive や Spark などのような、より手軽だったり、高速だったり、多言語の SDK が存在したりする技術で代替可能なため、MapReduce × Java や Hadoop Streaming で作成する機会はあまりないかもしれないと個人的には思いますが、ご参考までに :pray:

Java

WordCount.java は、Java での MapReduce アプリケーション の実装例です。入力のテキストファイルから単語を抜き出し、単語の数をカウントするアプリケーションです。

WordCount.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCount extends Configured implements Tool {
    /**
     * Mapper<入力キーの型, 入力バリューの型, 出力キーの型, 出力バリューの型> を継承したクラス.
     */
    public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // 初期化処理
        }

        /**
         * Map 処理を記述する.
         *
         * @param key その行が先頭から何バイト目の位置にあるかというバイトオフセット値(通常は利用しない)
         * @param value 1行分のデータ
         * @param context Context を通してジョブの設定や入出力データにアクセス可能
         */
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }

        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // クリーンナップ処理
        }
    }

    /**
     * Reducer<入力キーの型, 入力バリューの型, 出力キーの型, 出力バリューの型> を継承したクラス.
     */
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            // 初期化処理
        }

        /**
         * Reduce 処理を記述する.
         *
         * @param key Map 処理の出力(キー)
         * @param values Map 処理の出力(バリューのイテラブル)
         * @param context Context
         */
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }

        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            // クリーンナップ処理
        }
    }

    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }

        // JobTracker に対してジョブを投入する
        Job job = Job.getInstance(getConf(), "WordCount");

        // 入力データに応じて自動的に数が決まるMapタスクとは異なり、Reduceタスクの数は自分で指定する必要がある
        job.setNumReduceTasks(2);

        // jarファイルに格納されたクラスのうちの1つを指定する
        job.setJarByClass(WordCount.class);

        // Mapper、Combiner、Reducer としてどのクラスを利用するか指定する
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // テキストファイルからデータの入出力を行う
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        // 入出力用のディレクトリのパス
        TextInputFormat.addInputPath(job, new Path(args[0]));
        TextOutputFormat.setOutputPath(job, new Path(args[1]));

        // ジョブの完了を待つ
        return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new WordCount(), args);
        System.exit(res);
    }
}

Map 処理を記述するには org.apache.hadoop.mapreduce.Mapper を継承したクラスを作成し、同様に、Reduce 処理を記述するには org.apache.hadoop.mapreduce.Reducer を継承したクラスを作成します。

また、org.apache.hadoop.io.Text は String 型、org.apache.hadoop.io.IntWritable は int 型を Hadoop では意味します。

Java で実装された MapReduce アプリケーションを実行するためには、コンパイルして jar ファイルを作成する必要があります。

scripts/make-jar.sh
#!/usr/bin/env bash

# コンパイル
hadoop com.sun.tools.javac.Main WordCount.java

# jar の作成
jar cf wc.jar WordCount*.class
[root@xxxxxxxxx main]# ./scripts/make-jar.sh

次に入力のテキストファイルを用意します。MapReduce は、HDFS にデータが格納されていることを前提としているため、ローカルシステムからファイルを HDFS に配置します。

./scripts/create-input-text.sh
#!/usr/bin/env bash

# 入力のテキストファイルを作成
echo "apple lemon apple lemon lemon grape" > input.txt

# 入力のテキストファイルを HDFS に配置
sudo -u hdfs hadoop fs -mkdir -p /user/hdfs/input
sudo -u hdfs hadoop fs -put input.txt /user/hdfs/input
[root@xxxxxxxxx main]# ./scripts/create-input-text.sh

それでは、準備が整ったので実行します。

scripts/execute-wordcount.sh
#!/usr/bin/env bash

# WordCount.java の実行
# hadoop jar {jar ファイルのパス} {メインクラス名} {入力ファイルのパス} {出力先のパス}
sudo -u hdfs hadoop jar wc.jar WordCount /user/hdfs/input/input.txt /user/hdfs/output01

# 結果の表示
sudo -u hdfs hadoop fs -ls /user/hdfs/output01
sudo -u hdfs hadoop fs -cat /user/hdfs/output01/part-r-*
[root@xxxxxxxxx main]# ./scripts/execute-wordcount.sh

ジョブが成功した場合は、出力先のパス以下に _SUCCESS というファイルが生成されます。また、part-r-* という形式で出力の結果が1ファイルまたは複数ファイルに格納され、次のような結果を得られることがわかります。

part-r-00000
apple   2
grape   1
lemon   3

Python (Hadoop Streaming)

Hadoop Streaming は、Java 以外の言語で MapReduce アプリケーションを実行するためのインターフェースです。データの受け渡しに標準入出力を用いるので、Java の MapReduce アプリケーション と比較すると不便ですが、慣れ親しんだ言語でも開発することができます。今回は、Python で試してみます。

Hadoop Streaming では、入力先のパスと出力先のパスに加えて、実行する map 処理と reduce 処理が定義されたファイルのパスを指定してあげる必要があります。

scripts/execute-wordcount-python.sh
#!/usr/bin/env bash

# Hadoop Streaming の実行
sudo -u hdfs hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.6.0-mr1-cdh5.16.2.jar \
-input /user/hdfs/input/input.txt -output /user/hdfs/output02 \
-mapper /main/streaming/python/map.py -reducer /main/streaming/python/reduce.py

# 結果の表示
sudo -u hdfs hadoop fs -ls /user/hdfs/output02
sudo -u hdfs hadoop fs -cat /user/hdfs/output02/part-*

map.py では、標準入力から <単語 1> のキーバリューを生成し、標準出力に出力します。

streaming/python/map.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import re
import sys


# 1行を空文字区切りで分割し、(単語, 1) のキーバリューを生成する
def map_fn(line):
    return [(key, 1) for key in re.split(r'\s', line.strip()) if key]

# キーバリューを標準出力に出力する
def output(records):
    for key, value in records:
        print '{0}\t{1}'.format(key, value)

# 標準入力から入力を受け取る
for l in sys.stdin:
    output(map_fn(l))

reduce.py では、実際に単語の出現回数をカウントし、最終的な処理結果を標準出力に出力します。

streaming/python/reduce.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import re
import sys

results = {}


# 単語の出現回数を数える
def reduce_fn(line):
    key, value = re.split(r'\t', line.strip())
    if not key in results:
        results[key] = 0
    results[key] = results[key] + int(value)

# キーバリュー(最終的な処理結果)を標準出力に出力する
def output(records):
    for k, v in records:
        print '{0}\t{1}'.format(k, v)

# 標準入力から map 処理の出力を受け取る
for l in sys.stdin:
    reduce_fn(l)
output(sorted(results.items()))

入力ファイルが Java の時と同じ場合、同様の結果が出力先のパスのファイルから得られるかと思います。

part-00000
apple   2
grape   1
lemon   3

Hadoop エコシステム

Hadoop の主要なコンポーネントは他にも様々ありますが、すべて見ていくのは大変なのでそれぞれについて随時項目を追加して記述していきたいと思います。

コンポーネント 概要
Pig Pig Latin と呼ばれる DSL (Domain Specific Language) で処理を定義することができ、Java より少ないコードで、より簡単に MapReduce アプリケーションを作成できます。
Hive Apache Hive 概要 / HiveQL チートシート
HBase HDFS 上に構築する NoSQL の分散型データベースです。HDFS が苦手な部分を補完するためのシステムになります。

全体感を掴むには こちらの記事 が参考になります。

まとめ

今回は、Hadoop に入門した際のまとめとして記事を書きました。おおよそしか理解できていない部分や、概要だけしか知らない Hadoop の主要なコンポーネントがあるので、引き続き学習してみようと思います。また、AWS や GCP 等のクラウドで提供されているマネージドサービスも様々あるので、その使用感の違い等についても実際に動かしてみて学習したいと思います。

間違っている点があれば修正リクエストをお願いします。また、参考になるサイト等あれば是非教えてください!

参考 URL

32
30
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
32
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?