LoginSignup
4
1

More than 5 years have passed since last update.

Dataflow、Java、ストリーミングをとりあえず動かす

Last updated at Posted at 2018-10-08

概要

タイトル通りの入門記事。PythonではなくJavaでコードを書き、バッジではなくストリーミングで処理を行い、ローカルではなくDataflowをランナーとして使う。

この記事では、PubSubトピックからデータを受信し、簡単な加工をした上で、別のPubSubトピックにデータを書き込むデータフローを作成する。

検証に使った環境

  • Macbook Pro(macOS High Sierra)
  • Java 1.8
  • IntelliJ

前提

  • Java 1.8がインストール済み
  • Maven 3.5.4がインストール済み
  • GCPのアカウントがある
  • Cloud SDK(gcloudコマンド)がインストール済み

手順

GCPリソースの作成

必要なGCPリソースを作成する。

  • サービスアカウント
    • サービスアカウントを一つ作成し、プロジェクトオーナーの権限をつける(検証用のため、強めの権限にしています)
    • キーをダウンロードする
  • GCSのバケット
  • PubSubのトピックを二つ
    • 一つがデータソース、もう一つがデータの出力先となる

依存関係の設定(Mavenを使用)

ここではMavenを使用する。

        <dependency>
            <groupId>com.google.cloud.dataflow</groupId>
            <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
            <version>2.5.0</version>
        </dependency>

ソースコード

PubSubから受信したデータに、”Hello,”を追加し、別のPubSubに入れる。

AddHello.java
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class AddHello {
    private static final String PROJECT = "[YOUR PROJECT]";
    private static final String STAGING_LOCATION = "gs://[YOUR GCS BACKET]/staging";
    private static final String TEMP_LOCATION = "gs://[YOUR GCS BACKET]/temp";
    private static final String SRC_PUBSUB_TOPIC = "projects/[YOUR PROJECT]/topics/[PUBSUB TOPIC 1]";
    private static final String DST_PUBSUB_TOPIC = "projects/[YOUR PROJECT]/topics/[PUBSUB TOPIC 2]";

    static class MyFn extends DoFn<String, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            c.output("Hello," + c.element());
        }
    }

    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();
        DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
        dataflowOptions.setRunner(DataflowRunner.class);
        dataflowOptions.setProject(PROJECT);
        dataflowOptions.setStagingLocation(STAGING_LOCATION);
        dataflowOptions.setTempLocation(TEMP_LOCATION);
        dataflowOptions.setNumWorkers(1);

        Pipeline p = Pipeline.create(dataflowOptions);
        p.apply(PubsubIO.readStrings().fromTopic(SRC_PUBSUB_TOPIC))
                .apply(ParDo.of(new MyFn()))
                .apply(PubsubIO.writeStrings().to(DST_PUBSUB_TOPIC));
        p.run();
    }
}

デプロイ

環境変数にGOOGLE_APPLICATION_CREDENTIALS=/path/to/xxxxx.jsonを設定し、上記のコードを実行する。

GCPのWebコンソールからDataflowを選択し、デプロイされたことを確認する。

動作確認

データソースとなるPubSubトピックにデータを入れる。これはGCPのWebコンソールから可能。デプロイ後すぐはDataflowにデータが読み込まれない可能性があるので、少し時間を開けると良いかもしれない。

データ出力先のPubSubトピックにサブスクリプション(仮にmy-subscriptionとする)を作成し、データを取得する。

$ gcloud pubsub subscriptions pull my-subscription --auto-ack

参考資料

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1