11
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DatabricksAdvent Calendar 2024

Day 8

Databricks と TROCCO の融合で実現するシームレスなデータパイプライン構築入門

Last updated at Posted at 2024-12-07

概要

Databricks と TROCCO を用いてデータの自動処理パイプラインを構築する手順を紹介します。具体的には、Salesforce の Case と Account データを TROCCO 経由で Databricks に蓄積し、オープン状態のケース情報をもつ分析用テーブル(opened_case)を生成します。その後、最終的にデータを Google スプレッドシート へ出力して即座に分析可能な環境を実現します。これらの処理は、Databricks Workflows により自動化します。

image.png

本記事の手順を実行すると、 Databricks Workflows 上で 5 つのタスクが自動的に実行され、 Salesforce のデータを素早くスプレッドシートで分析できるようになります。

image.png
image.png

また、別記事で紹介した自作ライブラリを利用することで、 TROCCO REST API を Python から直接操作可能になり、 Databricks Workflows 上でのワークフロー構築がさらに容易になります。

image.png

引用元:TROCCO REST API を Python で操作:GitHub Copilot で試してみた結果 #AI - Qiita

データパイプライン構築・実行手順

全体像

本記事で構築するデータパイプラインでは、以下のサービスを利用します。

  1. TROCCO
  2. Salesforce
  3. Azure Storage (ストレージ)
  4. Databricks

このワークフローは複数のタスクで構成され、順次実行されます。まず、 TROCCO の REST API を通じて Salesforce から取得したデータを Azure Storage へ転送(転送ジョブ)します。その後、 Databricks の Auto Loader を用いてストレージ上のデータを Databricks テーブルへロード(ロード処理)します。続いて、 CTAS ( Create Table As Select )による処理で分析用テーブルへデータを整形・更新(データ処理)し、最終的には整理済みの分析用データを再び TROCCO 経由で Google スプレッドシート へ出力することで、可視化・分析が可能な環境を実現します。

image.png

分類 タスク名 役割・処理内容
転送ジョブ salesforce_account_to_storage Salesforce の Account データをストレージへ転送
転送ジョブ salesforce_case_to_databricks Salesforce の Case データを Databricks へ転送
ロード処理 load_storage_to_databricks ストレージ上のデータを Databricks テーブルへロード
データ処理 ctas_in_databricks Databricks 内で CTAS 処理を実行し、分析用テーブルを生成
転送ジョブ databricks_to_google_spreadsheet Databricks 上の分析用テーブルを Google スプレッドシート へ出力

1. TROCCO 上での開発

1-1. 接続情報を定義

image.png

1-2. Salesforce の Case を Databricks に連携する転送ジョブを作成

image.png

1-3. Salesforce の Account をストレージ( Azure Storage )に連携する転送ジョブを作成

image.png

1-4. Databricks から Google スプレッドシート に連携するジョブを作成

image.png

スプレッドシート ID の取得方法は、 TROCCO のドキュメントで詳しく説明されています。

image.png
引用元:転送先 - Google Spreadsheets

1-5. TROCCO API KEY を取得

image.png

2. Databricks 上での開発

2-1. データベースオブジェクトを作成

カタログとスキーマを作成します。

%sql
CREATE CATALOG IF NOT EXISTS troccot_test_02;
CREATE SCHEMA IF NOT EXISTS troccot_test_02.schema_01;

image.png

Account テーブルと Volume を作成し、既存のテーブルがある場合は削除します。 Case テーブルは TROCCO の転送ジョブ実行時に、 Opened_case テーブルは Databricks の処理実行時にそれぞれ作成されます。

%sql
-- case テーブルを削除
DROP TABLE IF EXISTS troccot_test_02.schema_01.case;

-- account テーブルを作成( Auto Loader のロード先であるため先に作成 )
DROP TABLE IF EXISTS troccot_test_02.schema_01.account;
CREATE TABLE troccot_test_02.schema_01.account;

-- opened_case テーブルを削除
DROP TABLE IF EXISTS troccot_test_02.schema_01.opened_case;

-- Volume を作成
CREATE VOLUME IF NOT EXISTS troccot_test_02.schema_01.src_volume_01;

image.png

Databricks Auto Loader で利用するチェックポイントディレクトリを初期化します。

# Auto Loader で利用するチェックポイントの初期化
dbutils.fs.rm("/Volumes/troccot_test_02/schema_01/src_volume_01/checkpoint", True)

image.png

