5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

『Cloud Dataflow + Cloud Pub/Sub + Fluentdを使って…』をSDK 2.1で実行するための変更

Posted at

Google Cloud Platform (GCP) でCloud Dataflowを使ってストリーム処理をする方法を学ぶため、まずは『Cloud Dataflow + Cloud Pub/Sub + Fluentdを使ってDoS検知する仕組みを作ってみた』を動かすことを試しました。しかしSDKがApache Beamベースに変わったことでそのままでは動かなかったので、コード変更した部分をまとめておきます。

※ GCPだけでなくJavaも初心者のため、説明やコードに変なところがあればぜひご指摘ください。

コード差分

生のdiff出力ではなく、対応する差分が隣り合うよう少し並べ替えています。

--- ../Dos.java.orig	2017-11-15 17:22:31.583396781 +0900
+++ src/main/java/com/google/cloud/dataflow/examples/Dos.java	2017-11-15 17:17:45.067769398 +0900
@@ -1,35 +1,35 @@
 package com.google.cloud.dataflow.examples;

-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.TextIO;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+// import org.apache.beam.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.Top;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.KV.OrderByValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.KV.OrderByValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+// import org.apache.beam.sdk.runners.BlockingDataflowRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.direct.DirectRunner;

 import java.util.List;
 import org.joda.time.Duration;
@@ -52,9 +52,9 @@

   static class GetIPFn extends DoFn<String, String> {
     private static final long serialVersionUID = 0;
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
+    // private final Aggregator<Long, Long> emptyLines =
+    //     createAggregator("emptyLines", new Sum.SumLongFn());
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       Gson gson = new Gson();
       ApacheLog log = gson.fromJson(c.element() , ApacheLog.class);
@@ -67,7 +67,7 @@

   public static class FormatAsTextFnFromList extends DoFn<List<KV<String, Long>>, String> {
     private static final long serialVersionUID = 0;
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       for(KV value : c.element()){
         Integer access_count = new Integer(value.getValue().toString());
@@ -79,22 +79,22 @@

   public static void main(String[] args) {
     DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
     options.setProject("xxxxxxxxxxxx");//プロジェクトを指定
     options.setStagingLocation("gs://xxxxxxxxxxxx/staging");//jarファイルが置かれるパスを指定
     options.setJobName("doscheck");//今回のジョブの名前を指定
-    options.setRunner(DataflowPipelineRunner.class);//Runnerについてはちゃんと調べていません。。。
+    options.setRunner(DataflowRunner.class);//Runnerについてはちゃんと調べていません。。。
     options.setStreaming(true);//ストリーム処理にしたいので、true

     Pipeline p = Pipeline.create(options);//パイプラインと呼ばれるトポロジを作る
     //トポロジに肉付けして、処理フローを作成する
-    p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/xxxxxxx/topics/input_topic"))//Pubsubから取得
+    p.apply("ReadFromPubsub", PubsubIO.readStrings().fromTopic("projects/xxxxxxx/topics/input_topic"))//Pubsubから取得
      .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))))//10秒間を5秒ごとにスライドしながら処理する
      .apply(ParDo.of(new GetIPFn()))//ログからIPを取得
      .apply(Count.<String>perElement())//IPごとにカウントする
      .apply(Top.<KV<String, Long>, KV.OrderByValue<String,Long>>of(3,new KV.OrderByValue<String,Long>()).withoutDefaults())//上位3個を抽出
      .apply(ParDo.of(new FormatAsTextFnFromList()))//Pubsubにpublish出来るように、テキストの形式に変換
-     .apply(PubsubIO.Write.named("WriteCounts").topic("projects/xxxxxxxxxxx/topics/count_ip"));//Pubsubにpublishする
+     .apply("WriteCounts", PubsubIO.writeStrings().to("projects/xxxxxxxxxxx/topics/count_ip"));//Pubsubにpublishする
     p.run();//pipelineを実行
   }

変更点は主にGCPの リリースノート: Dataflow SDK 2.x for Java を参照しました。

  • importするパッケージ
    • com.google.cloud.dataflow から org.apache.beam に変更
    • Runner類と PubsubIO がサブパッケージに移動
    • Aggregator はわからなかったので削除した)
  • Runner
    • クラス名から Pipeline を削除
    • BlockingDataflowPipelineRunner は削除され、DataflowRunner に統合
  • Aggregator
    • (わからなかったので削除した)
  • DoFn
    • @Override から @ProcessElement に変更
  • PTransform
    • 名前は named() で付けるのではなく、PCollection.apply() の第1引数に指定
  • PubsubIO

付録

Mavenの操作

Mavenでプロジェクトを作成する方法もわからなかったので、クイックスタートを基にコマンドをメモしておきます。GCPコンソールのターミナルで実行しました。

プロジェクト作成
$ mvn archetype:generate \
      -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
      -DarchetypeGroupId=com.google.cloud.dataflow \
      -DarchetypeVersion=2.1.0 \
      -DgroupId=com.google.cloud.dataflow.examples \
      -DartifactId=dos \
      -Dversion="0.1" \
      -DinteractiveMode=false \
      -Dpackage=com.google.cloud.dataflow.examples
$ cd dos/
$ rm -rf src/main/java/com/google/cloud/dataflow/examples/*
$ vim src/main/java/com/google/cloud/dataflow/examples/Dos.java
コンパイルと実行
$ mvn compile exec:java \
      -Dexec.mainClass=com.google.cloud.dataflow.examples.Dos
  1. ドキュメントでは <T>read() でインスタンス化するよう書いてありましたが、privateになっていて呼び出せませんでした。readStrings() 等が中でこれを呼び出しています。

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?