恥ずかしながら最近になって知ったワークフローエンジン Apache Airflow。日本語の紹介記事もちらほら出てきていますが、公式ドキュメントをちょっとずつ抄訳しながら読んでいこうと思います。
16回目の今回はオペレーター(Operators)。
バージョン2.3.3時点のものです。
オペレーター(Operators)
DAG:オペレーターはあらかじめ定義されたタスクのためのテンプレートです。オペレーターを使用することでDAGの内部で宣言的にタスクを定義することが出来ます:
with DAG("my-dag") as dag:
ping = SimpleHttpOperator(endpoint="http://example.com/update/")
email = EmailOperator(to="admin@example.com", subject="Update complete")
ping >> email
Airflowにはとてもたくさんのオペレーターが用意されています。それらはAirflowのコアが組み込みで提供するものやプリインストールされたプロバイダーが提供するものです。コアにはいくつかの汎用性の高いオペレーターが含まれています:
-
BashOperator
- bashでコマンドを実行する -
PythonOperator
- 任意のPython関数を実行する -
EmailOperator
- Eメールを送信する
コア・オペレーターの一覧はこちらをご参照ください: コア・オペレーターとフックのリファレンス。
デフォルトでインストールされているものの中に必要なオペレーターがない場合でも、コミュニティが提供する膨大なプロバイダー・パッケージ群の中で見つかるかもしれません:
SimpleHttpOperator
MySqlOperator
PostgresOperator
MsSqlOperator
OracleOperator
JdbcOperator
DockerOperator
HiveOperator
S3FileTransformOperator
PrestoToMySqlOperator
SlackAPIOperator
こうして示したもの以外にもたくさんのオペレーターが提供されています。コミュニティで管理されているオペレーター、フック、センサー、そしてトランスファー(transfer)がプロバイダー・パッケージ群のドキュメントで参照できます。
Note
Airflowのコード内では、タスクとオペレーターの概念はしばしば互換のものとして用いられています。タスクはDAGを構成する「実行の最小単位」の総称です。オペレーターは再利用可能で、事前定義されたタスク・テンプレートです。ユーザー開発者が作成したロジックを実行します。いくつかの引数(argument)を必要とします。
Jinjaテンプレート
AirflowはJinjaテンプレート・エンジンを利用しており、マクロと組み合わせて用いることでとてもパワフルに機能します。
例えば、データ区間(Data Interval)の開始日時を、環境変数を通じてBashOperator
により実行されるbashスクリプトに渡したい場合は:
# データ区間の開始日時。書式はYYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
task_id="test_env",
bash_command="/tmp/test.sh ",
dag=dag,
env={"DATA_INTERVAL_START": date},
)
ここで、{{ ds }}
はテンプレート変数であり、BashOperator
のenv
パラメーターはJinjaによりテンプレート化されているので、bashスクリプトはDATA_INTERVAL_START
という名前の環境変数でこのデータ区間の開始日時にアクセスできます。
Jinjaテンプレート・エンジンが利用できるパラメーターはドキュメント上で「テンプレート化されている」(tepmlated)と記述されています。テンプレート・エンジンによる文字列置換はpre_execute
関数が実行される直前に行われます。
Jinjaテンプレート・エンジンは入れ子のフィールドにも使えます。そのためには当該の入れ子のフィールドが「テンプレート化されている」ものであることを構造的に示す必要があります。template_fields
プロパティに登録されているフィールドはテンプレート・エンジンによる文字列置換の対象になります。下記の例ではpath
フィールドが該当します:
class MyDataReader:
template_fields: Sequence[str] = ("path",)
def __init__(self, my_path):
self.path = my_path
# ここにその他のコード
t = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
dag=dag,
)
Note
template_fields
プロパティはクラス変数でもインスタンス変数でも同様に機能します。
深く入れ子になったフィールドもまた文字列置換の対象にできます。この場合入れ子関係上中間に位置するフィールドもまたテンプレート化されている必要があります:
class MyDataTransformer:
template_fields: Sequence[str] = ("reader",)
def __init__(self, my_reader):
self.reader = my_reader
# ここにその他のコード
class MyDataReader:
template_fields: Sequence[str] = ("path",)
def __init__(self, my_path):
self.path = my_path
# ここにその他のコード
t = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
dag=dag,
)
DAGの作成時にJinjaのEnvironment
に対してオプションを渡すことが出来ます。よくある使用例の1つは、テンプレート文字列の終端の改行文字を削除しないようJinjaに指示するというものです:
my_dag = DAG(
dag_id="my-dag",
jinja_environment_kwargs={
"keep_trailing_newline": True,
# ここにその他のjinja2 Environmentオプション
},
)
指定可能なオプションについて詳しくはJinjaのドキュメントを参照してください。
ネイティブのPythonオブジェクトとしてフィールドをレンダリングする
デフォルトでは、template_fields
に列挙されたフィールドの値はすべて文字列としてレンダリングされます。
例えばextract
という名前のタスクが辞書(例:{"1001": 301.27, "1002": 433.21, "1003": 502.22}
)をXComに登録したとします。後続タスクが実行されるとき、その値を取り出し文字列としてレンダリングされた結果をorder_data
引数に渡します(例:'{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
)。
transform = PythonOperator(
task_id="transform",
op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
python_callable=transform,
)
レンダリング結果としてネイティブのPythonオブジェクト(この例ではdict
オブジェクト)を得たい場合、DAGのパラメーターとしてrender_template_as_native_obj=True
を指定します:
dag = DAG(
dag_id="example_template_as_python_object",
schedule_interval=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
render_template_as_native_obj=True,
)
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
return json.loads(data_string)
def transform(order_data):
print(type(order_data))
for value in order_data.values():
total_order_value += value
return {"total_order_value": total_order_value}
extract_task = PythonOperator(task_id="extract", python_callable=extract)
transform_task = PythonOperator(
task_id="transform",
op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
python_callable=transform,
)
extract_task >> transform_task
この例では、order_data
引数に{"1001": 301.27, "1002": 433.21, "1003": 502.22}
が渡されます。
render_template_as_native_obj
にTrue
が指定された場合、AirflowはJinjaのNativeEnvironmentを利用します。NativeEnvironment
により、テンプレートのレンダリング結果としてネイティブのPythonデータ型のインスタンスが生成されます。
予約語params
Apache Airflow v2.2.0において、params
という変数名はDAGシリアライゼーション過程で利用されます。この変数名をサードパーティのオペレーター内で利用しないでください。以前のバージョンのAirflowからアップグレードをした場合、次のようなエラーに遭遇するかもしれません:
AttributeError: 'str' object has no attribute '__module__'
オペレーター内で使用しているparams
変数の名前を変更してください。
レディメイドのオペレーターでは不足で、あるシステムに固有の、しかし当該システム内ではそれなりに汎用性のあるロジックを独自のオペレーターとして定義する場合、Jinjaを使ったパラメータ化が行えるのは便利そうだなと感じました。
一方で、「ネイティブのPythonオブジェクトとしてフィールドをレンダリングする」(Rendering Fields as Native Python Objects)のセクションで説明されている機能については、正直何がうれしいのだろう?と感じてしまいました。テンプレートのレンダリング結果として文字列以外のオブジェクトを想定するようなケースとは・・・と。