前置き
Google Cloud Functionsから別のFunctionsを呼び出す方法として、Pub/Subを使うやり方を試しました。
構成
やってみる
Pub/Subとは
イベント ドリブン システムとストリーミング分析のためにメッセージングと取り込みを行います。
Cloud Pub/Sub | Google Cloud
Pub/Sub は、イベントを処理するサービスとイベントを生成するサービスを切り離す非同期メッセージング サービスです。
Pub/Sub とは | Cloud Pub/Sub ドキュメント | Google Cloud
用語と登場人物の関係は公式ドキュメントが分かりやすいです。
パブリッシャーとサブスクライバーの関係
大まかな流れは下記のとおりです。
- 事前にトピックを作成
- パブリッシャー(いわゆる呼び出し元)がトピックにメッセージ(データ)を作成(publish)
- サブスクライバー(いわゆる呼び出し先)によってサブスクリプションが作成され、メッセージを受信
- メッセージがすべて受信されると、サブスクリプションが終了
パブリッシャーとサブスクライバーは、一対一ではなく一対多や多対多など自由に設定できます。
GCPで動かす
大まかな手順
- Pub/Subにトピックを作成する
- FunctionsにPub/Subトリガーを作成する
- トピックにメッセージをpublishするFunctionsを作成する
3のHTTPトリガー -> 1のトピック -> 2のFunctions
というフローになります。
1. Pub/Subにトピックを作成する
GCPコンソール > Pub/Sub > トピックから「トピックを作成」します。
トピックIDは任意の文字列を設定します。
2. FunctionsにPub/Subトリガーを作成する
GCPコンソール > Cloud Functionsから「関数を作成」します。
トリガータイプCloud Pub/Sub
にし、1 で作成したトピックを選択し、構成を保存します。
コードのランタイムは好きなものを選んでください。今回はJava11です。
サブスクライブ(Pub/Sub -> Functions)の検証
ここまで設定したら、1 のPub/Subトピックに戻ってトリガーされることを確認します。
GCPコンソール > Pub/Sub > トピックで対象のトピックを開き「メッセージをパブリッシュ」します。
確認したいだけなので「1つのメッセージを公開」で「1回」だけメッセージを追加します。
GCPコンソール > Cloud Functions で対象の関数の「ログ」の開きます。
うまくいっていれば、このようにパブリッシュしたメッセージが表示されます。
3. トピックにメッセージをpublishするFunctionsを作成する
公式ドキュメントのソースを参考に、gradleで作成しました。
トピックへのメッセージのパブリッシュ | Cloud Pub/Sub | Google Cloud
import com.google.cloud.functions.HttpFunction;
import com.google.cloud.functions.HttpRequest;
import com.google.cloud.functions.HttpResponse;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.protobuf.ByteString;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
public class PublishFunction implements HttpFunction {
private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
private static final String TOPIC_NAME = System.getenv("GOOGLE_CLOUD_PUBSUB_TOPIC");
@Override
public void service(HttpRequest request, HttpResponse response) throws Exception {
String message = request.getFirstQueryParameter("message").get();
ByteString byteStr = ByteString.copyFrom(message, StandardCharsets.UTF_8);
PubsubMessage pubsubApiMessage = PubsubMessage.newBuilder().setData(byteStr).build();
try {
Publisher publisher = Publisher.newBuilder(ProjectTopicName.of(PROJECT_ID, TOPIC_NAME)).build();
try {
publisher.publish(pubsubApiMessage).get();
response.setStatusCode(200);
response.getWriter().write(message);
} finally {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
} catch (InterruptedException | ExecutionException e) {
System.out.println("Error publishing Pub/Sub message: " + e.getMessage());
response.setStatusCode(500);
response.getWriter().write(message);
}
}
}
下記を設定
dependencies {
implementation platform("com.google.cloud:libraries-bom:5.3.0");
implementation("com.google.cloud:google-cloud-pubsub");
implementation('com.google.protobuf:protobuf-java:3.13.0')
}
作ったソースをFunctionsに登録します(Cloud Buildからデプロイしました)。
トリガータイプはHTTPにしておきます。
また、ランタイム変数GOOGLE_CLOUD_PROJECT
を設定する必要があります。
TOPIC_NAME
の方はコードに直接記述してもOKですが、今回はランタイム変数として設定しました。
環境変数の使用 | Google Cloud Functions に関するドキュメント
パブリッシュ(Functions -> Pub/Sub)の検証
3 で作った関数のHTTPトリガーをコールします。
URLパラメータmessage
を渡せるようにしているので、ここに任意の文字を設定します。
これが1 のPub/Subにメッセージとしてパブリッシュされ、最終的に 2 の関数に出力されれば成功です。
ローカルで試す
ローカルでGOOGLE_APPLICATION_CREDENTIALS
の環境変数を設定しても、下記の例外が発生します。
ここはうまくやるやり方が分かっていません
java.io.IOException: The Application Default Credentials are not available.
They are available if running in Google Compute Engine.
Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials.
See https://developers.google.com/accounts/docs/application-default-credentials for more information.
最後に
Functionsを連携させるのに何がいいかなと探して、Pub/Subにたどり着きました。
Functionsは起動時間に対して課金されるので、非同期で処理できるのは都合がよさそうです。