2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

WindowsでAirflowのCustom XCom Backendを試した

Posted at

WindowsでAirflowのCustom XCom Backendを試してみました。

環境構築で参考にした記事はこちら

では本題。流れは以下です。
1.Custom XCom Backendの定義したPythonファイルを作成
2.cfgファイルにてCustom XCom Backendファイルを設定
3.DAGファイルを作成
4.実行

Custom XCom Backendの定義したPythonファイルを作成

以下使用するCustom XCom Backendの定義したPythonファイル

from airflow.models.xcom import BaseXCom
class XmasBuckend(BaseXCom):

    @staticmethod
    def serialize_value(value):
        return BaseXCom.serialize_value(value)

    @staticmethod
    def deserialize_value(result):
        result = BaseXCom.deserialize_value(result)
        return f"Received value from Santa Claus: {result}" 

cfgファイルにてCustom XCom Backendファイルを設定

上記ファイルを使用できるようにcfgファイルの設定を変更します。
xcom_backend = ファイル名.クラス名を設定すればOKです。

xcom_backend = Custom_XCom_Backned.XmasBuckend

DAGファイルを作成

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime


with DAG(
    'Santa_Claus',
    schedule=None,
    ) as Santa_Claus:

    push_task = BashOperator(
        task_id='push_task',
        bash_command='echo presents',
        do_xcom_push=True
    )

    pull_task = BashOperator(
        task_id='pull_task',
        bash_command='echo {{ ti.xcom_pull(task_ids="push_task") }}',
    )

push_task >> pull_task

実行

以下で確認しました。

結果ログ(該当部分抜粋)

[2023-12-24, 15:53:16 UTC] {subprocess.py:93} INFO - Received value from Santa Claus: presents

想定通りの結果になりました、よかった!
メリークリスマス🎄

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?