8
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

[GCP]GCPとTerraformでデータ基盤を構築してみた

Last updated at Posted at 2024-05-20

概要

DataPipelineforWeatherInfomation.drawio.png
気象庁の気象情報を取得し、加工した上でBigqueryに登録するデータパイプラインです。
上記構成図の内、赤枠内のサービスをTerraformで定義しています。

また、今回のデータ基盤定義をしたTerraformを上記Gitリポジトリにて公開しています。

実装した背景

現在無職なので、転職用のポートフォリオとして作成しました。
また、GCP・Terraform(あとGit)も触ったことがなかったので、技術スキルを高めることも目的の一つです。

各工程の説明

一連の流れは、下記のとおりです。

  • ①データ取得
  • ②データ加工・投入

取得データは、最高気温・最低気温・日降水量の3つです。
これらはLooker Studioで日ごとの遷移を可視化することを想定して選定しました。
記事公開時点ではLooker Studioのダッシュボードを作成していないので、折角なら実装しようかとも思っています。

DataPipelineforWeatherInfomation.drawio.-1png.png

①データ取得

・トリガー
 Cloud Schedulerによる時間起動
・処理内容
 Cloud Functions内にて気象庁のサイトのダウンロードURLを指定して、Cloud Storage上の特定ディレクトリにファイルを格納しています。
 ディレクトリ構造は下記のとおりです。

download_file_jma(※bucket名)
├─maxtemperature
│  └─YYYY-MM-DD_mxtemsadext00_rct.csv
├─mintemperature
│  └─YYYY-MM-DD_mntemsadext00_rct.csv
└─predaily
   └─YYYY-MM-DD_predaily00_rct.csv

②データ加工・投入

・トリガー
 Cloud Storageの特定ディレクトリへのファイルアップロードによるイベント起動
・処理内容
 Cloud Functions内にてpandasを用いて必要な値を取捨選択し、スキーマ定義に沿った形へ加工しています。
 その後、BigQueryテーブルへのデータ登録も実行させています。
 取得データによってスキーマ定義が異なるため、それぞれに対応したFunctionsを用意しており、トリガーもこれらに対応しています。
 Functions名にセンスがないのは許しておくれ......

csv Cloud Functions
mxtemsadext00_rct.csv DataTransImportBqDwhMaxtemJMA
mntemsadext00_rct.csv DataTransImportBqDwhMintemJMA
predaily00_rct.csv DataTransImportBqDwhPredailyJMA

工夫した点

特定ディレクトリへのファイルアップロードを検知してCloud Functionsを実行する

トリガーは下記のとおり5つあるのですが、今回はEventarcトリガーを採用しました。

名称 分類 説明
HTTPトリガー HTTPトリガー HTTP(S) リクエストに応答する。
Pub/Subトリガー イベントトリガー Pub/SubトリガーによるPub/Subメッセージに応答する。
Cloud Storageトリガー イベントトリガー Cloud Storageの変更に応答する。
Firestoreトリガー イベントトリガー Firestore データベース内のイベントに応答する。
Eventarcトリガー イベントトリガー Eventarcでサポートされているイベントタイプに応答する。

前述したとおり、取得データに応じたFunctionsを実行させたいのですが、CloudStorageトリガーでは実現できません。
Cloud Storageトリガーの条件はバケット指定のみのため、今回のディレクトリ構造だとFunctionsに対応しないファイルアップロードを検知してしまいます。
ところが、Eventarcトリガーを用いると、より柔軟に各種イベントを契機に発火させることができ、今回の要件を満たすことができました。

