WindowsでAirflowのCustom XCom Backendを試してみました。
環境構築で参考にした記事はこちら
では本題。流れは以下です。
1.Custom XCom Backendの定義したPythonファイルを作成
2.cfgファイルにてCustom XCom Backendファイルを設定
3.DAGファイルを作成
4.実行
Custom XCom Backendの定義したPythonファイルを作成
以下使用するCustom XCom Backendの定義したPythonファイル
from airflow.models.xcom import BaseXCom
class XmasBuckend(BaseXCom):
@staticmethod
def serialize_value(value):
return BaseXCom.serialize_value(value)
@staticmethod
def deserialize_value(result):
result = BaseXCom.deserialize_value(result)
return f"Received value from Santa Claus: {result}"
cfgファイルにてCustom XCom Backendファイルを設定
上記ファイルを使用できるようにcfgファイルの設定を変更します。
xcom_backend = ファイル名.クラス名を設定すればOKです。
xcom_backend = Custom_XCom_Backned.XmasBuckend
DAGファイルを作成
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
'Santa_Claus',
schedule=None,
) as Santa_Claus:
push_task = BashOperator(
task_id='push_task',
bash_command='echo presents',
do_xcom_push=True
)
pull_task = BashOperator(
task_id='pull_task',
bash_command='echo {{ ti.xcom_pull(task_ids="push_task") }}',
)
push_task >> pull_task
実行
以下で確認しました。
結果ログ(該当部分抜粋)
[2023-12-24, 15:53:16 UTC] {subprocess.py:93} INFO - Received value from Santa Claus: presents
想定通りの結果になりました、よかった!
メリークリスマス🎄