GYAOのtsです。
我々のチームは、オールパブリッククラウドで、Microservice Architectureを採用した次期バックエンドを設計中です。
経緯
前回の投稿で、DataFlowの前提知識をいくつかまとめたので、早速ハンズオンしてみる。
今回の目指すところ
今回もチーム内のメンバー全員でイベントチックに構築してみる。
やりたいことは以下。
- pubsubにrest形式でメッセージをpublish
- dataflowのstream modeで処理を行う
- メッセージをsubscribe
- ランダムな文字列を発行
- 発行した文字列をキーにCloudDataStoreに登録。
pubsubにrest形式でメッセージをpublishする前に、Apigeeを置いて連携したいが、それは後ほど。
準備
Cloud Storage
Cloud Storageに転送→同時にGCEのインスタンスが起動→deployのような形になっているようなので、転送先のバケットを作成する。(Eclipseからも作成可能。)
↓
↓
flow-stagingという名前で作成。
Cloud Pub/Sub
Cloud Pub/SubからStreamデータを購読してその内容をstoreするので、topicを作成する。
↓
↓
テストトピックを作成。
Cloud DataStore
特に準備の必要はなし。
setup
ここからはオフィシャルに沿ってすすめる。
Cloud SDK
![スクリーンショット 2016-10-30 0.07.40.png](https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.amazonaws.com%2F0%2F114329%2F66e3733b-4d2b-ee17-b816-7a0bbb6f9ca4.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&s=698bf93291a6e51693977f9cde294488)
- 1~3は普通に作成、有効にする。
- 4 Eclipseで開発するマシンにCloudSDKをインストール。
- install後、下記を実行。
gcloud beta auth application-default login
- eclipseからのauthの設定 こちらのオフィシャルを参照。
- install後、下記を実行。
- 5、6もその通りにEclipse Luna以上、JDK1.7以上の構成で。
Eclipse
今回はEclipse neonを使用。
jdk1.8
Eclipseにはアップデートサイトhttps://dl.google.com/dataflow/eclipse/ からプラグインをインストールしておく。
Pipeline作成
Eclipseプラグインのお陰で簡単にプロジェクトが作成できる。(Mavenでも構築可能)
すすめるとProjectIDとLocationをの入力画面があるので、入力する。
項目 | 概要 |
---|---|
ProjectID | 自分のプロジェクトのID |
Location | 先程作成したCloudStorageのLocation。この場合はgs://flow-staging プルダウンで出てきます。 |
Coding
Pipelineは指定されたクラスのmainメソッドの中で構築する。Google提供のサンプルを少しいじって以下のようにしてみた。
public static void main(String[] args) throws IOException {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setDataset("xxxxxx"); //ProjectId
options.setKind("data"); //Datastoreのkind
options.setNamespace("default"); //Datastoreのnamespace
options.setJobName("store-job"); //Deployされたあと、一覧に出るJobの名前。
options.setStreaming(true); //streamingモードの有効化
Pipeline pipeline = Pipeline.create(options); //パイプライン作成
PCollection<String> input;
LOG.info("Reading from PubSub.");
input = pipeline.apply(PubsubIO.Read.named("subscriber").topic("projects/xxxxxxxxx/topics/test")) //Pub/Subの購読をapply
.apply(Window.<String>into(SlidingWindows.of(Duration.standardMinutes(1)).every(Duration.standardSeconds(10)))); //pubsubのinputはunboundedなので、boundedの処理を行うためにwindowを使用(1分間間隔のwindow。10秒毎に新しいwindowを生成)
input.apply(ParDo.of(new CreateEntityFn(options.getNamespace(), options.getKind()))) //
.apply(DatastoreIO.v1().write().withProjectId(options.getDataset()));
pipeline.run();
}
CreateEntityFnは下記の通り
static class CreateEntityFn extends DoFn<String, Entity> {
private final String namespace;
private final String kind;
private final com.google.datastore.v1.Key ancestorKey;
CreateEntityFn(String namespace, String kind) {
this.namespace = namespace;
this.kind = kind;
ancestorKey = makeAncestorKey(namespace, kind);
}
public Entity makeEntity(String content) {
Entity.Builder entityBuilder = Entity.newBuilder();
Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString());
if (namespace != null) {
keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
}
else {
//do nothing.
}
entityBuilder.setKey(keyBuilder.build());
entityBuilder.getMutableProperties().put("content", makeValue(content).build());
LOG.info("create Entity included content : " + content);
return entityBuilder.build();
}
@Override
public void processElement(ProcessContext c) {
c.output(makeEntity(c.element()));
}
}
Deploy
EclipseのRun→Run Configurationsで下記を作成
MainClassを指定。
projectIdと
staging Locationを指定。
DataflowPipelineRunnerを使用する。PipelineRunnerに関しては前回の投稿を参照
Run。以上。
CloudStorageの指定バケットにStagingとして保存される。
Dataflowのコンソールでjobを確認することができる。
実行
![スクリーンショット 2016-10-31 15.22.16.png](https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.amazonaws.com%2F0%2F114329%2Fd97acf94-3945-b4f9-0aaf-d14784ff7a7b.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&s=f7333db924db3673332b18a6d1ddf61a)
pubsubのtopicを見ると、subscriptionが登録されているのがわかる。
それでは実際にpubsubにpushしてみる。
![スクリーンショット 2016-10-31 15.23.33.png](https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.amazonaws.com%2F0%2F114329%2Fc2e8f095-37a5-c297-f2ae-633b6392e394.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&s=32d9996d911a80d2a5f01101a3a74f42)
![スクリーンショット 2016-10-31 15.25.13.png](https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.amazonaws.com%2F0%2F114329%2F1e6cdaca-77eb-f1b3-e21e-a2b136a089c4.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&s=814b421616ddff5c0af3ea069fffeda9)
![スクリーンショット 2016-10-31 15.26.05.png](https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.amazonaws.com%2F0%2F114329%2F61d006bf-bd7c-1a8d-8d1e-3c7aee01d864.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&s=86448f382ce6b2aea9e0fb4401188212)
![スクリーンショット 2016-10-31 15.28.26.png](https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.amazonaws.com%2F0%2F114329%2F3bb4b25c-75d2-02d8-f41c-aad0776812d2.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&s=096d8331485a39c4c46ea2eff7979a25)
![スクリーンショット 2016-10-31 15.30.09.png](https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.amazonaws.com%2F0%2F114329%2F2cb0acf7-9a89-2828-885b-c03bb2a6ca9f.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&s=5946f9dbbd0598c2f68ddb92b55b472b)
![スクリーンショット 2016-10-31 15.32.16.png](https://qiita-user-contents.imgix.net/https%3A%2F%2Fqiita-image-store.s3.amazonaws.com%2F0%2F114329%2F91248c23-9087-94cd-2abb-6cf719e2c2e5.png?ixlib=rb-4.0.0&auto=format&gif-q=60&q=75&s=47c3ed7d94440c99fb8b5674106954db)
次回
前回の投稿で、Cloud Pub/Subで順序性が保証されないことに気付いたので、設計を少し変えようかと。