0
0

More than 1 year has passed since last update.

GoogleCloudStorageにファイルを配置したことを契機にして、CloudFunctionを起動して、BigQueryにロードしてみた。

Last updated at Posted at 2022-10-27

はじめに

私はクラウドを使って仕事をすることが多いのですが、もっぱらAWSを使うことが多いです。
これはプライベートでもそうで、何かしらサービスを構築しようだの考えると1stChoiceはAWSになってしまいます。
この度GCPを使う機会に恵まれたので、GCPの使い所などや詰まったところ共有し、皆様の一助になればとのことで、今回こういった記事を書いてみることにします。

GCSからCloudFunctionを動かしてみたい。

AWSの中でもS3とLambdaが好きで、この二つあれば大概のサービスなんとかなるやん!と常々思ってます。であればそれをGoogleCloudでも実現したいと思っていたので、このお題にしてみました。

サービスの説明

GCSとは

GCSはAWSでいうS3で、オブジェクトストレージのことをさします。
基本的な機能はS3と同じで、高い耐久性や可用性を保持しており、機能としてさまざまなStorageクラスで保存することで、ユーザにとって最適なコストパフォーマンスでオブジェクトを保存することができます。詳しい機能の説明は公式のhttps://cloud.google.com/storage?hl=jaに譲ります。 

CloudFunctionとは

CloudFunctionはAWSのLambdaに近い機能を提供しております。ユーザが実行サーバを意識することなくコードを実行することができます。何かしらのGoogleCloudのサービスを利用したいが、わざわざサーバを立てるほどでもない、といった場合によく使われます。今回やろうとしていることがまさにです。
GCSにファイルを配置したことをきっかけに別のサービスを呼び出したりすることができます。イベントドリブンなアーキテクチャを構築するカナメと言っても過言ではありません。  
ただし重い機能やコードを実行することは得意ではありません。そう言った場合は素直にCloudRunやGKEやComputeEngineなどを利用しましょう。  
詳しい機能は公式のhttps://cloud.google.com/functions?hl=ja に譲ります。 
 

BigQueryとは

