36
27

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Cloud Dataflow + Cloud Pub/Sub + Fluentdを使ってDoS検知する仕組みを作ってみた

Posted at

Cloud Dataflow + Cloud Pub/Sub + Fluentdを使ってDoS検知する仕組みを作ってみた

以前、横田慎介さんが、Norikra+FluentdでDoS検知・ブロックする仕組みを作っていたのに触発されて、
GCPのCloud Dataflowで真似事をやってみました。

Cloud Dataflowとは?(現時点での理解)

GCPで提供される分散処理フレームワーク(?)です。
バッチモードとストリームモードがあり、処理フローを書いてあげると、Google側のアルゴリズムによってよしなに分散されて実行してくれます。

メリット

  • バッチもストリームも、同じお作法・同じプラットフォームで作れる。LambdaArchitectureでいうところのバッチレイヤーとスピードレイヤーをそれぞれ別のOSSなりサービスを使わなくても出来る。(とは言え、Dataflowでバッチ処理とストリーム処理をそれぞれ動かすことは必要)
  • 処理が実行されるインフラ(インスタンスの数とかインスタンス死活監視とか諸々)は気にしなくてもいい。

裏側(勝手な推測)

  1. コンパイルされたjarファイルが、GCSに配置される
  2. 必要台数のVMが立ち上がってクラスタリングされる
  3. クラスタはGCS上のjarファイルを取得して、実際の処理が開始される
  4. 処理の負荷に応じて、クラスタのスケーリングが実行される(試してみた感じ、最低3台構成)

やってみた

やったこと

  1. apacheのログをfluentdで取得して、Pub/Subに送信
  2. DataflowがPub/Subからログを取得して集計

タイムウインドウを10秒として、IPごとにアクセス数を集計し、アクセスの多いホスト上位3つのうち、アクセスが20回以上あったものを表示するようにしてみました。
(20回以上でいいの?とか、なんで上位3つ?というところは、スルーしてください。特に意味はありません)
3. 集計結果をPub/Subに送信
集計結果に基づいてFirewallルールを変更したり、結果をSlackに飛ばしたり、色々と出来ることはあるかと思いますが、今回は検知するところのみをやってみました。(サボりました)

スクリーンショット 2015-09-09 23.31.05.png

1. apacheのログをfluentdで取得して、Pub/Subに送信

fluentdのoutput-pluginではfluent-plugin-gcloud-pubsubを使います。

<source>
  type tail
  path /var/log/apache2/access.log
  tag apache.access
  format apache2
</source>

<match apache.access>
  type gcloud_pubsub
  project xxxxxxxxxx
  topic projects/xxxxxxxxxx/topics/input_topic
  flush_interval 10
  key /home/ubuntu/key
</match>

2. DataflowがPub/Subからログを取得して集計 3. 集計結果をPub/Subに送信

Dataflowのsampleを参考にして作りました。

Dos.java
package com.google.cloud.dataflow.examples;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.PipelineResult;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.Count;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.transforms.Top;
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.KV.OrderByValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionList;
import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;

import java.util.List;
import org.joda.time.Duration;
import org.joda.time.Instant;

import com.google.gson.*;

public class Dos {

  public class ApacheLog {
    String host;
    String user;
    String method;
    String path;
    String code;
    String size;
    String referer;
    String agent;
  }

  static class GetIPFn extends DoFn<String, String> {
    private static final long serialVersionUID = 0;
    private final Aggregator<Long, Long> emptyLines =
        createAggregator("emptyLines", new Sum.SumLongFn());
    @Override
    public void processElement(ProcessContext c) {
      Gson gson = new Gson();
      ApacheLog log = gson.fromJson(c.element() , ApacheLog.class);
      if (!log.host.isEmpty()) {
        c.output(log.host);
      }
    }
  }


  public static class FormatAsTextFnFromList extends DoFn<List<KV<String, Long>>, String> {
    private static final long serialVersionUID = 0;
    @Override
    public void processElement(ProcessContext c) {
      for(KV value : c.element()){
        Integer access_count = new Integer(value.getValue().toString());
        if (access_count > 20)
          c.output(value.getKey() + ": " + access_count);
      }
    }
  }

  public static void main(String[] args) {
    DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
    options.setProject("xxxxxxxxxxxx");//プロジェクトを指定
    options.setStagingLocation("gs://xxxxxxxxxxxx/staging");//jarファイルが置かれるパスを指定
    options.setJobName("doscheck");//今回のジョブの名前を指定
    options.setRunner(DataflowPipelineRunner.class);//Runnerについてはちゃんと調べていません。。。
    options.setStreaming(true);//ストリーム処理にしたいので、true
    
    Pipeline p = Pipeline.create(options);//パイプラインと呼ばれるトポロジを作る

    //トポロジに肉付けして、処理フローを作成する
    p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/xxxxxxx/topics/input_topic"))//Pubsubから取得
     .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))))//10秒間を5秒ごとにスライドしながら処理する
     .apply(ParDo.of(new GetIPFn()))//ログからIPを取得
     .apply(Count.<String>perElement())//IPごとにカウントする
     .apply(Top.<KV<String, Long>, KV.OrderByValue<String,Long>>of(3,new KV.OrderByValue<String,Long>()).withoutDefaults())//上位3個を抽出
     .apply(ParDo.of(new FormatAsTextFnFromList()))//Pubsubにpublish出来るように、テキストの形式に変換
     .apply(PubsubIO.Write.named("WriteCounts").topic("projects/xxxxxxxxxxx/topics/count_ip"));//Pubsubにpublishする
    
    p.run();//pipelineを実行
  }
}

