LoginSignup
0
0

Apache BeamでLeft Joinを実装する

Last updated at Posted at 2023-03-29

はじめに

PDEの資格を取得したは良いものの、0からコードを記述してパイプラインを作成することができなかったため、コーディングの練習を始めました。
Apache SparkはPandasライクな構文で理解しやすかったのですが、Apache Beamの独特な文法に苦戦し、特にLeft JoinをするためのメソッドがBeamには実装されていなかった(調べられていないだけかもですが)ので記事に残すことにしました。

環境

.toml
[tool.poetry]
name = "beam"
version = "0.1.0"
description = ""
authors = ["Kamegrueon"]

[tool.poetry.dependencies]
python = "^3.9"
jupyter = "^1.0.0"
apache-beam = "^2.46.0"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

実装したコード

Gitにコード上げてます。

解説

LeftJoin クラス

# Apache BeamのPTransformを継承して、LeftJoinという新しいPTransformを作成する
class LeftJoin(beam.PTransform):
  def __init__(self, right_pcoll: PCollection, join_key: str, join_cols: Union[list[str], None]=None):
    self.right_pcoll = right_pcoll
    self.join_key = join_key
    self.join_cols = join_cols
  
   # PTransformを拡張するためのメソッド
  def expand(self, pcoll: PCollection):
    return ( pcoll | beam.FlatMap(
              fn=left_join, # 左テーブルの各行と右テーブルのデータを結合するためにleft_join関数を使用する
              right_element=AsList(self.right_pcoll), # 右テーブルのデータをAsListでリスト化する
              join_key=self.join_key, # 結合に使用するキー
              join_cols=self.join_cols # 右テーブルから結合したいカラムのリスト、Noneの場合は全てのカラムを使用する
  ))

ParDo Transformは使用せずに、PTransformを継承してクラスを生成している。
理由は参考にしたこちらのコードでそのように実装していたからなのだが、通常のParDo Transformと同じようにクラスを生成しただけではbeam.AsListメソッドがエラーとなってしまった。

left_join 関数

# 左側のテーブルと右側のテーブルを結合する関数
def left_join(left_element: dict, right_element: list[dict], join_key: str, join_cols: Union[list[str], None]):
    # 指定されたカラムを抽出
    right_elem: list[dict] = extract_join_cols(right_element, join_key, join_cols)
    for right in right_elem:
      # join_keyが一致する場合は結合し、yieldで返す
      if left_element[join_key] == right[join_key]:
        yield {**left_element, **right}
        return
      
    # join_keyが一致しない場合、空のデータを追加して結合する
    empty_keys: list[str] = list(filter(lambda x: x != join_key, right_elem[0].keys()))
    yield {**left_element, **{ key: '' for key in empty_keys }}

右テーブルの任意の列のみを結合できるようにextract_join_cols関数も実装(後述)
左テーブルと右テーブルのキーが一致したら左テーブルに右テーブルをマージし、一致しない場合は空の値をマージする。
右テーブルに複数一致する行があった場合は最初に一致した行のみマージする仕様となっている。

extract_join_cols 関数

def extract_join_cols(right_element: list[dict], join_key: str, join_cols: Union[list[str], None]):
    if join_cols is not None:
      # リスト内包表記を用いて指定されたカラムだけを取得し、新しいリストを作成
      extract_right_element: list[dict] = [
        { k: v for k, v in dic.items() if k in [ *join_cols, join_key ]} for dic in right_element
      ]
      return extract_right_element
    # 指定されたカラムがなければそのまま返す
    return right_element

リスト内包表記で、右テーブルのリストから取り出した辞書オブジェクトからキーとバリューを取り出し、join_colsに指定したカラムとjoin_keyのみを取り出し、辞書を内包したリストを生成する。
join_colsは指定しなかった場合はNoneが入るため、指定しなければ特に何も処理をせずにreturnする。

出力結果

IN

left_pcoll = p | "Create Left PCollection" >> beam.Create([
    {'join_key': 1, 'left_value_1': 'a', 'left_value_2': 'b'},
    {'join_key': 2, 'left_value_1': 'c', 'left_value_2': 'd'},
    {'join_key': 2, 'left_value_1': 'e', 'left_value_2': 'f'},
    {'join_key': 3, 'left_value_1': 'g', 'left_value_2': 'h'}
])
    
# 右側のテーブルを作成する
right_pcoll = p | "Create Right PCollection" >> beam.Create([
    {'join_key': 1, 'right_value': 'A', 'not_join_value': "D"},
    {'join_key': 2, 'right_value': 'B', 'not_join_value': "E"},
    {'join_key': 4, 'right_value': 'C', 'not_join_value': "F"}
])

OUT

{'join_key': 1, 'left_value_1': 'a', 'left_value_2': 'b', 'right_value': 'A'}
{'join_key': 2, 'left_value_1': 'c', 'left_value_2': 'd', 'right_value': 'B'}
{'join_key': 2, 'left_value_1': 'e', 'left_value_2': 'f', 'right_value': 'B'}
{'join_key': 3, 'left_value_1': 'g', 'left_value_2': 'h', 'right_value': ''}

参考文献

最後に

割と苦労して情報を見つけ実装をしましたが、Apache Beamで結合するのではなく、一度Bigquery等に取り込んでから変換する方が楽なのではと感じました。(処理速度的にも)
ですが、実装する過程でPCollectionやPTransformに関しての知識がかなりついたと感じたので、取り組んだ価値はあったかと思います。
次はローカルではなくDataflow上でちゃんと動くか試していきます。
もし、BeamでLeftJoinをしたい方がいれば参考にしていただけると幸いです。

PS: TypeScriptに触れてから型がないと気持ち悪く感じるようになりました笑。

最後まで読んでいただきありがとうございます!

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0