ビジネスのアジリティに対応して設計された、サーバーレスでスケーラビリティと費用対効果に優れたマルチクラウド データ ウェアハウスです。(https://cloud.google.com/bigquery/#section-10より引用)
GoogleCloudのサービスとして、中心的な役割を持つサービスです。触ったことがない人でも聞いたことはあるのではないかと思います。非常に優れた性能を持っております。また標準的なSQLで書くことができるので、SQLを触ったことがある人は使い勝手もよいのではないでしょうか。一方でストレージコストとクエリでスキャンしたGBごとにコストがかかってくるので、なんでもかんでもSelect * from hogehogeしてはだめで、コストという面には注意したいですね。
詳しい機能は公式のhttps://cloud.google.com/bigquery/ に譲ります。 

やってみよう

アーキテクチャ

今回実装するアーキテクチャは以下のような形です。
アーキテクチャ.jpg
CSVファイルをGCSのバケットに配置すると、それがトリガとなって、CSVファイルの中身をBigQueryにデータとして登録しようと思います。
業務上のイメージとしては、アプリケーションから出力されたCSVデータをGCSに配置し、それが自動的にBigQueryに登録されるみたいなイメージですね。

Step1 GCSの設定

バケット名はグローバルで一意な名前をつけて、作成してみます。
image.png
作成されたことを確認しましょう。
バケット_素材1.png
次にファイルを配置するフォルダを作成してみましょう。
image.png

GCSの設定は以上です。

Step2 BigQueryの設定

データセットの作成をしましょう。
データセット.png
ロケーションはGCSのバケットを作成した場所に合わせておきましょう。

次にテーブルを作成します。
17b20590-ca2c-db3f-e179-68ed1087436f.png

テーブル作成時にGCSなどにデータがあれば、ここでインポートすることができます。
今回はCloudFunction経由で登録するので、空のテーブルにしておきます。
データセットは先ほど作成したものを使います。
スキーマはここで設定することもできますが、今回はCloudFunction内でスキーマも登録するので設定不要です。

Step3 CloudFunctionsの設定

CloudFunctionの設定をしていきます。
ここで第一世代ではなく、第二世代を設定しましょう。
関数名を設定します。
image.png
第二世代は第一世代と比べ機能拡張されているため、特に理由がない限り、第二世代を選択しましょう。
今回やりたいイベントドリブンな関数を作成する場合は第一世代でも可能ですが、第二世代のほうがイベントに反応できる、サービスが多くなります。

そのあとEventArcトリガーを選択してください。
image.png

ここでGCSのトリガーを設定します。
イベントプロバイダは「Cloud Storge」
イベントは「storage.objects.create」をそれぞれ設定します。

次にリソースで、特定のリソース→完全なリソース名に下記の値を設定します。

projects/_/buckets/<step1で作成したバケット>/objects/<step1で作成したフォルダ>/<配置予定のcsv>

(余談ですが、ここの設定について公式ドキュメントがわかりづらく非常に苦しんだ記憶があります。いきなり_がパスに入っていて戸惑うんですよね。。)

次は処理の本丸ですね。
image.png
ランタイムはコードはPythonなので、Python3.10を設定してください。
またエントリポイントと関数名は一致している必要があるので、合わせてください。
(例)def fugafugaならエントリポイントはfugafugaになります。

今回設定する予定のコードは以下となります。

import functions_framework
from google.cloud import bigquery

@functions_framework.cloud_event
def sample(cloudevent):
    # Construct a BigQuery client object.
    client = bigquery.Client()

    table_id = "your-project.your_dataset.your_table_name"

    job_config = bigquery.LoadJobConfig(
        schema=[
            bigquery.SchemaField("name", "STRING"),
            bigquery.SchemaField("age", "INT64"),
        ],
        skip_leading_rows=1,
        # The source format defaults to CSV, so the line below is optional.
        source_format=bigquery.SourceFormat.CSV,
    )
    uri = "gs://your-bucket/your-folder/your-file.csv"
    table_id 
    load_job = client.load_table_from_uri(
        uri, table_id, job_config=job_config
    )  # Make an API request.

    load_job.result()  # Waits for the job to complete.

    destination_table = client.get_table(table_id)  # Make an API request.
    print("Loaded {} rows.".format(destination_table.num_rows))

(引用元https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv から少し変更してます。)

※table_idのyour-project.<Step2で作成したデータセット>.<Step2で作成したテーブル>
 uriのgs://<Step1で作成したバケット>/<Step1で作成したフォルダ>/<配置予定のファイル>.csv
 は各個人の値に変換しましょう。

ここのコードを簡単に説明すると
1. BigQueryAPIのクライアントを呼び出し
2. JobConfigでスキーマを指定(ヘッダー行スキップ)
3. ロード予定のCSVファイルの指定
4. APIリクエストの実施
5. リクエスト結果を出力
という処理が記載されています。

また今回のコードではbigquery-apiの必要なので
requirements.txtに以下の値を追記しましょう。
bigquery==0.0.8

ここまで来たらすべて完了です。

Step4 テスト
先ほどGCS作成したバケットのフォルダ配下に配置予定のcsvを配置してみましょう。
私はこんなCSVを用意しました。

name age
yamada 25
tanaka 29
takahashi 65
sato 55

以下のように配置すると
image.png

あら不思議!スキーマが作成されています。
image.png
今回スキーマをコード内で直接設定しましたが、あらかじめJsonで記入しておいて、そのファイルを読み込ませることもできます。

selectしてみるとデータが作成されていることもわかります。
image.png

補足

今回イベント発火するためのトリガーにファイル名を直書きで指定しましたが、パスパターンを指定することで*を指定することもできます。
hogehoge**.csvなんて指定すれば、やれることの幅が広がりますね。

まとめ

簡単にイベントドリブンなサービスを構築することができました。
GCSからCloudFunctionの連携は鉄板だと思います。
データエンジニアとして、RawデータをGCSに配置して、様々なデータ分析サービスに連携すると幅が広がるので是非参考にしてください。
次はGCS→CloudFunction→Composer→DataFlowというものを書いてみようと思います。
これでデータパイプラインもイベントドリブンで動かすことができます。
お楽しみに。

引用まとめ

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0