Cloud Dataflow + Cloud Pub/Sub + Fluentdを使ってDoS検知する仕組みを作ってみた
以前、横田慎介さんが、Norikra+FluentdでDoS検知・ブロックする仕組みを作っていたのに触発されて、
GCPのCloud Dataflowで真似事をやってみました。
Cloud Dataflowとは?(現時点での理解)
GCPで提供される分散処理フレームワーク(?)です。
バッチモードとストリームモードがあり、処理フローを書いてあげると、Google側のアルゴリズムによってよしなに分散されて実行してくれます。
メリット
- バッチもストリームも、同じお作法・同じプラットフォームで作れる。LambdaArchitectureでいうところのバッチレイヤーとスピードレイヤーをそれぞれ別のOSSなりサービスを使わなくても出来る。(とは言え、Dataflowでバッチ処理とストリーム処理をそれぞれ動かすことは必要)
- 処理が実行されるインフラ(インスタンスの数とかインスタンス死活監視とか諸々)は気にしなくてもいい。
裏側(勝手な推測)
- コンパイルされたjarファイルが、GCSに配置される
- 必要台数のVMが立ち上がってクラスタリングされる
- クラスタはGCS上のjarファイルを取得して、実際の処理が開始される
- 処理の負荷に応じて、クラスタのスケーリングが実行される(試してみた感じ、最低3台構成)
やってみた
やったこと
- apacheのログをfluentdで取得して、Pub/Subに送信
- DataflowがPub/Subからログを取得して集計
タイムウインドウを10秒として、IPごとにアクセス数を集計し、アクセスの多いホスト上位3つのうち、アクセスが20回以上あったものを表示するようにしてみました。
(20回以上でいいの?とか、なんで上位3つ?というところは、スルーしてください。特に意味はありません)
3. 集計結果をPub/Subに送信
集計結果に基づいてFirewallルールを変更したり、結果をSlackに飛ばしたり、色々と出来ることはあるかと思いますが、今回は検知するところのみをやってみました。(サボりました)
1. apacheのログをfluentdで取得して、Pub/Subに送信
fluentdのoutput-pluginではfluent-plugin-gcloud-pubsubを使います。
<source>
type tail
path /var/log/apache2/access.log
tag apache.access
format apache2
</source>
<match apache.access>
type gcloud_pubsub
project xxxxxxxxxx
topic projects/xxxxxxxxxx/topics/input_topic
flush_interval 10
key /home/ubuntu/key
</match>
2. DataflowがPub/Subからログを取得して集計 3. 集計結果をPub/Subに送信
Dataflowのsampleを参考にして作りました。
package com.google.cloud.dataflow.examples;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.PipelineResult;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.Count;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.transforms.Top;
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.KV.OrderByValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionList;
import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import java.util.List;
import org.joda.time.Duration;
import org.joda.time.Instant;
import com.google.gson.*;
public class Dos {
public class ApacheLog {
String host;
String user;
String method;
String path;
String code;
String size;
String referer;
String agent;
}
static class GetIPFn extends DoFn<String, String> {
private static final long serialVersionUID = 0;
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
@Override
public void processElement(ProcessContext c) {
Gson gson = new Gson();
ApacheLog log = gson.fromJson(c.element() , ApacheLog.class);
if (!log.host.isEmpty()) {
c.output(log.host);
}
}
}
public static class FormatAsTextFnFromList extends DoFn<List<KV<String, Long>>, String> {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
for(KV value : c.element()){
Integer access_count = new Integer(value.getValue().toString());
if (access_count > 20)
c.output(value.getKey() + ": " + access_count);
}
}
}
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setProject("xxxxxxxxxxxx");//プロジェクトを指定
options.setStagingLocation("gs://xxxxxxxxxxxx/staging");//jarファイルが置かれるパスを指定
options.setJobName("doscheck");//今回のジョブの名前を指定
options.setRunner(DataflowPipelineRunner.class);//Runnerについてはちゃんと調べていません。。。
options.setStreaming(true);//ストリーム処理にしたいので、true
Pipeline p = Pipeline.create(options);//パイプラインと呼ばれるトポロジを作る
//トポロジに肉付けして、処理フローを作成する
p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/xxxxxxx/topics/input_topic"))//Pubsubから取得
.apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))))//10秒間を5秒ごとにスライドしながら処理する
.apply(ParDo.of(new GetIPFn()))//ログからIPを取得
.apply(Count.<String>perElement())//IPごとにカウントする
.apply(Top.<KV<String, Long>, KV.OrderByValue<String,Long>>of(3,new KV.OrderByValue<String,Long>()).withoutDefaults())//上位3個を抽出
.apply(ParDo.of(new FormatAsTextFnFromList()))//Pubsubにpublish出来るように、テキストの形式に変換
.apply(PubsubIO.Write.named("WriteCounts").topic("projects/xxxxxxxxxxx/topics/count_ip"));//Pubsubにpublishする
p.run();//pipelineを実行
}
}
実行結果
まずは実行
tetsuyam-MBP:DataflowJavaSDK-examples tetsuyam$ mvn compile exec:java -Dexec.mainClass=com.google.cloud.dataflow.examples.Dos
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Google Cloud Dataflow Java Examples - All manual_build
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ google-cloud-dataflow-java-examples-all ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /Applications/home_directory/Dev/GCP/DataflowJavaSDK-examples/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ google-cloud-dataflow-java-examples-all ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 29 source files to /Applications/home_directory/Dev/GCP/DataflowJavaSDK-examples/target/classes
[INFO]
[INFO] >>> exec-maven-plugin:1.1:java (default-cli) > validate @ google-cloud-dataflow-java-examples-all >>>
[INFO]
[INFO] <<< exec-maven-plugin:1.1:java (default-cli) < validate @ google-cloud-dataflow-java-examples-all <<<
[INFO]
[INFO] --- exec-maven-plugin:1.1:java (default-cli) @ google-cloud-dataflow-java-examples-all ---
9 09, 2015 10:56:51 午後 com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
情報: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 49 files. Enable logging at DEBUG level to see which files will be staged.
9 09, 2015 10:56:52 午後 com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
情報: Executing pipeline on the Dataflow Service, which will have billing implications related to Google Compute Engine usage and other Google Cloud Services.
9 09, 2015 10:56:52 午後 com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElements
情報: Uploading 49 files from PipelineOptions.filesToStage to staging location to prepare for execution.
9 09, 2015 10:57:35 午後 com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElements
情報: Uploading PipelineOptions.filesToStage complete: 1 files newly uploaded, 48 files cached
Dataflow SDK version: 1.0.0
9 09, 2015 10:57:37 午後 com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
情報: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/xxxxxx/dataflow/job/2015-09-09_06_57_37-11485599623285447562
Submitted job: 2015-09-09_06_57_37-11485599623285447562
9 09, 2015 10:57:37 午後 com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
情報: To cancel the job using the 'gcloud' tool, run:
> gcloud alpha dataflow jobs --project=xxxxxx cancel 2015-09-09_06_57_37-11485599623285447562
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 52.967 s
[INFO] Finished at: 2015-09-09T22:57:37+09:00
[INFO] Final Memory: 31M/344M
[INFO] ------------------------------------------------------------------------
Dataflowの様子
インスタンスが上がっていることもわかります(今回のケースだと3台)
Dataflow出力先のPubsub topic
tetsuyam-MBP:pubsub tetsuyam$ python subscribe.py
192.168.38.16: 429
192.168.38.18: 625
192.168.38.9: 625
192.168.38.16: 625
192.168.38.18: 625
192.168.38.9: 625
こんな感じで5秒おきに3つの集計結果が取得できます。
(テスト用のDoSスクリプトのクオリティが低いので、同じ値になってしまっていますが。。)
感想
- 処理フローを書くだけで、裏のインフラを気にせずに実行できるのは、StormやSparkと比べると良いかなーと思った
- とは言うものの、本気でスケールアウトしてデータ量増えてもさばけるの?とか、逆に落ち着いてきたらスケールインして料金節約してくれるの?と、疑問に思った(誰か検証してほしい)
- デプロイしてから最初の処理が始まるまでに若干時間がかかるように感じた。大体1分ぐらいしないと始まらない。クラスタ組むのにかかってる時間なのかもしれない。
- ストリームモードの場合、最低3ノードは上がるし、データが流れない時間帯も上がりっぱなしなので、たまに流れる程度の流量であったり、流れ続けないような場合だと無駄にお金がかかるような気がする。(その辺、AWS Lambdaとかだと処理時間だけの課金なので良さ気)
- バッチモードの場合だと、動かしてみるまでVM数も処理時間も分からないので一体いくらかかっていつ終わるかが読めない怖さもある
- 同一DCで動いているように見えるけど、DC断みたいなケースは想定されていないのか!?
まぁまだ新しいサービスだし、事例的なものも全然出てきていないのですが、きっと使いこなせば幸せになれるんだろうなーという予感はしてます。
Couchbase、Redisなど、キャッシュとして使えそうなDBと組み合わせてストリーム処理させてみたり、同じく新しく出てるBigQueryと組み合わせたりして、これからも色々試してみたいなーというところです。
早くアジアリージョンでも使えるようになってほしい。