LoginSignup
1
2

More than 3 years have passed since last update.

GCP - Pub/Sub サービスの利用

Last updated at Posted at 2020-05-19

GCP - Pub/Sub サービス構築チュートリアルからの続き
Pub/Sub サービスの作成と利用手順をまとめました。

サービス概要

これまでの知識を踏まえて下記の機能を実現したサービスを実装してみます。
1. トピックを作成する
2. サブスクリプションを作成する
3. サブスクリライブしているメッセージを取得する

なお、サンプルソースはGit Hubに上げてあるのでご自由のご確認ください。

開発環境設定

認証情報をセット

export GOOGLE_APPLICATION_CREDENTIALS=path/to/file.json

ディレクトリ構造

ソースはGit Hubにあります。

|-- minarai-pub-sub
|   |-- app.yaml
|   |-- main.py
|   |-- publisher.py
|   |-- subscriber.py
|   `-- requirements.txt

app.yaml

app.yaml
runtime: python37
env: standard
service: default
entrypoint: gunicorn -b :$PORT main:app

env_variables:
  # project id
  PROJECT_ID: 
  FLASK_ENV: development
  DEBUG: True

automatic_scaling:
  min_idle_instances: automatic
  max_idle_instances: automatic
  min_pending_latency: automatic
  max_pending_latency: automatic

requirements.txt

requirements.txt
Flask==1.1.2
google-cloud-pubsub==1.4.3
gunicorn==20.0.4; python_version > '3.0'
gunicorn==19.10.0; python_version < '3.0'

publisher.py

publisher.py
#!/usr/bin/env python
import logging
import argparse
import os
import time
from google.cloud import pubsub, pubsub_v1

def get_topics_list(project_id: str):
    """Lists all Pub/Sub topics in the given project

    Arguments:
        project_id {str} -- project id
    """
    # [START pubsub_list_topics]
    publisher = pubsub.PublisherClient()
    project_path = publisher.project_path(project_id)
    topic_list = publisher.list_topics(project_path)
    logging.error(vars(topic_list))
    return 1



    # print("befor for")
    # print(vars(topic_list))
    # for topic in topic_list:
    #     print("----")
    #     print(topic)
    # [END pubsub_list_topics]

def register_topic(project_id: str, topic_name: str):
    """Create a new Pub/Sub topic.

    Arguments:
        project_id {str} -- project jid
        topic_name {str} -- topic name
    """
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_name)
    logging.info("topic_path: {}".format(topic_path))

    topic = publisher.create_topic(topic_path)

    logging.info("Topic created: {}".format(topic))

def delete(project_id: set, topic_name: str):
    """delete message

    Arguments:
        project_id {str} -- project id
        topic_name {str} -- topic_name
    """
    # [START pubsub_delete_topic]
    publisher = pubsub.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_name)

    publisher.delete_topic(topic_path)

    logging.info("Topic deleted: {}".format(topic_path))
    # [END pubsub_delete_topic]


def publish_message(project_id: str, topic_name: str, message: str):
    """Publishes message

    Arguments:
        project_id {str} -- project_id
        topic_name {str} -- topic_name
        message {str}    -- message
    """

    publisher = pubsub.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_name}`
    topic_path = publisher.topic_path(project_id, topic_name)

    data = message.encode("utf-8")
    logging.info(topic_path)
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data)
    message_id = future.result()
    logging.info(message_id)


    logging.info("Published messages.")
    # [END pubsub_quickstart_publisher]
    # [END pubsub_publish]

def pub(project_id, topic_name, message):
    """Publishes a message to a Pub/Sub topic."""
    # [START pubsub_quickstart_pub_client]
    # Initialize a Publisher client.
    client = pubsub_v1.PublisherClient()
    # [END pubsub_quickstart_pub_client]
    # Create a fully qualified identifier in the form of
    # `projects/{project_id}/topics/{topic_name}`
    topic_path = client.topic_path(project_id, topic_name)

    # Data sent to Cloud Pub/Sub must be a bytestring.
    data = message.encode("utf-8")

    # Keep track of the number of published messages.
    ref = dict({"num_messages": 0})

    # When you publish a message, the client returns a future.
    api_future = client.publish(topic_path, data=data)
    api_future.add_done_callback(get_callback(api_future, data, ref))

    # Keep the main thread from exiting while the message future
    # gets resolved in the background.
    while api_future.running():
        time.sleep(0.5)
        logging.info("Published {} message(s).".format(ref["num_messages"]))

def get_callback(api_future, data, ref):
    """Wrap message data in the context of the callback function."""

    def callback(api_future):
        try:
            logging.info(
                "Published message {} now has message ID {}".format(
                    data, api_future.result()
                )
            )
            ref["num_messages"] += 1
        except Exception:
            logging.info(
                "A problem occurred when publishing {}: {}\n".format(
                    data, api_future.exception()
                )
            )
            raise

    return callback

