SNSピリカのサービスでは App EngineからPub/SubにメッセージをPublish → サブスクリプションフィルタで絞り込み → Cloud FunctionsからCloud Tasks経由で再度App Engineに戻して処理するケースがあります。
ローカル環境で開発するとき、Pub/SubからCloud FunctionsをPushサブスクリプション経由で呼び出したいことがあります。
その際のやり方について、備忘録として掲載します。
前提
以下の構成で動作することを想定しています。
(Inbound Message)
\--> [Pub/Sub]<port=8085>
- トピック: projects/alice/topics/bob
- サブスクリプション: projects/alice/subscriptions/charley
\--> [Cloud Functions]<http://127.0.0.1/emily:8080>
- プロジェクトID: alice
- Cloud Functions
- 関数名: emily
- ローカル環境での動作ポート: 8080
- Pub/Sub
- トピック名: bob
- サブスクリプション名: charley
- ローカル環境での動作ポート: 8085
- pushConfig: 呼び出したいCloud FunctionsのURL(今回はhttp://127.0.0.1:8080)
また、Functionsで立ち上げる関数は、以下の様にメッセージのみを表示するものとします。
import flask
import json
def emily(request: flask.Request):
print("received message!: " + json.dumps(request.get_json()))
return ""
環境構築
以下のフローで動作環境を構築します(本ケースでは、Python3.7, pipenvを利用します)。
- functions-frameworkのインストール
- functions-frameworkを立ち上げる
- gcloudのPub/Sub Emulatorを立ち上がる
- PubSub Emulator上でトピック、サブスクリプションを作成する
なお、2., 3., 4.はいずれも別のターミナル上で実施する必要があります(functions-framework、Pub/Sub Emulatorともにフォアグラウンドにプロセスが動作し続けるため)
1. functions-frameworkのインストール
pipenvを使う場合、以下の様にfunctions-frameworkをPipfileを記載した上でpipenv install
を実行します。
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
flask = "*"
[dev-packages]
flake8 = "*"
autopep8 = "*"
functions-framework = "*"
[scripts]
functions="functions-framework --target emily --debug"
2. functions-frameworkを立ち上げる
pipenvの仮想環境を使って、functions-frameworkを立ち上げます。
pipenv run functions
3. gcloudのPub/Sub Emulatorを立ち上がる
gcloudを使って、Pub/Subのエミュレータを立ち上げます。
gcloud beta emulators pubsub start --project alice
4. Pub/Sub Emulator上でトピック、サブスクリプションを作成する
以下の通りシェルスクリプト等でトピック、サブスクリプションを作成します。
PROJECT="alice"
TOPIC="bob"
SUBSCRIBER="charley"
PUBSUBPORT="8085"
FUNCTIONPORT="8080"
api_url="http://127.0.0.1:${PUBSUBPORT}/v1/projects/${PROJECT}"
# トピックを作成する
curl -X PUT ${api_url}/topics/${TOPIC}
# サブスクリプションを追加する
curl -s -X PUT ${api_url}/subscriptions/${SUBSCRIBER} \
-H "content-type: application/json" \
--data '{
"topic": "projects/'${PROJECT}'/topics/'${TOPIC}'",
"pushConfig":{"pushEndpoint": "http://127.0.0.1:'${FUNCTIONPORT}'"}
}'
# 追加したサブスクリプションを表示する
curl -X GET ${api_url}/subscriptions
問題なく設定できていれば、コンソール上に以下の様に設定内容が出力されます。
{
"subscriptions": [{
"name": "projects/alice/subscriptions/charley",
"topic": "projects/alice/topics/bob",
"pushConfig": {
"pushEndpoint": "http://127.0.0.1:8080"
},
"ackDeadlineSeconds": 10,
"messageRetentionDuration": "604800s"
}]
}
Pub/Sub経由でFunctionsを呼び出す
Pub/Subへのモックメッセージを作成し、Pub/SubのAPIに送信します。
以下の通りシェルスクリプト等でメッセージを作成し、curlでPub/Sub EmulatorにメッセージをPublishします。
PROJECT="alice"
TOPIC="bob"
data='
{
"my_message": "hello world!"
}
'
data_enc=$(echo $data | base64 ) # data should be base64 encoded.
message='
{
"messages": [
{
"data": "'$data_enc'",
"attributes": '$data'
}
]
}
'
url="http://127.0.0.1:8085/v1/projects/$PROJECT/topics/$TOPIC:publish"
curl $url \
-H "Content-Type: application/json" \
-d "$message"
送信後、functions-frameworkを立ち上げているコンソールで以下のメッセージを確認できます。
received message!: {"subscription": "projects/alice/subscriptions/charley", "message": {"data": "eyAibXlfbWVzc2FnZSI6ICJoZWxsbyB3b3JsZCEiIH0K", "messageId": "1", "attributes": {"my_message": "hello world!"}}}
付記: ローカルのApp EngineのPub/Subへの向け先をローカルのものにするには?
以下の通り、App Engine立ち上げ前に環境変数を設定しておくとOKです(リファレンス)。
自動設定する場合
以下コマンドにより、PUBSUB_EMULATOR_HOST=localhost:8085が環境変数に設定されます。
$(gcloud beta emulators pubsub env-init)
手動設定する場合
Pub/Subのローカルエミュレータのホストやポートを変更している場合、以下の様に手動設定します。
export PUBSUB_EMULATOR_HOST=localhost:8085
export PUBSUB_PROJECT_ID=alice
以上になります。ご覧いただきありがとうございます!