この記事では、分散ストリーム処理の基本をレビューし、FlinkとDataStream APIの開発を例として探ります。
本ブログは英語版からの翻訳です。オリジナルはこちらからご確認いただけます。一部機械翻訳を使用しております。翻訳の間違いがありましたら、ご指摘いただけると幸いです。
ストリーム処理の基本概念
ストリーム処理の定義は異なる場合があります。概念的には、ストリーム処理とバッチ処理は同じコインの表裏一体です。それらの関係は、ArrayList, Javaの要素が直接限定されたデータセットとみなされ、添え字(subscripts)でアクセスされるか、イテレータ(iterator)でアクセスされるかによって異なります。
図1. 左側がコイン分類器
コイン分類器をストリーム処理システムと表現することができます。事前に、コインの分類に使用されるすべてのコンポーネントは直列に接続されています。コインは連続的にシステムに入ってきて、将来の使用のために別のキューに出力されます。右の写真も同様です。
ストリーム処理システムには多くの特徴があります。一般的にストリーム処理システムは、無限のデータセットの処理をサポートするために、データ駆動型の処理方式を採用しています。あらかじめ演算子を設定しておき、データを処理します。複雑な計算ロジックを表現するために、Flinkを含む分散型ストリーム処理エンジンでは、一般的にDAGグラフを使用して計算ロジック全体を表現しています。
DAGの各点は、基本的な論理単位である先ほどの演算子を表しています。計算ロジックを有向グラフに整理して、エッジから特別なソースノードからシステムにデータが流れ込むようにします。データは、ネットワーク伝送やローカル伝送などの異なるデータ伝送方法を介して演算子間で伝送され、処理されます。最後に、データの結果は、他の特殊なシンクノードを介して外部システムやデータベースに送信されます。
図2. DAGコンピューティングのロジックグラフと実際のランタイム物理モデル。
論理グラフ上の各演算子は、物理グラフ上に複数の同時スレッドを持ちます。分散ストリーム処理エンジンの場合、各演算子が複数のインスタンスを持つ可能性があるため、実際の実行時物理モデルはより複雑になります。図2に示すように、ソース演算子Aは2つのインスタンスを持ち、中間演算子Cも2つのインスタンスを持っています。
論理モデルでは A と B が C の上流ノードであり、それに対応する物理モデルでは、C,A,B のすべてのインスタンス間でデータ交換が存在する可能性があります。
演算子インスタンスを異なるプロセスに分散させる際には、ネットワークを介してデータを伝送します。同一プロセス内の複数のインスタンス間でのデータ転送は、通常、ネットワークを経由する必要はありません。
表1 Apache Stormを使ってDAGの計算グラフを構築したもの。Apache StormのAPI定義は「オペレーション指向」なので低レベルです。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
表2 Apache Flinkを用いてDAG計算グラフを構築したもの。Apache FlinkのAPI定義はより「データ指向」なので、より高レベルなものになっています。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile ("input");
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);
counts.writeAsText("output");
DAGグラフはストリーム処理の計算ロジックを表しているので、APIのほとんどはこの計算ロジックグラフの構築を中心に設計されています。表1に数年前に流行したApache StormのWordCountの例を示します。
Apache Stormでは、グラフにSpout演算子やBolt演算子を追加し、演算子間の接続方法を指定します。このようにして、グラフ全体を構築した後、リモートまたはローカルのクラスタで実行するためにサブミットします。
対照的に、Apache Flink API も計算論理グラフを構築していますが、Flink の API 定義はよりデータ処理ロジックを指向しています。Flink はデータストリームを無限の集合に抽象化し、その集合に対する操作のグループを定義し、最下層で対応する DAG グラフを自動的に構築します。
そのため、Flink APIはより高レベルなものとなっています。多くの研究者は、期待されるグラフ構造をより簡単に確保できるため、実験にはStormの高い柔軟性を好むかもしれません。しかし、業界全体では、Flink APIのような高度なAPIの方が使いやすいという理由で優先されています。
Flink DataStream APIの概要
これまでのストリーム処理の基本的な考え方をもとに、Flink DataStream APIの使い方を詳しく説明します。まずは簡単な例から説明します。表3は、ストリーミングのWordCountの例です。わずか5行のコードしかありませんが、Flink DataStream APIをベースにしたプログラムを開発するための基本的な構造を提供しています。
表3 Flink DataStream APIをベースにしたWordCountの例
// 1. Set the runtime environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. Configure the data source to read data
DataStream<String> text = env.readTextFile ("input");
// 3. Perform a series of transformations
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);
// 4. Configure the Sink to write data out
counts.writeAsText("output");
// 5. Submit for execution
env.execute("Streaming WordCount");
ストリーミングWordCountを実装するには、まずStreamExecutionEnvironmentオブジェクトを取得します。これは、グラフを構築するコンテキストオブジェクトです。このオブジェクトに基づいて演算子を追加します。ストリーム処理レベルの場合は、データにアクセスするためのデータソースを作成します。この例では、Environmentオブジェクトにあるファイルを読み込むための組み込みのデータソースを使用します。
次に、無限のデータセットであるDataStreamオブジェクトを取得します。このデータセットに対して一連の操作を行います。例えば、WordCountの例では、各レコード(つまり、ファイル内の1行)が最初に単語に分離され、FlatMap操作によって実装されます。
FlatMapを呼び出すと、演算子が基礎となるDAGグラフに追加されます。次に、単語のストリームを得るために、ストリーム内の単語をグループ化し(KeyBy)、各単語のデータを累積的に計算します(sum(1))。計算された単語データは新しいストリームを形成し、出力ファイルに書き込まれます。
最後に、env#executeメソッドを呼び出してプログラムの実行を開始します。先ほど呼び出したメソッドがどれもデータを処理していないことを確認して、計算ロジックを表現するためのDAGグラフを構築してください。
グラフ全体を構築し、明示的にExecuteメソッドを呼び出してからにしてください。フレームワークは計算グラフをクラスタに提供し、データにアクセスしてロジックを実行します。
ストリーミングWordCountに基づく例では、Flink DataStream APIに基づくストリーム処理プログラムのコンパイルには、一般的に、データへのアクセス、処理、書き出しの3つのステップが必要であることが示されています。
最後に、Execute メソッドを呼び出します。
図 3. Flink DataStreamの動作概要。
前の例からわかるように、Flink DataStream APIのコアはストリーミングデータを表すDataStreamオブジェクトです。全体の計算ロジックグラフは、DataStreamオブジェクト上で異なる操作を呼び出して新しいDataStreamオブジェクトを生成することに基づいて構築されています。
一般的に、DataStreamに対する操作には4つのタイプがあります。最初のタイプは単一レコード操作で、望ましくないレコードをフィルタリングしたり(Filter操作)、各レコードを変換したり(Map操作)します。第二のタイプは、複数レコード操作である。例えば、1時間以内の注文の総量をカウントするには、1時間以内の全ての注文レコードを追加する。このタイプの操作をサポートするためには、処理のためのウィンドウを介して必要なレコードを結合する必要があります。
3つ目のタイプは、複数のストリームを操作して1つのストリームに変換することです。例えば、複数のストリームをUnion、Join、Connectなどの操作によってマージすることができます。これらの操作は、異なるロジックを使用してストリームをマージしますが、最終的には新しい統一されたストリームを生成するため、いくつかのクロスストリーム操作が可能になります。
4 つ目のタイプは「分割操作」で、DataStream でサポートされており、Merge 操作とは対照的です。これらの操作は、ルールによってストリームを複数のストリームに分割し、各分割ストリームは前のストリームのサブセットとなります。
図4. 異なるタイプのDataStreamサブタイプ。異なるサブタイプは、異なる操作のセットをサポートします。
異なるストリーム操作をサポートするために、Flinkは中間ストリームデータセットのタイプを示すために、異なるストリームタイプのセットを導入しています。図4は、変換関係の完全なタイプを示しています。
Mapのような単一レコード操作の場合、結果はDataStream型です。Split操作では、SplitStreamが生成されます。SplitStreamに基づいて、Selectメソッドを使用して、望ましいレコードをフィルタリングし、基本ストリームを得ます。
同様に、Connect操作では、StreamA.connect(StreamB)を呼び出した後、専用のConnectedStreamを取得します。ConnectedStreamでサポートされる操作は、共通のDataStreamでサポートされる操作とは異なります。
これは2つの異なるストリームをマージした結果であり、2つのストリームのレコードの異なる処理ロジックを指定することができ、処理された結果が新しいDataStreamストリームを形成します。処理中に状態情報を共有するように、同じ演算子で異なるレコードを処理します。上層のJoin操作の中には、下層のConnect操作を介して実装する必要があるものもあります。
また、ウィンドウ操作により、時間や数でストリームを分割することもできます。具体的な分割ロジックを選択します。グループ内のすべてのレコードが到着したら、すべてのレコードを取得し、トラバース演算やサム演算を行います。したがって、各グループを処理することで出力データのセットを取得し、すべての出力データが新しい基本ストリームを形成します。
共通のDataStreamに対しては、ストリーム全体の統一されたWindow処理を表すallWindow演算を使用します。そのため、複数の演算子インスタンスを使用して同時計算することはできません。この問題を解決するには、KeyByメソッドを使用して、まずKeyでレコードをグループ化します。その後、異なるキーに対応するレコードに対して別々のWindow処理を並行して実行します。
KeyBy操作は、最も重要で一般的に使用される操作の1つです。以下に詳しく説明します。
図5. 基本ストリームのWindow演算とKeyedStreamの比較
KeyedStream上でのWindow操作により、複数のインスタンスを使用した同時処理が可能になります。図5に基本DataStreamオブジェクトに対するallWindow操作とKeyedStreamオブジェクトに対するWindow操作の比較を示します。複数の並行インスタンスで同時にデータを処理するには、KeyBy演算でデータをグループ化します。
KeyBy操作とWindow操作の両方でデータをグループ化しますが、KeyBy操作ではストリームを横方向に、Window操作ではストリームを縦方向に分割します。
KeyByでデータを分割した後、後続の各オペレータインスタンスは、特定のKeyセットに対応するデータを処理することができます。また、Flinkでは、演算子は特定の状態を維持することができます。KeyedStream上の演算子の状態は、分散して保存されます。
KeyByは確定的なデータの割り当て方法です(次項では他の割り当て方法を紹介します)。失敗したジョブを再起動して並列性が変更された場合、FlinkはKeyグループを再割り当てし、特定のKeyを処理するグループにそのKeyの状態が含まれていなければならないことを保証し、整合性を確保します。
最後に、KeyBy演算は、Keyの数が演算子の同時インスタンス数を超えた場合にのみ動作することに注意してください。同じKeyに対応するデータはすべて同じインスタンスに送信されるため、Keyの数がインスタンスの数よりも少ない場合、一部のインスタンスがデータを受信できず、計算能力が十分に活用されていないことになります。
その他の問題
Flinkでは、オペレータ間でデータをやり取りする際に、KeyBy以外にも物理的なグルーピング方法をサポートしています。図1に示すように、Flink DataStreamにおける物理的なグルーピング方法には次のようなものがあります。
- Global: 上流のオペレータが、下流のオペレータの最初のインスタンスにすべてのレコードを送信します。
- Broadcast:上流のオペレータは、各レコードを下流のオペレータのすべてのインスタンスに送信します。
Forward: 上流のオペレータは、下流のオペレータのすべてのインスタンスにレコードを送信します。各アップストリーム オペレータのインスタンスは、ダウンストリーム オペレータの対応するインスタンスにレコードを送信します。この方法は、アップストリームオペレータのインスタンスの数がダウンストリームオペレータのインスタンスの数と同じ場合にのみ適用されます。
Shuffle: 上流側のオペレータが各レコードに対して下流側のオペレータをランダムに選択します。
Rebalance:上流のオペレータがラウンドロビンベースでデータを送信します。
Rescale: 上流と下流の演算子のインスタンス数がそれぞれ 'n' と 'm' のとき、'n' < 'm' の場合、各上流のインスタンスは、ラウンドロビンベースで ceil(m/n) または floor(m/n) の下流のインスタンスにデータを送信します。n' > 'm' の場合、floor(n/m)またはceil(n/m)のアップストリームインスタンスはラウンドロビンベースでダウンストリームインスタンスにデータを送信します。
PartitionCustomer:ビルトインの割り当て方法がニーズに合わない場合は、グループ化方法をカスタマイズすることを選択します。
図6. KeyBy以外の物理的なグループ化メソッド
グルーピングメソッドに加えて、Flink DataStream APIにおけるもう一つの重要な概念がシステムタイプです。
図7に示すように、Flink DataStreamオブジェクトはシステムタイプが強く設定されています。各 DataStream オブジェクトについて、要素の型を指定する必要があります。Flinkの基礎となるシリアル化メカニズムは、シリアル化を最適化するためにこの情報に依存しています。具体的には、Flinkの最下層では、TypeInformationオブジェクトを使用して型を記述します。TypeInformationオブジェクトは、シリアル化フレームワークが使用するタイプ関連情報の文字列を定義します。
図 7. Flink DataStream APIの型システム
Flinkには、一般的に使用される組み込みの基本型がいくつかあります。これらについては、Flinkはその型情報も提供しており、追加の宣言なしに直接使用することができます。Flinkは型推論の仕組みを使って対応する型を識別することができます。ただし、例外もあります。
例えば、Flink DataStream APIはJavaとScalaの両方をサポートしています。多くのScala APIは暗黙のパラメータを通して型情報を渡すため、Javaを通してScala APIを呼び出す必要がある場合は、暗黙のパラメータを通して型情報を渡す必要があります。もう一つの例として、Javaによる汎用型の消去があります。ストリーム型が汎用型の場合、消去後の情報の型を推論する必要がない場合があります。この場合、情報の型も明示的に指定する必要があります。
Flinkでは、Java APIでは複数のフィールドを結合する際にTuple型を使用するのが一般的ですが、Scala APIではRow型やCase Class型を使用することが多くなっています。タプル型は、Row型に比べて、フィールド数が25を超えることができないことと、すべてのフィールドでNULL値を使用することができないという2つの制限があります。
最後に、Flinkでは、新しい型、TypeInformationをカスタマイズしたり、Kryoを使ってシリアライズしたりすることができます。しかし、これは移行の問題を引き起こす可能性があります。そのため、カスタム型は避けることをお勧めします。
例
もう少し複雑な例を見てみましょう。システム内に注文を監視するデータソースがあるとします。それは、新規発注を行う際に、Tuple2を使用して、発注された商品の種類と取引量を出力します。そして、すべての種類のアイテムの取引量をリアルタイムでカウントします。
表4 リアルタイム注文統計の一例です。
public class GroupedProcessingTimeWindowSample {
private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
Random random = new Random();
while (isRunning) {
Thread.sleep((getRuntimeContext().getIndexOfThisSubtask() + 1) * 1000 * 5);
String key = "类别" + (char) ('A' + random.nextInt(3));
int value = random.nextInt(10) + 1;
System.out.println(String.format("Emits\t(%s, %d)", key, value));
ctx.collect(new Tuple2<>(key, value));
}
}
@Override
public void cancel() {
isRunning = false;
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStream<Tuple2<String, Integer>> ds = env.addSource(new DataSource());
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = ds.keyBy(0);
keyedStream.sum(1).keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return "";
}
}).fold(new HashMap<String, Integer>(), new FoldFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
@Override
public HashMap<String, Integer> fold(HashMap<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
accumulator.put(value.f0, value.f1);
return accumulator;
}
}).addSink(new SinkFunction<HashMap<String, Integer>>() {
@Override
public void invoke(HashMap<String, Integer> value, Context context) throws Exception {
// 每个类型的商品成交量
System.out.println(value);
// 商品成交总量
System.out.println(value.values().stream().mapToInt(v -> v).sum());
}
});
env.execute();
}
}
表4に本実施例の実装を示しています。ここでは、RichParallelSourceFunctionを継承した模擬データソースを実装します。RichParallelSourceFunctionは、複数のインスタンスを持つSourceFunctionのAPIです。
RunメソッドとCancelメソッドの2つのメソッドを実装します。Flinkは実行時にRunメソッドを直接ソースに呼び出します。初期ストリームを形成するためにデータを連続的に出力する必要があります。Runメソッドを実装する際には、アイテムタイプとトランザクション量のレコードをランダムに生成し、ctx#collect
メソッドを使用して送信します。Flinkが実行状態をマークして制御するためにVolatile変数で使用されるソースタスクをキャンセルする必要がある場合は、Cancelメソッドを使用します。
次に、Mainメソッドでグラフの構築を開始します。まず、StreamExecutionEnviroment オブジェクトを作成します。オブジェクトを作成するために呼び出されるgetExecutionEnvironment
メソッドが自動的に環境を決定するので、適切なオブジェクトが作成されます。例えば、IDEで右クリックしてメソッドを実行すると、LocalStreamExecutionEnvironmentオブジェクトが作成されます。
実際の環境で実行すると、RemoteStreamExecutionEnvironmentオブジェクトを作成します。Environmentオブジェクトを基に、初期ストリームを取得するためのソースを作成します。そして、アイテムタイプごとのトランザクション量をカウントするために、KeyByを使用してTupleの第1フィールド(アイテムタイプ)を介して入力ストリームをグループ化し、各Keyに対応するレコードの第2フィールド(トランザクション量)の値を合計します。
最下層では、Sum演算子がStateメソッドを使用して、各Key(アイテムタイプ)に対応するトランザクションボリュームの合計値を保持します。新しいレコードが到着すると、Sum演算子は、維持されている取引量合計を更新して、.NETのレコードを出力します。
タイプのボリュームのみをカウントする場合は、ここでプログラムは終了します。Sum演算子の直後にSink演算子を追加して、各アイテムタイプの継続的に更新されたトランザクションボリュームを出力します。ただし,すべてのタイプのトランザクション量をカウントするには,同じ計算ノードのすべてのレコードを出力してください.
KeyByを使用して、全てのレコードに同じKeyを返し、グループ化して、全てのレコードを同じインスタンスに送るようにしています。
次に、Foldメソッドを使用して、演算子内の各アイテムタイプのボリュームを維持します。FoldメソッドはDeprecatedとマークされていますが、今日ではDataStream APIの他の操作で置き換えることができないことに注意してください。したがって、このメソッドは初期値を受け取ります。
次に、後続のストリームの各レコードが到着すると、オペレータは初期値を更新するために渡されたFoldFunctionを呼び出し、更新された値を送信します。
HashMap を使用して、各項目タイプの現在のトランザクション量を保持します。新しいものが到着したら、HashMapを更新します。このように、Sinkを通じて最新のアイテムタイプとトランザクションボリュームのHashMapを受け取り、この値を頼りに各アイテムのトランザクションボリュームとトランザクションボリュームの合計を出力します。
この例では、DataStream APIの使い方を実演しています。より効率的に書くことができます。また、上位のTableやSQLでは、この状況をより良く処理するリトラクト機構もサポートしています。
図8 APIの概略図。
最後に、DataStream APIの原理を見てみましょう。DataStream#mapアルゴリズムを呼び出すと、Flinkは最下層にTransformationオブジェクトを作成します。このオブジェクトは、計算ロジックグラフのノードを表します。これは、ユーザー定義関数(UDF)であるMapFunctionを記録します。
より多くのメソッドを使用することで、より多くの DataStream オブジェクトを作成します。各オブジェクトにはTransformationオブジェクトがあり、これらのオブジェクトは計算依存関係に基づいてグラフ構造を形成します。
これが計算グラフです。その後、Flinkはさらにグラフ構造を変換し、最終的にジョブの提出に必要なJobGraphを生成します。
概要
この記事では、Flinkの下位レベルのAPIであるFlink DataStream APIを紹介します。実際の開発では、StateやTimeなど、APIをベースにして自分でいくつかの概念を使う必要があり、手間がかかります。その後の講座では、より上位レベルのTableやSQL APIについても紹介していきます。将来的にはTableやSQLがFlinkのAPIの主流になるかもしれません。
しかし、下位レベルのAPIは、より強力な表現能力を生み出します。細かい操作が必要な場合は、DataStream APIが必要になるケースもあります。
アリババクラウドは日本に2つのデータセンターを有し、世界で60を超えるアベラビリティーゾーンを有するアジア太平洋地域No.1(2019ガートナー)のクラウドインフラ事業者です。
アリババクラウドの詳細は、こちらからご覧ください。
アリババクラウドジャパン公式ページ