はじめに
PDEの資格を取得したは良いものの、0からコードを記述してパイプラインを作成することができなかったため、コーディングの練習を始めました。
Apache SparkはPandasライクな構文で理解しやすかったのですが、Apache Beamの独特な文法に苦戦し、特にLeft JoinをするためのメソッドがBeamには実装されていなかった(調べられていないだけかもですが)ので記事に残すことにしました。
環境
[tool.poetry]
name = "beam"
version = "0.1.0"
description = ""
authors = ["Kamegrueon"]
[tool.poetry.dependencies]
python = "^3.9"
jupyter = "^1.0.0"
apache-beam = "^2.46.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
実装したコード
Gitにコード上げてます。
解説
LeftJoin クラス
# Apache BeamのPTransformを継承して、LeftJoinという新しいPTransformを作成する
class LeftJoin(beam.PTransform):
def __init__(self, right_pcoll: PCollection, join_key: str, join_cols: Union[list[str], None]=None):
self.right_pcoll = right_pcoll
self.join_key = join_key
self.join_cols = join_cols
# PTransformを拡張するためのメソッド
def expand(self, pcoll: PCollection):
return ( pcoll | beam.FlatMap(
fn=left_join, # 左テーブルの各行と右テーブルのデータを結合するためにleft_join関数を使用する
right_element=AsList(self.right_pcoll), # 右テーブルのデータをAsListでリスト化する
join_key=self.join_key, # 結合に使用するキー
join_cols=self.join_cols # 右テーブルから結合したいカラムのリスト、Noneの場合は全てのカラムを使用する
))
ParDo Transformは使用せずに、PTransformを継承してクラスを生成している。
理由は参考にしたこちらのコードでそのように実装していたからなのだが、通常のParDo Transformと同じようにクラスを生成しただけではbeam.AsListメソッドがエラーとなってしまった。
left_join 関数
# 左側のテーブルと右側のテーブルを結合する関数
def left_join(left_element: dict, right_element: list[dict], join_key: str, join_cols: Union[list[str], None]):
# 指定されたカラムを抽出
right_elem: list[dict] = extract_join_cols(right_element, join_key, join_cols)
for right in right_elem:
# join_keyが一致する場合は結合し、yieldで返す
if left_element[join_key] == right[join_key]:
yield {**left_element, **right}
return
# join_keyが一致しない場合、空のデータを追加して結合する
empty_keys: list[str] = list(filter(lambda x: x != join_key, right_elem[0].keys()))
yield {**left_element, **{ key: '' for key in empty_keys }}
右テーブルの任意の列のみを結合できるようにextract_join_cols関数も実装(後述)
左テーブルと右テーブルのキーが一致したら左テーブルに右テーブルをマージし、一致しない場合は空の値をマージする。
右テーブルに複数一致する行があった場合は最初に一致した行のみマージする仕様となっている。
extract_join_cols 関数
def extract_join_cols(right_element: list[dict], join_key: str, join_cols: Union[list[str], None]):
if join_cols is not None:
# リスト内包表記を用いて指定されたカラムだけを取得し、新しいリストを作成
extract_right_element: list[dict] = [
{ k: v for k, v in dic.items() if k in [ *join_cols, join_key ]} for dic in right_element
]
return extract_right_element
# 指定されたカラムがなければそのまま返す
return right_element
リスト内包表記で、右テーブルのリストから取り出した辞書オブジェクトからキーとバリューを取り出し、join_colsに指定したカラムとjoin_keyのみを取り出し、辞書を内包したリストを生成する。
join_colsは指定しなかった場合はNoneが入るため、指定しなければ特に何も処理をせずにreturnする。
出力結果
IN
left_pcoll = p | "Create Left PCollection" >> beam.Create([
{'join_key': 1, 'left_value_1': 'a', 'left_value_2': 'b'},
{'join_key': 2, 'left_value_1': 'c', 'left_value_2': 'd'},
{'join_key': 2, 'left_value_1': 'e', 'left_value_2': 'f'},
{'join_key': 3, 'left_value_1': 'g', 'left_value_2': 'h'}
])
# 右側のテーブルを作成する
right_pcoll = p | "Create Right PCollection" >> beam.Create([
{'join_key': 1, 'right_value': 'A', 'not_join_value': "D"},
{'join_key': 2, 'right_value': 'B', 'not_join_value': "E"},
{'join_key': 4, 'right_value': 'C', 'not_join_value': "F"}
])
OUT
{'join_key': 1, 'left_value_1': 'a', 'left_value_2': 'b', 'right_value': 'A'}
{'join_key': 2, 'left_value_1': 'c', 'left_value_2': 'd', 'right_value': 'B'}
{'join_key': 2, 'left_value_1': 'e', 'left_value_2': 'f', 'right_value': 'B'}
{'join_key': 3, 'left_value_1': 'g', 'left_value_2': 'h', 'right_value': ''}
参考文献
最後に
割と苦労して情報を見つけ実装をしましたが、Apache Beamで結合するのではなく、一度Bigquery等に取り込んでから変換する方が楽なのではと感じました。(処理速度的にも)
ですが、実装する過程でPCollectionやPTransformに関しての知識がかなりついたと感じたので、取り組んだ価値はあったかと思います。
次はローカルではなくDataflow上でちゃんと動くか試していきます。
もし、BeamでLeftJoinをしたい方がいれば参考にしていただけると幸いです。
PS: TypeScriptに触れてから型がないと気持ち悪く感じるようになりました笑。
最後まで読んでいただきありがとうございます!