概要
本記事では、外部APIからBigQueryのテーブルへ定期的にデータを抽出するデータパイプラインのテンプレートソースコードを紹介します。
分析用途やサードパーティーのデータソースとの連携などで、APIから社内DBやBigQueryにデータを転送したいというニーズはよくあります。
そこで、このシンプルなワークフローを実現するために、GCP(Google Cloud Platform)とTerraformをベースにしたリポジトリを作成しました。
TerraformをIac(Infrasturucture as code)として採用することで、簡単に環境を再現し、検証を行うことができるようになりました。
本記事では、tmbd movie API dataを使用しています。このリポジトリを完全に再利用する場合は、tmdb apiリソースにアクセスするためのAPIキーを作成してください。
利用するAPIが変わっても、APIエンドポイントやBigQueryのテーブルスキーマをAPIレスポンスデータに合わせて変更するだけでよいので使い回すことができます。
このケースは小規模なプロジェクトに適しており、複雑なワークフローを管理する必要がない場合に適していると思います。中規模以上のプロジェクトであれば、AirflowやArgo CIを利用すると、便利な機能が追加され、ワークフロー全体を容易に管理することができます。
全体のソースコード
使用した技術
- Cloud Function (Python3.7): API クライアントを書くための Python コードを作成
- BigQuery: 生のAPIデータをBigQueryに保存する
- Cloud Pub/Sub: データの中継地として使われるキュー
- Cloud Scheduler: 定期的に実行するために設定する
- Google Cloud Storage: tf ファイルを保存しておくバケット
データソース
本サンプルでは、trending endpointを使用しています。以下のtf設定ファイルの中で、BigQueryのテーブルスキーマを事前に定義していますので、このレスポンスのjsonスキーマを確認してください。
また、.env
ファイルに TMDB_KEY 環境変数を追加してください。
システム構成
処理自体はCloud Functionで書かれています。Cloud Functionに記述されたAPIを起動し、Publish to Pub/Subトピックで応答データをBigQueryに挿入します。
また、TerraformのStateファイルはどこかに保存しておく必要があります。
このサンプルでは、"collecting-tf-state"というバケットを作成し、このGCSバケットにtf stateファイルを保存しています。
着手する前に
- GCPのプロジェクトが必要なので、なければ作成してください。
Terraform 設定ファイル
{$GCP_PROJECT_NAME} は適宜自分のGCPプロジェクトの名前に書き換えてください。
bigquery.tf
# BigQuery Database
resource "google_bigquery_dataset" "collecting_db" {
dataset_id = "collecting_db"
access {
role = "OWNER"
user_by_email = data.google_service_account.terraform_service_account.email
}
}
# BigQuery Table
resource "google_bigquery_table" "tmdb_trending" {
dataset_id = google_bigquery_dataset.collecting_db.dataset_id
table_id = "tmdb_trending"
time_partitioning {
type = "DAY"
}
deletion_protection = false
schema = <<EOF
[
{
"name": "adult",
"type": "BOOLEAN",
"mode": "NULLABLE"
},
{
"name": "backdrop_path",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "id",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "title",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "original_language",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "original_name",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "name",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "original_title",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "overview",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "poster_path",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "media_type",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "genre_ids",
"type": "INTEGER",
"mode": "REPEATED"
},
{
"name": "popularity",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "release_date",
"type": "DATE",
"mode": "NULLABLE"
},
{
"name": "first_air_date",
"type": "DATE",
"mode": "NULLABLE"
},
{
"name": "video",
"type": "BOOLEAN",
"mode": "NULLABLE"
},
{
"name": "vote_average",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "vote_count",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "origin_country",
"type": "STRING",
"mode": "REPEATED"
}
]
EOF
}
cloudfunction.tf
resource "google_storage_bucket" "bucket" {
name = "collecting-tf"
location = "asia-northeast1"
}
data "archive_file" "function_archive" {
type = "zip"
source_dir = "../src" # directory for main.py,requirement.txt
output_path = "../index.zip" # zipped file name
}
resource "google_storage_bucket_object" "archive" {
name = "../index.zip"
bucket = google_storage_bucket.bucket.name
source = data.archive_file.function_archive.output_path
}
resource "google_cloudfunctions_function" "function" {
name = "function-tdmb"
description = "Digest tmdb data"
runtime = "python37"
available_memory_mb = 256
source_archive_bucket = google_storage_bucket.bucket.name
source_archive_object = google_storage_bucket_object.archive.name
service_account_email = "terraform-service-account@gcp-compute-engine-343613.iam.gserviceaccount.com"
event_trigger {
event_type = "google.pubsub.topic.publish"
resource = "projects/gcp-compute-engine-343613/topics/streaming-topic"
}
entry_point = "insert_bq_tmdb_data"
}
pubsub.tf
resource "google_pubsub_topic" "topic" {
name = "streaming-topic"
labels = {
purpose = "tmdb"
}
}
scheduler.tf
このサンプルでは、毎日12時にAPIを呼び出して、毎日の映画のトレンドを取得しています。
resource "google_cloud_scheduler_job" "job" {
name = "cron-job"
description = "cron job"
schedule = "0 12 * * *" // Every 12 o'clock
time_zone = "Asia/Tokyo"
pubsub_target {
# topic.id is the topic's full resource name.
topic_name = google_pubsub_topic.topic.id
data = base64encode("test")
}
}
variables.tf
variable "stages" {
default = {
collecting-dev = "dev"
}
}
variable "region" {
default = "asia-northeast1"
}
variable "zone" {
default = "asia-northeast1-a"
}
main.tf
terraform {
required_version = "~> 0.13.5"
backend "gcs" {
bucket = "collecting-tf-state"
}
}
provider "google" {
region = var.region
version = "~> 3.49"
project = {$GCP_PROJECT_NAME}
}
data "google_project" "collecting" {
project_id = {$GCP_PROJECT_NAME}
}
output "workspace" {
value = terraform.workspace
}
output "gcp_project" {
value = data.google_project.collecting
}
output "gcp_region" {
value = var.region
}
output "gcp_zone" {
value = var.zone
}
data "google_service_account" "terraform_service_account" {
account_id = "terraform-service-account"
}
Terraform Command
# Initialize terraform environment
$ terraform init
# See what change will be made
$ terraform plan
# execute terraform function
$ terraform apply
結果
このサンプルSQLを実行することができます。
このSQLは、毎日のユーザーレビューのスコアが最も高い映画を取得するためのものです。BigQueryコンソールで実行します。
WITH data AS (
SELECT
original_title,
vote_average,
_PARTITIONTIME as pt,
ROW_NUMBER() OVER (PARTITION BY _PARTITIONTIME ORDER BY vote_average DESC) AS row_number
FROM `{$GCP_PROJECT_NAME}.collecting_db.tmdb_trending`
ORDER BY _PARTITIONTIME DESC, vote_average DESC
)
SELECT
*
FROM
data
WHERE
row_number = 1
SQL Result
original_title vote_average pt row_number
Top Gun: Maverick 8.371 2022-08-29T00:00:00Z 1
Top Gun: Maverick 8.371 2022-08-28T00:00:00Z 1
Top Gun: Maverick 8.383 2022-08-27T00:00:00Z 1
Top Gun: Maverick 8.384 2022-08-26T00:00:00Z 1
Top Gun: Maverick 8.365 2022-08-25T00:00:00Z 1
Top Gun: Maverick 8.344 2022-08-24T00:00:00Z 1
Top Gun: Maverick 8.328 2022-08-23T00:00:00Z 1
Top Gun: Maverick 8.325 2022-08-22T00:00:00Z 1
Top Gun: Maverick 8.325 2022-08-21T00:00:00Z 1
Top Gun: Maverick 8.331 2022-08-20T00:00:00Z 1
Top Gun: Maverick 8.325 2022-08-19T00:00:00Z 1
Top Gun: Maverick 8.335 2022-08-18T00:00:00Z 1
Top Gun: Maverick 8.339 2022-08-17T00:00:00Z 1
Top Gun: Maverick 8.338 2022-08-16T00:00:00Z 1
Purple Hearts 8.543 2022-08-15T00:00:00Z 1
Luck 8.229 2022-08-14T00:00:00Z 1
Top Gun: Maverick 8.339 2022-08-13T00:00:00Z 1
Purple Hearts 8.559 2022-08-12T00:00:00Z 1
Groot's Pursuit 8.821 2022-08-11T00:00:00Z 1
Purple Hearts 8.556 2022-08-10T00:00:00Z 1
Purple Hearts 8.557 2022-08-09T00:00:00Z 1
Purple Hearts 8.56 2022-08-08T00:00:00Z 1
Purple Hearts 8.539 2022-08-07T00:00:00Z 1
最近は「トップガン マーヴェリック」が人気みたいですね:)
以上です。