Help us understand the problem. What is going on with this article?

ScalaでつくるCloud Dataflowテンプレート

はじめに

以前にCloud Dataflowについて以下の記事を書きました。
https://qiita.com/yuyu_hf/items/e8e738f542e1f30d7be4

記事内では当時Apache Beamに慣れていたわけでもないので、ScalaではなくJavaを使用しましたが、Java、Scala、Scala+Scioのどれを使うか検討する際に、一目で比較できるようなものがあれば良いなと思い簡単な処理でそれぞれ試してみました

BigQueryの特定のカラムを抽出し、GCSに吐き出すとてもシンプルな処理です

テンプレート作成(Java編)

プロジェクト作成

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.beam \
    -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
    -DgroupId=com.example \
    -DartifactId=bigquery-to-gcs \
    -Dversion="0.1" \
    -DinteractiveMode=false \
    -Dpackage=com.example

ライブラリ管理

pom.xml
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.example</groupId>
  <artifactId>dataflow-bigquerytogcs</artifactId>
  <version>0.1</version>

  <packaging>jar</packaging>

  <properties>
    <beam.version>2.16.0</beam.version>

    <bigquery.version>v2-rev20181104-1.27.0</bigquery.version>
    <google-clients.version>1.27.0</google-clients.version>
    <hamcrest.version>2.1</hamcrest.version>
    <jackson.version>2.9.10</jackson.version>
    <joda.version>2.10.3</joda.version>
    <junit.version>4.13-beta-3</junit.version>
    <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
    <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
    <maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
    <maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>
    <mockito.version>3.0.0</mockito.version>
    <pubsub.version>v1-rev20181105-1.27.0</pubsub.version>
    <slf4j.version>1.7.25</slf4j.version>
    <spark.version>2.4.4</spark.version>
    <hadoop.version>2.7.3</hadoop.version>
    <maven-surefire-plugin.version>2.21.0</maven-surefire-plugin.version>
    <nemo.version>0.1</nemo.version>
    <flink.artifact.name>beam-runners-flink-1.8</flink.artifact.name>
  </properties>

  <repositories>
    <repository>
      <id>apache.snapshots</id>
      <name>Apache Development Snapshot Repository</name>
      <url>https://repository.apache.org/content/repositories/snapshots/</url>
      <releases>
        <enabled>false</enabled>
      </releases>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
  </repositories>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>${maven-compiler-plugin.version}</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>${maven-surefire-plugin.version}</version>
        <configuration>
          <parallel>all</parallel>
          <threadCount>4</threadCount>
          <redirectTestOutputToFile>true</redirectTestOutputToFile>
        </configuration>
        <dependencies>
          <dependency>
            <groupId>org.apache.maven.surefire</groupId>
            <artifactId>surefire-junit47</artifactId>
            <version>${maven-surefire-plugin.version}</version>
          </dependency>
        </dependencies>
      </plugin>

      <!-- Ensure that the Maven jar plugin runs before the Maven
        shade plugin by listing the plugin higher within the file. -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>${maven-jar-plugin.version}</version>
      </plugin>

      <!--
        Configures `mvn package` to produce a bundled jar ("fat jar") for runners
        that require this for job submission to a cluster.
      -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>${maven-shade-plugin.version}</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <finalName>${project.artifactId}-bundled-${project.version}</finalName>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/LICENSE</exclude>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>

    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>${maven-exec-plugin.version}</version>
          <configuration>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

  <profiles>
    <profile>
      <id>direct-runner</id>
      <activation>
        <activeByDefault>true</activeByDefault>
      </activation>
      <!-- Makes the DirectRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-direct-java</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

   <profile>
      <id>portable-runner</id>
      <activation>
        <activeByDefault>true</activeByDefault>
      </activation>
      <!-- Makes the PortableRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-reference-java</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

    <profile>
      <id>apex-runner</id>
      <!-- Makes the ApexRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-apex</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
        <!--
          Apex depends on httpclient version 4.3.6, project has a transitive dependency to httpclient 4.0.1 from
          google-http-client. Apex dependency version being specified explicitly so that it gets picked up. This
          can be removed when the project no longer has a dependency on a different httpclient version.
        -->
        <dependency>
          <groupId>org.apache.httpcomponents</groupId>
          <artifactId>httpclient</artifactId>
          <version>4.3.6</version>
          <scope>runtime</scope>
          <exclusions>
            <exclusion>
              <groupId>commons-codec</groupId>
              <artifactId>commons-codec</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
        <!--
          Apex 3.6 is built against YARN 2.6. Version in the fat jar has to match
          what's on the cluster, hence we need to repeat the Apex Hadoop dependencies here.
        -->
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-yarn-client</artifactId>
          <version>${hadoop.version}</version>
          <scope>runtime</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>${hadoop.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

    <profile>
      <id>dataflow-runner</id>
      <!-- Makes the DataflowRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

    <profile>
      <id>flink-runner</id>
      <!-- Makes the FlinkRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <!-- Please see the Flink Runner page for an up-to-date list
               of supported Flink versions and their artifact names:
               https://beam.apache.org/documentation/runners/flink/ -->
          <artifactId>${flink.artifact.name}</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

    <profile>
      <id>spark-runner</id>
      <!-- Makes the SparkRunner available when running a pipeline. Additionally,
           overrides some Spark dependencies to Beam-compatible versions. -->
      <properties>
        <netty.version>4.1.17.Final</netty.version>
      </properties>
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-spark</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
          <scope>runtime</scope>
          <exclusions>
            <exclusion>
              <groupId>org.slf4j</groupId>
              <artifactId>jul-to-slf4j</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
        <dependency>
          <groupId>com.fasterxml.jackson.module</groupId>
          <artifactId>jackson-module-scala_2.11</artifactId>
          <version>${jackson.version}</version>
          <scope>runtime</scope>
        </dependency>
        <!-- [BEAM-3519] GCP IO exposes netty on its API surface, causing conflicts with runners -->
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
          <version>${beam.version}</version>
          <exclusions>
            <exclusion>
              <groupId>io.grpc</groupId>
              <artifactId>grpc-netty</artifactId>
            </exclusion>
            <exclusion>
              <groupId>io.netty</groupId>
              <artifactId>netty-handler</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
      </dependencies>
    </profile>
    <profile>
      <id>gearpump-runner</id>
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-gearpump</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>
    <profile>
      <id>samza-runner</id>
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-samza</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>
    <profile>
      <id>nemo-runner</id>
      <dependencies>
        <dependency>
          <groupId>org.apache.nemo</groupId>
          <artifactId>nemo-compiler-frontend-beam</artifactId>
          <version>${nemo.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>${hadoop.version}</version>
          <exclusions>
            <exclusion>
              <groupId>org.slf4j</groupId>
              <artifactId>slf4j-api</artifactId>
            </exclusion>
            <exclusion>
              <groupId>org.slf4j</groupId>
              <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
      </dependencies>
    </profile>

    <profile>
      <id>jet-runner</id>
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-jet-experimental</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

  </profiles>

  <dependencies>
    <!-- Adds a dependency on the Beam SDK. -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <!-- Dependencies below this line are specific dependencies needed by the examples code. -->
    <dependency>
      <groupId>com.google.api-client</groupId>
      <artifactId>google-api-client</artifactId>
      <version>${google-clients.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-bigquery</artifactId>
      <version>${bigquery.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.google.http-client</groupId>
      <artifactId>google-http-client</artifactId>
      <version>${google-clients.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-pubsub</artifactId>
      <version>${pubsub.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>joda-time</groupId>
      <artifactId>joda-time</artifactId>
      <version>${joda.version}</version>
    </dependency>

    <!-- Add slf4j API frontend binding with JUL backend -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-jdk14</artifactId>
      <version>${slf4j.version}</version>
      <!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
      <scope>runtime</scope>
    </dependency>

    <!-- Hamcrest and JUnit are required dependencies of PAssert,
         which is used in the main code of DebuggingWordCount example. -->
    <dependency>
      <groupId>org.hamcrest</groupId>
      <artifactId>hamcrest-core</artifactId>
      <version>${hamcrest.version}</version>
    </dependency>

    <dependency>
      <groupId>org.hamcrest</groupId>
      <artifactId>hamcrest-library</artifactId>
      <version>${hamcrest.version}</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>${junit.version}</version>
    </dependency>

    <!-- The DirectRunner is needed for unit tests. -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-direct-java</artifactId>
      <version>${beam.version}</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.mockito</groupId>
      <artifactId>mockito-core</artifactId>
      <version>${mockito.version}</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

テンプレート

BigqeuryToGCS.java
package com.example;

import com.google.api.services.bigquery.model.TableRow;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;


public class BigqeuryToGCS {

    public interface BigQueryToGCSOptions extends PipelineOptions {}

    public static void main(final String[] args) {
        final BigQueryToGCSOptions options = PipelineOptionsFactory.fromArgs(args)
                                                                   .withValidation()
                                                                   .as(BigQueryToGCSOptions.class);

        final Pipeline pipeline = Pipeline.create(options);

        pipeline.apply("Read from BigQuery", BigQueryIO.readTableRows()
                            .fromQuery("SELECT * FROM <DB>.<Table>"))
                .apply(ParDo.of(new DoFn<TableRow, String>() {
                    @ProcessElement
                    public void processElement(final ProcessContext c) {
                        final TableRow row = c.element();
                        final String col = String.valueOf(row.get("<Column Name>"));
                        c.output(col);
                        }
                    }))
                .apply(TextIO.write().to("<GCS URL for output files>"));

    pipeline.run();
  }
}

テンプレートの実行

 mvn compile exec:java \
    ~/dataflow-intro
    -Dexec.mainClass=com.example.BigqueryToGCS \
    -Dexec.args="--project=<Project Name> \
    --tempLocation=<GCS URL> \
    --gcpTempLocation=<GCS URL> \
    --runner=DataflowRunner \
    --jobName=BigqueryToGCS" \
    -Pdataflow-runner

テンプレート作成(Scala編)

次にJavaで作ったプロジェクトをScalaに書き直して行きます

ライブラリ管理

Scalaではsbtを使うので、ライブラリの管理はbuild.sbtで行います。実行に必要な最小限のライブラリだけ入れたいので以下のように設定します

build.sbt
name := "<Project Name>"
version := "0.0.1"
scalaVersion := "2.12.10"

val beamVersion = "2.5.0"

libraryDependencies ++= Seq(
  "com.google.cloud.dataflow" % "google-cloud-dataflow-java-sdk-all" % beamVersion
)

テンプレート

BigqeuryToGCS.scala
package com.hello

import com.google.api.services.bigquery.model.TableRow
import org.apache.beam.runners.dataflow.DataflowRunner
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.options.{ PipelineOptions, PipelineOptionsFactory }
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.util.Transport
import org.apache.beam.sdk.transforms.{ DoFn, ParDo }
import DoFn.ProcessElement
import org.apache.beam.sdk.io.TextIO
import scala.util.{ Failure, Success, Try}


class ColumnDoFn extends DoFn[TableRow, String] {
    @ProcessElement
    def processElement(c: ProcessContext) {
        val input = c.element()
        val col = String.valueOf(input.get("<Column>"))
        c.output(col)
    }
} 

object BigqueryToGCS {
    def main(args: Array[String]) :Unit = {
        trait BigQueryToGCSOptions extends PipelineOptions with DataflowPipelineOptions
        val options = PipelineOptionsFactory.create().as(classOf[BigQueryToGCSOptions])

        options.setProject("<Project>")
        options.setRunner(classOf[DataflowRunner])
        options.setRegion("asia-northeast1")
        options.setZone("asia-northeast1-a")
        options.setNumWorkers(1)
        options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.NONE)
        options.setWorkerMachineType("n1-standard-1")
        options.setStagingLocation("<GCS URL for staging>")
        options.setGcpTempLocation("<GCS URL for tmp>")
        options.setTempLocation("<GCS URL for tmp>")

        val p = Pipeline.create(options)

        p.apply("", BigQueryIO.readTableRows().fromQuery("SELECT * FROM <DB>.<Table>"))
         .apply("", ParDo.of(new ColumnDoFn))
         .apply("", TextIO.write().to("<GCS URL for output files>"))

        p.run()
    }
}

テンプレートの実行

sbt "runMain com.hello.BigqueryToGCS --project=<Project Name> --runner=DataflowRunner -Pdataflow-runner"

テンプレート作成(Scala+Scio編)

Scioとは、Spotify社によって開発されているApache BeamとCloud Dataflow用のScala APIです

プロジェクト作成

sbt new spotify/scio.g8

ライブラリ管理

scioのテンプレートにBigqueryのライブラリが足りないので追加します

build.sbt
import sbt._
import Keys._

val scioVersion = "0.7.4"
val beamVersion = "2.11.0"
val scalaMacrosVersion = "2.1.1"

lazy val commonSettings = Defaults.coreDefaultSettings ++ Seq(
  organization := "example",
  // Semantic versioning http://semver.org/
  version := "0.1.0-SNAPSHOT",
  scalaVersion := "2.12.10",
  scalacOptions ++= Seq("-target:jvm-1.8",
                        "-deprecation",
                        "-feature",
                        "-unchecked"),
  javacOptions ++= Seq("-source", "1.8", "-target", "1.8")
)

lazy val paradiseDependency =
  "org.scalamacros" % "paradise" % scalaMacrosVersion cross CrossVersion.full
lazy val macroSettings = Seq(
  libraryDependencies += "org.scala-lang" % "scala-reflect" % scalaVersion.value,
  addCompilerPlugin(paradiseDependency)
)

lazy val root: Project = project
  .in(file("."))
  .settings(commonSettings)
  .settings(macroSettings)
  .settings(
    name := "yuyu_hf",
    description := "yuyu_hf",
    publish / skip := true,
    run / classLoaderLayeringStrategy := ClassLoaderLayeringStrategy.Flat,
    libraryDependencies ++= Seq(
      "com.spotify" %% "scio-core" % scioVersion,
      "com.spotify" %% "scio-test" % scioVersion % Test,
      "com.spotify" %% "scio-bigquery" % scioVersion,
      "org.apache.beam" % "beam-runners-direct-java" % beamVersion,
      // optional dataflow runner
      "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion,
      "org.slf4j" % "slf4j-simple" % "1.7.25"
    )
  )
  .enablePlugins(PackPlugin)

lazy val repl: Project = project
  .in(file(".repl"))
  .settings(commonSettings)
  .settings(macroSettings)
  .settings(
    name := "repl",
    description := "Scio REPL for yuyu_hf",
    libraryDependencies ++= Seq(
      "com.spotify" %% "scio-repl" % scioVersion
    ),
    Compile / mainClass := Some("com.spotify.scio.repl.ScioShell"),
    publish / skip := true
  )
  .dependsOn(root)

テンプレート

BigqeuryToGCS.scala
package example

import com.spotify.scio.bigquery._
import com.spotify.scio.ContextAndArgs


object BigqueryToGCS {
  @BigQueryType.fromQuery("SELECT * FROM [<Project>:<DB>.<Table>]")
  class Row

  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)

    sc.typedBigQuery[Row]()
      .map(r => r.sepal_length.getOrElse(""))
      .saveAsTextFile("<GCS URL for output files>")

    sc.close
  }
}

テンプレートの実行

sbt "runMain example.BigqueryToGCS --project=<Project Name> --runner=DataflowRunner"

感想

個人的にApache Beamに慣れるのに時間がかかったので最初は世の中にサンプルコードの多いJavaで書いていました。JavaのコードをまずScalaに変換するところから始めるだけでもきっかけとして良いと思いますし、Scalaらしいコードがかける人であればJavaよりも綺麗に中間データの処理ができるかもしれません。Scioにするかどうかはライブラリの更新頻度とチーム状況を見て考えると良いと思います。

参考文献

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした