2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Dataformに入門する

Last updated at Posted at 2024-08-08

はじめに

公式ドキュメントを読んでDataformをキャッチアップしたときのメモです。

Dataformとは

データ アナリストが BigQuery でデータ変換を行う複雑な SQL ワークフローを開発、テスト、バージョン管理、スケジュール設定するためのサービスです。

Dataform を使用すると、データ統合の ELT(抽出、読み込み、変換)プロセスにおけるデータ変換を管理できます。Dataform では、ソースシステムから抽出されたデータを BigQuery に読み込むと、明確に定義されたテスト済みで一連のデータテーブルに変換できます。

引用: Dataform の概要

Dataformを使うメリット

データパイプラインによる効率化と柔軟性向上

  • 分割されたコンポーネント化:
    • 巨大なSQLファイルの代わりに、Dataformではコンポーネント化されたモデルを使用します。これにより、各データソースが明確になり、依存関係も明確化されます。これにより、大規模なデータパイプラインを管理しやすくなります。
  • ETLプロセスの自動化:
    • データの抽出、変換、ロード(ETL)プロセスを自動化することで、手動でのタスク実行から解放され、時間を節約できます。これにより、データの品質や整合性を確保しながら、迅速にデータを処理することが可能です。
  • 柔軟でシンプルなデータモデル定義:
    • DataformではJavaScriptも併用してデータモデルを定義することで、柔軟性が高く、複雑なデータパイプラインもシンプルに記述できます。これにより、迅速かつ効率的にデータの加工や変換を行うことができます。
  • データリネージの可視化:
    • Dataformでは、データの流れや変換プロセスを視覚的に追跡できるデータリネージ機能を提供しています。これにより、データの出所や変換履歴を簡単に確認できるため、データの信頼性と透明性が向上します。エラーや問題のトラブルシューティングが容易になり、データの管理が一層効率的になります。

フルマネージドの利便性とセキュリティ

  • インフラの管理不要:
    • Dataformはサーバーレスで提供され、インフラの管理が不要です。これにより、利用料も無料でありながら、可視化やスケジューリング実行などの機能を提供します。

テストの自動化とバージョン管理の効果

  • テストのサポート:
    • ユニットテストフレームワークを使用して、データ品質を自動化されたテストで保証することができます。これにより、データの正確性と信頼性を維持しながら、開発プロセスを迅速化できます。
  • バージョン管理の利便性:
    • DataformはGitと連携することで、コードベースのバージョン管理と変更追跡が可能です。これにより、チームでの協力や運用が円滑に進み、過去の変更履歴を容易に確認できます。さらに、リリース管理やロールバックもシンプルに行えるため、開発サイクルの効率が向上します。

GCPとの深い統合で得られる利点

  • Google BigQueryとのシームレスな統合:
    • DataformはGoogle BigQueryをはじめとするGoogle Cloud Platform(GCP)サービスとシームレスに統合されており、大規模なデータウェアハウス環境での使用が推奨されます。この統合により、データの処理速度やスケーラビリティが向上し、コスト効率も高まります。
  • セキュリティの強化:
    • GCPのネイティブセキュリティ機能を活用することで、データの保護とコンプライアンスを確保できます。
  • 他のGCPサービスとの統合:
    • DataformはGoogle Cloud StorageやPub/Subなど、他のGCPサービスとも統合することができます。これにより、データの収集、保存、処理を一貫したワークフローで行うことができ、データエンジニアリングの効率が向上します。

利用手順例

前提、シチュエーション

  • 既存のBigQueryテーブル(dwh_customersdwh_orders)がある前提とします。
  • これらのソーステーブルを基にしたデータ集計を行い、最終的な出力テーブル**dm_customer_ordersを作成します。出力テーブルはorder_date**でパーティション化されます。
  • アサーションファイルを使用して、データ品質を検証します。

最終的に下記を作成します

スクリーンショット 2024-08-08 17.40.21.png