2-2. Databricks シークレットのスコープとシークレットを作成

%pip install databricks-sdk --upgrade -q
dbutils.library.restartPython()

image.png

from databricks.sdk import WorkspaceClient
from databricks.sdk.core import DatabricksError

w = WorkspaceClient()

image.png

# scope 名をセット
scope_name = 'trocco'

# scope を作成
try:
    w.secrets.create_scope(scope=scope_name)
except DatabricksError:
    print(f"Scope `{scope_name}` already exists!")

image.png

# シークレットのキーと値をセット
scope_key_and_values = {
    "api_key": "DkXvDxgowM482F5M7X6n9swK1dBF9deNpaFa4V7zzzzz",
}
# シークレットを登録
for key,value in scope_key_and_values.items():
    w.secrets.put_secret(
        scope=scope_name,
        key=key,
        string_value=value,
    )

image.png

# シークレットを取得し確認
print(dbutils.secrets.get(scope_name,"api_key"))

image.png

2-3. TROCCO REST API の実行クラスを有するファイル( trocco_api.py )を作成

コードはこちら
import time
import requests
from typing import Optional

class TroccoAPIClient:
    """Trocco APIクライアントクラス。

    Args:
        api_key (str): Trocco APIのAPIキー。
        wait_time (int, optional): リクエスト間の待機時間(秒)。デフォルトは30秒。
        max_wait_time (int, optional): 最大待機時間(秒)。デフォルトは3600秒(60分)。

    Attributes:
        api_key (str): Trocco APIのAPIキー。
        base_url (str): Trocco APIのベースURL。
        headers (dict): リクエストヘッダー。
        wait_time (int): リクエスト間の待機時間(秒)。
        max_wait_time (int): 最大待機時間(秒)。
    """

    def __init__(self, api_key: str, wait_time: int = 30, max_wait_time: int = 3600) -> None:
        self.api_key = api_key
        self.base_url = "https://trocco.io/api"
        self.headers = {
            "Authorization": f"Token {self.api_key}",
            "Content-Type": "application/json"
        }
        self.wait_time = wait_time
        self.max_wait_time = max_wait_time

    def post_job(self, payload: dict) -> Optional[dict]:
        """ジョブを作成する。

        Args:
            payload (dict): ジョブ作成に必要なデータ。

        Returns:
            Optional[dict]: APIからのレスポンス。失敗した場合はNone。
        """
        url = f"{self.base_url}/jobs"
        try:
            response = requests.post(url, json=payload, headers=self.headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"エラーが発生しました: {e}")
            return None

    def get_job(self, job_id: str) -> Optional[dict]:
        """特定のジョブの詳細を取得する。

        Args:
            job_id (str): 取得したいジョブのID。

        Returns:
            Optional[dict]: ジョブの詳細。失敗した場合はNone。
        """
        url = f"{self.base_url}/jobs/{job_id}"
        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"エラーが発生しました: {e}")
            return None

    def get_jobs(self, params: dict = {}) -> Optional[dict]:
        """ジョブの一覧を取得する。

        Args:
            params (dict, optional): クエリパラメータ。

        Returns:
            Optional[dict]: ジョブの一覧。失敗した場合はNone。
        """
        url = f"{self.base_url}/jobs"
        try:
            response = requests.get(url, headers=self.headers, params=params)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"エラーが発生しました: {e}")
            return None

    def execute_job_and_wait(self, payload: dict) -> Optional[dict]:
        """ジョブを作成し、完了するまで待機して結果を取得する。

        Args:
            payload (dict): ジョブ作成に必要なデータ。

        Returns:
            Optional[dict]: 最終的なジョブの結果。失敗した場合はNone。
        """
        job = self.post_job(payload)
        if not job:
            return None

        job_id = job.get("id")
        if not job_id:
            print("ジョブIDを取得できませんでした。")
            return None

        start_time = time.time()
        while True:
            job_status = self.get_job(job_id)
            if job_status:
                status = job_status.get("status")
                if status == "succeeded":
                    print("ジョブが成功しました。")
                    return job_status
                elif status == "error":
                    print("ジョブが失敗しました。")
                    return job_status
                else:
                    print(f"ジョブのステータス: {status}。待機します。")
            else:
                print("ジョブステータスの取得に失敗しました。リトライします。")

            if time.time() - start_time > self.max_wait_time:
                print("最大待機時間を超えました。")
                return None

            time.sleep(self.wait_time)

