LoginSignup
0
0

More than 1 year has passed since last update.

TerraformでAPIのレスポンスデータをBigQueryに抽出するテンプレート

Last updated at Posted at 2022-08-29

概要

本記事では、外部APIからBigQueryのテーブルへ定期的にデータを抽出するデータパイプラインのテンプレートソースコードを紹介します。
分析用途やサードパーティーのデータソースとの連携などで、APIから社内DBやBigQueryにデータを転送したいというニーズはよくあります。
そこで、このシンプルなワークフローを実現するために、GCP(Google Cloud Platform)とTerraformをベースにしたリポジトリを作成しました。

TerraformをIac(Infrasturucture as code)として採用することで、簡単に環境を再現し、検証を行うことができるようになりました。

本記事では、tmbd movie API dataを使用しています。このリポジトリを完全に再利用する場合は、tmdb apiリソースにアクセスするためのAPIキーを作成してください。

利用するAPIが変わっても、APIエンドポイントやBigQueryのテーブルスキーマをAPIレスポンスデータに合わせて変更するだけでよいので使い回すことができます。

このケースは小規模なプロジェクトに適しており、複雑なワークフローを管理する必要がない場合に適していると思います。中規模以上のプロジェクトであれば、AirflowArgo CIを利用すると、便利な機能が追加され、ワークフロー全体を容易に管理することができます。

全体のソースコード

使用した技術

データソース

本サンプルでは、trending endpointを使用しています。以下のtf設定ファイルの中で、BigQueryのテーブルスキーマを事前に定義していますので、このレスポンスのjsonスキーマを確認してください。
また、.env ファイルに TMDB_KEY 環境変数を追加してください。

システム構成

diagram.svg

処理自体はCloud Functionで書かれています。Cloud Functionに記述されたAPIを起動し、Publish to Pub/Subトピックで応答データをBigQueryに挿入します。

また、TerraformのStateファイルはどこかに保存しておく必要があります。
このサンプルでは、"collecting-tf-state"というバケットを作成し、このGCSバケットにtf stateファイルを保存しています。

着手する前に

  • GCPのプロジェクトが必要なので、なければ作成してください。

Terraform 設定ファイル

{$GCP_PROJECT_NAME} は適宜自分のGCPプロジェクトの名前に書き換えてください。

bigquery.tf

# BigQuery Database
resource "google_bigquery_dataset" "collecting_db" {
  dataset_id = "collecting_db"
  access {
    role          = "OWNER"
    user_by_email = data.google_service_account.terraform_service_account.email
  }
}

# BigQuery Table
resource "google_bigquery_table" "tmdb_trending" {
  dataset_id = google_bigquery_dataset.collecting_db.dataset_id
  table_id   = "tmdb_trending"
  time_partitioning {
    type = "DAY"
  }
  deletion_protection = false
  schema              = <<EOF
[
  {
    "name": "adult",
    "type": "BOOLEAN",
    "mode": "NULLABLE"
  },
  {
    "name": "backdrop_path",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "id",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "title",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "original_language",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "original_name",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "name",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "original_title",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "overview",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "poster_path",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "media_type",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "genre_ids",
    "type": "INTEGER",
    "mode": "REPEATED"
  },
  {
    "name": "popularity",
    "type": "FLOAT",
    "mode": "NULLABLE"
  },
  {
    "name": "release_date",
    "type": "DATE",
    "mode": "NULLABLE"
  },
  {
    "name": "first_air_date",
    "type": "DATE",
    "mode": "NULLABLE"
  },
  {
    "name": "video",
    "type": "BOOLEAN",
    "mode": "NULLABLE"
  },
  {
    "name": "vote_average",
    "type": "FLOAT",
    "mode": "NULLABLE"
  },
  {
    "name": "vote_count",
    "type": "INTEGER",
    "mode": "NULLABLE"
  },
  {
    "name": "origin_country",
    "type": "STRING",
    "mode": "REPEATED"
  }
]
EOF
}

cloudfunction.tf

resource "google_storage_bucket" "bucket" {
  name     = "collecting-tf"
  location = "asia-northeast1"
}

data "archive_file" "function_archive" {
  type        = "zip"
  source_dir  = "../src"       # directory for main.py,requirement.txt
  output_path = "../index.zip" # zipped file name
}

resource "google_storage_bucket_object" "archive" {
  name   = "../index.zip"
  bucket = google_storage_bucket.bucket.name
  source = data.archive_file.function_archive.output_path
}

