0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Airflow資料抄訳(16):オペレーター(Operators)

Last updated at Posted at 2022-07-25

恥ずかしながら最近になって知ったワークフローエンジン Apache Airflow。日本語の紹介記事もちらほら出てきていますが、公式ドキュメントをちょっとずつ抄訳しながら読んでいこうと思います。

16回目の今回はオペレーター(Operators)。
バージョン2.3.3時点のものです。


オペレーター(Operators)

DAG:オペレーターはあらかじめ定義されたタスクのためのテンプレートです。オペレーターを使用することでDAGの内部で宣言的にタスクを定義することが出来ます:

with DAG("my-dag") as dag:
    ping = SimpleHttpOperator(endpoint="http://example.com/update/")
    email = EmailOperator(to="admin@example.com", subject="Update complete")

    ping >> email

Airflowにはとてもたくさんのオペレーターが用意されています。それらはAirflowのコアが組み込みで提供するものやプリインストールされたプロバイダーが提供するものです。コアにはいくつかの汎用性の高いオペレーターが含まれています:

  • BashOperator - bashでコマンドを実行する
  • PythonOperator - 任意のPython関数を実行する
  • EmailOperator - Eメールを送信する

コア・オペレーターの一覧はこちらをご参照ください: コア・オペレーターとフックのリファレンス

デフォルトでインストールされているものの中に必要なオペレーターがない場合でも、コミュニティが提供する膨大なプロバイダー・パッケージ群の中で見つかるかもしれません:

  • SimpleHttpOperator
  • MySqlOperator
  • PostgresOperator
  • MsSqlOperator
  • OracleOperator
  • JdbcOperator
  • DockerOperator
  • HiveOperator
  • S3FileTransformOperator
  • PrestoToMySqlOperator
  • SlackAPIOperator

こうして示したもの以外にもたくさんのオペレーターが提供されています。コミュニティで管理されているオペレーター、フック、センサー、そしてトランスファー(transfer)がプロバイダー・パッケージ群のドキュメントで参照できます。

Note
Airflowのコード内では、タスクとオペレーターの概念はしばしば互換のものとして用いられています。タスクはDAGを構成する「実行の最小単位」の総称です。オペレーターは再利用可能で、事前定義されたタスク・テンプレートです。ユーザー開発者が作成したロジックを実行します。いくつかの引数(argument)を必要とします。

Jinjaテンプレート

AirflowはJinjaテンプレート・エンジンを利用しており、マクロと組み合わせて用いることでとてもパワフルに機能します。

例えば、データ区間(Data Interval)の開始日時を、環境変数を通じてBashOperatorにより実行されるbashスクリプトに渡したい場合は:

# データ区間の開始日時。書式はYYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
    task_id="test_env",
    bash_command="/tmp/test.sh ",
    dag=dag,
    env={"DATA_INTERVAL_START": date},
)

ここで、{{ ds }}はテンプレート変数であり、BashOperatorenvパラメーターはJinjaによりテンプレート化されているので、bashスクリプトはDATA_INTERVAL_STARTという名前の環境変数でこのデータ区間の開始日時にアクセスできます。

Jinjaテンプレート・エンジンが利用できるパラメーターはドキュメント上で「テンプレート化されている」(tepmlated)と記述されています。テンプレート・エンジンによる文字列置換はpre_execute関数が実行される直前に行われます。

Jinjaテンプレート・エンジンは入れ子のフィールドにも使えます。そのためには当該の入れ子のフィールドが「テンプレート化されている」ものであることを構造的に示す必要があります。template_fieldsプロパティに登録されているフィールドはテンプレート・エンジンによる文字列置換の対象になります。下記の例ではpathフィールドが該当します:

class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path

    # ここにその他のコード


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
    dag=dag,
)

Note
template_fieldsプロパティはクラス変数でもインスタンス変数でも同様に機能します。

深く入れ子になったフィールドもまた文字列置換の対象にできます。この場合入れ子関係上中間に位置するフィールドもまたテンプレート化されている必要があります:

class MyDataTransformer:
    template_fields: Sequence[str] = ("reader",)

    def __init__(self, my_reader):
        self.reader = my_reader

    # ここにその他のコード


class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path

    # ここにその他のコード


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
    dag=dag,
)

DAGの作成時にJinjaのEnvironmentに対してオプションを渡すことが出来ます。よくある使用例の1つは、テンプレート文字列の終端の改行文字を削除しないようJinjaに指示するというものです:

my_dag = DAG(
    dag_id="my-dag",
    jinja_environment_kwargs={
        "keep_trailing_newline": True,
        # ここにその他のjinja2 Environmentオプション
    },
)

指定可能なオプションについて詳しくはJinjaのドキュメントを参照してください。

ネイティブのPythonオブジェクトとしてフィールドをレンダリングする

デフォルトでは、template_fieldsに列挙されたフィールドの値はすべて文字列としてレンダリングされます。

例えばextractという名前のタスクが辞書(例:{"1001": 301.27, "1002": 433.21, "1003": 502.22})をXComに登録したとします。後続タスクが実行されるとき、その値を取り出し文字列としてレンダリングされた結果をorder_data引数に渡します(例:'{"1001": 301.27, "1002": 433.21, "1003": 502.22}')。

transform = PythonOperator(
    task_id="transform",
    op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
    python_callable=transform,
)

レンダリング結果としてネイティブのPythonオブジェクト(この例ではdictオブジェクト)を得たい場合、DAGのパラメーターとしてrender_template_as_native_obj=Trueを指定します:

dag = DAG(
    dag_id="example_template_as_python_object",
    schedule_interval=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    render_template_as_native_obj=True,
)


def extract():
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    return json.loads(data_string)


def transform(order_data):
    print(type(order_data))
    for value in order_data.values():
        total_order_value += value
    return {"total_order_value": total_order_value}


extract_task = PythonOperator(task_id="extract", python_callable=extract)

transform_task = PythonOperator(
    task_id="transform",
    op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
    python_callable=transform,
)

extract_task >> transform_task

この例では、order_data引数に{"1001": 301.27, "1002": 433.21, "1003": 502.22}が渡されます。

render_template_as_native_objTrueが指定された場合、AirflowはJinjaのNativeEnvironmentを利用します。NativeEnvironmentにより、テンプレートのレンダリング結果としてネイティブのPythonデータ型のインスタンスが生成されます。

予約語params

Apache Airflow v2.2.0において、paramsという変数名はDAGシリアライゼーション過程で利用されます。この変数名をサードパーティのオペレーター内で利用しないでください。以前のバージョンのAirflowからアップグレードをした場合、次のようなエラーに遭遇するかもしれません:

AttributeError: 'str' object has no attribute '__module__'

オペレーター内で使用しているparams変数の名前を変更してください。


レディメイドのオペレーターでは不足で、あるシステムに固有の、しかし当該システム内ではそれなりに汎用性のあるロジックを独自のオペレーターとして定義する場合、Jinjaを使ったパラメータ化が行えるのは便利そうだなと感じました。

一方で、「ネイティブのPythonオブジェクトとしてフィールドをレンダリングする」(Rendering Fields as Native Python Objects)のセクションで説明されている機能については、正直何がうれしいのだろう?と感じてしまいました。テンプレートのレンダリング結果として文字列以外のオブジェクトを想定するようなケースとは・・・と。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?