概要
タイトル通りの入門記事。PythonではなくJavaでコードを書き、バッジではなくストリーミングで処理を行い、ローカルではなくDataflowをランナーとして使う。
この記事では、PubSubトピックからデータを受信し、簡単な加工をした上で、別のPubSubトピックにデータを書き込むデータフローを作成する。
検証に使った環境
- Macbook Pro(macOS High Sierra)
- Java 1.8
- IntelliJ
前提
- Java 1.8がインストール済み
- Maven 3.5.4がインストール済み
- GCPのアカウントがある
- Cloud SDK(gcloudコマンド)がインストール済み
手順
GCPリソースの作成
必要なGCPリソースを作成する。
- サービスアカウント
- サービスアカウントを一つ作成し、プロジェクトオーナーの権限をつける(検証用のため、強めの権限にしています)
- キーをダウンロードする
- GCSのバケット
- PubSubのトピックを二つ
- 一つがデータソース、もう一つがデータの出力先となる
依存関係の設定(Mavenを使用)
ここではMavenを使用する。
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>2.5.0</version>
</dependency>
ソースコード
PubSubから受信したデータに、”Hello,”を追加し、別のPubSubに入れる。
AddHello.java
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
public class AddHello {
private static final String PROJECT = "[YOUR PROJECT]";
private static final String STAGING_LOCATION = "gs://[YOUR GCS BACKET]/staging";
private static final String TEMP_LOCATION = "gs://[YOUR GCS BACKET]/temp";
private static final String SRC_PUBSUB_TOPIC = "projects/[YOUR PROJECT]/topics/[PUBSUB TOPIC 1]";
private static final String DST_PUBSUB_TOPIC = "projects/[YOUR PROJECT]/topics/[PUBSUB TOPIC 2]";
static class MyFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output("Hello," + c.element());
}
}
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setRunner(DataflowRunner.class);
dataflowOptions.setProject(PROJECT);
dataflowOptions.setStagingLocation(STAGING_LOCATION);
dataflowOptions.setTempLocation(TEMP_LOCATION);
dataflowOptions.setNumWorkers(1);
Pipeline p = Pipeline.create(dataflowOptions);
p.apply(PubsubIO.readStrings().fromTopic(SRC_PUBSUB_TOPIC))
.apply(ParDo.of(new MyFn()))
.apply(PubsubIO.writeStrings().to(DST_PUBSUB_TOPIC));
p.run();
}
}
デプロイ
環境変数にGOOGLE_APPLICATION_CREDENTIALS=/path/to/xxxxx.json
を設定し、上記のコードを実行する。
GCPのWebコンソールからDataflowを選択し、デプロイされたことを確認する。
動作確認
データソースとなるPubSubトピックにデータを入れる。これはGCPのWebコンソールから可能。デプロイ後すぐはDataflowにデータが読み込まれない可能性があるので、少し時間を開けると良いかもしれない。
データ出力先のPubSubトピックにサブスクリプション(仮にmy-subscriptionとする)を作成し、データを取得する。
$ gcloud pubsub subscriptions pull my-subscription --auto-ack