main.tf
  event_trigger {
    trigger_region = "asia-northeast1"
    event_type     = "google.cloud.audit.log.v1.written"
    retry_policy   = "RETRY_POLICY_DO_NOT_RETRY"
    event_filters {
      attribute = "serviceName"
      value     = "storage.googleapis.com"
    }
    event_filters {
      attribute = "methodName"
      value     = "storage.objects.create"
    }
    event_filters {
      attribute = "resourceName"
      value     = "projects/_/buckets/download_file_jma/objects/maxtemperature/*.csv"
      operator  = "match-path-pattern"
    }

このように、パスパターン構文を用いて特定のディレクトリにあるファイルを指定できました。

Cloud FunctionsからCloud Loggingへのログ連携

標準出力だとログエクスプローラ上でのログの判別に困ると感じたので、Python標準のloggingライブラリに加え、Cloud Loggingライブラリを採用しました。
実装内容は下記の記事を参考にさせていただきました。

ログエクスプローラ上では下記のように表示され、エラー箇所の特定が容易になりました。
実装においては特に難しいこともなく、参考記事のとおりでのコーディングで事足りました。
ただ、見様見真似でしかないため、今後Cloud Loggingライブラリを用いたロギング処理について、本番運用に耐えうるような実装を模索してみたいところです。

image.png

main.py
import logging
import google.cloud.logging

@functions_framework.http
def DataTransImportBqDwhMaxtemJMA(request):
    # 標準Loggerの設定
    logging.basicConfig(
            format = "[%(asctime)s][%(levelname)s] %(message)s",
            level = logging.DEBUG
        )
    logger = logging.getLogger()

    # Cloud Loggingハンドラをloggerに接続
    logging_client = google.cloud.logging.Client()
    logging_client.setup_logging()

    # setup_logging()するとログレベルがINFOになるのでDEBUGに変更
    logger.setLevel(logging.DEBUG)

Terraform: for_each を使って冗長な記述を減らす

Cloud Storageのdownload_file_jmaバケット内に、取得データを格納する3つのディレクトリを用意しています。
Terraform定義上にて愚直に記述するとgoogle_storage_bucket_objectのresourceブロックが3つできてしまうことになるため、for_eachを用いて簡潔に記載することができました。

main.tf:for_eachを用いないパターン
#Cloud Storage
resource "google_storage_bucket_object" "download_file_jma_folder" {
  name     = "maxtemperature"
  content  = " "
  bucket   = google_storage_bucket.download_file_jma.id
}

resource "google_storage_bucket_object" "download_file_jma_folder" {
  name     = "mintemperature"
  content  = " "
  bucket   = google_storage_bucket.download_file_jma.id
}

resource "google_storage_bucket_object" "download_file_jma_folder" {
  name     = "predaily"
  content  = " "
  bucket   = google_storage_bucket.download_file_jma.id
}
main.tf:for_eachを用いるパターン
locals {
  cs_foldernames = {
    folder_max = "maxtemperature/"
    folder_min = "mintemperature/"
    folder_pre = "predaily/"
  }
}

#Cloud Storage
resource "google_storage_bucket_object" "download_file_jma_folder" {
  for_each = local.cs_foldernames
  name     = each.value
  content  = " "
  bucket   = google_storage_bucket.download_file_jma.id
}

こちらの実装はTerraformを学ぶために色々な記事を巡っていた際に、下記の記事にて登場しました。
まだまだTerraform初学者なので、とても参考になりました!

今後改善したい点

Cloud Storage上の最新作成日時のファイルを特定したい

main.py
~省略~
try:
    #実行日の日付を宣言する
    current_ymd = datetime.now().strftime("%Y-%m-%d")

    # Cloud Storageからダウンロードするファイルのパス
    gcs_uri = f"gs://download_file_jma/maxtemperature/{current_ymd}_mxtemsadext00_rct.csv"
~省略~

取得データには接頭辞にYYYY-MM-DDを付与させているため、実行日の日付を取得してファイルを特定しています。
ただ、この実装だと当日のファイルのみが起動の対象となってしまうため、前日以前のデータを加工⇒取込したいといった際に対応できません。

そのため、ファイルのメタデータを読み込み、作成日時が最新のファイルを対象とするような実装に改善したいなぁ等と考えています。

ETLに利用するサービスの調査・操作

下記のような理由があり、今回のETL処理にはCloud Functionsとpandasを採用しました。

  • 求人に応募する際のポートフォリオとして作成したいため、早めに実装したかった
  • pandasの操作にも慣れたかった

GCPのサービスにはETL処理を想定したものが複数あります。
 例)Dataproc, Dataflow, Data Fusion, Dataform, Batch
以前、データ基盤を構築するために使えるGCPのサービスを調査しましたが(下記記事)、当時は表面的にしか調べ切れていない・実践できていません。
そのため、どういった使い分けがあるか、実際に触ってみてどのような感触か把握できるようにしていきたいと思います。

terraformのディレクトリ構造/main.tfの構成

Terraform
└─GCP
    ├─.terraform
    ├─CloudFunctions
    │  ├─DataTransImportBqDwhMaxtemJMA
    │  ├─DataTransImportBqDwhMintemJMA
    │  ├─DataTransImportBqDwhPredailyJMA
    │  └─DownloadDataJMA
    ├─bigquery.tf
    ├─main.tf
    └─variable.tf

・ディレクトリ構造について
Git見てもわかるのですが、今回は上記のようなディレクトリ構造としました。
良い塩梅で整理できているため、今回の実装規模ではこの程度でよいかなと感じています。
実運用を想定すると開発環境用と本番環境用に分けた際には、どのような構造が適切かといった考え等していきたいところですね。

・main.tfの構成ついて
基本、main.tfにサービスの定義をしているのですが、BigQueryの定義のみ別ファイルに切り出しています。
3テーブル分と言えど、スキーマ定義がなかなか行数を食うことから、切り出すことで可読性向上を狙っています。

とはいえ、現状main.tfのCloud Functionsの記載が結構紙面を取っており、まだまだ冗長な箇所も多いです。
for_eachを用いて改善ができるのではないか......
いや、Functionsごとにresourceブロックを分けた現状のままとすべきか......
等と考えています。

雑感

今回のデータ基盤構築のきっかけになったのは、上記スライドでした。
この中で、下記のような記載があるのですが、まさしくそのとおりと感じました。

設計と実装の初期コスト(主に時間とキャッチアップ)は半端ないですが,投資した方が絶対に良い.

GCP・Terraformは全くの未経験、
Gitは業務上でブラウザからソースをダウンロードした程度で、コマンドを打ったことがありませんでした。
そのため、キャッチアップする知識の量も多く、何度も操作を繰り返すことで腑に落ちていくため、時間もかかりました。

とはいえ今回の実践のおかげで、確実に自分のスキルセット向上につながったと思いますので、良い経験だったと感じています。
この調子で今後もデータエンジニアとしての技術領域を学んでいこうと思います。

8
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?