LoginSignup
3

More than 5 years have passed since last update.

Cloud Dataflowを使ってBigQueryのデータをAvroフォーマットでGCSに格納する

Posted at

先の投稿ではdigdagを使ってBQのデータをAvroフォーマットでGCSに格納したが、BQのデータを加工したり、大規模なデータに対して並列で実行したい場合にはDataflowの方がマッチしていると考えられる。

DataflowはBigQuery,Avro,GCSのI/Oを備えているので、上記のことを容易に実現することが可能。

BigQueryからデータを取得してAvroに変換する処理の例がなくて困ったので、同じことではまっている人向けにここでシェア。

AvroIOを使うためには、最初にAvro schemaを定義することが必要。

  String schemaString = 
  "{\"namespace\": \"example.avro\",\n"
   + "\"type\": \"record\",\n"
   + "\"name\": \"User\",\n"
   + "\"fields\": [\n"
   + "    {\"name\": \"resource\", \"type\": \"string\"} \n "
   + "]\n"
   + "}";
   schema = new Schema.Parser().parse(schemaString);

次に、BigQuery table rowからGenericRecordに変換する処理を定義

 public static class TransformTableRowToAvroRecord extends DoFn<TableRow, GenericRecord> {
  private static final long serialVersionUID = 1L;

  public GenericRecord makeGenericRecord(TableRow content) {   
   GenericRecord record = new GenericData.Record(schema);
   record.put("resource", (String) content.get("userid"));
   return record;
  }
  @Override
  public void processElement(ProcessContext c) {
   TableRow row = c.element();
   c.output(makeGenericRecord(row));
  }  
 }

最後に、パイプラインに変換を適用してGCSに出力するようにする。

PCollection<TableRow> BigQueryReadingCollection = bp.apply(BigQueryIO.Read.withoutValidation().usingStandardSql().fromQuery(myQuery));
PCollection<GenericRecord> BigQueryTransformData = BigQueryReadingCollection.apply(ParDo.of(new TransformTableRowToAvroRecord())).setCoder(AvroCoder.of(GenericRecord.class, schema));
BigQueryTransformData.apply(AvroIO.Write.to("gs://MY_BUCKET/MY_PATH").withSchema(schema)); 

以上です。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3