17
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Cloud Composerに入門する

Last updated at Posted at 2021-12-26

はじめに

  • Cloud Composerに入門した時に確認したことのメモです。(QA方式)

確認したこと

Cloud Composerとは?

  • Apache Airflow で構築された、フルマネージドのワークフローオーケストレーションサービス
    • ハイブリッドおよびマルチクラウド環境にまたがるパイプラインを作成、スケジューリング、モニタリング
    • Apache Airflow のオープンソースのプロジェクト上に構築され、Python を使用して運用
    • 特定のベンダーに依存する必要がなくなり、使用も簡単

仕組みは?

スクリーンショット 2021-12-26 10.13.38.png
Cloud Composer 環境のアーキテクチャ から引用

  • 利用者としては、「DAGファイル」と呼ばれる.pyのファイルをGCSの指定のバケット下に置くだけで、ワークフローをどんどん追加して、やりたいタスクを動かしていくことができる
  • Airflow Worker(s): 各タスクを実行するワーカー。node_countの設定で増やせる
  • Cloud Storage
    • DAGs: ワークフローを記述するファイルを配置する
    • Logs: 各ワーカー上で出力されたログのファイルが配置される
    • plugin: DAGのなかで読み込みたい外部ファイルやモジュールなどを配置する

シンプルな構築・利用手順は?

1. Cloud Composer APIの有効化

  • コンソールからCloud Composer APIのENABLEを押下

2. Cloud Composer 環境を作成する

  • コンソールからComposer
  • -> 環境の作成
Location: us-central1など
Image Version: composerとairflowのバージョン
Maintenance windows: 週に12h必要
Service account: This service account runs the pods of your environment and performs environment operations such as upgrading your environment to a new version
Environment resources: Scheduler, Web server, WorkerのCPU・Memory・Storageなど
Network configuration: GKEのネットワーク設定(IPレンジなど)
  • -> GKEやGCSが作成される(GKEのコンソール画面からclustersやServicesが確認できる)

3. DAGファイルの作成

  • v2の場合の例
example_v2_dag.py
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

@dag(default_args={'owner': 'airflow'}, schedule_interval=None, start_date=days_ago(2))
def tutorial_taskflow_api_etl():
   @task
   def extract():
       return {"1001": 301.27, "1002": 433.21, "1003": 502.22}

   @task
   def transform(order_data_dict: dict) -> dict:
       total_order_value = 0

       for value in order_data_dict.values():
           total_order_value += value

       return {"total_order_value": total_order_value}

   @task()
   def load(total_order_value: float):

       print("Total order value is: %.2f" % total_order_value)

   order_data = extract()
   order_summary = transform(order_data)
   load(order_summary["total_order_value"])

tutorial_etl_dag = tutorial_taskflow_api_etl()
  • v1の場合の例
tutorial.py
from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
with DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
    )
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)

    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
    )

    t1 >> [t2, t3]

4. DAG の Cloud Storage へのアップロード

  • DAGファイルをCloud Storageのdagsフォルダにアップロードすると、ファイルがCloud Composerによって解析される
gsutil cp ./example_v2_dag.py <DAGs_folder_path>
  • -> (参考)コンソールからDAGを確認

スクリーンショット 2021-12-26 13.57.53.png

DAGの書き方は?

基本編

上記 example_v2_dag.py 参照(Airflow 2.0対応の書き方)

  • ①ライブラリのインポート

    • airflow自体のライブラリなどのインポートなどをする
  • ②デフォルト引数の定義

    • タスクを実行するために必要となる、DAGのデフォルト引数を定義する
    • DAG内の全オペレータに共通して設定するパラメータを定義する
    • airflow.models.BaseOperator
  • ③DAGの作成

    • DAG自体を作成する。DAG本体の名前や説明、どの間隔でタスクを実行するかを定義する
    • airflow.DAGのコンストラクタで設定できる引数の一覧と説明は、airflow.models.dag.DAGのドキュメント参照
  • ④タスクの宣言

    • 実際に実行するタスク (ジョブ) を宣言する。実行するすべてのタスクを宣言して、前の手順で作成したDAGの一部とする
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)
  • ⑤依存関係の記載
v1
# ワークフロー t2タスクの上流にt1をセット
t2.set_upstream(t1)
# ワークフロー t3タスクの上流にt1をセット
t3.set_upstream(t1)

# ワークフローは下記のような記述もできる
#t1.set_downstream([t1, t3])
#t1 >> [t2, t3]
#[t2, t3] << t1

応用編

  • 外から変数を渡したい時はどうするか

    • ユースケース

      • バッチによってはパラメータ付きで手動実行したい
      • 一時的なデータ投入・更新バッチのために毎回DAG定義を書くのではなく、汎用的なDAGを用意してパラメータで実行するコマンドを動的に切り替えできると便利(日付など)
    • 渡す側

        1. CLIからパラメータ付き実行
        • airflow trigger_dag -c '{"key1":1, "key2":2}' dag_id
        1. GUIからパラメータ付き実行
        • ヘッダのBrowse -> DAG Runs -> + アイコンクリックで新規DAG Run作成画面に遷移する
        1. REST API
        • $ curl -X POST \ http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs \ -H 'Cache-Control: no-cache' \ -H 'Content-Type: application/json' \ -d '{"conf":"{\"key\":\"value\"}"}'
    • -> 渡される側(オペレータ) ①template文字列内で受け取る

      bash_task = BashOperator(
          task_id="bash_task",
          bash_command='echo "Here is the message: '
                       '{{ dag_run.conf["json_key"] }}" ',
          dag=dag,
      )
      
    • -> 渡される側(オペレータ) ②context経由で受け取る

      • Operator内部のcontextという変数にdag_runが入っている
      def print_value(**context):
          print(context['dag_run'].conf['json_key'])
      
      PythonOperator(
          python_callable=print_value,
      )
      
  • スクリプト(bash, SQLなど)ファイルを各オペレータから読み込みたい時はどうするか

    • 実行するQueryを記述したSQLファイルは、 GCSのdags/sql配下にまとめて管理
