17
18

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

データエンジニアリングに興味が湧いてきたので、趣味のデータで簡単なデータ基盤を作ってみました。

利用データ

今回はグライダーのフライトログデータを気象データと組み合わせるシンプルなデータ基盤を作ってみます。

グライダーに専用のGPSを載せて飛ぶと、飛行した軌跡(3次元座標 × 時間)が記録され、そのデータはIGCファイルという標準のフォーマットとして出力することができます。グライダーは上昇気流にうまく乗ることで長時間飛行を続けることができますが、その上昇気流の発生条件は気象要素(気温、風向、風速、日照時間など)に大きく依存します。フライトログデータを気象データと紐づけて分析すれば、どんな気象条件でどのようなポイントに上昇気流が発生しやすいか、またそれを踏まえてどのような飛び方をすればよいかの手掛かりが得られるかもしれません。

Untitled.png
フライトログの例

作ったもの

作成した簡易的なデータ基盤は以下の通りです。ソースコードはGitHubで公開しています。

image.png

データソースはフライトログデータと気象データの2種類です。フライトログデータ(IGCファイル)はバケットへの投入をトリガーとして、Cloud Functionsを用いてパースした後、BigQueryにエクスポートします。気象データについても同じくCloud Functionsにより、日次で気象庁HPからスクレイピングしてBigQueryにエクスポートします(スケジューリングにはCloud Schedulerを利用)。

BigQuery上のテーブルの作成にはdbtを利用します。GitHub Actionsでdbtの実行をスケジュールし、週次でフライトログデータと気象データを紐づけたテーブルの更新を行います。

実装

Terraformによるインフラ構成

GCP上のインフラはTerraformを用いて構成します。Terraformにはmoduleという機能があり、システムを構成するコンポーネントをモジュールとして分割して定義することができます。そして、メインモジュール上でそれらを統合することで、全体のシステムを構成することができます。

今回は以下の3つのコンポーネントをモジュールとして定義しました。

  • BigQuery
  • バケット投入をトリガーとするCloud Functions(フライトログデータの処理に利用)
  • Cloud Schedulerにより定期実行するCloud Functions(気象データの処理に利用)

以下はフライトログデータを処理するCloud Functionsのモジュールとそれを呼び出すメインモジュールの実装コードです(一部抜粋)。

# infra/modules/storage_triggered_function/main.tf

resource "google_storage_bucket" "source_bucket" {
  name          = "${var.source_bucket_prefix}_${random_id.suffix.hex}"
  location      = var.location
  force_destroy = true
}

resource "google_storage_bucket" "function_bucket" {
  name          = "${var.function_bucket_prefix}_${random_id.suffix.hex}"
  location      = var.location
  force_destroy = true
}

resource "google_storage_bucket_object" "function_source" {
  name   = "function.zip"
  bucket = google_storage_bucket.function_bucket.name
  source = data.archive_file.function_archive.output_path
}

resource "google_cloudfunctions_function" "function" {
  name                  = var.function_name
  runtime               = "python312"
  available_memory_mb   = var.function_available_memory_mb
  source_archive_bucket = google_storage_bucket.function_bucket.name
  source_archive_object = google_storage_bucket_object.function_source.name
  entry_point           = "main"
  event_trigger {
    event_type = "google.storage.object.finalize"
    resource   = google_storage_bucket.source_bucket.id
  }
  environment_variables = {
    DATASET_ID = var.dataset_id
    TABLE_ID   = var.table_id
  }
}
# infra/main.tf

module "igc_file_processor" {
  source                       = "./modules/storage_triggered_function"
  source_bucket_prefix         = "igc_files"
  function_bucket_prefix       = "igc_file_processor"
  function_name                = "igc_file_processor"
  function_available_memory_mb = 256
  dataset_id                   = "data_lake"
  table_id                     = "flight_log"
}

dbtの定期実行

dbtはデータベース上のテーブルの変換処理(ELTのT)を多機能にサポートしてくれるツールです。dbtにはOSSとしてのdbt CoreとSaaSとしてのdbt Cloudの2種類がありますが、今回はdbt Coreを利用しました。dbt Coreの基本的な使い方は公式のチュートリアルが参考になりました。

dbtでは、作成したいテーブルをselect文で記述した.sqlファイルをdbtプロジェクトのmodels/に配置し、dbt runを実行することで、BigQuery上で変換処理が走りテーブルが作成されます。

今回はGitHub Actionsでdbt runを週次で実行するワークフローを定義しました。GCPの認証情報は、サービスアカウントのJSONファイルの中身をGitHubのシークレットとして登録し、ワークフロー実行時に読み込む形にしています。

name: run dbt

on:
  schedule:
    - cron: '0 4 * * 0'
  workflow_dispatch:

jobs:
  run_dbt:

    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"
      - run: pip install dbt-bigquery
      - run: echo '${{ secrets.GCP_CREDENTIALS }}' > secrets/gcloud.json
      - run: dbt run
        working-directory: dbt/flight_log/
        env:
          GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}
      - run: rm secrets/gcloud.json

dbtドキュメントの自動公開

dbtを利用する嬉しさの一つにドキュメントの自動生成機能があります。dbt docs generateを実行すると、定義した.sqlファイルをもとに、各テーブルのスキーマやテーブル間の依存関係を表すグラフ(Lineage)などの情報を含んだドキュメントファイルが生成されます。そして、dbt docs serve を実行するとローカルでサーバーが起動し、dbtドキュメントにアクセスすることができます。

今回はGitHub Actionsを用いて、mainブランチの更新をトリガーとして、GitHub Pagesでdbtドキュメントを自動ホスティングするワークフローを作成しました。GitHub Pagesへのデプロイには、Marketplace上で公開されているアクションactions-gh-pagesを利用しました。

name: publish dbt docs

on:
  push:
    branches:
      - main

jobs:
  publish_dbt_docs:
    
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"
      - run: pip install dbt-bigquery
      - run: echo '${{ secrets.GCP_CREDENTIALS }}' > secrets/gcloud.json 
      - run: dbt docs generate
        working-directory: dbt/flight_log/
        env:
          GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}
      - uses: peaceiris/actions-gh-pages@v4
        with:
          github_token: ${{ secrets.GITHUB_TOKEN }}
          publish_dir: dbt/flight_log/target/
      - run: rm secrets/gcloud.json

最終的に構築した基盤のテーブル間の依存関係は以下のようなグラフで表示されます。フライトログデータと気象データをデータソースとして、それらを紐づけたテーブルを作成できました。

Untitled.png

おわりに

今回はdbtとBigQueryを中心とした簡易的なデータ基盤の作成を行いました。開発を通して、Terraformやdbtの基本的な使い方を身に付けることができた気がします。

データ基盤は作っても活用しないと意味がないので、今後はDashを用いたWebアプリケーション化などにも取り組んでみたいと思います。

17
18
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
18

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?