恥ずかしながら最近になって知ったワークフローエンジン Apache Airflow。日本語の紹介記事もちらほら出てきていますが、公式ドキュメントをちょっとずつ抄訳しながら読んでいこうと思います。
18目の今回はクロス・コミュニケーション(XComs)。
バージョン2.3.3時点のものです。
クロス・コミュニケーション(XComs)
XComs("cross-communications" の短縮形)はタスク同士が互いに情報のやり取りをするための仕組みです。デフォルトでは、タスク同士は完全に分離されており、まったく異なるサーバー上で実行される可能性もあります。
XComsでやり取りされる個々の情報はkey
(実質的にそれ自身の名前です)と、データソースとなるtask_id
およびdag_id
により指し示されます。(シリアライズ可能なものなら)どんなものでもやり取りできますが、仕組みとしてはあくまでも小規模なデータをやり取りするためだけに設計されたものです。この仕組を大規模なデータ、例えばデータフレームのようなものの送信に利用しないでください。
XComsにデータを追加するにはxcom_push
メソッドを、取り出すにはxcom_pull
メソッドを使用します。多くのオペレーターはdo_xcom_push
引数にTrue
(デフォルト)が指定された場合、実行結果を return_value
というキーで自動的にプッシュします。@task
デコレーターで修飾された関数も同じように機能します。
xcom_pull
メソッドはキーを指定せずに呼び出すと、このreturn_value
をデフォルト値として利用します。これは次のようなコードを書くことができるということです:
# "pushing_task"タスクが"return_value"というキーで登録した値を取り出す
value = task_instance.xcom_pull(task_ids='pushing_task')
XComsはテンプレートの中でも利用できます:
SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}
XComsは変数(variables)に似ています。大きな違いはXComsがタスクごとに用意されるものであり、DAG Runの内部でのやり取りのために設計されているのに対して、変数はグローバルであり、Airflowの全般的な設定や値の共有のために設計されているという点です。
Note
最初のタスクが成功しなかった場合、当該タスクのリトライの都度XComsはクリアされます。これはタスク実行を冪等なものとするためです。
カスタムXCom バックエンド
XComsのバックエンドは変更できます。設定ファイルのxcom_backend
の項目で指定します。
独自のバックエンド実装を行いたい場合は、BaseXCom
クラスのサブクラスを作成し、serialize_value
メソッドとdeserialize_value
メソッドをオーバーライドします。
この他にorm_deserialize_value
メソッドもありますが、これはXComオブジェクトをWeb UI上で表示するときに呼び出されます。XComsを使って巨大なオブジェクトや、値を取得するのに大きなコストがかかるオブジェクトを扱っている場合、Web UIの応答速度を維持するために、このメソッドをオーバーライドしてデフォルトのコードが呼びされないよういしなくてはなりません(オーバーライドは軽量で、巨大なXComオブジェクトの一部分だけを表示するものになるでしょう)。
clear
メソッドもまたオーバーライドできます。このメソッドを使用して特定のDAGとタスクの結果をクリアすることができます。このメソッドをオーバーライドすることで、カスタムXComバックエンドがデータ・ライフサイクルを処理するのが容易になります。
コンテナ内でカスタムXComバックエンドを利用する
(省略)
Helmで管理されたk8sクラスター内でカスタムXComバックエンドを利用する
(省略)
小規模なデータのやり取りにのみ使うように注意しつつ、大規模なデータをやり取りする場合のカスタムXComについて触れているあたり、、、現実には本来の目的を無視した使われ方がまあまあ横行しているのでしょうか。
このカスタムXComというのがどれくらいニーズがあるものなのか? 「コンテナ内で~」「k8sクラスター内で~」というところはこのカスタムXComが前提で、それなりに複雑なことを述べているようす。ただ今回は読まず。