0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

はじめに

本記事では、HubSpotで管理している「コンタクト(contacts)」「会社(companies)」「取引(deals)」データを、BIツール(Looker Studioなど)のデータソースとなるBigQueryに転送する方法を紹介します。

これまでTROCCOというSaaSでHubSpotからスプレッドシートにデータを転送し、スプレッドシートからLooker Studioでデータを読み込んでいたのですが、TROCCOの料金プラン変更により、以前のように2時間に1回の頻繁なデータ更新が無料範囲で行えなくなってしまいました。加えて、スプレッドシートからのデータ読み込みが遅いという課題もあったため、TROCCOとスプレッドシートの代替としてGoogle Cloud Functions(GCF)とBigQueryを利用する新しい方法を採用しました。

GCP構成

  • プロジェクト名: hubspot-to-bigquery
  • Google Cloud Functions(GCF): sync-hubspot-to-bigquery
    • スクリプトはPythonで実装
  • Cloud Scheduler: sync-hubspot-to-bigquery-job
    • 毎時0分にジョブを実行
  • BigQuery:
    • データセット: hubspot_data
    • テーブル: contacts, companies, deals

データの流れ

  1. Cloud Schedulerが1時間に1回GCFをトリガーする
  2. GCFはHubSpotからのデータを抽出し、BigQueryに転送する
    ※データ更新のため、全レコードを削除後にレコードを新たに追加する
  3. Looker StudioはBigQueryのテーブルを参照してレポートを生成する

実装

HubSpotアクセストークンの発行

HubSpotの設定メニューから非公開アプリを作成し、必要なスコープ(crm.object.deals.read, crm.object.companies.read, crm.object.contacts.read)を設定します。非公開アプリの作成後、アクセストークンをコピーして後ほど使用します。

スクリーンショット 2024-07-06 15.37.38.png

スクリーンショット 2024-07-06 15.39.10.png

BigQueryテーブルの作成

BigQueryでhubspot_dataデータセットを作成し、deals, companies, contactsテーブルをクエリから作成します。以下はcompaniesテーブルの作成例です。

CREATE TABLE `hubspot-to-bigquery.hubspot_data.companies` (
  id INT64,
  created_at TIMESTAMP,
  updated_at TIMESTAMP,
  about_us STRING,
  ...
)

スクリーンショット 2024-07-06 15.50.16.png

GCFの作成

以下がスクリプトの全文です。

main.py
import os
import logging
from datetime import datetime, timezone, timedelta
from hubspot import HubSpot
from hubspot.crm.contacts import ApiException
from google.cloud import bigquery
from google.api_core.retry import Retry

logging.basicConfig(level=logging.INFO)

client = bigquery.Client()

contacts_table_id = "hubspot-to-bigquery.hubspot_data.contacts"
contacts_table = client.get_table("hubspot-to-bigquery.hubspot_data.contacts")
contacts_table_schema_keys = {field.name for field in contacts_table.schema}

companies_table_id = "hubspot-to-bigquery.hubspot_data.companies"
companies_table = client.get_table("hubspot-to-bigquery.hubspot_data.companies")
companies_table_schema_keys = {field.name for field in companies_table.schema}

deals_table_id = "hubspot-to-bigquery.hubspot_data.deals"
deals_table = client.get_table("hubspot-to-bigquery.hubspot_data.deals")
deals_table_schema_keys = {field.name for field in deals_table.schema}

contacts_properties = [
    "id", "created_at", "updated_at", "company_size", "date_of_birth",...
]

companies_properties = [
    "id", "created_at", "updated_at", "about_us",...
]

deals_properties = [
    "id", "created_at", "updated_at", "amount_in_home_currency",...
]


def sync_hubspot_to_bigquery(_):
    access_token = os.getenv("ACCESS_TOKEN")
    if not access_token:
        logging.error("Access token not found in environment variables")
        return "Access token not found in environment variables", 500

    # HubSpot APIクライアントの初期化
    api_client = HubSpot(access_token=access_token)

    try:
        # 全件更新のため各テーブルから全レコードを削除
        delete_table_records(contacts_table_id)
        delete_table_records(companies_table_id)
        delete_table_records(deals_table_id)

        # HubSpotからデータを取得
        contacts_fetched = api_client.crm.contacts.get_all(properties=contacts_properties)
        companies_fetched = api_client.crm.companies.get_all(properties=companies_properties)
        deals_fetched = api_client.crm.deals.get_all(properties=deals_properties)

        # 挿入レコードの作成
        contacts_rows = create_rows_to_insert(contacts_fetched, contacts_table_schema_keys)
        companies_rows = create_rows_to_insert(companies_fetched, companies_table_schema_keys)
        deals_rows = create_rows_to_insert(deals_fetched, deals_table_schema_keys)
        
        # BigQueryの各テーブルにデータ挿入
        insert_rows_bigquery(contacts_table_id, contacts_rows)
        insert_rows_bigquery(companies_table_id, companies_rows)
        insert_rows_bigquery(deals_table_id, deals_rows)

        success_message = f"Data synchronized successfully: {len(contacts_rows)} contacts, {len(companies_rows)} companies, and {len(deals_rows)} deals updated."
        logging.info(success_message)
        return success_message, 200

    except ApiException as e:
        error_message = f"Exception when requesting: {e}"
        logging.error(error_message)
        return error_message, 500

