LoginSignup
1
0

More than 1 year has passed since last update.

Cloud ComposerでBigQueryにデータを取り込んでみる

Posted at

 1つ前の投稿で、Cloud Functionsで天気情報を取得、蓄積する方法を紹介しました。小規模な処理であればCloud Functionsで十分なのですが、処理が複雑になると以下のような問題点があります。

  • 処理の進捗状況が把握しずらい
  • 再実行オペレーションが行き当たりばったりになりがち(そもそもやりにくい)

 これらを解決してくれるのがワークフローであり、Google CloudではCloud Composerが提供されています。Cloud Composerは、Apache Airflowで構築された、フルマネージドのワークフローサービスでなので、ユーザはワークフロー開発に専念することができます。

この投稿の実装範囲

 今回はCloud Composerを利用して、以下のことを実装したいと思います。

  • GCSバケットに特定のファイルが存在することを検知
  • BigQueryへのデータ読込

前回は、Cloud Functionsで提供されているGCS上でファイル作成のイベントを検知する仕組みを利用しましたが、ここら辺もCloud Composerで実装していきます。下の処理概要の図の赤枠部分をComposerで実行制御します。
OpenWeather情報取得1.jpg

Cloud Composerの環境設定

 この投稿では、Cloud Composerの環境構築の詳細部分については説明を割愛しますが、Googleが環境構築のドキュメントを作成してくれているので、それに従えばOKです。私は以下のような感じで環境を作成しました。

  • ロケーション:asia-east2
  • ノード数:3
  • マシンタイプ:n1-standard-1
  • イメージバージョン:composer-1.18.6-airflow-2.2.3
  • Python:3

作成完了までおおよそ20分程度かかります。

Apache Airflowについて

 Apache Airflowは、PythonでDAG(有向非巡回グラフ)を定義します。ざっくりですが、以下の順序でプログラムを書き起こしていきます。

  • DAGの設定
  • 各タスクの設定
  • タスク間の前後関係の設定

Airflowのチュートリアル見てもらえばなんとなく分かるのですが、作成したコードをみながら順番に確認していきましょう。

ソースコード

 私が作成したPythonのソースコードは、以下の通りです。

importBigQuery.py
from datetime import datetime, timedelta
import json
import pendulum

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
from airflow.operators.dummy import DummyOperator

bucket = "対象のバケット名をここに記載してください"

with DAG(
    'importBigQuery_WeatherData',
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        'depends_on_past': False,
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=10),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function,
        # 'on_success_callback': some_other_function,
        # 'on_retry_callback': another_function,
        # 'sla_miss_callback': yet_another_function,
        # 'trigger_rule': 'all_success'
    },
    description='import Weather Data to BigQuery',
    start_date=datetime(2021, 4, 1, tzinfo=pendulum.timezone('Asia/Tokyo')),
    schedule_interval="30 9 * * *",
    catchup=False,
    tags=['WeatherData'],
) as dag:

    with open('/home/airflow/gcs/dags/target_city.json', "r") as f:
        f_json = json.load(f)
        end_tasks=dict()
        for city_json in f_json:
            city_name = city_json["city_name"]
            current_sensor = GoogleCloudStoragePrefixSensor(
                task_id = "sensor-current-" + city_name,
                bucket=bucket,
                prefix="current_" + city_name + "_{{ next_execution_date | ds_nodash }}"
            )

            current_import = GoogleCloudStorageToBigQueryOperator(
                task_id = "import-current-" + city_name,
                bucket=bucket,
                source_objects=["current_" + city_name + "_{{ next_execution_date | ds_nodash }}.json"],
                source_format="NEWLINE_DELIMITED_JSON",
                destination_project_dataset_table='sandbox.current_' + city_name + "_{{ next_execution_date | ds_nodash }}",
                write_disposition='WRITE_TRUNCATE',
            )

            end_tasks["import-"+city_name] = current_import

            current_sensor >> current_import
        
        all_success = DummyOperator(
            task_id = "All-Success"
        )

    for end_tasks_key in end_tasks.keys():
        end_tasks[end_tasks_key] >> all_success

DAGの設定

 DAGの設定部分は以下の箇所です。

with DAG(
    'importBigQuery_WeatherData',
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        'depends_on_past': False,
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=10),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function,
        # 'on_success_callback': some_other_function,
        # 'on_retry_callback': another_function,
        # 'sla_miss_callback': yet_another_function,
        # 'trigger_rule': 'all_success'
    },
    description='import Weather Data to BigQuery',
    start_date=datetime(2021, 4, 1, tzinfo=pendulum.timezone('Asia/Tokyo')),
    schedule_interval="30 9 * * *",
    catchup=False,
    tags=['WeatherData'],
) as dag:

 ここには実行するDAGの以下のような設定を定義します。

  • DAGの名称
  • 処理失敗時の挙動
  • リトライ実行回数
  • リトライまでの時間間隔
  • タイムアウト時間
  • 実行開始日
  • 実行スケジュール