0. データを準備する

  • データセット作成

    • dwh_sample
    • dm_sample
  • テーブル作成・データ挿入

    -- テーブルdwh_customersの作成
    CREATE TABLE `project_id.dwh_sample.dwh_customers` (
      customer_id INT64,
      customer_name STRING,
      customer_email STRING,
      created_at TIMESTAMP
    );
    
    -- テーブルdwh_ordersの作成
    CREATE TABLE `project_id.dwh_sample.dwh_orders` (
      order_id INT64,
      customer_id INT64,
      order_amount FLOAT64,
      order_date TIMESTAMP
    );
    
    -- データ挿入
    INSERT INTO `project_id.dwh_sample.dwh_customers` (customer_id, customer_name, customer_email, created_at) VALUES
    (1, 'Alice', 'alice@example.com', '2023-01-01 00:00:00 UTC'),
    (2, 'Bob', 'bob@example.com', '2023-02-01 00:00:00 UTC'),
    (3, 'Charlie', 'charlie@example.com', '2023-03-01 00:00:00 UTC');
    
    INSERT INTO `project_id.dwh_sample.dwh_orders` (order_id, customer_id, order_amount, order_date) VALUES
    (1, 1, 100.0, '2023-01-15 00:00:00 UTC'),
    (2, 2, 200.0, '2023-02-15 00:00:00 UTC'),
    (3, 1, 150.0, '2023-03-01 00:00:00 UTC'),
    (4, 3, 300.0, '2023-03-15 00:00:00 UTC');
    

1. コードを管理するためのリポジトリを作成する

  • コンソール>BigQuery>Dataform>Create repository
    • サービスアカウントを指定(デフォルトも指定可能)

2. 開発用のワークスペースを作成する

  • リポジトリを選択>Create development workspace
    • 開発者ごとのワークスペースを作成する
  • Initialize workspace
    • 初期設定ファイルが配置される

3.開発ワークスペースで SQL ワークフローを開発する

  • 後続の手順で下記を作成していく

    definitions/
        sources/
            dwh_customers.sqlx
            dwh_orders.sqlx
        intermediate/
        output/
            dm_customer_orders.sqlx
            dm_customer_orders_assertion.js
        extra/
    includes/
    dataform.json
    
  • sourcesディレクトリ

    • dwh_customers.sqlx

      • SELECTなくても動作する
      config {
        type: "declaration",
        database: "project_id",
        schema: "dwh_sample",
        name: "dwh_customers"
      }
      
      SELECT
        *
      FROM
        `project_id.dwh_sample.dwh_customers`
      
    • dwh_orders.sqlx

      config {
        type: "declaration",
        database: "project_id",
        schema: "dwh_sample",
        name: "dwh_orders"
      }
      
      SELECT
        *
      FROM
        `project_id.dwh_sample.dwh_orders`
      
  • outputディレクトリ

    • dm_customer_orders.sqlx

      • クエリの中でQUALIFYするか。ROW_NUMBERなど
      config {
        type: "table",
        database: "project_id",
        schema: "dm_sample",
        name: "dm_customer_orders",
        description: "Aggregated customer orders with partitioning by last_order_date",
        columns: {
          customer_id: "Customer ID",
          customer_name: "Customer Name",
          total_amount: "Total Order Amount",
          last_order_date: "Last Order Date"
        },
        tags: ["output"],
        uniqueKey: ["customer_id"],
        bigquery: {
          partitionBy: "DATE(last_order_date)"
        }
      }
      
      SELECT
        c.customer_id,
        c.customer_name,
        SUM(o.order_amount) AS total_amount,
        MAX(o.order_date) AS last_order_date
      FROM
        ${ref("dwh_customers")} AS c
      JOIN
        ${ref("dwh_orders")} AS o
      ON
        c.customer_id = o.customer_id
      GROUP BY
        c.customer_id,
        c.customer_name
      
      
    • dm_customer_orders_assertion.js

      module.exports = {
        assertions: {
          unique_keys: ["customer_id"],
          not_null: ["customer_id", "customer_name", "total_amount", "last_order_date"],
          custom: [
            {
              query: "SELECT COUNT(*) = 0 FROM dm_customer_orders WHERE total_amount <= 0",
              description: "Ensure total_amount is positive"
            },
            {
              query: "SELECT COUNT(*) = 0 FROM dm_customer_orders WHERE last_order_date IS NULL",
              description: "Ensure last_order_date is not null"
            }
          ]
        }
      }
      

