Google Cloud Platform (GCP) でCloud Dataflowを使ってストリーム処理をする方法を学ぶため、まずは『Cloud Dataflow + Cloud Pub/Sub + Fluentdを使ってDoS検知する仕組みを作ってみた』を動かすことを試しました。しかしSDKがApache Beamベースに変わったことでそのままでは動かなかったので、コード変更した部分をまとめておきます。
※ GCPだけでなくJavaも初心者のため、説明やコードに変なところがあればぜひご指摘ください。
コード差分
生のdiff出力ではなく、対応する差分が隣り合うよう少し並べ替えています。
--- ../Dos.java.orig 2017-11-15 17:22:31.583396781 +0900
+++ src/main/java/com/google/cloud/dataflow/examples/Dos.java 2017-11-15 17:17:45.067769398 +0900
@@ -1,35 +1,35 @@
package com.google.cloud.dataflow.examples;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.TextIO;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+// import org.apache.beam.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.Top;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.KV.OrderByValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.KV.OrderByValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+// import org.apache.beam.sdk.runners.BlockingDataflowRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.direct.DirectRunner;
import java.util.List;
import org.joda.time.Duration;
@@ -52,9 +52,9 @@
static class GetIPFn extends DoFn<String, String> {
private static final long serialVersionUID = 0;
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
+ // private final Aggregator<Long, Long> emptyLines =
+ // createAggregator("emptyLines", new Sum.SumLongFn());
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
Gson gson = new Gson();
ApacheLog log = gson.fromJson(c.element() , ApacheLog.class);
@@ -67,7 +67,7 @@
public static class FormatAsTextFnFromList extends DoFn<List<KV<String, Long>>, String> {
private static final long serialVersionUID = 0;
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) {
for(KV value : c.element()){
Integer access_count = new Integer(value.getValue().toString());
@@ -79,22 +79,22 @@
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setProject("xxxxxxxxxxxx");//プロジェクトを指定
options.setStagingLocation("gs://xxxxxxxxxxxx/staging");//jarファイルが置かれるパスを指定
options.setJobName("doscheck");//今回のジョブの名前を指定
- options.setRunner(DataflowPipelineRunner.class);//Runnerについてはちゃんと調べていません。。。
+ options.setRunner(DataflowRunner.class);//Runnerについてはちゃんと調べていません。。。
options.setStreaming(true);//ストリーム処理にしたいので、true
Pipeline p = Pipeline.create(options);//パイプラインと呼ばれるトポロジを作る
//トポロジに肉付けして、処理フローを作成する
- p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/xxxxxxx/topics/input_topic"))//Pubsubから取得
+ p.apply("ReadFromPubsub", PubsubIO.readStrings().fromTopic("projects/xxxxxxx/topics/input_topic"))//Pubsubから取得
.apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))))//10秒間を5秒ごとにスライドしながら処理する
.apply(ParDo.of(new GetIPFn()))//ログからIPを取得
.apply(Count.<String>perElement())//IPごとにカウントする
.apply(Top.<KV<String, Long>, KV.OrderByValue<String,Long>>of(3,new KV.OrderByValue<String,Long>()).withoutDefaults())//上位3個を抽出
.apply(ParDo.of(new FormatAsTextFnFromList()))//Pubsubにpublish出来るように、テキストの形式に変換
- .apply(PubsubIO.Write.named("WriteCounts").topic("projects/xxxxxxxxxxx/topics/count_ip"));//Pubsubにpublishする
+ .apply("WriteCounts", PubsubIO.writeStrings().to("projects/xxxxxxxxxxx/topics/count_ip"));//Pubsubにpublishする
p.run();//pipelineを実行
}
変更点は主にGCPの リリースノート: Dataflow SDK 2.x for Java を参照しました。
- importするパッケージ
-
com.google.cloud.dataflow
からorg.apache.beam
に変更 - Runner類と
PubsubIO
がサブパッケージに移動 - (
Aggregator
はわからなかったので削除した)
-
- Runner
- クラス名から
Pipeline
を削除 -
BlockingDataflowPipelineRunner
は削除され、DataflowRunner
に統合
- クラス名から
- Aggregator
- (わからなかったので削除した)
- DoFn
-
@Override
から@ProcessElement
に変更
-
- PTransform
- 名前は
named()
で付けるのではなく、PCollection.apply()
の第1引数に指定
- 名前は
- PubsubIO
-
Read
ではなくreadStrings()
1 でインスタンス化(writeも同様) -
topic()
はfromTopic()
/to()
に変更
-
付録
Mavenの操作
Mavenでプロジェクトを作成する方法もわからなかったので、クイックスタートを基にコマンドをメモしておきます。GCPコンソールのターミナルで実行しました。
プロジェクト作成
$ mvn archetype:generate \
-DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
-DarchetypeGroupId=com.google.cloud.dataflow \
-DarchetypeVersion=2.1.0 \
-DgroupId=com.google.cloud.dataflow.examples \
-DartifactId=dos \
-Dversion="0.1" \
-DinteractiveMode=false \
-Dpackage=com.google.cloud.dataflow.examples
$ cd dos/
$ rm -rf src/main/java/com/google/cloud/dataflow/examples/*
$ vim src/main/java/com/google/cloud/dataflow/examples/Dos.java
コンパイルと実行
$ mvn compile exec:java \
-Dexec.mainClass=com.google.cloud.dataflow.examples.Dos
-
ドキュメントでは
<T>read()
でインスタンス化するよう書いてありましたが、privateになっていて呼び出せませんでした。readStrings()
等が中でこれを呼び出しています。 ↩