resource "google_cloudfunctions_function" "function" {
  name        = "function-tdmb"
  description = "Digest tmdb data"
  runtime     = "python37"

  available_memory_mb   = 256
  source_archive_bucket = google_storage_bucket.bucket.name
  source_archive_object = google_storage_bucket_object.archive.name
  service_account_email = "terraform-service-account@gcp-compute-engine-343613.iam.gserviceaccount.com"

  event_trigger {
    event_type = "google.pubsub.topic.publish"
    resource   = "projects/gcp-compute-engine-343613/topics/streaming-topic"
  }
  entry_point = "insert_bq_tmdb_data"
}

pubsub.tf

resource "google_pubsub_topic" "topic" {
  name = "streaming-topic"

  labels = {
    purpose = "tmdb"
  }
}

scheduler.tf

このサンプルでは、毎日12時にAPIを呼び出して、毎日の映画のトレンドを取得しています。

resource "google_cloud_scheduler_job" "job" {
  name        = "cron-job"
  description = "cron job"
  schedule    = "0 12 * * *" // Every 12 o'clock
  time_zone   = "Asia/Tokyo"

  pubsub_target {
    # topic.id is the topic's full resource name.
    topic_name = google_pubsub_topic.topic.id
    data       = base64encode("test")
  }
}

variables.tf

variable "stages" {
  default = {
    collecting-dev  = "dev"
  }
}

variable "region" {
  default = "asia-northeast1"
}

variable "zone" {
  default = "asia-northeast1-a"
}


main.tf

terraform {
  required_version = "~> 0.13.5"
  backend "gcs" {
    bucket = "collecting-tf-state"
  }
}

provider "google" {
  region  = var.region
  version = "~> 3.49"
  project = {$GCP_PROJECT_NAME}
}

data "google_project" "collecting" {
  project_id = {$GCP_PROJECT_NAME}
}

output "workspace" {
  value = terraform.workspace
}

output "gcp_project" {
  value = data.google_project.collecting
}

output "gcp_region" {
  value = var.region
}

output "gcp_zone" {
  value = var.zone
}

data "google_service_account" "terraform_service_account" {
  account_id = "terraform-service-account"
}


Terraform Command


# Initialize terraform environment

$ terraform init

# See what change will be made

$ terraform plan

# execute terraform function

$ terraform apply

結果

このサンプルSQLを実行することができます。
このSQLは、毎日のユーザーレビューのスコアが最も高い映画を取得するためのものです。BigQueryコンソールで実行します。

WITH data AS (
  SELECT
      original_title,
      vote_average,
      _PARTITIONTIME as pt,
      ROW_NUMBER() OVER (PARTITION BY _PARTITIONTIME ORDER BY vote_average DESC) AS row_number
  FROM `{$GCP_PROJECT_NAME}.collecting_db.tmdb_trending`
  ORDER BY _PARTITIONTIME DESC, vote_average DESC
)
SELECT
    *
FROM
    data
WHERE
    row_number = 1

SQL Result

original_title	vote_average	pt	row_number
Top Gun: Maverick	8.371	2022-08-29T00:00:00Z	1
Top Gun: Maverick	8.371	2022-08-28T00:00:00Z	1
Top Gun: Maverick	8.383	2022-08-27T00:00:00Z	1
Top Gun: Maverick	8.384	2022-08-26T00:00:00Z	1
Top Gun: Maverick	8.365	2022-08-25T00:00:00Z	1
Top Gun: Maverick	8.344	2022-08-24T00:00:00Z	1
Top Gun: Maverick	8.328	2022-08-23T00:00:00Z	1
Top Gun: Maverick	8.325	2022-08-22T00:00:00Z	1
Top Gun: Maverick	8.325	2022-08-21T00:00:00Z	1
Top Gun: Maverick	8.331	2022-08-20T00:00:00Z	1
Top Gun: Maverick	8.325	2022-08-19T00:00:00Z	1
Top Gun: Maverick	8.335	2022-08-18T00:00:00Z	1
Top Gun: Maverick	8.339	2022-08-17T00:00:00Z	1
Top Gun: Maverick	8.338	2022-08-16T00:00:00Z	1
Purple Hearts	8.543	2022-08-15T00:00:00Z	1
Luck	8.229	2022-08-14T00:00:00Z	1
Top Gun: Maverick	8.339	2022-08-13T00:00:00Z	1
Purple Hearts	8.559	2022-08-12T00:00:00Z	1
Groot's Pursuit	8.821	2022-08-11T00:00:00Z	1
Purple Hearts	8.556	2022-08-10T00:00:00Z	1
Purple Hearts	8.557	2022-08-09T00:00:00Z	1
Purple Hearts	8.56	2022-08-08T00:00:00Z	1
Purple Hearts	8.539	2022-08-07T00:00:00Z	1

最近は「トップガン マーヴェリック」が人気みたいですね:)

以上です。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0