image.png

2-4. TROCCO REST API による転送ジョブ実行用ノートブック(00_execute_trocco_api)

job_definition_id をパラメータで受け取り、そのジョブを実行し完了まで待機します。

from trocco_api import TroccoAPIClient

dbutils.widgets.text("job_definition_id", "", "Enter Job Definition ID")
job_definition_id = dbutils.widgets.get("job_definition_id")

scope_name = "trocco"
secret_name = "api_key"
api_key = dbutils.secrets.get(scope_name, secret_name)

client = TroccoAPIClient(api_key)

payload = {
    "job_definition_id": job_definition_id,
}

result = client.execute_job_and_wait(payload)

if result:
    print("ジョブの結果:")
    print(result)
else:
    print("ジョブの取得に失敗しました。")

image.png

2-5. Databricks Auto Loader によるデータ取り込みノートブック(01_load_storage_to_databricks)

src_dir = "/Volumes/troccot_test_02/schema_01/src_volume_01/src_files"
checkpoint_dir = "/Volumes/troccot_test_02/schema_01/src_volume_01/checkpoint"
target_table_name = "troccot_test_02.schema_01.account"

schema = """
Id STRING,
IsDeleted BOOLEAN,
MasterRecordId STRING,
Name STRING,
Type STRING,
ParentId STRING,
BillingStreet STRING,
BillingCity STRING,
BillingState STRING,
BillingPostalCode BIGINT,
BillingCountry STRING,
BillingLatitude STRING,
BillingLongitude STRING,
BillingGeocodeAccuracy STRING,
ShippingStreet STRING,
ShippingCity STRING,
ShippingState STRING,
ShippingPostalCode BIGINT,
ShippingCountry STRING,
ShippingLatitude STRING,
ShippingLongitude STRING,
ShippingGeocodeAccuracy STRING,
Phone STRING,
Fax STRING,
AccountNumber STRING,
Website STRING,
PhotoUrl STRING,
Sic BIGINT,
Industry STRING,
AnnualRevenue DOUBLE,
NumberOfEmployees BIGINT,
Ownership STRING,
TickerSymbol STRING,
Description STRING,
Rating STRING,
Site STRING,
OwnerId STRING,
CreatedDate TIMESTAMP,
CreatedById STRING,
LastModifiedDate TIMESTAMP,
LastModifiedById STRING,
SystemModstamp TIMESTAMP,
LastActivityDate STRING,
LastViewedDate STRING,
LastReferencedDate STRING,
Jigsaw STRING,
JigsawCompanyId STRING,
CleanStatus STRING,
AccountSource STRING,
DunsNumber STRING,
Tradestyle STRING,
NaicsCode STRING,
NaicsDesc STRING,
YearStarted STRING,
SicDesc STRING,
DandbCompanyId STRING,
CustomerPriority__c STRING,
SLA__c STRING,
Active__c BOOLEAN,
NumberofLocations__c DOUBLE,
UpsellOpportunity__c STRING,
SLASerialNumber__c BIGINT,
SLAExpirationDate__c TIMESTAMP
"""

df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("header", "true")
    .option("multiLine", "true")
    .option("cloudFiles.schemaLocation", src_dir)
    .option("cloudFiles.schemaHints", schema)
    .load(src_dir)
)

(
    df.writeStream.format("delta")
    .option("checkpointLocation", checkpoint_dir)
    .option("mergeSchema", "true")
    .trigger(availableNow=True)
    .toTable(target_table_name)
)

image.png

2-6. CTAS によるデータ処理ノートブック(02_ctas_in_databricks)

source_table_name_01 = "troccot_test_02.schema_01.account"
source_table_name_02 = "troccot_test_02.schema_01.case"
target_table_name = "troccot_test_02.schema_01.opened_case"

sql = f"""
CREATE OR REPLACE TABLE {target_table_name} 
AS
SELECT 
  act.Industry,
  act.Name,
  cs.status,
  cs.priority,
  cs.Reason,
  cs.Subject
FROM {source_table_name_01} act
JOIN {source_table_name_02} cs
ON act.Id = cs.AccountId

WHERE 
  cs.isClosed IS FALSE
"""

print(sql)
resuult_df = spark.sql(sql)
resuult_df.display()

image.png

3. ワークフローの実行と結果確認

3-1. Databricks Workflow のジョブを作成

