はじめに
業務でデータ基盤の構築をしています。
今回はCloud Composer
を用いて、BigQuery
でcsvデータをロードしてみたいと思います。
使用する諸々はこちら
TerraformによるComposer環境の構築
Terraform
を使用して、クラウドリソースを作成すると管理が楽です。
そのため今回はTerraform
を使用して、リソース作成を行います。
サービスアカウントの作成
Terraform
でComposer
の構築をするために、サービスアカウントからキーを発行しておいてください
サービスアカウント キーの作成と管理
Terraformaのインストール
公式サイトからzipを落とせますが、homebrewでインストールすることも出来ます
$ brew install terraform
tfファイルの作成
次にComposerを立ち上げるための.tf
ファイルを作成します。
# google認証
provider "google-beta" {
credentials = "${file("account.json")}"
project = "project_id"
region = "asia-northeast1"
}
# 作成するComposerの内容
resource "google_composer_environment" "composer-environment" {
name = "sample"
project = "project_id"
region = "asia-northeast1"
config {
node_count = 3
node_config {
zone = "asia-northeast1-c"
machine_type = "n1-standard-1"
disk_size_gb = 20
}
}
}
provider.credentials
には、発行したキーファイル(.json)のパスを指定します。
ただこの状態だとファイルにセキュアな情報が記載されてしまいます。git管理するときにもめんどそうです。
そのためTerraformでも勧められているように、以下のように環境変数をセットした方が利便性が高いです。可能ならこちらを実施してください。
$ export GOOGLE_CLOUD_KEYFILE_JSON={{path}}
ちなみprovider
でgoogle-beta
を使用しているのは、python3をComposer
で使いたかったからです。
特段こだわりなければgoogle-beta
ではなく、googleで問題ありません。
リソースの作成
上記で作成した.tf
ファイルがあるディレクトリに移動して、init
> apply
を行います。
$ terraform init
$ terraform apply
...
Do you want to perform these actions?
Terraform will perform the actions described above.
Only 'yes' will be accepted to approve.
Enter a value:
yes
でリソース作成を行います。
Enter a value: yes
google_composer_environment.composer-environment: Still creating... (10s elapsed)
google_composer_environment.composer-environment: Still creating... (20s elapsed)
google_composer_environment.composer-environment: Still creating... (30s elapsed)
.
.
.
Apply complete! Resources: 1 added, 0 changed, 0 destroyed.
Composer
が作成されるまで10分ほど掛かります。気長に待ちましょう!
作成されるとGCPのコンソールから「Composer」を選択すると、下記のように環境が作成されているのを確認できます。
Dagファイルの作成
Composer (Airflow)
では、ワークフローの定義をDagファイルというものに記載します。ただのpythonファイルです。
今回はgcs
にあるcsv
ファイルをBigQuery
に投入するだけのタスクを持つ、ワークフローを作成します。
やりたいことはBigQuery
にデータを投入することなので、Dataflow
をComposer
から呼び出すような実装にします。
Dataflow
はGCPで用意されている、フルマネージドなデータ処理サービスです。
Dataflow
にはgoogle側でいくつかテンプレートが用意されており、今回はgcs
にあるcsvをBigQueryへロードするため、Text Files on Cloud Storage to BigQuery
を使います。
上記のテンプレートをComposer
から呼び出して、データのインポート作業はDataflow
に任せます。
Dagファイルは以下のようになります。
from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from datetime import datetime, timedelta
from airflow import models
# gcpのプロジェクトID
PROJECT_ID = models.Variable.get('project_id')
# gcsのパス
BUCKET_NAME = models.Variable.get('gcs_bucket_name')
# dag名
DAG_NAME = 'csv_to_bigquery'
yesterday = datetime.combine(
datetime.today() - timedelta(days=1) + timedelta(hours=9),
datetime.min.time()
)
default_dag_args = {
'owner': 'HagaSpa',
'start_date': yesterday,
'project_id': PROJECT_ID,
'dataflow_default_options': {
'project': PROJECT_ID,
}
}
with models.DAG(
dag_id=DAG_NAME,
schedule_interval="@once",
default_args=default_dag_args) as dag:
t1 = DataflowTemplateOperator(
task_id='task1',
template='gs://dataflow-templates/latest/GCS_Text_to_BigQuery',
parameters={
# udf.jsファイルにある呼び出したいメソッド名
'javascriptTextTransformFunctionName': 'transform',
# bqのスキーマ定義ファイルのgcsパス
'JSONPath': 'gs://{}/composer/schema/schema.json'.format(BUCKET_NAME),
# udf.jsファイルのgcsパス
'javascriptTextTransformGcsPath': 'gs://{}/composer/udf/udf.js'.format(BUCKET_NAME),
# bqへ投入するcsvファイルのgcsパス
'inputFilePattern': 'gs://{}/composer/csv/sample.csv'.format(BUCKET_NAME),
# 保存するbqのプロジェクトid:データセット名.テーブル名
'outputTable': '{}:my_dataset.sample'.format(PROJECT_ID),
# bqへのロード中のtempディレクトリ
'bigQueryLoadingTemporaryDirectory': 'gs://{}/composer/temp'.format(BUCKET_NAME),
},
)
-
まず
PROJECT_ID = models.Variable.get('project_id')
ですが、model.Variables
はAirflow
で定義できる環境変数のことです。プロジェクトIDやgcs
のバケット名など、外だししておく方が運用が楽な値は環境変数に定義します。
各値は自身の環境に合わせて、Airflow
の環境変数をセットして下さい。
「環境変数はどうやって定義すんのや?」って方はこちらを参照ください。 -
with models.DAG()
がairflowにおける、1つのDagになります。
SubDag?とか使えば、複数記載が出来るようですが今回はこれだけになります。 -
DataflowTemplateOperator()
が、Dataflowのテンプレートを呼び出している箇所になります。
parameters
に記載している内容は、Dataflowのテンプレート(GCS_Text_to_BigQuery)を使用する上で必須なものです。
Dataflowによるものです。 -
udf.js
は、「ユーザが定義した関数ファイル」です。具体的には「ファイルの行を引数とした、jsファイル」となっており、BigQueryへcsvファイルを取り込む際のカラム名とフィールド値をマッピングします。
function transform(line) {
var values = line.split(',');
var obj = new Object();
obj.id = values[0];
obj.name = values[1];
obj.age = values[2];
var jsonString = JSON.stringify(obj);
return jsonString;
}
// id,name,age
1,hagaspa,26
2,anyone,99
3,hoge,17
上記2ファイルをDag
ファイルに記載された、gcs
のパスに配置してください。
その後Composer
のコンソールから「Dag のフォルダ」をクリックし、表示されたgcs
にDagファイルをアップロードしてください。
Dagの起動
Dagファイルをアップロードしてから、しばらく経つとAirflow
の画面に以下のようにDagが表示されます。
(おそらく初回は自動的に開始されているはずです。開始されていない場合は、右側のRunをクリックして下さい。)
Recent Tasksがグリーンになると成功です。
Airflow
がDataflow
を起動し、BigQueryへデータをインポートしているか確認しましょう!
まずはDataflowですが、成功していると下記のようになっているはずです。
基本的にオールグリーンになっていれば成功ですが、ありがちなミスとしてgcs
のファイル名を間違えているとエラーにならずに0KBを読み取るということをするので、Read from source
のフェーズは要チェックしましょう。
最後にBigQuery
にデータをインポート出来ているか確認します。
csvデータがインポート出来ました!
まとめ
データ基盤を作る上では、しばしばテーブルの更新状態に依存することがあります。
今回は単純にデータをロードしただけですが実業務だと、
- 前日データを使ってテーブルAにデータをロードする。
-
テーブルAが更新されているなら、
Tableau
データソースへ適用や集計処理などを実施。 - テーブルAへの取り込みがエラーなら、後続処理を止めSlack通知を行う。
などのワークフローを組むことが多いです。
そのためAirflow
もとい、Composer
を使用してデータ基盤構築フローを組むことはとても便利だと感じました。
困ったら
基本はドキュメントに記載されています。またossなので最悪ソースコードを読めます。
Dataflow
以外にも色んなサービスをAirflow
で使用できるので、operatorsを参照してやりたいことがAirflow
で実現できるか考慮しましょう!