LoginSignup
15
11

More than 5 years have passed since last update.

Apache Beam with Google Cloud Dataflow(over 2.0.x系)入門~基本的なGroupByKey編~

Last updated at Posted at 2017-07-12

Apache Beam with Google Cloud Dataflow(over 2.0.x系)入門~基本的なGroupByKey編~

Apache Beamの5つのCore Transformの内の1つ、GroupByKeyの基本的な使い方について記す。
CoGroupByKeyなどについては別の機会に書けたらなと思う。

Apache Beam や Cloud Dataflowの基本についてはこちら

公式のBeam Programming Guideを参考に書かせていただいている。

GroupByKeyとは

並列なreduction操作。
Map/Shuffle/Reduce-styleでいうところのShuffle。
GroupByKeyは、簡単に言うとその名の通り「KeyによってCollectionをGroup化する」Core Transform。
Keyは同じだが、valueが異なるペアが複数存在するKey-ValueなCollectionを結合して新しいCollectionを生成する。
共通なKeyを持っているデータを集約するのに役に立つ。

multimapとuni-map

multimap

例えば、Java, Python, GoというKeyがあったとする。
その複数の各Keyに各々Valueが数字で割り当てられている。
変換前のこのMapをmultimapという。

Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6

uni-map

上記のKey-ValueなmultimapのCollectionに対してGroupByKeyを適用すると以下のような結果が得られる。

Java [1, 6, 8]
Python [2, 7]
Go[7, 8]

変換後このMapをuni-mapと呼ぶ。
一意のJava, Python, GoというKeyに対して、数字のCollectionのMapが割り当てられている。

Beam SDK for Java特有のKey-Valueの表し方

Beam SDK for Javaでは、通常のJavaとは異なるKey-Valueの表し方をする。
KVという型でkey-valueのオブジェクトを表す。

実際にコードを書いてみた

読み込むファイル

sample.txt
Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6

実際のJavaのコード

各処理は、コードにコメントとして記載している。
理解を優先するため、メソッドチェーンを極力使用していない。
そのため、冗長なコードになっている。

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;


/**
 * メイン
 * Created by sekiguchikai on 2017/07/12.
 */
public class Main {
    /**
     * 関数オブジェクト
     * 与えられたString str, String numを","で分割し、
     * numをInteger型に変更して、KV<String, Integer>型にする
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        // ProcessContextは、inputを表すobject
        // 自分で定義しなくてもBeam SDKが勝手に取ってきてくれる
        public void processElement(ProcessContext c) {
            // ","で分割
            String[] words = c.element().split(",");
            // 分割したword[0]をKに、words[1]をIntegerに変換してVにする
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }

    /**
     * 関数オブジェクト
     * KV<String, Iterable<Integer>型をString型に変更する
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            // inputをString型に変換する
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     * インプットデータのパス
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     * アウトデータのパス
     */
    private static final String OUTPUT_FILE_PATH = "./result.csv";

    /**
     * メイン
     * 理解のため、メソッドチェーンを極力使用していない
     * そのため、冗長なコードになっている
     *
     * @param args 引数
     */
    public static void main(String[] args) {
        // まずPipelineに設定するOptionを作成する
        // 今回は、ローカルで起動するため、DirectRunnerを指定する
        // ローカルモードでは、DirectRunnerがすでにデフォルトになっているため、ランナーを設定する必要はない
        PipelineOptions options = PipelineOptionsFactory.create();

        // Optionを元にPipelineを生成する
        Pipeline pipeline = Pipeline.create(options);

        // inout dataを読み込んで、そこからPCollection(パイプライン内の一連のデータ)を作成する
        PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));

        // 与えられたString str, String numを","で分割し、numをInteger型に変更して、KV<String, Integer>型にする
        PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));

        // GroupByKeyで、{Go, [2, 9, 1, 5]}のような形にする
               // GroupByKey.<K, V>create())でGroupByKey<K, V>を生成している
        PCollection<KV<String, Iterable<Integer>>> groupedWords = kvCounter.apply(
                GroupByKey.<String, Integer>create());

        // 出力のため、<KV<String, Iterable<Integer>>>型からString型に変換している
        PCollection<String> output = groupedWords.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));

        // 書き込む
        output.apply(TextIO.write().to(OUTPUT_FILE_PATH));

        // run : PipeLine optionで指定したRunnerで実行
        // waitUntilFinish : PipeLineが終了するまで待って、最終的な状態を返す
        pipeline.run().waitUntilFinish();
    }
}

ちなみにメソッドチェーンを使うとこんな感じ。
だいぶすっきりした。


import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;


/**
 * メイン
 * Created by sekiguchikai on 2017/07/12.
 */
public class Main {
    /**
     * 関数オブジェクト
     * 与えられたString str, String numを","で分割し、
     * numをInteger型に変更して、KV<String, Integer>型にする
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        // ProcessContextは、inputを表すobject
        // 自分で定義しなくてもBeam SDKが勝手に取ってきてくれる
        public void processElement(ProcessContext c) {
            // ","で分割
            String[] words = c.element().split(",");
            // 分割したword[0]をKに、words[1]をIntegerに変換してVにする
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }

    /**
     * 関数オブジェクト
     * KV<String, Iterable<Integer>型をString型に変更する
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            // inputをString型に変換する
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     * インプットデータのパス
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     * アウトデータのパス
     */
    private static final String OUTPUT_FILE_PATH = "./result.csv";

    /**
     * メイン
     * 理解のため、メソッドチェーンを極力使用していない
     * そのため、冗長なコードになっている
     *
     * @param args 引数
     */
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());

        // メソッドチェーンを使った書き方
        pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
                .apply(ParDo.of(new SplitWordsAndMakeKVFn()))
                .apply(GroupByKey.<String, Integer>create())
                .apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()))
                .apply(TextIO.write().to(OUTPUT_FILE_PATH));

        // run : PipeLine optionで指定したRunnerで実行
        // waitUntilFinish : PipeLineが終了するまで待って、最終的な状態を返す
        pipeline.run().waitUntilFinish();
    }
}

実行結果

以下の3つのファイルが生成される。
result.csv-00000-of-00003
result.csv-00001-of-00003
result.csv-00002-of-00003

それぞれのファイルの中身は、以下。
分散並列処理で処理が行われているので、中身が空白のファイルや、中身が1つ、2つのものがあったりとバラバラである。
また、どの内容がどのファイルに出力されるかは毎回ランダムである。

result.csv-00000-of-00003
中身なし

result.csv-00001-of-00003

KV{Java, [1, 3, 2]}

result.csv-00002-of-00003

KV{Go, [5, 2, 9, 1]}
KV{Python, [5, 2, 6]}

関連記事

Apache Beam with Cloud Dataflow(over 2.0.0系)入門~基本部分~ParDoまで~ - Qiita

IntelliJとGradleで始めるApache Beam 2.0.x with Google Cloud Dataflow - Qiita

参考にさせていただいたサイト

Beam Programming Guide

GroupByKey と結合  |  Cloud Dataflow のドキュメント  |  Google Cloud Platform

※ ブログでも同一の投稿を行っている

15
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
11