実行結果

まずは実行

tetsuyam-MBP:DataflowJavaSDK-examples tetsuyam$ mvn compile exec:java -Dexec.mainClass=com.google.cloud.dataflow.examples.Dos
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Google Cloud Dataflow Java Examples - All manual_build
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ google-cloud-dataflow-java-examples-all ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /Applications/home_directory/Dev/GCP/DataflowJavaSDK-examples/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ google-cloud-dataflow-java-examples-all ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 29 source files to /Applications/home_directory/Dev/GCP/DataflowJavaSDK-examples/target/classes
[INFO]
[INFO] >>> exec-maven-plugin:1.1:java (default-cli) > validate @ google-cloud-dataflow-java-examples-all >>>
[INFO]
[INFO] <<< exec-maven-plugin:1.1:java (default-cli) < validate @ google-cloud-dataflow-java-examples-all <<<
[INFO]
[INFO] --- exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all ---
9 09, 2015 10:56:51 午後 com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
情報: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 49 files. Enable logging at DEBUG level to see which files will be staged.
9 09, 2015 10:56:52 午後 com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
情報: Executing pipeline on the Dataflow Service, which will have billing implications related to Google Compute Engine usage and other Google Cloud Services.
9 09, 2015 10:56:52 午後 com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElements
情報: Uploading 49 files from PipelineOptions.filesToStage to staging location to prepare for execution.
9 09, 2015 10:57:35 午後 com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElements
情報: Uploading PipelineOptions.filesToStage complete: 1 files newly uploaded, 48 files cached
Dataflow SDK version: 1.0.0
9 09, 2015 10:57:37 午後 com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
情報: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/xxxxxx/dataflow/job/2015-09-09_06_57_37-11485599623285447562
Submitted job: 2015-09-09_06_57_37-11485599623285447562
9 09, 2015 10:57:37 午後 com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
情報: To cancel the job using the 'gcloud' tool, run:
> gcloud alpha dataflow jobs --project=xxxxxx cancel 2015-09-09_06_57_37-11485599623285447562
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 52.967 s
[INFO] Finished at: 2015-09-09T22:57:37+09:00
[INFO] Final Memory: 31M/344M
[INFO] ------------------------------------------------------------------------

Dataflowの様子

デプロイするとジョブが実行されます
スクリーンショット 2015-09-09 22.59.15.png

ジョブを見ると、先ほど作ったパイプラインの詳細が見れます
スクリーンショット 2015-09-09 22.59.39.png

インスタンスが上がっていることもわかります(今回のケースだと3台)
スクリーンショット 2015-09-09 23.01.44.png

Dataflow出力先のPubsub topic

tetsuyam-MBP:pubsub tetsuyam$ python subscribe.py
192.168.38.16: 429
192.168.38.18: 625
192.168.38.9: 625
192.168.38.16: 625
192.168.38.18: 625
192.168.38.9: 625

こんな感じで5秒おきに3つの集計結果が取得できます。
(テスト用のDoSスクリプトのクオリティが低いので、同じ値になってしまっていますが。。)

感想

  • 処理フローを書くだけで、裏のインフラを気にせずに実行できるのは、StormやSparkと比べると良いかなーと思った
  • とは言うものの、本気でスケールアウトしてデータ量増えてもさばけるの?とか、逆に落ち着いてきたらスケールインして料金節約してくれるの?と、疑問に思った(誰か検証してほしい)
  • デプロイしてから最初の処理が始まるまでに若干時間がかかるように感じた。大体1分ぐらいしないと始まらない。クラスタ組むのにかかってる時間なのかもしれない。
  • ストリームモードの場合、最低3ノードは上がるし、データが流れない時間帯も上がりっぱなしなので、たまに流れる程度の流量であったり、流れ続けないような場合だと無駄にお金がかかるような気がする。(その辺、AWS Lambdaとかだと処理時間だけの課金なので良さ気)
  • バッチモードの場合だと、動かしてみるまでVM数も処理時間も分からないので一体いくらかかっていつ終わるかが読めない怖さもある
  • 同一DCで動いているように見えるけど、DC断みたいなケースは想定されていないのか!?

まぁまだ新しいサービスだし、事例的なものも全然出てきていないのですが、きっと使いこなせば幸せになれるんだろうなーという予感はしてます。
Couchbase、Redisなど、キャッシュとして使えそうなDBと組み合わせてストリーム処理させてみたり、同じく新しく出てるBigQueryと組み合わせたりして、これからも色々試してみたいなーというところです。

早くアジアリージョンでも使えるようになってほしい。

36
27
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
36
27

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?