0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Apache Airflow でワークフロー作成 (2)

Last updated at Posted at 2023-06-25

DAG の新規作成

airflow をインストールしたディレクトリの「dags/」以下にファイルを作成する。

$ touch dags/sample_dag.py

以下のETL処理を実行するDAGを作成する。

  1. JSON文字列から辞書型変数を返却
  2. 辞書型変数からvalueを抜き出して加算して返却
  3. 加算した合計値を標準出力
sample_dag.py
import json
import pendulum
from airflow.decorators import dag, task


@dag(
    schedule=None,
    start_date=pendulum.datetime(2023, 6, 25, tz="UTC"),
    catchup=False,
    tags=["ETL"],
)
def sample_dag():

    @task()
    def extract():

        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict

    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):

        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}

    @task()
    def load(total_order_value: float):

        print(f"Total order value is: {total_order_value:.2f}")

    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])


sample_dag()

Airflow画面をリロードすると、作成したDAGが表示される。
image.png

DAGを実行する。
image.png

extract log

ac67b37461ee
*** Found local files:
***   * /opt/airflow/logs/dag_id=sample_dag/run_id=manual__2023-06-25T13:38:09.170374+00:00/task_id=extract/attempt=1.log
[2023-06-25, 22:38:10 JST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: sample_dag.extract manual__2023-06-25T13:38:09.170374+00:00 [queued]>
[2023-06-25, 22:38:10 JST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: sample_dag.extract manual__2023-06-25T13:38:09.170374+00:00 [queued]>
[2023-06-25, 22:38:10 JST] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-06-25, 22:38:10 JST] {taskinstance.py:1327} INFO - Executing <Task(_PythonDecoratedOperator): extract> on 2023-06-25 13:38:09.170374+00:00
[2023-06-25, 22:38:10 JST] {standard_task_runner.py:57} INFO - Started process 584 to run task
[2023-06-25, 22:38:10 JST] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'sample_dag', 'extract', 'manual__2023-06-25T13:38:09.170374+00:00', '--job-id', '419', '--raw', '--subdir', 'DAGS_FOLDER/sample_dag.py', '--cfg-path', '/tmp/tmp2ctqlv0m']
[2023-06-25, 22:38:10 JST] {standard_task_runner.py:85} INFO - Job 419: Subtask extract
[2023-06-25, 22:38:10 JST] {task_command.py:410} INFO - Running <TaskInstance: sample_dag.extract manual__2023-06-25T13:38:09.170374+00:00 [running]> on host ac67b37461ee
[2023-06-25, 22:38:10 JST] {taskinstance.py:1547} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='sample_dag' AIRFLOW_CTX_TASK_ID='extract' AIRFLOW_CTX_EXECUTION_DATE='2023-06-25T13:38:09.170374+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-25T13:38:09.170374+00:00'
[2023-06-25, 22:38:10 JST] {python.py:183} INFO - Done. Returned value was: {'1001': 301.27, '1002': 433.21, '1003': 502.22}
[2023-06-25, 22:38:10 JST] {taskinstance.py:1350} INFO - Marking task as SUCCESS. dag_id=sample_dag, task_id=extract, execution_date=20230625T133809, start_date=20230625T133810, end_date=20230625T133810
[2023-06-25, 22:38:10 JST] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-06-25, 22:38:10 JST] {taskinstance.py:2653} INFO - 1 downstream tasks scheduled from follow-on schedule check

transform log

