LoginSignup
0
2

More than 3 years have passed since last update.

GCP: FunctionsとPub/Subを連携させる

Posted at

前置き

Google Cloud Functionsから別のFunctionsを呼び出す方法として、Pub/Subを使うやり方を試しました。

構成

  • Google Cloud Platform
    • Pub/Sub
    • Cloud Functions (Java11) image.png

やってみる

Pub/Subとは

イベント ドリブン システムとストリーミング分析のためにメッセージングと取り込みを行います。
Cloud Pub/Sub  |  Google Cloud

Pub/Sub は、イベントを処理するサービスとイベントを生成するサービスを切り離す非同期メッセージング サービスです。
Pub/Sub とは  |  Cloud Pub/Sub ドキュメント  |  Google Cloud

用語と登場人物の関係は公式ドキュメントが分かりやすいです。
パブリッシャーとサブスクライバーの関係

大まかな流れは下記のとおりです。
1. 事前にトピックを作成
2. パブリッシャー(いわゆる呼び出し元)がトピックにメッセージ(データ)を作成(publish)
3. サブスクライバー(いわゆる呼び出し先)によってサブスクリプションが作成され、メッセージを受信
4. メッセージがすべて受信されると、サブスクリプションが終了

パブリッシャーとサブスクライバーは、一対一ではなく一対多や多対多など自由に設定できます。

GCPで動かす

大まかな手順

  1. Pub/Subにトピックを作成する
  2. FunctionsにPub/Subトリガーを作成する
  3. トピックにメッセージをpublishするFunctionsを作成する

3のHTTPトリガー -> 1のトピック -> 2のFunctions
というフローになります。

1. Pub/Subにトピックを作成する

GCPコンソール > Pub/Sub > トピックから「トピックを作成」します。
トピックIDは任意の文字列を設定します。

2. FunctionsにPub/Subトリガーを作成する

GCPコンソール > Cloud Functionsから「関数を作成」します。
トリガータイプCloud Pub/Subにし、1 で作成したトピックを選択し、構成を保存します。

コードのランタイムは好きなものを選んでください。今回はJava11です。

サブスクライブ(Pub/Sub -> Functions)の検証

ここまで設定したら、1 のPub/Subトピックに戻ってトリガーされることを確認します。
GCPコンソール > Pub/Sub > トピックで対象のトピックを開き「メッセージをパブリッシュ」します。
確認したいだけなので「1つのメッセージを公開」で「1回」だけメッセージを追加します。

GCPコンソール > Cloud Functions で対象の関数の「ログ」の開きます。
うまくいっていれば、このようにパブリッシュしたメッセージが表示されます。
image.png

3. トピックにメッセージをpublishするFunctionsを作成する

公式ドキュメントのソースを参考に、gradleで作成しました。
トピックへのメッセージのパブリッシュ  |  Cloud Pub/Sub  |  Google Cloud

PublishFunction.java
import com.google.cloud.functions.HttpFunction;
import com.google.cloud.functions.HttpRequest;
import com.google.cloud.functions.HttpResponse;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.protobuf.ByteString;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;

public class PublishFunction implements HttpFunction {
    private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
    private static final String TOPIC_NAME = System.getenv("GOOGLE_CLOUD_PUBSUB_TOPIC");

    @Override
    public void service(HttpRequest request, HttpResponse response) throws Exception {
        String message = request.getFirstQueryParameter("message").get();
        ByteString byteStr = ByteString.copyFrom(message, StandardCharsets.UTF_8);
        PubsubMessage pubsubApiMessage = PubsubMessage.newBuilder().setData(byteStr).build();

        try {
            Publisher publisher = Publisher.newBuilder(ProjectTopicName.of(PROJECT_ID, TOPIC_NAME)).build();
            try {
                publisher.publish(pubsubApiMessage).get();
                response.setStatusCode(200);
                response.getWriter().write(message);
            } finally {
                publisher.shutdown();
                publisher.awaitTermination(1, TimeUnit.MINUTES);
            }
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("Error publishing Pub/Sub message: " + e.getMessage());
            response.setStatusCode(500);
            response.getWriter().write(message);
        }
    }
}

下記を設定

build.gradle
dependencies {
    implementation platform("com.google.cloud:libraries-bom:5.3.0");
    implementation("com.google.cloud:google-cloud-pubsub");
    implementation('com.google.protobuf:protobuf-java:3.13.0')
}

作ったソースをFunctionsに登録します(Cloud Buildからデプロイしました)。
トリガータイプはHTTPにしておきます。

また、ランタイム変数GOOGLE_CLOUD_PROJECTを設定する必要があります。
TOPIC_NAMEの方はコードに直接記述してもOKですが、今回はランタイム変数として設定しました。
環境変数の使用  |  Google Cloud Functions に関するドキュメント

パブリッシュ(Functions -> Pub/Sub)の検証

3 で作った関数のHTTPトリガーをコールします。
URLパラメータmessageを渡せるようにしているので、ここに任意の文字を設定します。
これが1 のPub/Subにメッセージとしてパブリッシュされ、最終的に 2 の関数に出力されれば成功です。

ローカルで試す

ローカルでGOOGLE_APPLICATION_CREDENTIALSの環境変数を設定しても、下記の例外が発生します。
ここはうまくやるやり方が分かっていません :thinking:

java.io.IOException: The Application Default Credentials are not available. 
They are available if running in Google Compute Engine. 
Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. 
See https://developers.google.com/accounts/docs/application-default-credentials for more information.

最後に

Functionsを連携させるのに何がいいかなと探して、Pub/Subにたどり着きました。
Functionsは起動時間に対して課金されるので、非同期で処理できるのは都合がよさそうです。

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2