0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AirflowのXComの記述方法あれこれ

Posted at

Apache Airflow の XComsについて、いざコードで記述してみる時に手が止まらないようにするためのメモ。
具体的には、以下の4パターンでコードの記述方法が変わってくるので、その違いについて書いていく。

  1. @taskデコレータを使って、明示的にxcom_pushしたものをpullする
  2. @taskデコレータを使って、PythonOperatorのreturn値をpullする
  3. @taskデコレータを使わずに、明示的にxcom_pushしたものをpullする
  4. @taskデコレータを使わずに、PythonOperatorのreturn値をpullする

参考にさせていただいた記事

関連情報

XComsとは

タスク間でデータを受け渡す仕組み。

以下公式ドキュメントの引用。

XComs (short for “cross-communications”) are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines.

ざっくり言うと、データを渡す側で xcom_push、データを受け取る側で xcom_pull をすることでデータの受け渡しが可能。

@task デコレータ(@dag デコレータ)について

Apache Airflow 2.0以降で導入されたもので、DAGの定義やタスクの作成をより直感的で簡潔に行うことができる。
詳しくは こちらを参照。
具体的な書き方については以下の公式の使用例がわかりやすい。

1. @taskデコレータを使って、明示的にxcom_pushしたものをpullする

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule_interval='@daily', start_date=datetime(2023, 1, 1))
def my_dag_1():
    @task
    def push_task():
        from airflow.models import TaskInstance
        ti = TaskInstance.current()
        ti.xcom_push(key='explicit_key', value='Explicit Value with @task decorator')

    @task
    def pull_task():
        from airflow.models import TaskInstance
        ti = TaskInstance.current()
        value = ti.xcom_pull(task_ids='push_task', key='explicit_key')
        print(value)

    push_task()
    pull_task()

2. @taskデコレータを使って、PythonOperatorのreturn値をpullする

ポイントとしては以下の2点:

  • PythonOperator 含め、ほとんどのオペレータではreturn値をXComsの return_value というキー自動的にプッシュする
  • xcom_pullは、キーが渡されない場合デフォルトで return_value キーの値を取得する

つまり、return値をpullする場合は明示的にpushする必要はないし、pull時にキーを指定しなくても良い。

@dag(schedule_interval='@daily', start_date=datetime(2023, 1, 1))
def my_dag_2():
    @task
    def push_task():
        return 'Value returned with @task decorator'

    @task
    def pull_task(data_from_push_task):
        print(data_from_push_task)

    data = push_task()
    pull_task(data)

3. @taskデコレータを使わずに、明示的にxcom_pushしたものをpullする

from airflow.models import DAG, TaskInstance
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def push_function(**kwargs):
    ti = kwargs['ti']
    ti.xcom_push(key='explicit_key', value='Explicit Value without @task decorator')

def pull_function(**kwargs):
    ti = kwargs['ti']
    value = ti.xcom_pull(task_ids='push_function', key='explicit_key')
    print(value)

with DAG('my_dag_3', schedule_interval='@daily', start_date=datetime(2023, 1, 1)) as dag:
    push_task = PythonOperator(
        task_id='push_function',
        python_callable=push_function,
        provide_context=True
    )

    pull_task = PythonOperator(
        task_id='pull_function',
        python_callable=pull_function,
        provide_context=True
    )

    push_task >> pull_task

4. @taskデコレータを使わずに、PythonOperatorのreturn値をpullする

ポイントは2と同じ。

def push_value_function():
    return 'Value returned without @task decorator'

def pull_value_function(**kwargs):
    ti = kwargs['ti']
    value = ti.xcom_pull(task_ids='push_value_function')
    print(value)

with DAG('my_dag_4', schedule_interval='@daily', start_date=datetime(2023, 1, 1)) as dag:
    push_value_task = PythonOperator(
        task_id='push_value_function',
        python_callable=push_value_function
    )

    pull_value_task = PythonOperator(
        task_id='pull_value_function',
        python_callable=pull_value_function,
        provide_context=True
    )

    push_value_task >> pull_value_task

まとめ

現在では Task Flow を使った方法などあるため、旧式?の方法を記載している点には注意。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?