これは、Google Cloud PlatformのAdvent Calendar の20日目のエントリーです。
Cloud Dataflowとは?
概念はこちらを参照してください。
要は、分散処理フレームワーク+実行環境(onGCP)みたいなものです。
イマイチ事例も無いし情報も少ないのですが、個人的には可能性を感じていてワクワクしてます。
Dataproc (Hadoop)より優れている(と思う)ところ
分散処理フレームワークといえば、Hadoopが代表的です。
GCPでもHadoopサービスであるDataprocを出しているので、簡単に違いを考えてみました。
そもそもDataprocとは?
Hadoopクラスタを自動構築するサービス。AWSで言うところのEMR。
Click to Deployで作ることもできますが、これを使えばクラスタとして管理ができるので、ノード数の指定や増減、プリエンプティブVMを含むマシンタイプの設定なども細かくできます。
また、Hadoopであるため既存のHadoop資産(アプリとかエコシス連携とか)がそのまま使えます。Hadoopに馴染みがあれば、比較的導入コストは低いと思います。
何故Dataflowを使うのか
- ワークフローベースでコードが書けるので、MapReduceを書きまくるよりもシンプル
- バッチとストリームを同じように管理できる
- HadoopとStormでLambdaアーキテクチャ!みたいなことをしなくとも、Dataflowのみを使って完結させられる
- ジョブとクラスタが1対1対応
- 1ジョブ1Dataflowクラスタのような対応になるので、ジョブが終わればクラスタも終了する。クラスタを上げたり下げたりしなくてもOK
- オートスケール出来る
Dataflowのオートスケール
ストリームモードには対応していませんが(2015.12.20現在)、バッチモードではオートスケールが効きます。
いい感じに分散して、いい感じに処理をしてくれるDataflowは、いい感じにスケールまでしてくれるらしいです。
ということで試してみました。
オートスケール条件
スケール上限
setMaxNumWorkers
で最大スケール台数を指定します。
minimumは指定できないらしい。そもそもスケールインするのかは不明。
スケールトリガー
現状、スループットベースのみ対応しているようです。
スループットベースの詳細は分からず。。
どこのスループットがどうなれば、どうスケールするのかは分からないけど、きっといい感じにやってくれるのでしょう。
やってみた
負荷をかけてスケールアウトさせてみる
方針
-
ワードカウントのサンプル
を少しイジって、ワードカウント - スケールアウトせずに処理されると困るので、最小スペックマシンで1台のみからスタート
- スケールアウトしてくれることを祈りながら待機
実行コード
package com.google.cloud.dataflow.examples;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions;
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.Count;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
import java.util.Date;
import java.text.SimpleDateFormat;
public class Advent {
static class ExtractWordsFn extends DoFn<String, String> {
private static final long serialVersionUID = 0;
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
@Override
public void processElement(ProcessContext c) {
int i;
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
}
String[] words = c.element().split("[^a-zA-Z']+");
for (String word : words) {
if (!word.isEmpty()) {
for (i = 0; i < 5000; i++) {//負荷をかけるために水増し
c.output(word);
}
c.output(word);
}
}
}
}
public static class FormatAsTextFn extends DoFn<KV<String, Long>, String> {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
c.output(c.element().getKey() + ": " + c.element().getValue());
}
}
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
private static final long serialVersionUID = 0;
@Override
public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());
return wordCounts;
}
}
public static void main(String[] args) {
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("HHmmss");
System.out.println(sdf.format(date));
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setProject("xxxxxxxxxxxx");
options.setStagingLocation("gs://xxxxxxxxxxxxxxxx");
options.setJobName("adventTest" + sdf.format(date));
options.setWorkerMachineType("f1-micro");//最弱マシンタイプを指定
options.setNumWorkers(1);//最初は1台でスタート
options.setMaxNumWorkers(10);//最大10台までスケール
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setAutoscalingAlgorithm(DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("ReadLines").from("gs://xxxxxxx/src/*.txt"))
.apply(new CountWords())
.apply(ParDo.of(new FormatAsTextFn()))
.apply(TextIO.Write.named("WriteCounts").to("gs://xxxxxxx/result/count"));
p.run();
}
}
結果
CA0848:DataflowJavaSDK-examples 01012025$ mvn compile exec:java -Dexec.mainClass=com.google.cloud.dataflow.examples.Advent -Dexec.args=""
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Google Cloud Dataflow Java Examples - All manual_build
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ google-cloud-dataflow-java-examples-all ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /Applications/home/Dev/GCP/DataflowJavaSDK-examples/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ google-cloud-dataflow-java-examples-all ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 31 source files to /Applications/home/Dev/GCP/DataflowJavaSDK-examples/target/classes
[INFO]
[INFO] >>> exec-maven-plugin:1.1:java (default-cli) > validate @ google-cloud-dataflow-java-examples-all >>>
[INFO]
[INFO] <<< exec-maven-plugin:1.1:java (default-cli) < validate @ google-cloud-dataflow-java-examples-all <<<
[INFO]
[INFO] --- exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all ---
(省略)
> gcloud alpha dataflow jobs --project=xxxxxxx cancel 2015-12-19_21_41_55-11977783783909628972
2015-12-20T05:41:56.099Z: Basic: (6d2cc001d4e2fe2): Autoscaling: Enabled for job 2015-12-19_21_41_55-11977783783909628972 between 1 and 10 worker processes.
2015-12-20T05:41:57.672Z: Detail: (638797b7bcd39109): Checking required Cloud APIs are enabled.
2015-12-20T05:41:58.065Z: Detail: (638797b7bcd397c3): Expanding GroupByKey operations into optimizable parts.
2015-12-20T05:41:58.069Z: Detail: (638797b7bcd398b9): Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
2015-12-20T05:41:58.074Z: Detail: (638797b7bcd39aa5): Annotating graph with Autotuner information.
2015-12-20T05:41:58.156Z: Detail: (638797b7bcd39e02): Fusing adjacent ParDo, Read, Write, and Flatten operations
2015-12-20T05:41:58.158Z: Detail: (638797b7bcd39ef8): Fusing consumer Advent.CountWords/ParDo(ExtractWords) into ReadLines
2015-12-20T05:41:58.161Z: Detail: (638797b7bcd39fee): Fusing consumer Advent.CountWords/Count.PerElement/Count.PerKey/GroupByKey+Advent.CountWords/Count.PerElement/Count.PerKey/Combine.GroupedValues/Partial into Advent.CountWords/Count.PerElement/Init
2015-12-20T05:41:58.164Z: Detail: (638797b7bcd390e4): Fusing consumer Advent.CountWords/Count.PerElement/Count.PerKey/Combine.GroupedValues/Extract into Advent.CountWords/Count.PerElement/Count.PerKey/Combine.GroupedValues
2015-12-20T05:41:58.166Z: Detail: (638797b7bcd391da): Fusing consumer WriteCounts into ParDo(FormatAsText)
2015-12-20T05:41:58.169Z: Detail: (638797b7bcd392d0): Fusing consumer Advent.CountWords/Count.PerElement/Count.PerKey/Combine.GroupedValues into Advent.CountWords/Count.PerElement/Count.PerKey/GroupByKey/Read
2015-12-20T05:41:58.171Z: Detail: (638797b7bcd393c6): Fusing consumer ParDo(FormatAsText) into Advent.CountWords/Count.PerElement/Count.PerKey/Combine.GroupedValues/Extract
2015-12-20T05:41:58.173Z: Detail: (638797b7bcd394bc): Fusing consumer Advent.CountWords/Count.PerElement/Count.PerKey/GroupByKey/Write into Advent.CountWords/Count.PerElement/Count.PerKey/GroupByKey/Reify
2015-12-20T05:41:58.176Z: Detail: (638797b7bcd395b2): Fusing consumer Advent.CountWords/Count.PerElement/Init into Advent.CountWords/ParDo(ExtractWords)
2015-12-20T05:41:58.178Z: Detail: (638797b7bcd396a8): Fusing consumer Advent.CountWords/Count.PerElement/Count.PerKey/GroupByKey/Reify into Advent.CountWords/Count.PerElement/Count.PerKey/GroupByKey+Advent.CountWords/Count.PerElement/Count.PerKey/Combine.GroupedValues/Partial
2015-12-20T05:41:58.206Z: Basic: (638797b7bcd39230): Worker configuration: f1-micro in us-central1-f.
2015-12-20T05:41:58.405Z: Detail: (638797b7bcd39608): Adding StepResource setup and teardown to workflow graph.
2015-12-20T05:41:58.634Z: Basic: (5ea7f798d3119825): Starting 1 workers...
2015-12-20T05:41:58.650Z: Basic: S01: (73e1c7e795370654): Executing operation Advent.CountWords/Count.PerElement/Count.PerKey/GroupByKey/Create
2015-12-20T05:41:58.689Z: Basic: S02: (df5816a1cf52454d): Executing operation ReadLines+Advent.CountWords/ParDo(ExtractWords)+Advent.CountWords/Count.PerElement/Init+Advent.CountWords/Count.PerElement/Count.PerKey/GroupByKey+Advent.CountWords/Count.PerElement/Count.PerKey/Combine.GroupedValues/Partial+Advent.CountWords/Count.PerElement/Count.PerKey/GroupByKey/Reify+Advent.CountWords/Count.PerElement/Count.PerKey/GroupByKey/Write
2015-12-20T05:43:33.106Z: Detail: (8c621adec7b2b07e): Workers have started successfully.
2015-12-20T05:44:05.682Z: Basic: (b7fb9e5ac7ffe8ac): Autoscaling: Resizing worker pool from 1 to 10.
ついにスケールアウト!ただ、一気に10台まで増えてしまった。
2015-12-20T05:56:33.323Z: Basic: S03: (278ed3ea31d29821): Executing operation Advent.CountWords/Count.PerElement/Count.PerKey/GroupByKey/Close
2015-12-20T05:56:33.366Z: Basic: S04: (d6193d68e3e362aa): Executing operation Advent.CountWords/Count.PerElement/Count.PerKey/GroupByKey/Read+Advent.CountWords/Count.PerElement/Count.PerKey/Combine.GroupedValues+Advent.CountWords/Count.PerElement/Count.PerKey/Combine.GroupedValues/Extract+ParDo(FormatAsText)+WriteCounts
2015-12-20T05:56:36.559Z: Basic: (5f41d4340ae9c953): Stopping worker pool...
2015-12-20T05:57:35.626Z: Basic: (5f41d4340ae9cc7e): Worker pool stopped.
2015-12-20T05:57:35.793Z: Detail: (d6193d68e3e36dc8): Cleaning up.
12 20, 2015 2:57:45 午後 com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner run
情報: Job finished with status DONE
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 16:23 min
[INFO] Finished at: 2015-12-20T14:57:45+09:00
[INFO] Final Memory: 38M/547M
[INFO] ------------------------------------------------------------------------
と、非常に乱暴な検証ですが、無事にスケールアウトすることだけは確認できました。
ちなみに
options.setMaxNumWorkers(100)
とかでやると、
2015-12-20T08:05:31.546Z: Basic: (235fde4114c30699): Worker configuration: f1-micro in us-central1-f.
2015-12-20T08:05:31.704Z: Detail: (ae1e4e6f50dce230): Cleaning up.
2015-12-20T08:05:31.715Z: Error: (ae1e4e6f50dce651): Workflow failed. Causes: (235fde4114c30e35): Project xxxxxxxxx has insufficient quota(s) to execute this workflow with 100 instances in region us-central1. Quota summary (available/required): 99/100 CPUs, 7640/25000 disk GB, 2048/0 SSD disk GB, 12/100 IP addresses.
Please see https://cloud.google.com/compute/docs/resource-quotas about requesting more quota.
12 20, 2015 5:05:43 午後 com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner run
情報: Job finished with status FAILED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
となり、デプロイに失敗します。
スケールすると思いきやquotaに引っかかってスケールしていない的なことは起こらず、最初から無理な設定は入らないようです。
感想
無事にオートスケール出来たわけですが、実際バッチでオートスケールってどんな状況なのかユースケースがいまいちピンと来ませんでした。
ストリームなら、データがスパイクした場合なんかにスケールさせる必要はありそうですが、バッチで自動的にスケールするってどんな状況なのでしょうか。
複雑なフローを組んでいた場合、並列度のあげられる処理とそうでない処理が混じっていた場合に、前者な処理の時だけスケールアウトして処理して、後者なときはスケールインして、これまたいい感じに遊んでるノードを減らしてくれたりするのかなーなんて思いました。
そんな自在に伸縮してくれるようなことができるのか試してみたいのと、早くストリーム処理に対応してもらいたいです。
なんにせよ、まだまだ情報も少ないし事例も少ないDataflowですが、個人的には注目しているので、早く実戦投入も目指してみようと思います。
今後の期待
- オートスケールについてもう少しドキュメントを詳しく書いていただきたい
- プリエンプティブVMを混ぜたスケールアウトとか
- ストリーム処理にも対応してほしい