LoginSignup
5
6

More than 5 years have passed since last update.

GoogleCloudで汎用Database構築2 - DataFlow2 -

Last updated at Posted at 2016-10-31

GYAOのtsです。
我々のチームは、オールパブリッククラウドで、Microservice Architectureを採用した次期バックエンドを設計中です。

経緯

前回の投稿で、DataFlowの前提知識をいくつかまとめたので、早速ハンズオンしてみる。

今回の目指すところ

今回もチーム内のメンバー全員でイベントチックに構築してみる。
やりたいことは以下。

  1. pubsubにrest形式でメッセージをpublish
  2. dataflowのstream modeで処理を行う
    1. メッセージをsubscribe
    2. ランダムな文字列を発行
    3. 発行した文字列をキーにCloudDataStoreに登録。

pubsubにrest形式でメッセージをpublishする前に、Apigeeを置いて連携したいが、それは後ほど。

準備

Cloud Storage

Cloud Storageに転送→同時にGCEのインスタンスが起動→deployのような形になっているようなので、転送先のバケットを作成する。(Eclipseからも作成可能。)
スクリーンショット 2016-10-29 23.06.16.png

スクリーンショット 2016-10-29 23.10.43.png

スクリーンショット 2016-10-29 23.16.15.png
flow-stagingという名前で作成。

Cloud Pub/Sub

Cloud Pub/SubからStreamデータを購読してその内容をstoreするので、topicを作成する。
スクリーンショット 2016-10-29 23.19.26.png

スクリーンショット 2016-10-29 23.22.26.png

スクリーンショット 2016-10-29 23.24.33.png
テストトピックを作成。

Cloud DataStore

特に準備の必要はなし。

setup

ここからはオフィシャルに沿ってすすめる。

Cloud SDK

スクリーンショット 2016-10-30 0.07.40.png
Source: https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-eclipse

  • 1~3は普通に作成、有効にする。
  • 4 Eclipseで開発するマシンにCloudSDKをインストール。
  • 5、6もその通りにEclipse Luna以上、JDK1.7以上の構成で。

Eclipse

今回はEclipse neonを使用。
jdk1.8
Eclipseにはアップデートサイトhttps://dl.google.com/dataflow/eclipse/ からプラグインをインストールしておく。

Pipeline作成

Eclipseプラグインのお陰で簡単にプロジェクトが作成できる。(Mavenでも構築可能
スクリーンショット 2016-10-30 0.29.13.png
スクリーンショット 2016-10-30 0.33.36.png
すすめるとProjectIDとLocationをの入力画面があるので、入力する。

項目 概要
ProjectID 自分のプロジェクトのID
Location 先程作成したCloudStorageのLocation。この場合はgs://flow-staging プルダウンで出てきます。

Coding

Pipelineは指定されたクラスのmainメソッドの中で構築する。Google提供のサンプルを少しいじって以下のようにしてみた。

StoreJob.java

    public static void main(String[] args) throws IOException {

        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        options.setDataset("xxxxxx"); //ProjectId
        options.setKind("data"); //Datastoreのkind
        options.setNamespace("default"); //Datastoreのnamespace
        options.setJobName("store-job"); //Deployされたあと、一覧に出るJobの名前。
        options.setStreaming(true); //streamingモードの有効化
        Pipeline pipeline = Pipeline.create(options); //パイプライン作成

        PCollection<String> input;
        LOG.info("Reading from PubSub.");
        input = pipeline.apply(PubsubIO.Read.named("subscriber").topic("projects/xxxxxxxxx/topics/test")) //Pub/Subの購読をapply
                        .apply(Window.<String>into(SlidingWindows.of(Duration.standardMinutes(1)).every(Duration.standardSeconds(10)))); //pubsubのinputはunboundedなので、boundedの処理を行うためにwindowを使用(1分間間隔のwindow。10秒毎に新しいwindowを生成)
        input.apply(ParDo.of(new CreateEntityFn(options.getNamespace(), options.getKind()))) //
                .apply(DatastoreIO.v1().write().withProjectId(options.getDataset()));
        pipeline.run();
    }

CreateEntityFnは下記の通り

CreateEntityFn.java
    static class CreateEntityFn extends DoFn<String, Entity> {

        private final String namespace;
        private final String kind;
        private final com.google.datastore.v1.Key ancestorKey;

        CreateEntityFn(String namespace, String kind) {
            this.namespace = namespace;
            this.kind = kind;
            ancestorKey = makeAncestorKey(namespace, kind);
        }

        public Entity makeEntity(String content) {
            Entity.Builder entityBuilder = Entity.newBuilder();
            Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString());
            if (namespace != null) {
                keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
            }
            else {
                //do nothing.
            }
            entityBuilder.setKey(keyBuilder.build());
            entityBuilder.getMutableProperties().put("content", makeValue(content).build());
            LOG.info("create Entity included content : " + content);
            return entityBuilder.build();
        }

        @Override
        public void processElement(ProcessContext c) {
            c.output(makeEntity(c.element()));
        }
    }

Deploy

EclipseのRun→Run Configurationsで下記を作成
スクリーンショット 2016-10-31 15.01.19.png
MainClassを指定。
スクリーンショット 2016-10-31 15.04.05.png
projectIdと
staging Locationを指定。
DataflowPipelineRunnerを使用する。PipelineRunnerに関しては前回の投稿を参照
Run。以上。
スクリーンショット 2016-10-31 15.18.03.png
CloudStorageの指定バケットにStagingとして保存される。
スクリーンショット 2016-10-31 15.19.45.png
Dataflowのコンソールでjobを確認することができる。
スクリーンショット 2016-10-31 15.21.07.png

実行

スクリーンショット 2016-10-31 15.22.16.png

pubsubのtopicを見ると、subscriptionが登録されているのがわかる。
それでは実際にpubsubにpushしてみる。

スクリーンショット 2016-10-31 15.23.33.png
公開ボタンから

スクリーンショット 2016-10-31 15.25.13.png
適当にメッセージング

スクリーンショット 2016-10-31 15.26.05.png
Dataflowの画面では要素数がリアルタイムで1にかわる。

スクリーンショット 2016-10-31 15.28.26.png
Logs→ワーカーログでログを見てみると、Entityが作成されたのがわかる。

スクリーンショット 2016-10-31 15.30.09.png
データストアのdefault名前空間を見てみると、data kindに

スクリーンショット 2016-10-31 15.32.16.png
保存を確認。

次回

前回の投稿で、Cloud Pub/Subで順序性が保証されないことに気付いたので、設計を少し変えようかと。

5
6
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
6