こちらのJoin Operationsの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
コードサンプルはPythonのみを記載しています。
構造化ストリーミングでは、ストリーミングデータセット/データフレームと静的なデータセット/データフレームと、あるいは、別のストリーミングデータセット/データフレームとのJoinをサポートしています。ストリーミングのJoinの結果は、上述のセクションにおけるストリーミング集計の結果と同様に、インクリメンタルに生成されます。このセクションでは、上述のケースでサポートされているJoinのタイプ(inner/outer/semiなど)を探索します。サポートされているJoinの全てのタイプにおいては、ストリーミングデータセット/データフレームとのJoinの結果は、ストリームに同じデータを含む静的なデータセット/データフレームであるかのように全く同じ結果を生成します。
ストリーム・スタティックのJoin
Spark 2.0での導入以降、構造化ストリーミングではストリーミングと静的データフレーム/データセット間のJoin(inner join/いくつかのouter join)をサポートしています。シンプルな例を以下に示します。
staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type") # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer") # left outer join with a static DF
ストリーム・スタティックのJoinはステートフルではないので、ステート管理が不要であることに注意してください。しかし、いくつかのタイプのストリーム・スタティックのouter joinは未サポートです。これらは、このJoinセクションの最後に一覧されています。
ストリーム・ストリームのJoin
Spark 2.3では、ストリーム・ストリームJoinのサポートを追加しており、2つのストリーミングデータセット/データフレームをJoinすることができます。2つのデータストリームからJoinの結果を生成する際の課題は、任意のタイミングにおいてJoinの両側のデータセットのビューは不完全なものであり、入力間でのマッチを発見することがより困難であるというものです。一方の入力ストリームから受け取る任意の行は、もう一方の入力ストリームから受信はしていませんが、将来においてマッチする可能性があります。このため、過去の入力とそれぞれの将来的な入力をマッチできるように、入力ストリームの両方に対してストリーミングのステートとして過去の入力をバッファし、それに応じてJoinの結果を生成します。さらに、ストリーミングの集計と同様に、ウォーターマークを用いてステートを限定し、遅延した、あるいは順序を守らないデータを自動でハンドリングします。それでは、サポートされているストリーム・ストリームJoinのタイプと、これらをどのように活用するのかを議論しましょう。
ウォーターマークのオプションを伴うInner Join
任意の種類のJoin条件を伴う任意の種類のカラムに対するInner Joinをサポートしています。しかし、ストリームが実行されると、いかなる新規の入力は過去のいかなる入力とマッチし得るので、全ての過去の入力を保存しなくてはならないとすると、ストリームのステートのサイズは無限に増加します。際限のないステートを避けるために、限りなく古い入力は将来の入力とマッチしないので、それらがステートからクリアーされるように追加のJoin条件を定義しなくてはなりません。言い換えると、Joinにおいては以下の追加のステップを行わなくてはなりません。
- (ストリーミング集計と同様に)入力がどの程度遅延するのかをエンジンが理解できるように、両方の入力に対してウォーターマークの遅延を定義します。
- いつ一方の古い入力行が別の入力とのマッチングに不要になる(すなわち、時間の制約を満足しない)のかをエンジンが理解できるように、両方の入力におけるイベント時間の制約を定義します。この制約は2つの方法のいずれかで定義することができます。
- 時間レンジのJoin条件(例:
JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR
) - イベント時間のウィンドウに対するJoin(例:
JOIN ON leftTimeWindow = rightTimeWindow
)
- 時間レンジのJoin条件(例:
サンプルを通じてこれを理解していきましょう。
いつインプレッションがマネタイズされたクリックにつながったのかの相関を見るために、広告のインプレッションのストリームと広告に対するクリックの別のストリームをJoinしたいものとします。このストリーム・ストリームJoinのステートのクリーンアップを可能にするために、以下のように遅延のウォーターマークと時間の制約を指定なくてはなりません。
- 遅延のウォーターマーク: インプレッションと対応するクリックはイベント時間においては、それぞれ最大2時間、3時間の遅延を許容します。
- イベント時間レンジの条件: クリックは対応するインプレッションの後、0秒から1時間の間に発生するものとします。
コードは以下のようになります。
from pyspark.sql.functions import expr
impressions = spark.readStream. ...
clicks = spark.readStream. ...
# Apply watermarks on event-time columns
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
# Join with event-time constraints
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
""")
)
ウォーターマークのオプションを伴うInner Joinにおけるセマンティックの保証
集計におけるウォーターマーキングによって提供される保証と同様の話です。2 hours
のウォーターマークの遅延は、遅延が2時間以下のデータはエンジンによって削除されないということが保証されます。しかし、2時間以上遅延したデータは処理されない可能性があります。
ウォーターマークを伴うOuter Join
Inner Joinにおいてはウォーターマークとイベント時間の制約はオプションですが、Outer Joinでは指定する必要があります。これは、Outer JoinでNULLの結果を生成するためには、入力行が将来にわたってマッチしないのはいつなのかをエンジンが理解する必要があるためです。このため、適切な結果を生成するためにはウォーターマークとイベント時間の制約を指定しなくてはなりません。このため、Outer Joinを伴うクエリーは、上述した広告・マネタイズのサンプルと同様なものになりますが、Outer Joinであることを指定するための追加パラメーターが存在する点が異なります。
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
ウォーターマークのオプションを伴うOuter Joinにおけるセマンティックの保証
遅延のウォーターマークに関して、データが削除されるかどうかに関しては、Outer JoinにおいてもInner Joinと同じ保証が行われます。
注意
どのようにOuterの結果が生成されるのかに関しては、いくつか注意すべき特性があります。
- OuterのNULLの結果は、指定された遅延のウォーターマークと時間レンジの条件に依存する遅延によって生成されます。これは、エンジンがデータがもはやマッチせず、将来にわたってもマッチしないということを保証するために十分な期間を待たなくてはならないためです。
- マイクロバッチエンジンの現在の実装においては、ウォーターマークはマイクロバッチの最後にまで進行し、次のマイクロバッチではステートをクリーアンプし、Outerの結果を出力するためにアップデートされたウォーターマークを使用します。新規データが処理された際にのみマイクロバッチを起動するので、ストリームに新規データが到着しない場合には、Outerの結果の生成が遅延する場合があります。簡単に言うと、Joinされる2つの入力ストリームが長い間データを受信しない場合、Outer(left、rigthの両方のケースで)の出力が遅延する場合があります。
ウォーターマークを伴うSemi Join
Semi Joinは右側とマッチするリレーションの左側から値を返却します。また、これはLeft Semi Joinとも呼ばれます。Outer Joinと同様、Semi Joinにおいてもウォーターマークとイベント時間の制約を指定しなくてはなりません。これは、左側でマッチしない行を除外するためのものであり、エンジンは左側の入力行がいつ将来にわたって右側とマッチしなくなるのかを知る必要があります。
ウォーターマークを伴うストリーム・ストリームのSemi Join
遅延のウォーターマークに関して、データが削除されるかどうかに関しては、Semi JoinにおいてもInner Joinと同じ保証が行われます。
ストリーミングクエリーにおけるJoinのサポートマトリクス
左の入力 | 右の入力 | Joinのタイプ | |
---|---|---|---|
スタティック | スタティック | 全てのタイプ | サポートされています。ストリーミングクエリーで表現できるとしてもこれはストリーミングデータでありません。 |
ストリーム | スタティック | ||
Inner | サポートされています。ステートフルではありません。 | ||
Left Outer | サポートされています。ステートフルではありません。 | ||
Right Outer | サポートされていません。 | ||
Full Outer | サポートされていません。 | ||
Left Semi | サポートされていません。 | ||
スタティック | ストリーム | ||
Inner | サポートされています。ステートフルではありません。 | ||
Left Outer | サポートされていません。 | ||
Right Outer | サポートされています。ステートフルではありません。 | ||
Full Outer | サポートされていません。 | ||
Left Semi | サポートされていません。 | ||
ストリーム | ストリーム | ||
Inner | サポートされています。ステートをクリーンアップするためにオプションとして、両サイドにウォーターマークと時間の制約を指定できます。 | ||
Left Outer | 条件付きでサポートされており、適切な結果を得るためには右側にウォーターマークと時間の制約を指定する必要があります。オプションとして全てのステートをクリーンアップするために左側にウォーターマークを指定することができます。 | ||
Right Outer | 条件付きでサポートされており、適切な結果を得るためには左側にウォーターマークと時間の制約を指定する必要があります。オプションとして全てのステートをクリーンアップするために右川にウォーターマークを指定することができます。 | ||
Full Outer | 条件付きでサポートされており、適切な結果を得るためには一方にウォーターマークと時間の制約を指定する必要があります。オプションとして全てのステートをクリーンアップするためにもう一方にウォーターマークを指定することができます。 | ||
Left Semi | 条件付きでサポートされており、適切な結果を得るためには右側にウォーターマークと時間の制約を指定する必要があります。オプションとして全てのステートをクリーンアップするために左側にウォーターマークを指定することができます。 |
サポートされるJoinに関する追加情報です。
- Joinをカスケードすることが可能です。すなわち、
df1.join(df2, ...).join(df3, ...).join(df4, ....)
は可能です。 - Spark 2.4時点では、クエリーがAppend出力モードの場合にのみJoinを使用できます。他の出力モードはサポートされていません。
- Spark 2.4時点では、Joinの前にnon-map-likeオペレーションを使用することはできません。使用できない例を以下に示します。
- Joinの前にストリーミング集計を行うことはできません。
- Joinの前にUpdateモードでmapGroupsWithStateやflatMapGroupsWithStateを使用することはできません。