def delete_table_records(table_id):
    delete_query = f"""
    DELETE FROM `{table_id}` WHERE TRUE
    """

    try:
        query_job = client.query(delete_query)
        query_job.result()
        logging.info(f"All records have been deleted from {table_id}.")
    except Exception as e:
        logging.error(f"Failed to delete records from {table_id}: {e}")

def convert_utc_to_jst(timestamp):
    # 日本標準時(JST)に変換
    jst_zone = timezone(timedelta(hours=9))
    jst_time = timestamp.astimezone(jst_zone)
    logging.debug(f"Converted {timestamp} to {jst_time}")
    return jst_time.isoformat()

def create_rows_to_insert(fetched_data, table_schema_keys):
    rows_to_insert = []
    for data in fetched_data:
        data_properties = data.properties
        
        # BigQueryに挿入するための行データを作成
        row = {
            "id": data.id,
            "created_at": convert_utc_to_jst(data.created_at) if data.created_at else None,
            "updated_at": convert_utc_to_jst(data.updated_at) if data.updated_at else None
        }

        for key, prop in data_properties.items():
            if key in table_schema_keys:  # スキーマに存在するカラムのみ処理
                value = prop if prop != '' and prop is not None else None

                # datetimeオブジェクトだった場合、JSTに変換してISOフォーマットにする
                if isinstance(value, datetime):
                    value = convert_utc_to_jst(value)
                
                row[key] = value

        rows_to_insert.append(row)
    return rows_to_insert

def insert_rows_bigquery(table_id, rows_to_insert, batch_size=100):
    # リトライポリシーの設定
    custom_retry = Retry(
        initial=1.0,  # 初期遅延時間 1 秒
        maximum=10.0,  # 最大遅延時間 10 秒
        multiplier=2.0,  # 遅延時間の倍増係数
        deadline=1200.0  # 全体の最大試行時間 1200 秒
    )
    
    # 分割してデータを挿入
    for i in range(0, len(rows_to_insert), batch_size):
        batch = rows_to_insert[i:i + batch_size]

        try:
            errors = client.insert_rows_json(table_id, batch, retry=custom_retry)
            if errors:
                logging.error(f"Errors occurred in batch {i // batch_size + 1}: {errors}")
            else:
                logging.info(f"Batch {i // batch_size + 1} inserted successfully into {table_id}.")
        except Exception as e:
            logging.error(f"Error inserting data into {table_id}: {e}")
requirements.txt
functions-framework==3.*
hubspot-api-client
google-cloud-bigquery

ログ設定とBigQueryクライアントの初期化

ロギングは情報レベルで設定され、BigQueryクライアントはデフォルトのプロジェクト設定で初期化されます。また、使用するBigQueryテーブルのIDとスキーマのキーを取得し、後のデータ挿入で使用します。

logging.basicConfig(level=logging.INFO)
client = bigquery.Client()

contacts_table_id = "hubspot-to-bigquery.hubspot_data.contacts"
contacts_table = client.get_table(contacts_table_id)
contacts_table_schema_keys = {field.name for field in contacts_table.schema}
# 同様に会社と取引データテーブルの設定も行います。

データ同期関数の定義

sync_hubspot_to_bigquery関数では、環境変数からHubSpotのアクセストークンを取得し、APIクライアントを初期化します。その後、指定されたプロパティでHubSpotのデータを抽出し、BigQueryに挿入する一連のステップを実行します。

def sync_hubspot_to_bigquery(_):
    access_token = os.getenv("ACCESS_TOKEN")
    if not access_token:
        logging.error("Access token not found in environment variables")
        return "Access token not found in environment variables", 500

    api_client = HubSpot(access_token=access_token)
    # データ抽出と挿入の詳細な手順は後述します。

データ挿入とリトライポリシー

抽出したデータはBigQueryにバッチで挿入されます。リトライポリシーは失敗した挿入操作を自動的に再試行するために設定され、データ整合性を保ちます。

def insert_rows_bigquery(table_id, rows_to_insert, batch_size=100):
    custom_retry = Retry(initial=1.0, maximum=10.0, multiplier=2.0, deadline=1200.0)
    for i in range(0, len(rows_to_insert), batch_size):
        batch = rows_to_insert[i:i + batch_size]
        errors = client.insert_rows_json(table_id, batch, retry=custom_retry)
        if errors:
            logging.error(f"Errors occurred in batch {i // batch_size + 1}: {errors}")

BigQueryのデータ更新と時刻変換の注意点

  • ストリーミングバッファの制限: BigQueryのストリーミングバッファ仕様により、データ更新頻度を1時間より短く設定するとエラーになってしまいました。TROCCO使用時の更新頻度は2時間に1回だったので、むしろ今回は頻度を短くできたのですが、もっと短い頻度が必要である場合は注意が必要です。
  • 時刻データの変換必要性: BigQueryではTIMESTAMP型のデータがUTCで保管されるため、日本時間に変換する処理が必要です。Looker Studioでデータを分析する際には、この時刻変換を適切に行うことで、時差の影響を受けずに正確なレポート作成が可能です。

Cloud Schedulerの作成

Cloud Schedulerを使用して、GCFを毎時0分に自動的に実行するジョブを設定します。このスケジュールにより、HubSpotのデータが定期的に更新され、常に最新の情報がBigQueryに保持されます。スケジューラのCron設定は0 * * * *で、これで毎時0分にジョブがトリガーされます。
スクリーンショット 2024-07-06 15.57.04.png

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?