GCP - Pub/Sub サービス構築チュートリアルからの続き
Pub/Sub サービスの作成と利用手順をまとめました。
サービス概要
これまでの知識を踏まえて下記の機能を実現したサービスを実装してみます。
- トピックを作成する
- サブスクリプションを作成する
- サブスクリライブしているメッセージを取得する
なお、サンプルソースはGit Hubに上げてあるのでご自由のご確認ください。
開発環境設定
認証情報をセット
export GOOGLE_APPLICATION_CREDENTIALS=path/to/file.json
ディレクトリ構造
ソースはGit Hubにあります。
|-- minarai-pub-sub
| |-- app.yaml
| |-- main.py
| |-- publisher.py
| |-- subscriber.py
| `-- requirements.txt
app.yaml
app.yaml
runtime: python37
env: standard
service: default
entrypoint: gunicorn -b :$PORT main:app
env_variables:
# project id
PROJECT_ID:
FLASK_ENV: development
DEBUG: True
automatic_scaling:
min_idle_instances: automatic
max_idle_instances: automatic
min_pending_latency: automatic
max_pending_latency: automatic
requirements.txt
requirements.txt
Flask==1.1.2
google-cloud-pubsub==1.4.3
gunicorn==20.0.4; python_version > '3.0'
gunicorn==19.10.0; python_version < '3.0'
publisher.py
publisher.py
#!/usr/bin/env python
import logging
import argparse
import os
import time
from google.cloud import pubsub, pubsub_v1
def get_topics_list(project_id: str):
"""Lists all Pub/Sub topics in the given project
Arguments:
project_id {str} -- project id
"""
# [START pubsub_list_topics]
publisher = pubsub.PublisherClient()
project_path = publisher.project_path(project_id)
topic_list = publisher.list_topics(project_path)
logging.error(vars(topic_list))
return 1
# print("befor for")
# print(vars(topic_list))
# for topic in topic_list:
# print("----")
# print(topic)
# [END pubsub_list_topics]
def register_topic(project_id: str, topic_name: str):
"""Create a new Pub/Sub topic.
Arguments:
project_id {str} -- project jid
topic_name {str} -- topic name
"""
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
logging.info("topic_path: {}".format(topic_path))
topic = publisher.create_topic(topic_path)
logging.info("Topic created: {}".format(topic))
def delete(project_id: set, topic_name: str):
"""delete message
Arguments:
project_id {str} -- project id
topic_name {str} -- topic_name
"""
# [START pubsub_delete_topic]
publisher = pubsub.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
publisher.delete_topic(topic_path)
logging.info("Topic deleted: {}".format(topic_path))
# [END pubsub_delete_topic]
def publish_message(project_id: str, topic_name: str, message: str):
"""Publishes message
Arguments:
project_id {str} -- project_id
topic_name {str} -- topic_name
message {str} -- message
"""
publisher = pubsub.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_name}`
topic_path = publisher.topic_path(project_id, topic_name)
data = message.encode("utf-8")
logging.info(topic_path)
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data)
message_id = future.result()
logging.info(message_id)
logging.info("Published messages.")
# [END pubsub_quickstart_publisher]
# [END pubsub_publish]
def pub(project_id, topic_name, message):
"""Publishes a message to a Pub/Sub topic."""
# [START pubsub_quickstart_pub_client]
# Initialize a Publisher client.
client = pubsub_v1.PublisherClient()
# [END pubsub_quickstart_pub_client]
# Create a fully qualified identifier in the form of
# `projects/{project_id}/topics/{topic_name}`
topic_path = client.topic_path(project_id, topic_name)
# Data sent to Cloud Pub/Sub must be a bytestring.
data = message.encode("utf-8")
# Keep track of the number of published messages.
ref = dict({"num_messages": 0})
# When you publish a message, the client returns a future.
api_future = client.publish(topic_path, data=data)
api_future.add_done_callback(get_callback(api_future, data, ref))
# Keep the main thread from exiting while the message future
# gets resolved in the background.
while api_future.running():
time.sleep(0.5)
logging.info("Published {} message(s).".format(ref["num_messages"]))
def get_callback(api_future, data, ref):
"""Wrap message data in the context of the callback function."""
def callback(api_future):
try:
logging.info(
"Published message {} now has message ID {}".format(
data, api_future.result()
)
)
ref["num_messages"] += 1
except Exception:
logging.info(
"A problem occurred when publishing {}: {}\n".format(
data, api_future.exception()
)
)
raise
return callback
if __name__ == "__main__":
# project_id = "Google Cloud Project ID"
project_id = os.environ.get('PROJECT_ID')
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("command", help="publisher action is get | create | delete | publish")
parser.add_argument("--project_id")
parser.add_argument("--topic_name")
parser.add_argument("--message")
args = parser.parse_args()
if args.command == "get":
if not args.project_id == None:
project_id = args.project_id
get_topics_list(project_id)
elif args.command == "create":
create_topic(project_id, args.topic_name)
elif args.command == "delete":
delete_topic(project_id, args.topic_name)
elif args.command == "publish":
pub(project_id, args.topic_name, args.message)
subscriber.py
subscriber.py
#!/usr/bin/env python
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
# [START pubsub_quickstart_sub_all]
import argparse
# [START pubsub_quickstart_sub_deps]
from google.cloud import pubsub_v1
# [END pubsub_quickstart_sub_deps]
def register_subscription(project_id: str, topic_name: str, subscription_name: str):
"""create subscription
Arguments:
project_id {str} -- [description]
topic_name {str} -- [description]
subscription_name {str} -- [description]
"""
subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path(project_id, topic_name)
subscription_path = subscriber.subscription_path(
project_id, subscription_name
)
subscription = subscriber.create_subscription(
subscription_path, topic_path
)
logging.info("Subscription created: {}".format(subscription))
subscriber.close()
def sub(project_id: str, subscription_name: str):
"""get message
Arguments:
project_id {str} -- [description]
subscription_name {str} -- [description]
Returns:
list -- message.message_id and message.data
"""
# [START pubsub_quickstart_sub_client]
# Initialize a Subscriber client
subscriber_client = pubsub_v1.SubscriberClient()
# [END pubsub_quickstart_sub_client]
# Create a fully qualified identifier in the form of
# `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber_client.subscription_path(
project_id, subscription_name
)
timeout=5.0
result = dict()
def callback(message):
logging.warning(
"Received message {} of message ID {}\n".format(message, message.message_id)
)
# Acknowledge the message. Unack'ed messages will be redelivered.
message.ack()
return message
streaming_pull_future = subscriber_client.subscribe(
subscription_path, callback=callback
)
try:
# Calling result() on StreamingPullFuture keeps the main thread from
# exiting while messages get processed in the callbacks.
result = streaming_pull_future.result()
# logging.warning("streaming_pull_future.result: {}".format(result))
except Exception as e: # noqa
logging.exception(
"Listening for messages on {} threw an exception: {}.".format(
subscription_name, e
)
)
streaming_pull_future.cancel()
logging.warning(result)
subscriber_client.close()
return result
def sync_sub(project_id: str, subscription_name: str):
"""同期取得
Arguments:
project_id {str} -- [description]
subscription_name {str} -- [description]
Returns:
list -- public_message
"""
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name
)
NUM_MESSAGES = 3
# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)
if not response or len(response.received_messages) == 0:
return []
public_message = []
ack_ids = []
for received_message in response.received_messages:
logging.info("Received: {}".format(received_message.message.data))
ack_ids.append(received_message.ack_id)
public_message.append(
{
"message_id": received_message.message.message_id,
"message_data": received_message.message.data.decode("utf-8")
}
)
# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(subscription_path, ack_ids)
logging.info(
"Received and acknowledged {} messages. Done.".format(
len(response.received_messages)
)
)
subscriber.close()
return public_message
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("project_id", help="Google Cloud project ID")
parser.add_argument("subscription_name", help="Pub/Sub subscription name")
args = parser.parse_args()
sub(args.project_id, args.subscription_name)
# [END pubsub_quickstart_sub_all]
main.py
main.py
#!/usr/bin/env python
import json
import logging
import os
from flask import Flask, request, jsonify, Response
from publisher import register_topic, delete, get_topics_list, pub
from subscriber import sub, sync_sub, register_subscription
app = Flask(__name__)
app.config['JSON_AS_ASCII'] = False
ALLOWED_EXTENSIONS = {'csv'}
@app.route('/_ah/push-handlers//topic/create', methods=['POST'])
def add_topic():
payload = request.json
topic_name = payload.get('topic_name')
project_id = os.environ.get('PROJECT_ID')
register_topic(project_id, topic_name)
return jsonify({
"result": "201 Created"
})
@app.route('/_ah/push-handlers//push', methods=['POST'])
def push_message():
payload = request.json
topic_name = payload.get('topic_name')
message = payload.get('message')
logging.info("topic : {}".format(topic_name))
logging.info("message : {}".format(message))
project_id = os.environ.get('PROJECT_ID')
pub(project_id, topic_name, message)
return jsonify({
"result": "201 Created"
})
@app.route('/_ah/push-handlers//get', methods=['POST'])
def pull_messages():
payload = request.json
subscription_name = payload.get('subscription_name')
logging.warning("test")
project_id = os.environ.get('PROJECT_ID')
message = sync_sub(project_id, subscription_name)
return Response(json.dumps(message), mimetype='application/json')
@app.route('/_ah/push-handlers//subscription/create', methods=['POST'])
def add_subscription():
payload = request.json
topic_name = payload.get('topic_name')
message = payload.get('subscription_name')
project_id = os.environ.get('PROJECT_ID')
register_subscription(project_id, topic_name, subscription_name)
if __name__ == '__main__':
app.run()
モジュールインストール
Python venvの使い方
ローカルで実行する際はモジュールをインストールして実行します
% python3 -m venv minarai-pub-sub-env
% source minarai-pub-sub-env//bin/activate
% pip3 install -r requirements.txt
% pyton3 main.py
デプロイ
gcloud app deploy