LoginSignup
0
0

More than 1 year has passed since last update.

【GCP】Composer(Airflow)によるGCSとBigqueryのデータ連携実装

Posted at

要件

gcsに毎日更新するデータファイルと一部更新可能なマスターデータファイルを毎日12時Bigqueryにファイルごとテーブルを作成する。
イメージが下記の図のように、Step1~Step2までの部分を実装します。
スクリーンショット 2022-08-15 15.32.51.png

Runtime

composer:2.0.21
Airflow:2.2.5
python: 3.8
(GCPやcomposerの環境設定については割愛)

実装

Dagの基幹クラス
(先輩が作っていただいたものを個人や会社の情報を削除してから、そのまま流用)

import os
from abc import ABCMeta, abstractmethod
from datetime import datetime
import airflow
from airflow import DAG

DEFAULT_OWNER = 'airflow'

class DagBase(metaclass=ABCMeta):
    def __init__(self,
                     file_name: str,
                     doc_md: None,
                     schedule_interval=None,
                     owner=DEFAULT_OWNER,
                     catchup=False,
                     start_date=datetime(datetime.today().year, datetime.today().month, datetime.today().day)
                     ):
            self._dag_id = os.path.basename(file_name).replace(".pyc", "").replace(".py", "")
            self._env_id = airflow.settings.conf.get("core", "environment_id")
            self._schedule_interval = self._set_schedule_interval(schedule_interval=schedule_interval)
            self._owner = owner
            self._is_catchup = catchup
            self._start_date = start_date
            self._doc_md = doc_md
    
    @abstractmethod
    def generate_tasks(self):
        """

        :return:
        """
        pass

    def generate_dag(self) -> DAG:
        """

        :return: DAG
        """
        with DAG(
            dag_id=self._dag_id,
            start_date=self._start_date,
            schedule_interval=self._schedule_interval,
            catchup=self._is_catchup,
            doc_md=self._doc_md,
            default_args={
                "owner": self._owner,
                "schedule_interval": self._schedule_interval,
                "start_date": self._start_date
            }
        ) as dag:
            self.generate_tasks()
        return dag

mainDAGの部分

from datetime import date, timedelta
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from tool import dag_tools
from dag_base.dag_base import DagBase
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import Variable
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator


class Dag(DagBase):
    def __init__(self):
        super().__init__(file_name=__file__,
                         doc_md=__doc__,
                         schedule_interval="0 12 * * *")

    # task_sequence
    def generate_tasks(self):
        start = DummyOperator(task_id="start")
        sleep = DummyOperator(task_id="sleep")
        wait = DummyOperator(task_id="wait",
                             trigger_rule='all_done')
        end = DummyOperator(task_id="end")

        master_table_list = []
        access_table_list = []
        for i in table_list:
            if "accesses" in i:
                access_table_list.append(i)
            else:
                master_table_list.append(i)
        masterToBigqueryList = self.task_list_master(master_table_list)
        accessToBigqueryList = self.task_list_access(access_table_list)
        bigqueryJob = self.bigquery_query_execute()

        start >> accessToBigqueryList >> sleep >> masterToBigqueryList >> wait >> bigqueryJob >> end

    def task_list_master(self, master_table_list):
        masterToBigqueryList = []
        # master_tables処理タスク
        dealMasterTable = self.deal_master_table(master_table_list=master_table_list)
        for i in dealMasterTable:
            task_id = i + "_Master"
            table_name = i.split(".")[0].split(f"_{yesterday}")[0]
            masterToBigquery = GCSToBigQueryOperator(
                task_id=task_id,
                bucket=BUCKET,
                source_objects=[PATH_PREFIX + i],
                source_format="NEWLINE_DELIMITED_JSON",
                compression="GZIP",
                destination_project_dataset_table=f"{DATASET}.{table_name}",
                create_disposition="CREATE_IF_NEEDED",
                write_disposition="WRITE_TRUNCATE",
                autodetect=True,
                retries=0
            )
            masterToBigqueryList.append(masterToBigquery)

        return masterToBigqueryList

    def task_list_access(self, access_table_list):
        accessToBigqueryList = []
        # access_tables処理タスク
        dealAccessTable = self.deal_access_table(access_table_list=access_table_list)
        for i in dealAccessTable:
            task_id = i + "_Accesses"
            output_table = "adebis-share_adebis_baitoru_com_121_accesses"
            accessToBigquery = GCSToBigQueryOperator(
                task_id=task_id,
                bucket=BUCKET,
                source_objects=[PATH_PREFIX + i],
                source_format="NEWLINE_DELIMITED_JSON",
                compression="GZIP",
                destination_project_dataset_table=f"{DATASET}.{output_table}",
                create_disposition="CREATE_IF_NEEDED",
                write_disposition="WRITE_APPEND",
                autodetect=True
            )
            accessToBigqueryList.append(accessToBigquery)

        return accessToBigqueryList

    def deal_access_table(self, access_table_list):
        """
        access_table_list: 最新のアクセスデータファイルだけ返します

        :return:List
        """
        access_deal_table_list = []
        for i in access_table_list:
            if i.split("121_")[1].split("_")[1].split(".")[0] == yesterday:
                access_deal_table_list.append(i)

        return access_deal_table_list

    def deal_master_table(self, master_table_list):
        """
        master_table_list: 最新のマスターデータファイルだけ返します

        :return:List
        """
        master_deal_table_list = []
        for i in master_table_list:
            if i.split("121_")[1].split("_")[1].split(".")[0] == yesterday:
                master_deal_table_list.append(i)

        return master_deal_table_list

    def bigquery_query_execute(self):
        """
        Bigquery Job execute

        :return: BigQueryInsertJobOperator
        """
        table_for_join = "table-name"
        Create_Table_Query = (
            f"{QUERY}"
        )
        Bigquery_job = BigQueryInsertJobOperator(
            task_id="Create_Join_Table",
            configuration={
                "query": {
                    "query": Create_Table_Query,
                    "useLegacySql": False,
                }
            },
            location=LOCATION
        )
        return Bigquery_job

"""
Composer(Airflow)で設定した変数を取得
"""
# GCP上の設定
gcp_config = Variable.get("config", deserialize_json=True)
PROJECT_ID = gcp_config['project_id']
DATASET = gcp_config['dataset']
TABLE = gcp_config["table"]
LOCATION = gcp_config["location"]
BUCKET = gcp_config["bucket"]
PATH_PREFIX = gcp_config["path_prefix"]
# 処理テーブルリスト
table_list = dag_tools.get_all_tables_from_gcs(project_id=PROJECT_ID,
                                               bucket_name=BUCKET)
# 当日一日前の日付け
yesterday = (date.today() - timedelta(days=1)).strftime("%Y%m%d")
# クエリ
QUERY = "(省略)"
# DAGの実行
dag = Dag()
do_Dag = dag.generate_dag()

GCSからファイルリスト取得するための方法
(from tool import dag_toolsの部分)

from google.cloud import storage

def get_all_tables_from_gcs(project_id, bucket_name):
    client = storage.Client(project=project_id)
    blobs = client.list_blobs(bucket_name)
    file_list = []
    for file in blobs:
        # bucketのディレクトリ構造により調整
        if file.name.split("/")[1]:
            file_list.append(file.name.split("/")[1])
    return file_list
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0