など色々あります。ここら辺は別の機会に個々の項目について、丁寧にまとめて行こうと思います。

各タスクの設定

 各タスクの定義は2箇所で行っています。1つ目は、「GoogleCloudStoragePrefixSensor」と「GoogleCloudStorageToBiQueryOperator」を利用して、GCP上のファイルの存在確認処理、JSONファイルのインポート処理を定義しています。

    with open('/home/airflow/gcs/dags/target_city.json', "r") as f:
        f_json = json.load(f)
        end_tasks=dict()
        for city_json in f_json:
            city_name = city_json["city_name"]
            current_sensor = GoogleCloudStoragePrefixSensor(
                task_id = "sensor-current-" + city_name,
                bucket=bucket,
                prefix="current_" + city_name + "_{{ next_execution_date | ds_nodash }}"
            )

            current_import = GoogleCloudStorageToBigQueryOperator(
                task_id = "import-current-" + city_name,
                bucket=bucket,
                source_objects=["current_" + city_name + "_{{ next_execution_date | ds_nodash }}.json"],
                source_format="NEWLINE_DELIMITED_JSON",
                destination_project_dataset_table='sandbox.current_' + city_name + "_{{ next_execution_date | ds_nodash }}",
                write_disposition='WRITE_TRUNCATE',
            )

            end_tasks["import-"+city_name] = current_import

 2つ目は、「DummyOpererator」を利用して、すべてのインポート処理が終わった後にDAG内のタスクがすべて終了したことを確認するためのダミータスクを定義しました。これくらいの小規模DAGであればよいですが、大規模になってくると確認が面倒になってくるので。
 それと、プログラムの中で登場する{{ }}で囲まれた部分は、airflow上で特別な意味を持ちます。実行日を制御するはexecution_dateなどよく利用するのですが、結構仕様がややこしいのでこちらも別途DAGの設定と合わせて詳細なお話をまとめようと思います。ここら辺の話はAirflowのドキュメントにもまとまっているので、参照してみるといいかもです。

タスク間の前後関係の設定

 最後にタスクの前後関係を定義を見ていきます。これも2箇所で行っています。タスクの依存関係は「>>」を使います。1つ目は、ファイルの存在検知とインポート処理の前後関係を定義しています。

current_sensor >> current_import

2つ目はインポート処理とダミータスクの前後関係を定義しています。

for end_tasks_key in end_tasks.keys():
    end_tasks[end_tasks_key] >> all_success

都市名の定義ファイル

target_city.json
[
    {
        "city_name":"fukuoka"
    },
    {
        "city_name":"tokyo"
    }
]

Web管理画面について

 Composer環境の作成が完了すると、Cloud Consoleの画面は以下のようになっています。
Composer.JPG

 赤枠部分のリンクをクリックすると、以下のようなApache Airflowの管理画面が表示されます。
Composer1.JPG
airflow_monitoringというDAGすでに登録されていますが、これは状態監視を行うために稼働しているものとなります。このままにしているとずっと動き続けるので、邪魔だなと思ったらスケジュールを止めちゃうこともできます。止め方は簡単です。上の赤枠部分をクリックするとスケジュールが停止します。
Composer1.JPG
クリックして色が変わればOKです。
Composer2.JPG
これでスケジュールを停止させることができました!

プログラムのリリースについて

 同じくCloud ConsoleのComposer画面からDAGフォルダのリンクをクリックします。
Composer.JPG

 するとGCSの画面が新しいタブで開かれます。
Composer3.JPG
 ここに必要なファイルをアップロードすれば、リリース完了です。ということで、先ほどの「importBigQuery.py」と「target_city.json」をアップロードします。
Composer4.JPG

 AirflowのWeb管理画面に戻ると・・・
Composer5.JPG
 importBigQuery_WeatherDataというDAGが以下されています。表示されない場合は、ブラウザの更新をやってみてください。また、DAG名をクリックすると詳細情報を見ることができます。
最初に表示されるのが、Tree画面です。これは、DAGの階層構造および、過去の実行結果を参照することができます。
Composer6.JPG
 Graphタブを押すと以下のような画面が表示されます。
Composer7.JPG
 いかにもワークフローのような画面が表示されました。この例ではsensor-current-fukuokaとsensor-current-tokyoという2タスクが、GCS上のファイル存在有無を確認行っている状態で、現在ファイルが存在しないため実行中(running)のステータスとなっています。それでは、ファイルを配置して処理が動くか確かめていきます。
Composer10.JPG
対象ファイルを配置すると・・・
Composer8.JPG
動き出しました!濃い緑色が正常終了です。もう少し待つと・・・
Composer9.JPG
無事すべて正常終了しました。

BigQueryの確認

 念のためデータがインポートされているか確認します。
Composer11.JPG
インポートされてました!

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0