Apache Airflow の XComsについて、いざコードで記述してみる時に手が止まらないようにするためのメモ。
具体的には、以下の4パターンでコードの記述方法が変わってくるので、その違いについて書いていく。
-
@task
デコレータを使って、明示的にxcom_pushしたものをpullする -
@task
デコレータを使って、PythonOperatorのreturn値をpullする -
@task
デコレータを使わずに、明示的にxcom_pushしたものをpullする -
@task
デコレータを使わずに、PythonOperatorのreturn値をpullする
参考にさせていただいた記事
関連情報
XComsとは
タスク間でデータを受け渡す仕組み。
以下公式ドキュメントの引用。
XComs (short for “cross-communications”) are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines.
ざっくり言うと、データを渡す側で xcom_push
、データを受け取る側で xcom_pull
をすることでデータの受け渡しが可能。
@task
デコレータ(@dag
デコレータ)について
Apache Airflow 2.0以降で導入されたもので、DAGの定義やタスクの作成をより直感的で簡潔に行うことができる。
詳しくは こちらを参照。
具体的な書き方については以下の公式の使用例がわかりやすい。
1. @task
デコレータを使って、明示的にxcom_pushしたものをpullする
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule_interval='@daily', start_date=datetime(2023, 1, 1))
def my_dag_1():
@task
def push_task():
from airflow.models import TaskInstance
ti = TaskInstance.current()
ti.xcom_push(key='explicit_key', value='Explicit Value with @task decorator')
@task
def pull_task():
from airflow.models import TaskInstance
ti = TaskInstance.current()
value = ti.xcom_pull(task_ids='push_task', key='explicit_key')
print(value)
push_task()
pull_task()
2. @task
デコレータを使って、PythonOperatorのreturn値をpullする
ポイントとしては以下の2点:
-
PythonOperator
含め、ほとんどのオペレータではreturn値をXComsのreturn_value
というキー自動的にプッシュする - xcom_pullは、キーが渡されない場合デフォルトで
return_value
キーの値を取得する
つまり、return値をpullする場合は明示的にpushする必要はないし、pull時にキーを指定しなくても良い。
@dag(schedule_interval='@daily', start_date=datetime(2023, 1, 1))
def my_dag_2():
@task
def push_task():
return 'Value returned with @task decorator'
@task
def pull_task(data_from_push_task):
print(data_from_push_task)
data = push_task()
pull_task(data)
3. @task
デコレータを使わずに、明示的にxcom_pushしたものをpullする
from airflow.models import DAG, TaskInstance
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def push_function(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key='explicit_key', value='Explicit Value without @task decorator')
def pull_function(**kwargs):
ti = kwargs['ti']
value = ti.xcom_pull(task_ids='push_function', key='explicit_key')
print(value)
with DAG('my_dag_3', schedule_interval='@daily', start_date=datetime(2023, 1, 1)) as dag:
push_task = PythonOperator(
task_id='push_function',
python_callable=push_function,
provide_context=True
)
pull_task = PythonOperator(
task_id='pull_function',
python_callable=pull_function,
provide_context=True
)
push_task >> pull_task
4. @task
デコレータを使わずに、PythonOperatorのreturn値をpullする
ポイントは2と同じ。
def push_value_function():
return 'Value returned without @task decorator'
def pull_value_function(**kwargs):
ti = kwargs['ti']
value = ti.xcom_pull(task_ids='push_value_function')
print(value)
with DAG('my_dag_4', schedule_interval='@daily', start_date=datetime(2023, 1, 1)) as dag:
push_value_task = PythonOperator(
task_id='push_value_function',
python_callable=push_value_function
)
pull_value_task = PythonOperator(
task_id='pull_value_function',
python_callable=pull_value_function,
provide_context=True
)
push_value_task >> pull_value_task
まとめ
現在では Task Flow を使った方法などあるため、旧式?の方法を記載している点には注意。