4. Dataform コアを SQL にコンパイルする

  • コンソールで開発する場合、コーディングしながらコンパイルされている
    • リアルタイムでコード検証がされてエラーチェックをすることができます
    • COMPILED QUERIESタブで実行されるSQLを確認可能です
  • Dataform>COMPILED GRAPH で確認できる

5. 依存関係ツリーを実行する

  • XXX@gcp-sa-dataform.iam.gserviceaccount.com のアカウントで実行されるので、BigQueryの権限を付与しておく
  • Dataform>START EXECUTIONで実行する
    • tagごとに実行することも可能
  • Dataform>EXECUTIONSで結果を確認できる

Tips

サンプルスクリプト

typeの使い分け

主なファイル

  • dataform.json→workflow_settings.yaml(Dataform コア 3.0から変更)
    - Dataform ワークフロー設定を構成する

    warehouse: bigquery
    defaultProject: dev-431902
    defaultLocation: asia-northeast1
    defaultDataset: dataform
    defaultAssertionDataset: dataform_assertions
    dataformCoreVersion: 3.0.0-beta.4
    vars:
      env: production
    
  • includes/constants.js
    - Dataform に含まれる include 変数と関数を再利用する
    - 下記例はプロジェクトIDだが、よく使うカラムなどを定数としてexportしたりする

      const PROJECT_ID = "my_project_name";
      module.exports = { PROJECT_ID };
    
      config { type: "table" }
      SELECT * FROM ${constants.PROJECT_ID}.my_schema_name.my_table_name
    

Gitリポジトリ紐付け

開発環境

環境管理: dev→stg→prdのフロー

  • コード ライフサイクルの管理
    • 下記または、環境ごとの変数が複数ある場合はvarでenvを設定してconstで環境ごとの値を設定する、または環境ごとの設定ファイルを用意するなどがある
  • ①環境ごとの設定ファイルを準備

    • まず、dataform.jsonファイルで環境ごとのプロジェクトIDを変数として定義します。この変数を使用して、環境に応じてプロジェクトIDを切り替えます。
    • または、varsではenvだけ指定しておいて、constとかでプロジェクト名をenvから作る
    {
      "defaultSchema": "your_dataset_name",
      "assertionSchema": "dataform_assertions",
      "warehouse": "bigquery",
      "defaultDatabase": "dev-project-id",  // デフォルトは開発環境
      "defaultLocation": "your_region",
      "vars": {
        "project_id": "dev-project-id"  // 環境ごとのプロジェクトIDを変数で管理
      }
    }
    
  • ②SQLXファイルで変数を使用

    • SQLXファイル内で、プロジェクトIDを変数として参照します。これにより、環境ごとに異なるプロジェクトIDを使用できます。
    config {
      type: "view",
      schema: "${dataform.projectConfig.vars.project_id}.your_dataset_name"
    }
    
    SELECT * FROM `${dataform.projectConfig.vars.project_id}.your_table_name`
    
  • ③開発環境での開発

    • 開発者は、Dataformの開発ワークスペースを使用して開発を行います。開発が完了したら、Gitを使用してコードを管理し、PR(プルリクエスト)を作成してレビューと承認を受けます。
  • ④本番環境での実行

    • 本番環境では、Workflowsを使用してDataformを実行します。Workflowsから実行する際に、コンパイル変数を使用してプロジェクトIDを本番用に上書きします。
      CLIを使用する場合
    dataform run --vars '{"project_id":"prod-project-id"}'  # 本番環境用のプロジェクトIDを指定
    
    • Workflowsからの実行
      • Workflowsから実行する際には、Dataform APIを使用してコンパイル変数を設定します。APIリクエストでcompilationResults.createを呼び出し、CodeCompilationConfigでプロジェクトIDを指定します。
    {
      "workspace": "projects/your_project_id/locations/your_location/repositories/your_repository/workspaces/your_workspace",
      "codeCompilationConfig": {
        "vars": {
          "project_id": "prod-project-id"  // 本番環境用のプロジェクトID
        }
      }
    }
    

