先の投稿ではdigdagを使ってBQのデータをAvroフォーマットでGCSに格納したが、BQのデータを加工したり、大規模なデータに対して並列で実行したい場合にはDataflowの方がマッチしていると考えられる。
DataflowはBigQuery,Avro,GCSのI/Oを備えているので、上記のことを容易に実現することが可能。
BigQueryからデータを取得してAvroに変換する処理の例がなくて困ったので、同じことではまっている人向けにここでシェア。
AvroIOを使うためには、最初にAvro schemaを定義することが必要。
String schemaString =
"{\"namespace\": \"example.avro\",\n"
+ "\"type\": \"record\",\n"
+ "\"name\": \"User\",\n"
+ "\"fields\": [\n"
+ " {\"name\": \"resource\", \"type\": \"string\"} \n "
+ "]\n"
+ "}";
schema = new Schema.Parser().parse(schemaString);
次に、BigQuery table rowからGenericRecordに変換する処理を定義
public static class TransformTableRowToAvroRecord extends DoFn<TableRow, GenericRecord> {
private static final long serialVersionUID = 1L;
public GenericRecord makeGenericRecord(TableRow content) {
GenericRecord record = new GenericData.Record(schema);
record.put("resource", (String) content.get("userid"));
return record;
}
@Override
public void processElement(ProcessContext c) {
TableRow row = c.element();
c.output(makeGenericRecord(row));
}
}
最後に、パイプラインに変換を適用してGCSに出力するようにする。
PCollection<TableRow> BigQueryReadingCollection = bp.apply(BigQueryIO.Read.withoutValidation().usingStandardSql().fromQuery(myQuery));
PCollection<GenericRecord> BigQueryTransformData = BigQueryReadingCollection.apply(ParDo.of(new TransformTableRowToAvroRecord())).setCoder(AvroCoder.of(GenericRecord.class, schema));
BigQueryTransformData.apply(AvroIO.Write.to("gs://MY_BUCKET/MY_PATH").withSchema(schema));
以上です。