5
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

GoogleCloudで汎用Database構築2 - DataFlow2 -

Last updated at Posted at 2016-10-31

GYAOのtsです。
我々のチームは、オールパブリッククラウドで、Microservice Architectureを採用した次期バックエンドを設計中です。

経緯

前回の投稿で、DataFlowの前提知識をいくつかまとめたので、早速ハンズオンしてみる。

今回の目指すところ

今回もチーム内のメンバー全員でイベントチックに構築してみる。
やりたいことは以下。

  1. pubsubにrest形式でメッセージをpublish
  2. dataflowのstream modeで処理を行う
    1. メッセージをsubscribe
    2. ランダムな文字列を発行
    3. 発行した文字列をキーにCloudDataStoreに登録。

pubsubにrest形式でメッセージをpublishする前に、Apigeeを置いて連携したいが、それは後ほど。

準備

Cloud Storage

Cloud Storageに転送→同時にGCEのインスタンスが起動→deployのような形になっているようなので、転送先のバケットを作成する。(Eclipseからも作成可能。)
スクリーンショット 2016-10-29 23.06.16.png

スクリーンショット 2016-10-29 23.10.43.png

スクリーンショット 2016-10-29 23.16.15.png
flow-stagingという名前で作成。

Cloud Pub/Sub

Cloud Pub/SubからStreamデータを購読してその内容をstoreするので、topicを作成する。
スクリーンショット 2016-10-29 23.19.26.png

スクリーンショット 2016-10-29 23.22.26.png

スクリーンショット 2016-10-29 23.24.33.png
テストトピックを作成。

Cloud DataStore

特に準備の必要はなし。

setup

ここからはオフィシャルに沿ってすすめる。

Cloud SDK

スクリーンショット 2016-10-30 0.07.40.png Source: https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-eclipse
  • 1~3は普通に作成、有効にする。
  • 4 Eclipseで開発するマシンにCloudSDKをインストール。
  • 5、6もその通りにEclipse Luna以上、JDK1.7以上の構成で。

Eclipse

今回はEclipse neonを使用。
jdk1.8
Eclipseにはアップデートサイトhttps://dl.google.com/dataflow/eclipse/ からプラグインをインストールしておく。

Pipeline作成

Eclipseプラグインのお陰で簡単にプロジェクトが作成できる。(Mavenでも構築可能
スクリーンショット 2016-10-30 0.29.13.png
スクリーンショット 2016-10-30 0.33.36.png
すすめるとProjectIDとLocationをの入力画面があるので、入力する。

項目 概要
ProjectID 自分のプロジェクトのID
Location 先程作成したCloudStorageのLocation。この場合はgs://flow-staging プルダウンで出てきます。

Coding

Pipelineは指定されたクラスのmainメソッドの中で構築する。Google提供のサンプルを少しいじって以下のようにしてみた。

StoreJob.java

	public static void main(String[] args) throws IOException {

		Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
		options.setDataset("xxxxxx"); //ProjectId
		options.setKind("data"); //Datastoreのkind
		options.setNamespace("default"); //Datastoreのnamespace
		options.setJobName("store-job"); //Deployされたあと、一覧に出るJobの名前。
		options.setStreaming(true); //streamingモードの有効化
		Pipeline pipeline = Pipeline.create(options); //パイプライン作成

		PCollection<String> input;
		LOG.info("Reading from PubSub.");
		input = pipeline.apply(PubsubIO.Read.named("subscriber").topic("projects/xxxxxxxxx/topics/test")) //Pub/Subの購読をapply
						.apply(Window.<String>into(SlidingWindows.of(Duration.standardMinutes(1)).every(Duration.standardSeconds(10)))); //pubsubのinputはunboundedなので、boundedの処理を行うためにwindowを使用(1分間間隔のwindow。10秒毎に新しいwindowを生成)
		input.apply(ParDo.of(new CreateEntityFn(options.getNamespace(), options.getKind()))) //
				.apply(DatastoreIO.v1().write().withProjectId(options.getDataset()));
		pipeline.run();
	}

CreateEntityFnは下記の通り

CreateEntityFn.java
	static class CreateEntityFn extends DoFn<String, Entity> {

		private final String namespace;
		private final String kind;
		private final com.google.datastore.v1.Key ancestorKey;

		CreateEntityFn(String namespace, String kind) {
			this.namespace = namespace;
			this.kind = kind;
			ancestorKey = makeAncestorKey(namespace, kind);
		}

		public Entity makeEntity(String content) {
			Entity.Builder entityBuilder = Entity.newBuilder();
			Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString());
			if (namespace != null) {
				keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
			}
			else {
				//do nothing.
			}
			entityBuilder.setKey(keyBuilder.build());
			entityBuilder.getMutableProperties().put("content", makeValue(content).build());
			LOG.info("create Entity included content : " + content);
			return entityBuilder.build();
		}

		@Override
		public void processElement(ProcessContext c) {
			c.output(makeEntity(c.element()));
		}
	}

Deploy

EclipseのRun→Run Configurationsで下記を作成
スクリーンショット 2016-10-31 15.01.19.png
MainClassを指定。
スクリーンショット 2016-10-31 15.04.05.png
projectIdと
staging Locationを指定。
DataflowPipelineRunnerを使用する。PipelineRunnerに関しては前回の投稿を参照
Run。以上。
スクリーンショット 2016-10-31 15.18.03.png
CloudStorageの指定バケットにStagingとして保存される。
スクリーンショット 2016-10-31 15.19.45.png
Dataflowのコンソールでjobを確認することができる。
スクリーンショット 2016-10-31 15.21.07.png

実行

スクリーンショット 2016-10-31 15.22.16.png

pubsubのtopicを見ると、subscriptionが登録されているのがわかる。
それでは実際にpubsubにpushしてみる。

スクリーンショット 2016-10-31 15.23.33.png 公開ボタンから スクリーンショット 2016-10-31 15.25.13.png 適当にメッセージング スクリーンショット 2016-10-31 15.26.05.png Dataflowの画面では要素数がリアルタイムで1にかわる。 スクリーンショット 2016-10-31 15.28.26.png Logs→ワーカーログでログを見てみると、Entityが作成されたのがわかる。 スクリーンショット 2016-10-31 15.30.09.png データストアのdefault名前空間を見てみると、data kindに スクリーンショット 2016-10-31 15.32.16.png 保存を確認。

次回

前回の投稿で、Cloud Pub/Subで順序性が保証されないことに気付いたので、設計を少し変えようかと。

5
6
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?