本記事は、VISITS Advent Calendar 2019の10日目の記事です。
昨日は、@muraokaz によるクラスタリング+二次元マッピングによる可視化を定量評価する でした。
VISITS Technologiesでインフラエンジニアをしている@syogunです。
本日は、Cloud Composerを使用してVISITSでデータ分析基盤を構築(in progress)した時の話をさせて頂ければと思います。
主にCloudSQL(MySQL)→GCS→BigQuery
の部分について書きます。
人様の情報がベースで恐縮ですが、こちらの記事を参考にして構築を進めました。
Cloud ComposerがGKE上で動いているため、ちょっとGKEの話も出てきます。
では参りましょう。
1. 構成図
今後変更はあるかもしれませんが、想定している構成図はこんな感じです。(GAEのログ転送は含まない)
データマスキングの部分は、GCPのDLPを使用してメールアドレスや電話番号を判別して、
その情報を元にデータをマスキングする方法を検討中です。
現在進行形なので、仕組みができたらまた書こうと思います。
2. 背景
ユーザーや弊社サービス利用企業のActivity等を分析し、プロダクトの改善に活かしていきたいというニーズがあり、データ分析基盤を作ろう!という話になりました。
具体的には、アプリケーションログ(GAE)、Google Analytics、DB(MySQL)をDataSourceとし、
そのDataSourceに必要なマスキング処理を行い、DWHにロードするといった構成を考えました。
弊社のクラウドは主にGCPを使っているため、DWHは文句なしでBigQueryを使うことにしました。
3. いろいろ試しました
そもそも「何を使えばいいだ?」というレベルから始まったのいろいろ試行錯誤しました。
おそらく一番の課題は、データを抽出、加工するところだと考え、
最初は、Cloud DataPrepやCloud DataFusionを使用して、ノンプログラミングで楽することを考えました。
しかしその淡い夢は、以下の理由で絶たれれました。。
Cloud DataPrepについて
DataPrepとは、公式ドキュメントを見ると
ブラウザ環境の簡単なドラッグ&ドロップ操作で、分析用の多様なデータセットを視覚的に探索、クリーニング、準備できるインテリジェントなデータ準備およびクレンジング サービスです。
とあります。
DataSourceのマスキングにこのDataPrepを使うことを検討したんですが、
- そもそも対応しているDataSourceがGCSとBigQueryしかなく、Cloud SQLに対応していなかった。
- Cloud SQLからのデータ抽出は、別途CircleCI等のCIツールから
gcloud sql export csv
を叩いてGCSにCloud SQLのデータを出力することも検討したが、負けた感が強くて途中でやめた。
という理由で無念の撤退。
Cloud DataFusionについて
DataFusionとは、公式ドキュメントを見ると
ETL および ELT のデータ パイプラインを効率的に構築して管理できる、フルマネージドかつクラウド ネイティブなデータ統合サービスです。
とあります。
次にDataPrepと比較して様々なDataSourceに対応しているDataFusionを検討したんですが、
- DataFusionからCloud SQLへの接続がどうしてもできなかった。。涙
- こちら記事だとRDSはできそうだった..
という理由で無念の撤退。
余談ですが、DataFusionは、DataSourceとしてS3とかにも対応しており、
S3->BigQueryが実現できそうでした。これはAWSユーザにとっては嬉しいですね。
4. 最終的に何を使ったのか?
そんなこんなで最終的にCloud Composerにたどり着きました。
理由としては、
- ワークフローを定義できるのでマスキング等の処理を任意のタイミングで挟める
-
MySqlToGoogleCloudStorageOperator
やGoogleCloudStorageToBigQueryOperator
といったOperator(特定のプログラムを実行するためのテンプレートのようなもの)が用意されている - すでにCloudSQL(MySQL)→GCS→BigQueryをやっている人がおり、サンプルのDAGファイルもあった。
- Airflowは、個人的になかなか学習コストが高かったので助かった
が挙げられます。
5. Cloud Composerセットアップな大まかな流れ
以下のような流れでセットアップできます。
- Cloud Composer環境を構築
- Cloud SQL ProxyをGKE上にデプロイ
- Airflow connectionsの設定
- DAGファイルの作成、設置
6. Cloud ComposerでCloudSQL(MySQL)→GCS→BigQuery
を実現するためのポイント
ここでは、私がハマった箇所をご紹介します。
1. Cloud Composer環境を構築
当初は、以下のgcloudコマンドでCloud Composer環境を構築しました。
$ gcloud composer environments create <your env name> --location asia-northeast1
しかしながら、Composer Backend timed out.
なるエラーが出てコマンドがこけます。
原因は、Cloud Composer環境作成の際、サービスアカウントを明示的に指定しないと
デフォルトでは、<your project number>-compute@developer.gserviceaccount.com
というサービスアカウントが使用されます。
このサービスアカウントにComposer ワーカー
権限がないため、Composerの作成にこけるようです。(エラーメッセージはなんとかしてほしい)
デフォルトのサービスアカウントは使いたくなかったので、
今回は別途cloud-composer@<your project name>.iam.gserviceaccount.com
を作成し、そちらでComposerを作成しました。以下、コマンド例。
$ gcloud composer environments create <your env name> --service-account=cloud-composer@<your project name>.iam.gserviceaccount.com --location asia-northeast1
サービスアカウント権限は、
Composer ワーカー
BigQuery 管理者
クラウドSQL編集者
ストレージ オブジェクト管理者
をつけました。
2. Cloud SQL ProxyをGKE上にデプロイ
Cloud ComposerからCloud SQLへの接続は、Cloud SQL Proxyを使用します。
Cloud Composer環境を作成すると、裏でGCEが起動し、その上でKubernetesが起動することになります。
よって、Cloud SQL Proxy用のDeployment
,Service
を作成し、DBの接続設定をCloud SQL ProxyのEndpointに向けることでCloud SQLへの接続が可能となります。
まず、kubectlコマンドでサービウアカウントのsecretを作成します。
$ mv <your service account file> credentials.json
$ kubectl create secret generic <your secret name> --from-file=key.json=credentials.json
次にこちらのManifest Fileを参考にCloud SQL Proxy用のDeploymentを作成します。
こちらのManifest Fileに少し手を加えて、さきほど作成したsecret(サービスアカウントキー)を埋め込んでます。
参考までに手を加えた部分を抜粋します。
spec:
volumes:
- name: ssl-certs
hostPath:
path: /etc/ssl/certs
- name: google-cloud-key
secret:
secretName: <your secret name> # secret名を指定
containers:
- image: gcr.io/cloudsql-docker/gce-proxy:1.11
volumeMounts:
- name: ssl-certs
mountPath: /etc/ssl/certs
- name: google-cloud-key # secretをmount
mountPath: /var/secrets/google
env:
- name: GOOGLE_APPLICATION_CREDENTIALS # GOOGLE_APPLICATION_CREDENTIALSにサービスアカウントを設定(不要かも?)
value: /var/secrets/google/key.json
command:
[
"/cloud_sql_proxy",
"-instances=visits-for-innovators-prd:asia-northeast1:innovators-production-replica=tcp:0.0.0.0:3306",
"-credential_file=/var/secrets/google/key.json", # credentialを明示的に指定
]
次に、こちらのManifest Fileをkubectl apply
してServiceリソースを作成します。
ここで作成したServiceがCloud SQL ProxyのEndpointになります。
3. Airflow connectionsの設定
GCPのCloud ComposerからAirflowのGUIに接続できます。
このAirflowのGUIからCloud SQLのConnections設定を行います。
ここでは文字コードを指定しないとExportしたjsonが文字化けすることがあったんですが、以下のようにutf8
を明示的に指定することで回避できました。
なお、Conn ID
には、上記で作成したServiceリソース名.default
(例 mysql-dvh-sqlproxy-service.default)を指定すればOKです。
あとは、KubernetsのKube-DNSが名前解決してくれるはずです。
4. DAGファイルの作成、設置
Cloud Composerを作成すると自動でDAGファイルを配置するGCSバケットが作成されます。
このGCSバケットにDAGファイルを配置すれば、ワークフローが読み込まれます。
ここでのハマりポイントは、DAGファイルでSchedule設定しているのに、当該時刻が過ぎてもワークフローが走らないことがありました。
AirflowのSchedule設定のポイントは以下2つでした。
スケジュール実行
start_dateを設定する
まず、start_date
、ワークフローの開始日付を設定しないと動きません。
# 設定例
default_args = {
'owner': '<your name>',
'depends_on_past': False,
# Change this to a literal date to make scheduler work.
'start_date': datetime(2019, 11, 25), # ここをちゃんと設定する
'email': ['<your email address>'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=1),
}
スケジュール実行時間の計算方式を理解する
Scheduleを設定してもジョブが動かないず、少しハマりました。
詳細は、こちらの記事の解説がすごく分かりやすかったですが、ポイントは¥
start_date + schedule_interval が経過したらスケジュールされる
スケジュールされるタイミングはschedule_intervalの経過後
とのことでした。
このあたり把握せず、デバッグしようと思うとハマりますのでお気をつけを。
BigQueryOperatorを使ったデータマートの作成
BigQuery上のTableをjoinして、データマート的なものを作ろうと考えました。
これには、AirflowのBigQueryOperator
を使用することを考えています。
具体的には、既存のBigqueryのTableに対しsql='sql/template.sql',
で実行したSQLの結果を、destination_dataset_table
で指定したDatasetのTableに出力するというものです。
ところが、上記のQuery実行結果が、destination_dataset_table
で指定したDatasetのTableにコピーされない事象がありました。
これは、create_disposition='CREATE_IF_NEEDED',
というパラメータをつけることで解決しました。
以下サンプルDAGです。
なお、ここでしているSQLは、DAGファイルと同じGCS上のディレクトリに配置すればOKです。
def gen_transform_table_task(table_config):
transform_task = BigQueryOperator(
task_id='transform_of_{}'.format(table_config.params['export_table']),
sql='sql/template.sql',
destination_dataset_table="{}.{}.{}".format(table_config.params['gcp_project'],
table_config.params['stage_dataset'] +
"_summary",
table_config.params['stage_table'] + "_{{ ds_nodash }}"),
params={"project_nm": "<your gcp project name>",
"dataset_nm": "innovators",
"table_nm": "{}".format(table_config.params['export_table'])},
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED', # ここ
bigquery_conn_id='<your bigquery conn id>',
location='US',
use_legacy_sql=False,
dag=dag)
transform_task.doc_md = """\
# Import table from storage to bigquery
task documentation
"""
return transform_task
7. まとめ
Cloud Composerは、いろんなOperatorが用意してあるので、使い方をマスターできればかなり便利そうな感じですね。
これからいろいろキャッチアップしていきたいです。
明日は、@ken_hikita による「kintoneの承認経路を作ってみたときのお話し」です!
8. 参考情報
以下の文献を参考にさせていただきました。