transform_1 = bigquery_operator.BigQueryOperator(
   task_id="transform_1",
   sql="sql/query_1.sql",
   destination_dataset_table="{{params.sample_table}}",
   write_disposition="WRITE_APPEND",
)

実行結果確認方法は?

実行結果ステータス

スクリーンショット 2021-12-26 13.59.28.png

スクリーンショット 2021-12-26 13.58.45.png

実行結果ログ

スクリーンショット 2021-12-26 14.00.39.png

アラート通知方法は?

エラー

  • メール通知は default_argsの通り
  • Slack通知はAirflowのDAGの実行状況をSlackに通知して監視する参照
    • Operatorのパラメータに on_success_callback または on_failure_callback を設定しておくとタスクの成功/失敗時に指定した関数を実行することができるので、ここにタスクの成功/失敗時にそれをSlackに通知する関数を指定しておく。

遅延

  • airflowは各Task に対してSLA 値(時間)を設定できる
    • メール通知の場合は下記の通り。sla_miss_callback設定も可能
t1 = PythonOperator(
    task_id='sla_test',
    python_callable=example_call,
    sla=timedelta(seconds=10),
    email="xxx.yyy@example.com",
    dag=dag
)

リトライ方法は?

  • AirflowのTips 11選 参照
    • コンソール画面からClearする(Clear=リトライなので注意。Clearを押下すると、すぐにリトライが始まる)

スクリーンショット 2021-12-26 14.34.37.png

権限管理方法は?

  • ログインユーザ

    • Userinfo
      • User Name: accounts.google.com:1234XXXXXXXX
      • Role: [Op]
    • Personal Info
      • First Name: tanakataro @ gmail.com
      • Email: tanakataro @ gmail.com
  • Composerの認証

Google Cloud クライアント ライブラリでは、アプリケーションのデフォルト認証情報(ADC)と呼ばれる方式を使用してアプリケーションの認証情報を検出している。クライアント ライブラリが使用される際は以下の順番で認証情報がチェックされます。

  1. 環境変数 GOOGLE_APPLICATION_CREDENTIALSを確認
  2. 環境変数が設定されていない場合、ADC ではサービスで実行されているアプリケーションに応じて、Compute Engine、Google Kubernetes Engine、Cloud Run、 App Engine、Cloud Functions によって提供されているデフォルトのサービス アカウントを使用する。
  • ウェブサーバ

Airflow ウェブサーバーは、Identity-Aware Proxy と統合されています。Cloud Composer は IAP インテグレーションの詳細を隠し、ウェブサーバーへのアクセスに十分な権限を持つユーザー アカウントに、ウェブサーバーへのアクセスを提供します。

スケール方法は?

  • 実質GKEと同じ
  • workerとなっているGKEのノードはauto scalingが有効になっていない。 -> composer作成時に決めたノードで、適切にジョブの計算資源を管理し、他Podの兼ね合いも考慮するなら、kubernetes_pod_operatorを用いてリミットを設定した上でジョブを実行させるようにする必要がある

コストは?

  • ほぼ最小構成でも約月4万円/月
    スクリーンショット 2021-12-26 14.15.26.png

その他、気を付けることは?

  • 実行時刻の考え方
    • start_date + schecule_interval 経過後に初回DAG実行
    • 過去分が実行されるので注意
      • intervalの設定を12時間ごとにしてstart_dateを2日前にしてDAGを実行すると、2日前の12時,24時,1日前の12時,24時の分も実行される
      • 実行したくない場合は、 catchup=False 指定により回避する
dag = DAG(
    'blog_example_execution_date',
    schedule_interval=timedelta(minutes=1),
    start_date=datetime(
        2020, 1, 29, 15, 30, tzinfo=pendulum.timezone('Asia/Tokyo')), # 明示的にタイムゾーンを指定
)

...
# execution_date = 2019/01/29 15:30:00 +09:00
#   => DAG に指定した start_date と同一

# start_date = 2019/01/29 15:31:00 +09:00
#   => DAG に指定した start_date に schedule_interval を足した日時と同一
  • メンテナンスウィンドウに注意する
  • Variable: GUI上からトークンなどを設定できる。key-valueで取得できる。
    • bucket=Variable.get('sample_gcs_bucket'),
  • 接続情報の管理(DBなど)は、Secret Managerを利用する
  • connection: Airflow needs to know how to connect to your environment. Information such as hostname, port, login and passwords to other systems and services is handled in the Admin->Connections section of the UI. The pipeline code you will author will reference the ‘conn_id’ of the Connection objects.
airflow connections add 'my_prod_db' \
    --conn-type 'my-conn-type' \
    --conn-login 'login' \
    --conn-password 'password' \
    --conn-host 'host' \
    --conn-port 'port' \
    --conn-schema 'schema' \
postgres_query = PostgresOperator(
    task_id="query_one",
    postgres_conn_id=<my_postgres_conn_id>,
    sql=<my_sql_statement>,
    autocommit=True,
)

参考

17
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?