LoginSignup
3
2

More than 1 year has passed since last update.

Flink初級【コードの概観】

Last updated at Posted at 2021-10-06

はじめに

本記事はFlink初学者向けの記事です。私自身Flinkを勉強中です。

Flinkとは

並列分散ストリーム処理用に開発されたOSS。JVM上(Java8)上で動くよう設計されたjavaAPIとも言えます。
Flinkの記述はシンプルで書きやすく、障害発生時の復旧作業や分散処理の配備などをapi上で設定することができ、とても楽です。

Flinkの動作イメージとソースコードの関係

データ(レコード)の入り口をSourceといい、出口をSinkと言います。Sourceでは例えばKafkaなどのメッセージブローカー,csvファイル,socket通信などからの入力を受け取ります。SinkではまたKafkaに処理を施したデータを流したり、textfileに出力したりできます。
Flink1.png

これらを踏まえたうえでFlinkのソースコードの概観は以下のようになります。

//決まり文句。初期設定。envを使って引数や並列数や時間の定義などを行えます。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//Source
DataStream<String> recordinput = env.readTextFile("/home/input.txt");

//Operator
DataStream<Tuple<String, Integer>> processedstream = recordinput.map(new MapFunction(){~}).keyBy(0).max(1).filter(new FilterFunction(){~});

//Sink
processedstream.writesaText("home/Output.txt");

//決まり文句。Flinkは最後にこの処理を書かないと実行されないようになっている。
env.execute();

見てわかる通り、Flinkの核はOperatorの部分です。ここに臨む処理を施すよう様々なメソッドを呼び出していきます。
上の例に出てきたメソッドだと、map,keyBy,maxなどです。イメージはまず読み込まれたrecordinputはtxtファイルの1行をStringとして読み込むので、それをTupleの形に変形させるためにmapを使います。その後keyByによってidでデータストリームをグループ分けします(川がが何本かに分かれるイメージ)。そして各ストリームにおいてのvalueの最大値をmaxにより計算し、最後にfilterでidがAのみ通したストリームを生成してます。

あくまで上のコードはイメージですので実際にはこのコードは正しくありませんが、コードの流れはつかめたと思います。

実際のコード

最後に実際に動くサンプルコードを載せます。

package main.java.file;


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;

public class Streaming {
    public static void main(String[] args) throws Exception {

         //決まり文句。
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         //時間の定義をprocessingtime(CPU側の時間)に設定
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);


         //Source。ソケット通信でlocalhostのポート9090からデータを受け取る。
        DataStream<String> stream1 = env.socketTextStream("localhost",9090);


        //一つ目のoperator。上の例ではoperatorを一つで書いているが、
        //別に複数個に分けて書くこともできる。メソッドの引数には
        //決められた形のクラスを与えることが多い。このmapでは
        //引数内でそのクラスの定義をしている。ローカルクラスと呼ばれる仕組み。
        //mapに渡すクラスはMapFunctionインターフェースを実装する。
        DataStream<Tuple2<String, Integer>> stream2 = stream1.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] words = value.split(",");
                return new Tuple2<>(words[0], Integer.parseInt(words[3]));
            }
        });

        //二つ目のoperator。keyByの引数では入力データの
        //どの位置を基準にグループ分けするかを示す。
        //その後windowメソッドで直近1msに到着したレコードごとに区切り、
        //aggregateメソッドで区切ったwindowごとに引数の
        //Countクラスに記述された演算を行わせる。
        //今回はローカルクラスでなく一番下でクラスを定義している。
        //どちらの方法でもよい。
        DataStream<Tuple2<String,Integer>> stream3 = stream2
                .keyBy(0)
                .window(TumblingProcessingTimeWindows.of(Time.milliseconds(1)))
                .aggregate(new Count());

        //Sink
        stream3.writeAsText("/mnt/c/work/sample1.txt");

        //決まり文句。
        env.execute("sample_temp");

    }



    //上のaggregateメソッドの引数用に定義。aggregateに
    //渡すクラスはAggregateFunctionインターフェースを実装する。
    public static class Count implements AggregateFunction<Tuple2<String,Integer>, Tuple3<String,Integer,Integer>, Tuple2<String,Integer>>{

        @Override
        public Tuple3<String, Integer, Integer> createAccumulator() {
            return new Tuple3<>("",0,0);
        }

        @Override
        public Tuple3<String, Integer, Integer> add(Tuple2<String, Integer> input, Tuple3<String, Integer, Integer> acc) {
            acc.f0=input.f0;
            acc.f1+=input.f1;
            acc.f2+=1;

            return acc;
        }

        @Override
        public Tuple2<String, Integer> getResult(Tuple3<String, Integer, Integer> accumulator) {
            return new Tuple2<>(accumulator.f0, accumulator.f2);
        }

        @Override
        public Tuple3<String, Integer, Integer> merge(Tuple3<String, Integer, Integer> acc1, Tuple3<String, Integer, Integer> acc2) {
            acc1.f1+=acc2.f1;
            acc1.f2+=acc2.f2;

            return acc1;
        }

    }






}

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2