ac67b37461ee
*** Found local files:
***   * /opt/airflow/logs/dag_id=sample_dag/run_id=manual__2023-06-25T13:38:09.170374+00:00/task_id=transform/attempt=1.log
[2023-06-25, 22:38:11 JST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: sample_dag.transform manual__2023-06-25T13:38:09.170374+00:00 [queued]>
[2023-06-25, 22:38:11 JST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: sample_dag.transform manual__2023-06-25T13:38:09.170374+00:00 [queued]>
[2023-06-25, 22:38:11 JST] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-06-25, 22:38:11 JST] {taskinstance.py:1327} INFO - Executing <Task(_PythonDecoratedOperator): transform> on 2023-06-25 13:38:09.170374+00:00
[2023-06-25, 22:38:11 JST] {standard_task_runner.py:57} INFO - Started process 587 to run task
[2023-06-25, 22:38:11 JST] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'sample_dag', 'transform', 'manual__2023-06-25T13:38:09.170374+00:00', '--job-id', '420', '--raw', '--subdir', 'DAGS_FOLDER/sample_dag.py', '--cfg-path', '/tmp/tmp8cf_c2w4']
[2023-06-25, 22:38:11 JST] {standard_task_runner.py:85} INFO - Job 420: Subtask transform
[2023-06-25, 22:38:11 JST] {task_command.py:410} INFO - Running <TaskInstance: sample_dag.transform manual__2023-06-25T13:38:09.170374+00:00 [running]> on host ac67b37461ee
[2023-06-25, 22:38:11 JST] {taskinstance.py:1547} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='sample_dag' AIRFLOW_CTX_TASK_ID='transform' AIRFLOW_CTX_EXECUTION_DATE='2023-06-25T13:38:09.170374+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-25T13:38:09.170374+00:00'
[2023-06-25, 22:38:11 JST] {python.py:183} INFO - Done. Returned value was: {'total_order_value': 1236.7}
[2023-06-25, 22:38:11 JST] {taskinstance.py:1350} INFO - Marking task as SUCCESS. dag_id=sample_dag, task_id=transform, execution_date=20230625T133809, start_date=20230625T133811, end_date=20230625T133811
[2023-06-25, 22:38:11 JST] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-06-25, 22:38:11 JST] {taskinstance.py:2653} INFO - 1 downstream tasks scheduled from follow-on schedule check

load log

ac67b37461ee
*** Found local files:
***   * /opt/airflow/logs/dag_id=sample_dag/run_id=manual__2023-06-25T13:38:09.170374+00:00/task_id=load/attempt=1.log
[2023-06-25, 22:38:13 JST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: sample_dag.load manual__2023-06-25T13:38:09.170374+00:00 [queued]>
[2023-06-25, 22:38:13 JST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: sample_dag.load manual__2023-06-25T13:38:09.170374+00:00 [queued]>
[2023-06-25, 22:38:13 JST] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-06-25, 22:38:13 JST] {taskinstance.py:1327} INFO - Executing <Task(_PythonDecoratedOperator): load> on 2023-06-25 13:38:09.170374+00:00
[2023-06-25, 22:38:13 JST] {standard_task_runner.py:57} INFO - Started process 590 to run task
[2023-06-25, 22:38:13 JST] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'sample_dag', 'load', 'manual__2023-06-25T13:38:09.170374+00:00', '--job-id', '421', '--raw', '--subdir', 'DAGS_FOLDER/sample_dag.py', '--cfg-path', '/tmp/tmpfrt4zy13']
[2023-06-25, 22:38:13 JST] {standard_task_runner.py:85} INFO - Job 421: Subtask load
[2023-06-25, 22:38:13 JST] {task_command.py:410} INFO - Running <TaskInstance: sample_dag.load manual__2023-06-25T13:38:09.170374+00:00 [running]> on host ac67b37461ee
[2023-06-25, 22:38:13 JST] {taskinstance.py:1547} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='sample_dag' AIRFLOW_CTX_TASK_ID='load' AIRFLOW_CTX_EXECUTION_DATE='2023-06-25T13:38:09.170374+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-25T13:38:09.170374+00:00'
[2023-06-25, 22:38:13 JST] {logging_mixin.py:149} INFO - Total order value is: 1236.70
[2023-06-25, 22:38:13 JST] {python.py:183} INFO - Done. Returned value was: None
[2023-06-25, 22:38:13 JST] {taskinstance.py:1350} INFO - Marking task as SUCCESS. dag_id=sample_dag, task_id=load, execution_date=20230625T133809, start_date=20230625T133813, end_date=20230625T133813
[2023-06-25, 22:38:13 JST] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-06-25, 22:38:13 JST] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check

「Total order value is: 1236.70」の合計値が標準出力されているのがわかる。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?