BigQueryのデータをClickHouseに転送する実践的な手順を、GCS経由のClickPipesを利用して解説します。設定から動作確認までを再現し、ETL構成とベストプラクティスを紹介します。
はじめに:BigQueryとClickHouseの連携シナリオ
ビッグデータ分析基盤を構築する際、BigQueryとClickHouseを併用することで、それぞれの強みを最大限に活かすことができます。
BigQueryの強み
- ペタバイト規模のデータウェアハウスとして機能
- 多様なクエリパターンに対応した柔軟な分析
- マネージドサービスとしての運用負荷の低さ
ClickHouseの強み
- サブ秒単位のリアルタイム分析
- 高速な集計処理とOLAP性能
- カラム指向ストレージによる高い圧縮率(BigQueryより約30%優位)
この記事で扱うGCS経由のClickPipesは、BigQueryからGoogle Cloud Storage(GCS)にデータをエクスポートし、ClickPipesのGCSコネクタを使ってClickHouseに取り込む2段階のアプローチです。
データフロー
BigQuery → GCS(エクスポート) → ClickPipes(GCSコネクタ) → ClickHouse
このアプローチのメリット
- 並列処理による高速転送:BigQueryは大規模テーブルを自動的に複数ファイルに分割(最大1GBごと)してエクスポートするため、並列読み込みが効率的
- 中継ストレージの活用:GCSを中継することで、転送のリトライやバッチ処理が容易
- ClickPipesの自動化機能:スキーマ推定、型変換、継続的なデータ取り込みを自動化
前提環境
本記事で使用する環境は以下の通りです。
必要なリソース
| リソース | 詳細 |
|---|---|
| BigQueryプロジェクト | テスト用データセット mydataset とテーブル synthetic_data を含む |
| Google Cloud Storage | BigQueryエクスポート用バケット gs://my-bq-export-bucket
|
| ClickHouse Cloud | クラスタが稼働中で、接続情報(ホスト、ポート、認証情報)を取得済み |
| ClickPipes | ClickHouse Cloudアカウントでアクセス可能 |
必要な権限
-
BigQuery:
roles/bigquery.dataViewer,roles/bigquery.jobUser -
GCS:
roles/storage.objectCreator,roles/storage.objectViewer - ClickHouse:データベースへの書き込み権限
ClickPipeの概要と仕組み
ClickPipesは、ClickHouse Cloudが提供する継続的なデータ取り込みサービスです。複数のソースからClickHouseへのデータ転送を自動化し、ETLパイプラインの構築を簡素化します。
ClickPipesの3工程
ClickPipesは内部で以下の3工程を実行します。
1. Extract(抽出)
- GCSバケットから自動的にファイルを検出
- JSON、CSV、Parquet、Avro形式をサポート
- 並列読み込みによる高速処理
2. Transform(変換)
- ソースファイルから型を自動推定
- ソースの型からClickHouseの型へ自動変換
- スキーママッピング画面で型を手動調整可能
3. Load(ロード)
- ClickHouseテーブルへのバッチ挿入
- 重複排除や更新処理のサポート
- エラーハンドリングとリトライ機能
GCSコネクタの特徴
- 自動ファイル検出:指定したGCSパスのファイルを継続的に監視
- 増分取り込み:新しいファイルのみを処理する増分ロード
- 圧縮サポート:gzip、snappy圧縮されたファイルに対応
ステップ1:BigQueryからGCSへのエクスポート
まず、BigQueryのデータをGCSにエクスポートします。
エクスポートクエリの実行
EXPORT DATA OPTIONS(
uri='gs://my-bq-export-bucket/synthetic_data/*.parquet',
format='PARQUET',
overwrite=true
) AS
SELECT * FROM mydataset.synthetic_data;
このクエリは以下を実行します。
- データをParquet形式でエクスポート
- ファイルサイズが大きい場合、自動的に複数ファイルに分割(例:
synthetic_data-000000000000.parquet,synthetic_data-000000000001.parquet) -
overwrite=trueでGCS上の既存ファイルを上書き
注意: overwrite=true はBigQueryからGCSへのエクスポートのみに有効です。ClickPipes側では「新しいファイルのみ」を処理するため、同じファイル名で上書きすると再取り込みされない可能性があります。
エクスポート確認
gsutil ls gs://my-bq-export-bucket/synthetic_data/
出力例
gs://my-bq-export-bucket/synthetic_data/synthetic_data-000000000000.parquet
リージョン間エクスポートの注意点
BigQueryデータセットとGCSバケットが異なるリージョンにある場合、以下の点に注意してください:
Pythonでのエクスポート例
from google.cloud import bigquery
client = bigquery.Client(project='your-project-id')
table_id = 'your-project.your-dataset.your-table'
destination_uri = 'gs://your-bucket/path/*.parquet'
# エクスポート設定
job_config = bigquery.ExtractJobConfig()
job_config.destination_format = bigquery.DestinationFormat.PARQUET
job_config.compression = bigquery.Compression.SNAPPY
# 重要:locationパラメータはデータセットのリージョンと一致させる
extract_job = client.extract_table(
table_id,
destination_uri,
location="asia-northeast1", # データセットのリージョンを指定
job_config=job_config,
)
extract_job.result() # ジョブ完了を待機
リージョン設定のポイント
-
locationパラメータは、BigQuery データセット のリージョンと一致させる必要があります - GCSバケットのリージョンは異なっていても動作します(ただしクロスリージョン転送料金が発生)
- ClickHouseのリージョンも異なっていて問題ありません(例:BigQuery=asia-northeast1、ClickHouse=us-east1)
コスト最適化
クロスリージョン転送を避けるため、可能であればBigQueryデータセット、GCSバケット、ClickHouseクラスタを同一リージョンに配置することを推奨します。
定期エクスポートの設定(オプション)
継続的な同期を実現するために、BigQueryのスケジュールクエリを設定できます。
-- 毎時間、過去75分から15分前のデータをエクスポート
EXPORT DATA OPTIONS(
uri='gs://my-bq-export-bucket/synthetic_data_incremental/${RUN_DATE}/*.parquet',
format='PARQUET'
) AS
SELECT * FROM mydataset.synthetic_data
WHERE event_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 75 MINUTE)
AND event_time < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 15 MINUTE);
このオフセットウィンドウにより、データ遅延を吸収しながら重複を防止できます。
ステップ2:ClickPipesのセットアップ
ClickHouse CloudコンソールからClickPipesを設定します。
ClickPipesの作成手順
-
ClickHouse Cloudコンソールにログイン
- https://clickhouse.cloud/ にアクセス
- 対象のサービスを選択
-
ClickPipesの作成
- 左メニューから「ClickPipes」を選択
- 「Create ClickPipe」ボタンをクリック
-
ソースの選択
- データソースとして「Google Cloud Storage」を選択
GCSコネクタの接続設定
ClickPipesのGCSコネクタに以下の情報を設定します。
| パラメータ | 説明 | 例 |
|---|---|---|
| Name | パイプライン名 | bigquery_to_clickhouse |
| Path | GCSのフルURL(ワイルドカード対応) | https://storage.googleapis.com/my-bq-export-bucket/synthetic_data/*.parquet |
| File Type | ファイル形式 | Parquet |
| Access Key | HMAC Access Key ID | GOOG1E... |
| Secret Key | HMAC Secret | (自動生成される秘密鍵) |
重要: ClickPipesのGCSコネクタは、パスを gs:// ではなく https://storage.googleapis.com/ から始まるURL形式で指定する必要があります。
認証設定(HMAC認証)
ClickPipesのGCSコネクタは HMAC認証 を使用します。Service Account用のHMACキーを作成します。
# Service Accountの作成
gcloud iam service-accounts create clickpipes-gcs-reader \
--display-name="ClickPipes GCS Reader"
# GCSバケットへの権限付与
gsutil iam ch serviceAccount:clickpipes-gcs-reader@YOUR_PROJECT_ID.iam.gserviceaccount.com:roles/storage.objectViewer \
gs://my-bq-export-bucket
# HMACキーの作成
gcloud storage hmac create clickpipes-gcs-reader@YOUR_PROJECT_ID.iam.gserviceaccount.com
最後のコマンドで出力される Access ID と Secret をメモしてください。これらをClickPipesの設定画面で以下のように入力します。
-
Access Key:
GOOG1E...で始まるAccess ID - Secret Key: 表示されたSecret
ClickHouse出力先の設定
次に、ClickHouse側の設定を行います。
| パラメータ | 説明 | 推奨値 |
|---|---|---|
| Database | 出力先データベース | default |
| Table | 出力先テーブル名 | synthetic_data |
| Table Engine | テーブルエンジン | MergeTree |
| Ordering Key | ORDER BY句 | (event_time, user_id) |
| Permissions | ClickPipesユーザーの権限 |
Limited access(通常のデータ取り込みの場合) |
Permissionsについて
- Limited access: テーブルへのデータ書き込みのみ(推奨)
- Full access: マテリアライズドビューやディクショナリへのアクセスも必要な場合
ClickPipesは自動的にテーブルを作成するオプションがあります。また、事前にテーブルを作成することも可能です。
CREATE TABLE default.synthetic_data
(
id String,
user_id Int64,
event_time DateTime64(3),
event_type String,
amount Float64
)
ENGINE = MergeTree()
ORDER BY (event_time, user_id);
MergeTreeを推奨する理由
- 時系列データに最適
- 高速なクエリ性能
- 効率的なデータ圧縮
- パーティション管理が容易
ステップ3:スキーママッピングの確認
ClickPipesは自動的にスキーマを推定しますが、型変換ルールを確認・編集できます。
BigQueryとClickHouseの型対応表
| BigQuery型 | ClickHouse型 | 備考 |
|---|---|---|
| BOOL | Bool |
|
| INT64 | Int64 |
より小さい範囲なら Int8/Int16/Int32 も選択可 |
| FLOAT64 | Float64 |
|
| NUMERIC | Decimal(P, S) |
精度とスケールを指定 |
| STRING | String |
|
| BYTES | String |
Base64エンコードが必要な場合あり |
| DATE | Date |
|
| DATETIME | DateTime64(3) |
マイクロ秒精度が必要なら DateTime64(6)
|
| TIMESTAMP | DateTime64(3, 'UTC') |
タイムゾーン指定推奨 |
| TIME | String |
ClickHouseにTIME型はない |
| ARRAY | Array(T) |
内部型は対応する型に変換 |
| STRUCT |
Tuple または Nested
|
構造に応じて選択 |
| JSON | String |
JSON操作関数で処理 |
型最適化のベストプラクティス
BigQueryの INT64 をそのまま使うのではなく、実際のデータ範囲に応じて最適化することで、ストレージと性能が向上します。
| データ範囲 | 推奨型 | メモリ削減 |
|---|---|---|
| 0 ~ 255 | UInt8 |
87.5% |
| 0 ~ 65,535 | UInt16 |
75% |
| 0 ~ 4,294,967,295 | UInt32 |
50% |
| -128 ~ 127 | Int8 |
87.5% |
例:ユーザーIDが100万程度までなら UInt32 で十分です。
スキーマ設定の例
ClickPipesのスキーママッピング画面で以下のように設定します。
BigQuery列 → ClickHouse列
────────────────────────────────────
id (STRING) → id (String)
user_id (INT64) → user_id (UInt32) ← 最適化
event_time (TIMESTAMP) → event_time (DateTime64(3, 'UTC'))
event_type (STRING) → event_type (LowCardinality(String)) ← カーディナリティが低いため最適化
amount (FLOAT64) → amount (Float64)
LowCardinality は、カーディナリティが低いカラム(例:event_type)に適用すると圧縮率が向上します。
Nullable列について
ClickHouseでは Nullable 型は8バイトのオーバーヘッドがあるため、可能な限り避けることを推奨します。BigQueryで NULL が含まれる列は、デフォルト値を設定するか、Nullable を使用します。
-- NULLを避ける例
amount Float64 DEFAULT 0.0
-- Nullableを使う例(やむを得ない場合)
optional_field Nullable(String)
ClickPipesが自動追加するメタデータカラム
ClickPipesのGCSコネクタは、以下のメタデータカラムを自動的に追加します。
| カラム名 | 型 | 説明 |
|---|---|---|
_path |
LowCardinality(String) | GCSファイルのパス |
_file |
LowCardinality(String) | ファイル名 |
_size |
Nullable(UInt64) | ファイルサイズ(バイト) |
_time |
Nullable(DateTime) | ファイルの取り込み時刻 |
これらのカラムは、データの出所や取り込みタイミングを追跡するのに便利です。特定のファイルからのデータのみをフィルタリングする際にも使用できます。
-- 特定のファイルからのデータのみ取得
SELECT * FROM default.synthetic_data
WHERE _file = 'synthetic_data-000000000000.parquet';
-- 取り込み時刻でフィルタ
SELECT * FROM default.synthetic_data
WHERE _time >= '2024-01-15 10:00:00';
ステップ4:ジョブの実行と監視
ClickPipesの設定が完了したら、データ取り込みを開始します。
パイプラインの開始
ClickPipesコンソールで「Start」ボタンをクリックすると、データ取り込みが開始されます。
監視ダッシュボード
ClickPipesダッシュボードでは以下の情報を確認できます。
- Status:パイプラインの状態(Running / Paused / Error)
- Rows Ingested:取り込まれた行数
- Files Processed:処理されたファイル数
- Last Run:最後の実行時刻
- Error Messages:エラーが発生した場合のメッセージ
ログの確認
詳細なログを確認するには
- ClickPipesのパイプライン名をクリック
- 「Logs」タブを選択
- エラーや警告メッセージを確認
典型的なログ出力例
2024-01-15 10:30:01 [INFO] Starting ClickPipe: bigquery_to_clickhouse
2024-01-15 10:30:02 [INFO] Scanning GCS path: gs://my-bq-export-bucket/synthetic_data/*.parquet
2024-01-15 10:30:03 [INFO] Found 1 file(s) to process
2024-01-15 10:30:04 [INFO] Processing: synthetic_data-000000000000.parquet
2024-01-15 10:30:15 [INFO] Inserted 100,000 rows into default.synthetic_data
2024-01-15 10:30:15 [INFO] ClickPipe completed successfully
まとめ
この記事では、BigQueryからGCS経由でClickPipesを使ってClickHouseにデータを移行する実践的な手順を解説しました。
BigQueryの柔軟性とClickHouseの高速性を組み合わせることで、データウェアハウスとリアルタイム分析基盤のハイブリッド構成が実現できます。BigQueryでペタバイト級のデータを蓄積・分析しながら、ClickHouseでリアルタイム性が求められるダッシュボードやAPIを提供するといった、それぞれの強みを活かした使い分けが可能です。
ClickPipesのGCSコネクタを利用することで、スキーマ推定や型変換といった煩雑な作業を自動化し、継続的なデータ取り込みパイプラインを短時間で構築できます。BigQueryのスケジュールクエリと組み合わせれば、完全自動化された定期同期も実現できるでしょう。
今回紹介した基本パターンをベースに、データ規模やユースケースに応じてパーティション設計や圧縮設定を最適化し、さらに大規模なデータ基盤へと発展させていってください。
参考リンク