はじめに
私はクラウドを使って仕事をすることが多いのですが、もっぱらAWSを使うことが多いです。
これはプライベートでもそうで、何かしらサービスを構築しようだの考えると1stChoiceはAWSになってしまいます。
この度GCPを使う機会に恵まれたので、GCPの使い所などや詰まったところ共有し、皆様の一助になればとのことで、今回こういった記事を書いてみることにします。
GCSからCloudFunctionを動かしてみたい。
AWSの中でもS3とLambdaが好きで、この二つあれば大概のサービスなんとかなるやん!と常々思ってます。であればそれをGoogleCloudでも実現したいと思っていたので、このお題にしてみました。
サービスの説明
GCSとは
GCSはAWSでいうS3で、オブジェクトストレージのことをさします。
基本的な機能はS3と同じで、高い耐久性や可用性を保持しており、機能としてさまざまなStorageクラスで保存することで、ユーザにとって最適なコストパフォーマンスでオブジェクトを保存することができます。詳しい機能の説明は公式のhttps://cloud.google.com/storage?hl=jaに譲ります。
CloudFunctionとは
CloudFunctionはAWSのLambdaに近い機能を提供しております。ユーザが実行サーバを意識することなくコードを実行することができます。何かしらのGoogleCloudのサービスを利用したいが、わざわざサーバを立てるほどでもない、といった場合によく使われます。今回やろうとしていることがまさにです。
GCSにファイルを配置したことをきっかけに別のサービスを呼び出したりすることができます。イベントドリブンなアーキテクチャを構築するカナメと言っても過言ではありません。
ただし重い機能やコードを実行することは得意ではありません。そう言った場合は素直にCloudRunやGKEやComputeEngineなどを利用しましょう。
詳しい機能は公式のhttps://cloud.google.com/functions?hl=ja に譲ります。
BigQueryとは
ビジネスのアジリティに対応して設計された、サーバーレスでスケーラビリティと費用対効果に優れたマルチクラウド データ ウェアハウスです。(https://cloud.google.com/bigquery/#section-10より引用)
GoogleCloudのサービスとして、中心的な役割を持つサービスです。触ったことがない人でも聞いたことはあるのではないかと思います。非常に優れた性能を持っております。また標準的なSQLで書くことができるので、SQLを触ったことがある人は使い勝手もよいのではないでしょうか。一方でストレージコストとクエリでスキャンしたGBごとにコストがかかってくるので、なんでもかんでもSelect * from hogehoge
してはだめで、コストという面には注意したいですね。
詳しい機能は公式のhttps://cloud.google.com/bigquery/ に譲ります。
やってみよう
アーキテクチャ
今回実装するアーキテクチャは以下のような形です。
CSVファイルをGCSのバケットに配置すると、それがトリガとなって、CSVファイルの中身をBigQueryにデータとして登録しようと思います。
業務上のイメージとしては、アプリケーションから出力されたCSVデータをGCSに配置し、それが自動的にBigQueryに登録されるみたいなイメージですね。
Step1 GCSの設定
バケット名はグローバルで一意な名前をつけて、作成してみます。
作成されたことを確認しましょう。
次にファイルを配置するフォルダを作成してみましょう。
GCSの設定は以上です。
Step2 BigQueryの設定
データセットの作成をしましょう。
ロケーションはGCSのバケットを作成した場所に合わせておきましょう。
テーブル作成時にGCSなどにデータがあれば、ここでインポートすることができます。
今回はCloudFunction経由で登録するので、空のテーブルにしておきます。
データセットは先ほど作成したものを使います。
スキーマはここで設定することもできますが、今回はCloudFunction内でスキーマも登録するので設定不要です。
Step3 CloudFunctionsの設定
CloudFunctionの設定をしていきます。
ここで第一世代ではなく、第二世代を設定しましょう。
関数名を設定します。
第二世代は第一世代と比べ機能拡張されているため、特に理由がない限り、第二世代を選択しましょう。
今回やりたいイベントドリブンな関数を作成する場合は第一世代でも可能ですが、第二世代のほうがイベントに反応できる、サービスが多くなります。
ここでGCSのトリガーを設定します。
イベントプロバイダは「Cloud Storge」
イベントは「storage.objects.create」をそれぞれ設定します。
次にリソースで、特定のリソース→完全なリソース名に下記の値を設定します。
projects/_/buckets/<step1で作成したバケット>/objects/<step1で作成したフォルダ>/<配置予定のcsv>
(余談ですが、ここの設定について公式ドキュメントがわかりづらく非常に苦しんだ記憶があります。いきなり_がパスに入っていて戸惑うんですよね。。)
次は処理の本丸ですね。
ランタイムはコードはPythonなので、Python3.10を設定してください。
またエントリポイントと関数名は一致している必要があるので、合わせてください。
(例)def fugafugaならエントリポイントはfugafugaになります。
今回設定する予定のコードは以下となります。
import functions_framework
from google.cloud import bigquery
@functions_framework.cloud_event
def sample(cloudevent):
# Construct a BigQuery client object.
client = bigquery.Client()
table_id = "your-project.your_dataset.your_table_name"
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("age", "INT64"),
],
skip_leading_rows=1,
# The source format defaults to CSV, so the line below is optional.
source_format=bigquery.SourceFormat.CSV,
)
uri = "gs://your-bucket/your-folder/your-file.csv"
table_id
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id) # Make an API request.
print("Loaded {} rows.".format(destination_table.num_rows))
(引用元https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv から少し変更してます。)
※table_idのyour-project.<Step2で作成したデータセット>.<Step2で作成したテーブル>
uriのgs://<Step1で作成したバケット>/<Step1で作成したフォルダ>/<配置予定のファイル>.csv
は各個人の値に変換しましょう。
ここのコードを簡単に説明すると
1. BigQueryAPIのクライアントを呼び出し
2. JobConfigでスキーマを指定(ヘッダー行スキップ)
3. ロード予定のCSVファイルの指定
4. APIリクエストの実施
5. リクエスト結果を出力
という処理が記載されています。
また今回のコードではbigquery-apiの必要なので
requirements.txtに以下の値を追記しましょう。
bigquery==0.0.8
ここまで来たらすべて完了です。
Step4 テスト
先ほどGCS作成したバケットのフォルダ配下に配置予定のcsvを配置してみましょう。
私はこんなCSVを用意しました。
name | age |
---|---|
yamada | 25 |
tanaka | 29 |
takahashi | 65 |
sato | 55 |
あら不思議!スキーマが作成されています。
今回スキーマをコード内で直接設定しましたが、あらかじめJsonで記入しておいて、そのファイルを読み込ませることもできます。
selectしてみるとデータが作成されていることもわかります。
補足
今回イベント発火するためのトリガーにファイル名を直書きで指定しましたが、パスパターンを指定することで*を指定することもできます。
hogehoge**.csvなんて指定すれば、やれることの幅が広がりますね。
まとめ
簡単にイベントドリブンなサービスを構築することができました。
GCSからCloudFunctionの連携は鉄板だと思います。
データエンジニアとして、RawデータをGCSに配置して、様々なデータ分析サービスに連携すると幅が広がるので是非参考にしてください。
次はGCS→CloudFunction→Composer→DataFlowというものを書いてみようと思います。
これでデータパイプラインもイベントドリブンで動かすことができます。
お楽しみに。