if __name__ == "__main__":
    # project_id = "Google Cloud Project ID"
    project_id = os.environ.get('PROJECT_ID')

    parser = argparse.ArgumentParser(
        description=__doc__,
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    parser.add_argument("command", help="publisher action is get | create | delete | publish")

    parser.add_argument("--project_id")
    parser.add_argument("--topic_name")
    parser.add_argument("--message")

    args = parser.parse_args()

    if args.command == "get":
        if not args.project_id == None:
            project_id = args.project_id
        get_topics_list(project_id)
    elif args.command == "create":
        create_topic(project_id, args.topic_name)
    elif args.command == "delete":
        delete_topic(project_id, args.topic_name)
    elif args.command == "publish":
        pub(project_id, args.topic_name, args.message)

subscriber.py

subscriber.py
#!/usr/bin/env python

# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
# [START pubsub_quickstart_sub_all]
import argparse

# [START pubsub_quickstart_sub_deps]
from google.cloud import pubsub_v1

# [END pubsub_quickstart_sub_deps]

def register_subscription(project_id: str, topic_name: str, subscription_name: str):
    """create subscription

    Arguments:
        project_id {str} -- [description]
        topic_name {str} -- [description]
        subscription_name {str} -- [description]
    """
    subscriber = pubsub_v1.SubscriberClient()
    topic_path = subscriber.topic_path(project_id, topic_name)
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    subscription = subscriber.create_subscription(
        subscription_path, topic_path
    )

    logging.info("Subscription created: {}".format(subscription))

    subscriber.close()

def sub(project_id: str, subscription_name: str):
    """get message

    Arguments:
        project_id {str} -- [description]
        subscription_name {str} -- [description]

    Returns:
        list -- message.message_id and message.data
    """
    # [START pubsub_quickstart_sub_client]
    # Initialize a Subscriber client
    subscriber_client = pubsub_v1.SubscriberClient()
    # [END pubsub_quickstart_sub_client]
    # Create a fully qualified identifier in the form of
    # `projects/{project_id}/subscriptions/{subscription_name}`
    subscription_path = subscriber_client.subscription_path(
        project_id, subscription_name
    )

    timeout=5.0
    result = dict()
    def callback(message):
        logging.warning(
            "Received message {} of message ID {}\n".format(message, message.message_id)
        )
        # Acknowledge the message. Unack'ed messages will be redelivered.
        message.ack()
        return message

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path, callback=callback
    )

    try:
        # Calling result() on StreamingPullFuture keeps the main thread from
        # exiting while messages get processed in the callbacks.
        result = streaming_pull_future.result()

        # logging.warning("streaming_pull_future.result: {}".format(result))
    except Exception as e:  # noqa
        logging.exception(
            "Listening for messages on {} threw an exception: {}.".format(
                subscription_name, e
            )
        )
        streaming_pull_future.cancel()


    logging.warning(result)

    subscriber_client.close()

    return result

def sync_sub(project_id: str, subscription_name: str):
    """同期取得

    Arguments:
        project_id {str} -- [description]
        subscription_name {str} -- [description]

    Returns:
        list -- public_message
    """
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    NUM_MESSAGES = 3

    # The subscriber pulls a specific number of messages.
    response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

    if not response or len(response.received_messages) == 0:
        return []

    public_message = []
    ack_ids = []
    for received_message in response.received_messages:
        logging.info("Received: {}".format(received_message.message.data))
        ack_ids.append(received_message.ack_id)
        public_message.append(
            {
                "message_id": received_message.message.message_id,
                "message_data": received_message.message.data.decode("utf-8")
            }
        )

    # Acknowledges the received messages so they will not be sent again.
    subscriber.acknowledge(subscription_path, ack_ids)

    logging.info(
        "Received and acknowledged {} messages. Done.".format(
            len(response.received_messages)
        )
    )

    subscriber.close()

    return public_message


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__,
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    parser.add_argument("project_id", help="Google Cloud project ID")
    parser.add_argument("subscription_name", help="Pub/Sub subscription name")

    args = parser.parse_args()

    sub(args.project_id, args.subscription_name)
# [END pubsub_quickstart_sub_all]

main.py

main.py
#!/usr/bin/env python
import json
import logging
import os

from flask import Flask, request, jsonify, Response
from publisher import register_topic, delete, get_topics_list, pub
from subscriber import sub, sync_sub, register_subscription

app = Flask(__name__)
app.config['JSON_AS_ASCII'] = False

ALLOWED_EXTENSIONS = {'csv'}

@app.route('/_ah/push-handlers//topic/create',  methods=['POST'])
def add_topic():
    payload = request.json
    topic_name = payload.get('topic_name')
    project_id = os.environ.get('PROJECT_ID')

    register_topic(project_id, topic_name)

    return jsonify({
        "result": "201 Created"
    })

@app.route('/_ah/push-handlers//push',  methods=['POST'])
def push_message():
    payload = request.json
    topic_name = payload.get('topic_name')
    message = payload.get('message')

    logging.info("topic : {}".format(topic_name))
    logging.info("message : {}".format(message))

    project_id = os.environ.get('PROJECT_ID')

    pub(project_id, topic_name, message)

    return jsonify({
        "result": "201 Created"
    })

@app.route('/_ah/push-handlers//get',  methods=['POST'])
def pull_messages():
    payload = request.json
    subscription_name = payload.get('subscription_name')
    logging.warning("test")
    project_id = os.environ.get('PROJECT_ID')

    message = sync_sub(project_id, subscription_name)

    return Response(json.dumps(message),  mimetype='application/json')

@app.route('/_ah/push-handlers//subscription/create',  methods=['POST'])
def add_subscription():
    payload = request.json
    topic_name = payload.get('topic_name')
    message = payload.get('subscription_name')

    project_id = os.environ.get('PROJECT_ID')

    register_subscription(project_id, topic_name, subscription_name)

if __name__ == '__main__':
  app.run()

モジュールインストール

Python venvの使い方
ローカルで実行する際はモジュールをインストールして実行します

% python3 -m venv minarai-pub-sub-env
% source minarai-pub-sub-env//bin/activate

% pip3 install -r requirements.txt
% pyton3 main.py

デプロイ

gcloud app deploy
1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2