includes/function.js

  • 他、テーブルサフィックスの取得などができる
// includes/function.js

/**
 * Returns the current timestamp in the format 'YYYY-MM-DD HH:MM:SS'.
 */
function getCurrentTimestamp() {
  const date = new Date();
  const year = date.getFullYear();
  const month = String(date.getMonth() + 1).padStart(2, '0');
  const day = String(date.getDate()).padStart(2, '0');
  const hours = String(date.getHours()).padStart(2, '0');
  const minutes = String(date.getMinutes()).padStart(2, '0');
  const seconds = String(date.getSeconds()).padStart(2, '0');
  return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}`;
}

/**
 * Capitalizes the first letter of each word in a given string.
 * @param {string} text - The text to be capitalized.
 * @returns {string} - The capitalized text.
 */
function capitalizeWords(text) {
  return text.replace(/\b\w/g, char => char.toUpperCase());
}

/**
 * Safely retrieves a property from an object, returning a default value if the property is undefined.
 * @param {object} obj - The object to retrieve the property from.
 * @param {string} key - The key of the property to retrieve.
 * @param {*} defaultValue - The default value to return if the property is undefined.
 * @returns {*} - The value of the property or the default value.
 */
function getOrDefault(obj, key, defaultValue) {
  return obj.hasOwnProperty(key) ? obj[key] : defaultValue;
}

/**
 * Generates a random integer between min and max (inclusive).
 * @param {number} min - The minimum value.
 * @param {number} max - The maximum value.
 * @returns {number} - The random integer.
 */
function getRandomInt(min, max) {
  return Math.floor(Math.random() * (max - min + 1)) + min;
}

/**
 * Generates a unique identifier (UUID v4).
 * @returns {string} - The generated UUID.
 */
function generateUUID() {
  return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {
    const r = (Math.random() * 16) | 0;
    const v = c === 'x' ? r : (r & 0x3) | 0x8;
    return v.toString(16);
  });
}

module.exports = {
  getCurrentTimestamp,
  capitalizeWords,
  getOrDefault,
  getRandomInt,
  generateUUID,
};
// my_table.sqlx

const { getCurrentTimestamp, capitalizeWords } = require('./includes/function.js');

config {
  type: "table",
  description: capitalizeWords("this table stores user data"),
}

select
  user_id,
  user_name,
  created_at,
  '${getCurrentTimestamp()}' as generated_at
from
  source("raw_data", "users")

冪等性担保のためのデータ削除

  • pre_operationsを使用して、冪等性を担保するためのデータ削除を行う例
// tables/my_table.sqlx

// 変数設定
const { target_date } = require('./includes/function.js');

// テーブルの設定
config {
  type: "table",
  description: "This table stores daily transaction data.",
  pre_operations: [
    `
    DELETE FROM ${self()} 
    WHERE date = '${target_date()}'
    `
  ]
}

// 新しいデータの挿入クエリ
select
  user_id,
  transaction_amount,
  transaction_type,
  date
from
  source('raw_data', 'transactions')
where
  date = '${target_date()}'
// includes/function.js

/**
 * Returns the target date to be used in the pre_operations and the main query.
 * You can set this dynamically based on your needs.
 * @returns {string} - The target date in the format 'YYYY-MM-DD'.
 */
function target_date() {
  // You can set this to any dynamic or static date as per your requirements
  return "2024-08-09"; // Example static date, or use logic to determine date
}

module.exports = {
  getCurrentTimestamp,
  capitalizeWords,
  getOrDefault,
  getRandomInt,
  generateUUID,
  target_date, // Add this function to the module exports
};

Assertionのプロパティ

  1. rowConditions
  2. uniqueKeyCondition
  3. dataFreshnessConditions
  4. globalAssertionParams
  5. fieldConditions
  6. nonNullConditions
  7. customSql
  8. expression
  9. assertionDependencies
2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?