何のメモ
Cloud Dataflowを初めて触ってみたときの記録。
どんな時に役立つ?
次回Cloud Dataflowを使って何かする時にどんな感じだったかを思い出すのに使う個人的なメモ。
Cloud Dataflowとは
公式:https://cloud.google.com/dataflow/?hl=ja
自分まとめ:https://qiita.com/risa0320/items/ff6aa5ec87c84e14560e#cloud-dataflow
※ https://cloud.google.com/dataflow/?hl=ja より
触ってみる
今回はGoogleの既存テンプレートであるText Files on Cloud Storage to BigQuery
を使って処理していく。
Jobの内容は簡単には、GCS上のファイルに対してUDFを使ってデータ加工をし、BQ上のテーブルにInsertする感じ。
準備
https://console.cloud.google.com/dataflow/
https://cloud.google.com/dataflow/docs/guides/templates/provided-batch?hl=ja#gcstexttobigquery
Cloud Dataflowを有効して(Dataflowを試す)、必要なAPIを有効化しておく。
事前に以下を作成しておく。
- GCSにバケットと各種フォルダを作成
- バケット
- tmpフォルダ
- データソースとなるファイル
- sample.csv
- GCS上に配置する
- BQ上に空テーブルを作成
- sample.csvのデータを格納するテーブル
- BQテーブルのschemaファイル
- schema.json
- GCS上に配置する
- UDFファイル
- my_function.js
- データ加工時に使用するUDFを定義したjsファイル
- GCS上に配置する
- 空のファイル
- process.txt
- GCS上に配置
1, "Taro", "2000/10/01 03:00:00 UTC", 19, "Tokyo", true
2, "Hanako", "1994/07/25 12:56:00 UTC", 26, "Osaka", false
3, "Kumi", "1999/01/10 23:44:00 UTC", 20, "Osaka", true
function transform(line) {
function strToTimestamp(str_datetime) {
if (str_datetime.indexOf('/') >= 0){
str_datetime = str_datetime.replace(/\\/g, "-");
}
return new Date(str_datetime);
};
function booleanToInt(b) {
if (b) {
return 1;
}
return 0;
};
var words = line.split(',');
var obj = new Object();
obj.id = Number(words[0]);
obj.name = words[1];
obj.birthday = strToTimestamp(words[2]);
obj.age = Number(words[3]);
obj.address = words[4];
obj.has_dog = booleanToInt(words[5]);
return JSON.stringify(obj);
}
本題
今回はストリーミングより安いバッチ処理を作成する。
Job作成
「テンプレートからジョブを作成」からJobを作成していく。
Cloud Dataflowのテンプレートにはいろいろあるけど、今回はGCS上のファイルをデータ加工してBQに突っ込む処理をやってみようと思うので、
Text Files on Cloud Storage to BigQuery
を選択。
テンプレートを選択すると必要なパラメータボックスが出てくるので、埋めていく。
- GCS location of your JavaScript UDF
- GCS上のUDFファイルPath
- gs://test-job-1/test-job-1/job_1.js
- GCS location of your BigQuery schema file, described as a JSON
- GCS上のBQテーブルSchemaファイルPath
- gs://test-job-1/test-job-1/schema.json
- The name of the JavaScript function you wish to call as your UDF
- UDFファイル内で定義され、Dataflow内でJobとして実行する関数名を指定
- transform
- The fully qualified BigQuery table
- BQテーブル名
- for-my-study:datafolw_job_1.job_1
- The GCS location of the text you'd like to process
- gs://test-job-1/process.txt
- Temporary directory for BigQuery loading process
- gs://test-job-1/test-job-1/tmp
- 一時的なロケーション
- gs://test-job-1/test-job-1/tmp
上記を設定し、「ジョブを実行」。
処理時間の目安はこのテンプレートの場合は以下のよう。
このバッチ パイプラインの費用は、処理するデータの量に応じて変わります。一般的な 10 MB の CSV ファイルの処理時間は 1 つの Dataflow ワーカーで 5 分未満、料金は 1 時間あたり\$0.10になります。500 MB の CSV ファイルの処理時間は 4 つの Dataflow ワーカーで約 10 分、料金は 1 時間あたり\$0.40 になります。
Job実行結果を確認
エラーが出た。
赤くなっているBoxの「▽」を押すと更に詳細なエラーポイントが見れる。
感想
Jobフローをカスタマイズするにもそのテンプレートを作成しないといけなかったり、
既存Jobを修正する方法がGUI画面上では見つからなかったり、慣れるまでは結構大変かも。
ただ、DataflowからGCP上のいろんなサービスに面倒な認証なしで連携できて、
処理状態を視覚的に簡単に把握できるのはいいなと思った。