Databricks Workflows にて、上記ノートブックを順次実行するジョブを作成します。転送ジョブを実行するノートブックは共通化しており、実行する転送ジョブの ID ( job_definition_id )をパラメータで渡します。

image.png

コードはこちら
resources:
  jobs:
    databricks_with_trocco:
      name: databricks_with_trocco
      tasks:
        - task_key: salesforce_account_to_storage
          notebook_task:
            notebook_path: /Workspace/Shared/databricks_with_trocco/01_pipelines/00_execute_trocco_api
            base_parameters:
              job_definition_id: "291021"
            source: WORKSPACE
        - task_key: load_storage_to_databricks
          depends_on:
            - task_key: salesforce_account_to_storage
          notebook_task:
            notebook_path: /Workspace/Shared/databricks_with_trocco/01_pipelines/01_load_storage_to_databricks
            source: WORKSPACE
        - task_key: salesforce_case_to_databricks
          notebook_task:
            notebook_path: /Workspace/Shared/databricks_with_trocco/01_pipelines/00_execute_trocco_api
            base_parameters:
              job_definition_id: "287317"
            source: WORKSPACE
        - task_key: ctas_in_databricks
          depends_on:
            - task_key: load_storage_to_databricks
            - task_key: salesforce_case_to_databricks
          notebook_task:
            notebook_path: /Workspace/Shared/databricks_with_trocco/01_pipelines/02_ctas_in_databricks
            source: WORKSPACE
        - task_key: databricks_to_google_spreadsheet_
          depends_on:
            - task_key: ctas_in_databricks
          notebook_task:
            notebook_path: /Workspace/Shared/databricks_with_trocco/01_pipelines/00_execute_trocco_api
            base_parameters:
              job_definition_id: "290996"
            source: WORKSPACE
      queue:
        enabled: true

3-2. Databricks Workflow のジョブを実行

Databricks Workflows を実行すると、処理状況が可視化され、正常に完了したことを確認できます。

image.png

ノートブック実行結果は内部に保存されており、エラー発生時にも原因特定が容易なため、システム運用をスムーズに行えます。

image.png
image.png

TROCCO にて REST API 実行結果を確認

image.png
image.png
image.png

3-3. Google スプレッドシート にてデータが連携されたことを確認

image.png

Next Action

1. Databricks におけるデータアーキテクチャの設計

本記事では単純なデータアーキテクチャを扱いましたが、実運用に際しては、より適切なアーキテクチャの検討が必要です。

image.png

引用元:誰も教えてくれないメダリオンアーキテクチャの デザインメソッド:JEDA データエンジニア分科会 #1 #Python - Qiita

本記事では TROCCO と Databricks を連携する 2 つの手法を紹介しましたが、実際には運用要件やシステム構成に応じた手法選定が必要です。システム間を疎結合にし、ファイル連携時に Databricks の Auto Loader でファイル追加を容易に行う点を考慮すると、筆者は 2 番目の方法(ストレージ経由)が適するケースが多いと考えています。

  1. TROCCO から Databricks へ直接連携する方法
  2. TROCCO から Databricks へストレージ経由で連携する方法

image.png

TROCCO にて Databricks と直接連携する方法として下記の方法が提供されています。マネージドデータ転送もサポートされており、ソースシステムから複数のオブジェクトを連携する際には便利です。

2. パイプラインの構築方法の検討

本記事ではノートブックでのパイプラインを構築しましたが、 dbt(Data Build Tool)や Databricks が開発している Delta live tables という機能により構築する方法があり、組織のスキルや状況に応じてパイプラインの構築方法を検討してください。

3. TROCCO REST API 実行ライブラリの改修

TROCCO REST API を実行するライブラリの改修も検討すべきです。例えば、今回利用した際には TROCCO 側の転送ジョブがエラーとなっても Databricks Workflows 上のタスクがエラーとして認識されませんでした。 GitHub Copilot の Copilot Edits 機能などを用いて、 API のステータスチェック処理を強化し、エラーステータス時に適切に例外をスローするように改修してください。

status には下記のものが存在し、エラーとして扱うべきステータスが返却された場合には処理エラーとして例外を発生させることが望まれます。

queued / setting_up / executing / interrupting / succeeded / error / canceled / skipped / timeout
                    return job_status
-                elif status == "error":
-                    print("ジョブが失敗しました。")
+                elif status in ["error", "canceled", "skipped", "timeout"]:
+                    raise Exception(f"ジョブが失敗しました。ステータス: {status}")
                    return job_status

image.png

11
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?