概要
気象庁の気象情報を取得し、加工した上でBigqueryに登録するデータパイプラインです。
上記構成図の内、赤枠内のサービスをTerraformで定義しています。
また、今回のデータ基盤定義をしたTerraformを上記Gitリポジトリにて公開しています。
実装した背景
現在無職なので、転職用のポートフォリオとして作成しました。
また、GCP・Terraform(あとGit)も触ったことがなかったので、技術スキルを高めることも目的の一つです。
各工程の説明
一連の流れは、下記のとおりです。
- ①データ取得
- ②データ加工・投入
取得データは、最高気温・最低気温・日降水量の3つです。
これらはLooker Studioで日ごとの遷移を可視化することを想定して選定しました。
記事公開時点ではLooker Studioのダッシュボードを作成していないので、折角なら実装しようかとも思っています。
①データ取得
・トリガー
Cloud Schedulerによる時間起動
・処理内容
Cloud Functions内にて気象庁のサイトのダウンロードURLを指定して、Cloud Storage上の特定ディレクトリにファイルを格納しています。
ディレクトリ構造は下記のとおりです。
download_file_jma(※bucket名)
├─maxtemperature
│ └─YYYY-MM-DD_mxtemsadext00_rct.csv
├─mintemperature
│ └─YYYY-MM-DD_mntemsadext00_rct.csv
└─predaily
└─YYYY-MM-DD_predaily00_rct.csv
②データ加工・投入
・トリガー
Cloud Storageの特定ディレクトリへのファイルアップロードによるイベント起動
・処理内容
Cloud Functions内にてpandasを用いて必要な値を取捨選択し、スキーマ定義に沿った形へ加工しています。
その後、BigQueryテーブルへのデータ登録も実行させています。
取得データによってスキーマ定義が異なるため、それぞれに対応したFunctionsを用意しており、トリガーもこれらに対応しています。
Functions名にセンスがないのは許しておくれ......
csv | Cloud Functions |
---|---|
mxtemsadext00_rct.csv | DataTransImportBqDwhMaxtemJMA |
mntemsadext00_rct.csv | DataTransImportBqDwhMintemJMA |
predaily00_rct.csv | DataTransImportBqDwhPredailyJMA |
工夫した点
特定ディレクトリへのファイルアップロードを検知してCloud Functionsを実行する
トリガーは下記のとおり5つあるのですが、今回はEventarcトリガーを採用しました。
名称 | 分類 | 説明 |
---|---|---|
HTTPトリガー | HTTPトリガー | HTTP(S) リクエストに応答する。 |
Pub/Subトリガー | イベントトリガー | Pub/SubトリガーによるPub/Subメッセージに応答する。 |
Cloud Storageトリガー | イベントトリガー | Cloud Storageの変更に応答する。 |
Firestoreトリガー | イベントトリガー | Firestore データベース内のイベントに応答する。 |
Eventarcトリガー | イベントトリガー | Eventarcでサポートされているイベントタイプに応答する。 |
前述したとおり、取得データに応じたFunctionsを実行させたいのですが、CloudStorageトリガーでは実現できません。
Cloud Storageトリガーの条件はバケット指定のみのため、今回のディレクトリ構造だとFunctionsに対応しないファイルアップロードを検知してしまいます。
ところが、Eventarcトリガーを用いると、より柔軟に各種イベントを契機に発火させることができ、今回の要件を満たすことができました。
event_trigger {
trigger_region = "asia-northeast1"
event_type = "google.cloud.audit.log.v1.written"
retry_policy = "RETRY_POLICY_DO_NOT_RETRY"
event_filters {
attribute = "serviceName"
value = "storage.googleapis.com"
}
event_filters {
attribute = "methodName"
value = "storage.objects.create"
}
event_filters {
attribute = "resourceName"
value = "projects/_/buckets/download_file_jma/objects/maxtemperature/*.csv"
operator = "match-path-pattern"
}
このように、パスパターン構文を用いて特定のディレクトリにあるファイルを指定できました。
Cloud FunctionsからCloud Loggingへのログ連携
標準出力だとログエクスプローラ上でのログの判別に困ると感じたので、Python標準のloggingライブラリに加え、Cloud Loggingライブラリを採用しました。
実装内容は下記の記事を参考にさせていただきました。
ログエクスプローラ上では下記のように表示され、エラー箇所の特定が容易になりました。
実装においては特に難しいこともなく、参考記事のとおりでのコーディングで事足りました。
ただ、見様見真似でしかないため、今後Cloud Loggingライブラリを用いたロギング処理について、本番運用に耐えうるような実装を模索してみたいところです。
import logging
import google.cloud.logging
@functions_framework.http
def DataTransImportBqDwhMaxtemJMA(request):
# 標準Loggerの設定
logging.basicConfig(
format = "[%(asctime)s][%(levelname)s] %(message)s",
level = logging.DEBUG
)
logger = logging.getLogger()
# Cloud Loggingハンドラをloggerに接続
logging_client = google.cloud.logging.Client()
logging_client.setup_logging()
# setup_logging()するとログレベルがINFOになるのでDEBUGに変更
logger.setLevel(logging.DEBUG)
Terraform: for_each
を使って冗長な記述を減らす
Cloud Storageのdownload_file_jmaバケット内に、取得データを格納する3つのディレクトリを用意しています。
Terraform定義上にて愚直に記述するとgoogle_storage_bucket_objectのresourceブロックが3つできてしまうことになるため、for_each
を用いて簡潔に記載することができました。
#Cloud Storage
resource "google_storage_bucket_object" "download_file_jma_folder" {
name = "maxtemperature"
content = " "
bucket = google_storage_bucket.download_file_jma.id
}
resource "google_storage_bucket_object" "download_file_jma_folder" {
name = "mintemperature"
content = " "
bucket = google_storage_bucket.download_file_jma.id
}
resource "google_storage_bucket_object" "download_file_jma_folder" {
name = "predaily"
content = " "
bucket = google_storage_bucket.download_file_jma.id
}
locals {
cs_foldernames = {
folder_max = "maxtemperature/"
folder_min = "mintemperature/"
folder_pre = "predaily/"
}
}
#Cloud Storage
resource "google_storage_bucket_object" "download_file_jma_folder" {
for_each = local.cs_foldernames
name = each.value
content = " "
bucket = google_storage_bucket.download_file_jma.id
}
こちらの実装はTerraformを学ぶために色々な記事を巡っていた際に、下記の記事にて登場しました。
まだまだTerraform初学者なので、とても参考になりました!
今後改善したい点
Cloud Storage上の最新作成日時のファイルを特定したい
~省略~
try:
#実行日の日付を宣言する
current_ymd = datetime.now().strftime("%Y-%m-%d")
# Cloud Storageからダウンロードするファイルのパス
gcs_uri = f"gs://download_file_jma/maxtemperature/{current_ymd}_mxtemsadext00_rct.csv"
~省略~
取得データには接頭辞にYYYY-MM-DDを付与させているため、実行日の日付を取得してファイルを特定しています。
ただ、この実装だと当日のファイルのみが起動の対象となってしまうため、前日以前のデータを加工⇒取込したいといった際に対応できません。
そのため、ファイルのメタデータを読み込み、作成日時が最新のファイルを対象とするような実装に改善したいなぁ等と考えています。
ETLに利用するサービスの調査・操作
下記のような理由があり、今回のETL処理にはCloud Functionsとpandasを採用しました。
- 求人に応募する際のポートフォリオとして作成したいため、早めに実装したかった
- pandasの操作にも慣れたかった
GCPのサービスにはETL処理を想定したものが複数あります。
例)Dataproc, Dataflow, Data Fusion, Dataform, Batch
以前、データ基盤を構築するために使えるGCPのサービスを調査しましたが(下記記事)、当時は表面的にしか調べ切れていない・実践できていません。
そのため、どういった使い分けがあるか、実際に触ってみてどのような感触か把握できるようにしていきたいと思います。
terraformのディレクトリ構造/main.tfの構成
Terraform
└─GCP
├─.terraform
├─CloudFunctions
│ ├─DataTransImportBqDwhMaxtemJMA
│ ├─DataTransImportBqDwhMintemJMA
│ ├─DataTransImportBqDwhPredailyJMA
│ └─DownloadDataJMA
├─bigquery.tf
├─main.tf
└─variable.tf
・ディレクトリ構造について
Git見てもわかるのですが、今回は上記のようなディレクトリ構造としました。
良い塩梅で整理できているため、今回の実装規模ではこの程度でよいかなと感じています。
実運用を想定すると開発環境用と本番環境用に分けた際には、どのような構造が適切かといった考え等していきたいところですね。
・main.tfの構成ついて
基本、main.tfにサービスの定義をしているのですが、BigQueryの定義のみ別ファイルに切り出しています。
3テーブル分と言えど、スキーマ定義がなかなか行数を食うことから、切り出すことで可読性向上を狙っています。
とはいえ、現状main.tfのCloud Functionsの記載が結構紙面を取っており、まだまだ冗長な箇所も多いです。
for_each
を用いて改善ができるのではないか......
いや、Functionsごとにresourceブロックを分けた現状のままとすべきか......
等と考えています。
雑感
今回のデータ基盤構築のきっかけになったのは、上記スライドでした。
この中で、下記のような記載があるのですが、まさしくそのとおりと感じました。
設計と実装の初期コスト(主に時間とキャッチアップ)は半端ないですが,投資した方が絶対に良い.
GCP・Terraformは全くの未経験、
Gitは業務上でブラウザからソースをダウンロードした程度で、コマンドを打ったことがありませんでした。
そのため、キャッチアップする知識の量も多く、何度も操作を繰り返すことで腑に落ちていくため、時間もかかりました。
とはいえ今回の実践のおかげで、確実に自分のスキルセット向上につながったと思いますので、良い経験だったと感じています。
この調子で今後もデータエンジニアとしての技術領域を学んでいこうと思います。