Dataflow Template "BigQuery to TFRecord"のソースコードを読んでみる
Dataflow Template は、Google Cloud の分散処理サービスであるCloud Dataflowをコードを書かずに動かすことができるシロモノです。いくつかの一般的な処理のテンプレートが提供されており、今回はそのうちの「BigQuery to TFRecord」です。機械学習の前処理のひとつです。
ソースコードはこちら
main処理
大きな流れとしては、
- 入力されたSQLを使ってBigQueryからデータを読み、その際にTensorFlowのtf.Futureに変換して、tf.ExampleにシリアライズしたPCollectionを生成("RecordToExample")
- 指定されたtrain/test/validの割合に応じて、Partition変換でデータを分割する
- 指定されたGoogle Cloud Storageのディレクトリにtrain/test/valというディレクトリを切って、TFRecord形式のファイルを出力("WriteTFTrainingRecord"/"WriteTFTestingRecord"/"WriteTFValidationRecord")
ちょっとしたポイント
BigQuery StorageAPIでREAD
BigQueryからの読み込みは、.withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ)
で指定。
PCollection<byte[]> bigQueryToExamples =
pipeline
.apply(
"RecordToExample",
BigQueryIO.read(BigQueryToTFRecord::record2Example)
.fromQuery(options.getReadQuery())
.withCoder(ByteArrayCoder.of())
.withTemplateCompatibility()
.withoutValidation()
.usingStandardSql()
.withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ)
// Enable BigQuery Storage API
).apply("ReshuffleResults", Reshuffle.viaRandomKey());
Futureに変換する際の型変換
- "STRING"/"TIME"/"DATE" -> Bytes(Array考慮)
- "BYTES" -> Bytes
- "INTEGER"/"INT64"/"TIMESTAMP" -> Int64
- "FLOAT"/"FLOAT64" -> Float
- "BOOLEAN"/"BOOL" -> Int64(1 or 0)
- その他 -> Error(Unsuported)
データ分割の指定がない場合
train/test/validの割合指定がない場合、全部trainになる。1が、100%。
void setOutputSuffix(ValueProvider<String> outputSuffix);
@Description("The training percentage split for TFRecord Files")
@Default.Float(1)
ValueProvider<Float> getTrainingPercentage();