はじめに
本記事はFlink初学者向けの記事です。私自身Flinkを勉強中です。
Flinkとは
並列分散ストリーム処理用に開発されたOSS。JVM上(Java8)上で動くよう設計されたjavaAPIとも言えます。
Flinkの記述はシンプルで書きやすく、障害発生時の復旧作業や分散処理の配備などをapi上で設定することができ、とても楽です。
Flinkの動作イメージとソースコードの関係
データ(レコード)の入り口をSourceといい、出口をSinkと言います。Sourceでは例えばKafkaなどのメッセージブローカー,csvファイル,socket通信などからの入力を受け取ります。SinkではまたKafkaに処理を施したデータを流したり、textfileに出力したりできます。
これらを踏まえたうえでFlinkのソースコードの概観は以下のようになります。
//決まり文句。初期設定。envを使って引数や並列数や時間の定義などを行えます。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Source
DataStream<String> recordinput = env.readTextFile("/home/input.txt");
//Operator
DataStream<Tuple<String, Integer>> processedstream = recordinput.map(new MapFunction(){~}).keyBy(0).max(1).filter(new FilterFunction(){~});
//Sink
processedstream.writesaText("home/Output.txt");
//決まり文句。Flinkは最後にこの処理を書かないと実行されないようになっている。
env.execute();
見てわかる通り、Flinkの核はOperatorの部分です。ここに臨む処理を施すよう様々なメソッドを呼び出していきます。
上の例に出てきたメソッドだと、map,keyBy,maxなどです。イメージはまず読み込まれたrecordinputはtxtファイルの1行をStringとして読み込むので、それをTupleの形に変形させるためにmapを使います。その後keyByによってidでデータストリームをグループ分けします(川がが何本かに分かれるイメージ)。そして各ストリームにおいてのvalueの最大値をmaxにより計算し、最後にfilterでidがAのみ通したストリームを生成してます。
あくまで上のコードはイメージですので実際にはこのコードは正しくありませんが、コードの流れはつかめたと思います。
実際のコード
最後に実際に動くサンプルコードを載せます。
package main.java.file;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
public class Streaming {
public static void main(String[] args) throws Exception {
//決まり文句。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//時間の定義をprocessingtime(CPU側の時間)に設定
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//Source。ソケット通信でlocalhostのポート9090からデータを受け取る。
DataStream<String> stream1 = env.socketTextStream("localhost",9090);
//一つ目のoperator。上の例ではoperatorを一つで書いているが、
//別に複数個に分けて書くこともできる。メソッドの引数には
//決められた形のクラスを与えることが多い。このmapでは
//引数内でそのクラスの定義をしている。ローカルクラスと呼ばれる仕組み。
//mapに渡すクラスはMapFunctionインターフェースを実装する。
DataStream<Tuple2<String, Integer>> stream2 = stream1.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] words = value.split(",");
return new Tuple2<>(words[0], Integer.parseInt(words[3]));
}
});
//二つ目のoperator。keyByの引数では入力データの
//どの位置を基準にグループ分けするかを示す。
//その後windowメソッドで直近1msに到着したレコードごとに区切り、
//aggregateメソッドで区切ったwindowごとに引数の
//Countクラスに記述された演算を行わせる。
//今回はローカルクラスでなく一番下でクラスを定義している。
//どちらの方法でもよい。
DataStream<Tuple2<String,Integer>> stream3 = stream2
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(1)))
.aggregate(new Count());
//Sink
stream3.writeAsText("/mnt/c/work/sample1.txt");
//決まり文句。
env.execute("sample_temp");
}
//上のaggregateメソッドの引数用に定義。aggregateに
//渡すクラスはAggregateFunctionインターフェースを実装する。
public static class Count implements AggregateFunction<Tuple2<String,Integer>, Tuple3<String,Integer,Integer>, Tuple2<String,Integer>>{
@Override
public Tuple3<String, Integer, Integer> createAccumulator() {
return new Tuple3<>("",0,0);
}
@Override
public Tuple3<String, Integer, Integer> add(Tuple2<String, Integer> input, Tuple3<String, Integer, Integer> acc) {
acc.f0=input.f0;
acc.f1+=input.f1;
acc.f2+=1;
return acc;
}
@Override
public Tuple2<String, Integer> getResult(Tuple3<String, Integer, Integer> accumulator) {
return new Tuple2<>(accumulator.f0, accumulator.f2);
}
@Override
public Tuple3<String, Integer, Integer> merge(Tuple3<String, Integer, Integer> acc1, Tuple3<String, Integer, Integer> acc2) {
acc1.f1+=acc2.f1;
acc1.f2+=acc2.f2;
return acc1;
}
}
}