Apache Beam with Google Cloud Dataflow(over 2.0.x系)入門~基本的なGroupByKey編~
Apache Beamの5つのCore Transformの内の1つ、GroupByKeyの基本的な使い方について記す。
CoGroupByKeyなどについては別の機会に書けたらなと思う。
Apache Beam や Cloud Dataflowの基本についてはこちら
公式のBeam Programming Guideを参考に書かせていただいている。
GroupByKeyとは
並列なreduction操作。
Map/Shuffle/Reduce-styleでいうところのShuffle。
GroupByKeyは、簡単に言うとその名の通り「KeyによってCollectionをGroup化する」Core Transform。
Keyは同じだが、valueが異なるペアが複数存在するKey-ValueなCollectionを結合して新しいCollectionを生成する。
共通なKeyを持っているデータを集約するのに役に立つ。
multimapとuni-map
multimap
例えば、Java, Python, GoというKeyがあったとする。
その複数の各Keyに各々Valueが数字で割り当てられている。
変換前のこのMapをmultimapという。
Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6
uni-map
上記のKey-ValueなmultimapのCollectionに対してGroupByKeyを適用すると以下のような結果が得られる。
Java [1, 6, 8]
Python [2, 7]
Go[7, 8]
変換後このMapをuni-mapと呼ぶ。
一意のJava, Python, GoというKeyに対して、数字のCollectionのMapが割り当てられている。
Beam SDK for Java特有のKey-Valueの表し方
Beam SDK for Javaでは、通常のJavaとは異なるKey-Valueの表し方をする。
KVという型でkey-valueのオブジェクトを表す。
実際にコードを書いてみた
読み込むファイル
Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6
実際のJavaのコード
各処理は、コードにコメントとして記載している。
理解を優先するため、メソッドチェーンを極力使用していない。
そのため、冗長なコードになっている。
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
/**
* メイン
* Created by sekiguchikai on 2017/07/12.
*/
public class Main {
/**
* 関数オブジェクト
* 与えられたString str, String numを","で分割し、
* numをInteger型に変更して、KV<String, Integer>型にする
*/
static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
@ProcessElement
// ProcessContextは、inputを表すobject
// 自分で定義しなくてもBeam SDKが勝手に取ってきてくれる
public void processElement(ProcessContext c) {
// ","で分割
String[] words = c.element().split(",");
// 分割したword[0]をKに、words[1]をIntegerに変換してVにする
c.output(KV.of(words[0], Integer.parseInt(words[1])));
}
}
/**
* 関数オブジェクト
* KV<String, Iterable<Integer>型をString型に変更する
*/
static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
// inputをString型に変換する
c.output(String.valueOf(c.element()));
}
}
/**
* インプットデータのパス
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
* アウトデータのパス
*/
private static final String OUTPUT_FILE_PATH = "./result.csv";
/**
* メイン
* 理解のため、メソッドチェーンを極力使用していない
* そのため、冗長なコードになっている
*
* @param args 引数
*/
public static void main(String[] args) {
// まずPipelineに設定するOptionを作成する
// 今回は、ローカルで起動するため、DirectRunnerを指定する
// ローカルモードでは、DirectRunnerがすでにデフォルトになっているため、ランナーを設定する必要はない
PipelineOptions options = PipelineOptionsFactory.create();
// Optionを元にPipelineを生成する
Pipeline pipeline = Pipeline.create(options);
// inout dataを読み込んで、そこからPCollection(パイプライン内の一連のデータ)を作成する
PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
// 与えられたString str, String numを","で分割し、numをInteger型に変更して、KV<String, Integer>型にする
PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));
// GroupByKeyで、{Go, [2, 9, 1, 5]}のような形にする
// GroupByKey.<K, V>create())でGroupByKey<K, V>を生成している
PCollection<KV<String, Iterable<Integer>>> groupedWords = kvCounter.apply(
GroupByKey.<String, Integer>create());
// 出力のため、<KV<String, Iterable<Integer>>>型からString型に変換している
PCollection<String> output = groupedWords.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));
// 書き込む
output.apply(TextIO.write().to(OUTPUT_FILE_PATH));
// run : PipeLine optionで指定したRunnerで実行
// waitUntilFinish : PipeLineが終了するまで待って、最終的な状態を返す
pipeline.run().waitUntilFinish();
}
}
ちなみにメソッドチェーンを使うとこんな感じ。
だいぶすっきりした。
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
/**
* メイン
* Created by sekiguchikai on 2017/07/12.
*/
public class Main {
/**
* 関数オブジェクト
* 与えられたString str, String numを","で分割し、
* numをInteger型に変更して、KV<String, Integer>型にする
*/
static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
@ProcessElement
// ProcessContextは、inputを表すobject
// 自分で定義しなくてもBeam SDKが勝手に取ってきてくれる
public void processElement(ProcessContext c) {
// ","で分割
String[] words = c.element().split(",");
// 分割したword[0]をKに、words[1]をIntegerに変換してVにする
c.output(KV.of(words[0], Integer.parseInt(words[1])));
}
}
/**
* 関数オブジェクト
* KV<String, Iterable<Integer>型をString型に変更する
*/
static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
// inputをString型に変換する
c.output(String.valueOf(c.element()));
}
}
/**
* インプットデータのパス
*/
private static final String INPUT_FILE_PATH = "./sample.txt";
/**
* アウトデータのパス
*/
private static final String OUTPUT_FILE_PATH = "./result.csv";
/**
* メイン
* 理解のため、メソッドチェーンを極力使用していない
* そのため、冗長なコードになっている
*
* @param args 引数
*/
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
// メソッドチェーンを使った書き方
pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
.apply(ParDo.of(new SplitWordsAndMakeKVFn()))
.apply(GroupByKey.<String, Integer>create())
.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()))
.apply(TextIO.write().to(OUTPUT_FILE_PATH));
// run : PipeLine optionで指定したRunnerで実行
// waitUntilFinish : PipeLineが終了するまで待って、最終的な状態を返す
pipeline.run().waitUntilFinish();
}
}
実行結果
以下の3つのファイルが生成される。
result.csv-00000-of-00003
result.csv-00001-of-00003
result.csv-00002-of-00003
それぞれのファイルの中身は、以下。
分散並列処理で処理が行われているので、中身が空白のファイルや、中身が1つ、2つのものがあったりとバラバラである。
また、どの内容がどのファイルに出力されるかは毎回ランダムである。
result.csv-00000-of-00003
中身なし
result.csv-00001-of-00003
KV{Java, [1, 3, 2]}
result.csv-00002-of-00003
KV{Go, [5, 2, 9, 1]}
KV{Python, [5, 2, 6]}
関連記事
Apache Beam with Cloud Dataflow(over 2.0.0系)入門~基本部分~ParDoまで~ - Qiita
IntelliJとGradleで始めるApache Beam 2.0.x with Google Cloud Dataflow - Qiita
参考にさせていただいたサイト
GroupByKey と結合 | Cloud Dataflow のドキュメント | Google Cloud Platform
※ ブログでも同一の投稿を行っている