RabbitMQ
openstack

OpenStackメッセージング機構を探ってみる

OpenStackメッセージング機構で活用されているRabbitMQ連携について、色々と探ってみたいと思います。

■ OpenStackメッセージング機構とは ...

  • オープンソースなMQ機構をベースに、メッセージ転送処理を実現する(主に、RabbitMQ)
  • Nova/Heatなど、OpenStack内部モジュール通信として、活用されている
  • OpenStack共通モジュールとして、oslo.messagingにてMQ機構のラッパー実装を提供する
  • RPC (Remote Procedure Call)モデルを提供する

といった特徴があります。

□ RPC (Remote Procedure Call)でのメッセージ種別

  • rpc.cast ( 特定ターゲットに通知し、応答を期待しないRPC通信モデル )
  • rpc.call ( 特定ターゲットからの応答を期待するRPC通信モデル )

昔、OpenStackメッセージング機能を調査した時には、fanout(複数ターゲットに、同報通知するRPC通信モデル)のメッセージ種別も存在していましたが、今では、あまり活用されていないみたいです。

□ RPC Callとは

rpc.callの概要については、OpenStackドキュメント"RPC Calls"の記載にて、わかりやすく解説されていますので、そのまま、以下、引用します。

RPC Calls

The diagram below shows the message flow during an rpc.call operation:

  1. A Topic Publisher is instantiated to send the message request to the queuing system; immediately before the publishing operation, a Direct Consumer is instantiated to wait for the response message.
  2. Once the message is dispatched by the exchange, it is fetched by the Topic Consumer dictated by the routing key (such as ‘topic.host’) and passed to the Worker in charge of the task.
  3. Once the task is completed, a Direct Publisher is allocated to send the response message to the queuing system.
  4. Once the message is dispatched by the exchange, it is fetched by the Direct Consumer dictated by the routing key (such as msg_id) and passed to the Invoker.

rpc-flow-1.png

上図では、MQ機構について、"RabbitMQ"と記載されていますが、oslo.messagingの設定によって、使用するMQ機構を自由に変更することが可能です。
ちなみに、昔は、Apache Qpidも存在していたのですが、今では対応していないようです。

  • RabbitMQ
  • ZeroMQ
  • Kafka ...

参考;「OpenStackドキュメント"oslo.messaging: Available Driver"」

■ OpenStackメッセージ機構の動作を試してみる

今回は、OpenStackメッセージング機構で、最も活用されている rpc.call に着目して、実際の動作を試してみます。
ただし、いきなり、更地から、oslo.messagingを活用した、RPC Client/Serverモデルを実現するのは、難易度が高すぎるので、ここでは、OpenStack Heatプロジェクトにおける実現方式をリファレンスとして、進めていきます。

□ OpenStack Heatとは

Heatとは、OpenStack Orchestrationを担うプロジェクトです。
テンプレートに実現したい構成要素を定義して、heat-api / engineを介してリソースをデプロイすることができます。

OpenStackメッセージング機構を探ってみる.004.jpeg

rpc.callのサンプルアプリの準備

サンプルアプリ内で定義したhealth_checkというリモート関数を通じて、RPC Client側となるheat-apiと、RPC Server側となるheat-engineとの間で、rpc.call通信を試すものになります。
"How are you?"という質問に対して、"I'm fine!"という回答をやりとりする処理を、延々と繰り返すだけの簡単な動作になります。

(1) RPC Client側サンプルアプリ

「OpenStackドキュメント"oslo.messaging reference: RPC Client"」を参考にしながら、Pike版OpenStack heatリポジトリをリファレンスとして、heat-apiのRPC通信部分をサンプルコード化しました。

heat-api.py
import os
import time
import logging
import oslo_messaging
from oslo_config import cfg

CONF = cfg.CONF
CONF(default_config_files=['conf/heat.conf'])



oslo_messaging.set_transport_defaults('heat')
TRANSPORT = oslo_messaging.get_transport(CONF)
ENGINE_TOPIC = 'engine'


def get_rpc_client(**kwargs):
    target = oslo_messaging.Target(**kwargs)
    return oslo_messaging.RPCClient(TRANSPORT, target)


class EngineClient(object):

    BASE_RPC_API_VERSION = '1.0'

    def __init__(self):
        self._client = get_rpc_client(
            topic=ENGINE_TOPIC,
            version=self.BASE_RPC_API_VERSION)

    @staticmethod
    def make_msg(method, **kwargs):
        return method, kwargs

    def call(self, ctxt, msg, version=None, timeout=None):
        method, kwargs = msg

        if version is not None:
            client = self._client.prepare(version=version)
        else:
            client = self._client

        if timeout is not None:
            client = client.prepare(timeout=timeout)

        return client.call(ctxt, method, **kwargs)

    def health_check(self, ctxt, seqid, host, content):
        return self.call(ctxt, self.make_msg('health_check',
                                              seqid=seqid,
                                              host=host,
                                              req=content))


class StackController(object):

    def __init__(self):
        self.rpc_client = EngineClient()

    def health_check(self, seqid, host, content):
        (id, hostname, response) = self.rpc_client.health_check({}, seqid, host, content)
        logging.info("### Response: id=[{0}], host=[{1}], content=[{2}]"
                    .format(id, hostname, response))


if __name__ == '__main__':
    logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s',
                        level=logging.INFO)

    sequence_id = 0
    myhost = os.uname()[1]
    client = StackController()
    while True:
        sequence_id += 1
        client.health_check(sequence_id, myhost, "How are you?")
        time.sleep(1)
conf/heat1.conf
[DEFAULT]
transport_url = rabbit://guest:guest@rabbit-1-server:5672

(2) RPC Server側サンプルアプリ

「OpenStackドキュメント"oslo.messaging reference: RPC Server"」を参考にしながら、Pike版OpenStack heatリポジトリをリファレンスとして、heat-engineのRPC通信部分をサンプルコード化しました。

heat-engine.py
import os
import logging
import eventlet
import oslo_messaging
from oslo_messaging.rpc import dispatcher
import datetime
from oslo_config import cfg
eventlet.monkey_patch()

CONF = cfg.CONF
CONF(default_config_files=['conf/heat.conf'])


oslo_messaging.set_transport_defaults('heat')
TRANSPORT = oslo_messaging.get_transport(CONF)
ENGINE_TOPIC = 'engine'


def get_rpc_server(target, endpoint):
    access_policy = dispatcher.DefaultRPCAccessPolicy
    return oslo_messaging.get_rpc_server(TRANSPORT, target, [endpoint],
                                         executor='eventlet',
                                         access_policy=access_policy)


class EngineService(object):

    RPC_API_VERSION = '1.35'

    def __init__(self, host, topic):
        self.host = host
        self.topic = topic

    def start(self):
        target = oslo_messaging.Target(
            version=self.RPC_API_VERSION, server=self.host,
            topic=self.topic)
        server = get_rpc_server(target, self)
        server.start()
        server.wait()

    def health_check(self, ctx, seqid, host, req):
        logging.info("### Request: id=[{0}], host=[{1}], content=[{2}]"
                    .format(seqid, host, req))
        myhost = os.uname()[1]
        response = "I'm fine!"
        return seqid, myhost, response




if __name__ == '__main__':
    logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s',
                        level=logging.INFO)

    srv = EngineService("heat-engine", ENGINE_TOPIC)
    server = srv.start()
conf/heat1.conf
[DEFAULT]
transport_url = rabbit://guest:guest@rabbit-1-server:5672

■ お手軽に、rpc.callサンプルアプリを動作させてみた

□ Dockerベースによる環境セットアップ

上記のサンプルアプリを動作させるためには、別途、RabbitMQやoslo.messagingを事前にインストールしておく必要があります。
ここでは、お手軽な方法として、全て、Docker環境で実現する方法を紹介します。

  • OpenStackメッセージング勉強リポジトリをクローンする
$ git clone https://github.com/ttsubo/study_openstack_messaging
Cloning into 'study_openstack_messaging'...
remote: Counting objects: 54, done.
remote: Compressing objects: 100% (42/42), done.
remote: Total 54 (delta 12), reused 54 (delta 12), pack-reused 0
Unpacking objects: 100% (54/54), done.
Checking connectivity... done.
  • 各種Dockerイメージを作成する
$ cd study_openstack_messaging
$ docker build -t ttsubo/ubuntu:latest dockerfile/ubuntu/.
$ docker build -t ttsubo/rabbitmq:latest dockerfile/rabbitmq/.
$ docker build -t ttsubo/haproxy:latest dockerfile/haproxy/.

□ rpc.callサンプルアプリを実行してみる

Dockerイメージが作成できたところで、rpc.callサンプルアプリを動作させてみます。
RPC Client側となるheat-apiと、RPC Server側となるheat-engineとの間で、rpc.call通信を試すものになります。
"How are you?"という質問に対して、"I'm fine!"という回答をやりとりする処理を、延々と繰り返すだけの簡単な動作になります。

  • Dockerコンテナを起動する
$ docker-compose -f docker-compose-single.yaml up -d
... (snip)
  • heat-api側Dockerコンテナ動作を確認する
$ tail -f log/heat-1-api.log
2018-01-20 04:24:47,947:INFO:### Response: id=[1], host=[heat-engine], content=[I'm fine!]
2018-01-20 04:24:48,961:INFO:### Response: id=[2], host=[heat-engine], content=[I'm fine!]
2018-01-20 04:24:49,975:INFO:### Response: id=[3], host=[heat-engine], content=[I'm fine!]
2018-01-20 04:24:50,988:INFO:### Response: id=[4], host=[heat-engine], content=[I'm fine!]
2018-01-20 04:24:52,001:INFO:### Response: id=[5], host=[heat-engine], content=[I'm fine!]
2018-01-20 04:24:53,014:INFO:### Response: id=[6], host=[heat-engine], content=[I'm fine!]
2018-01-20 04:24:54,030:INFO:### Response: id=[7], host=[heat-engine], content=[I'm fine!]
2018-01-20 04:24:55,045:INFO:### Response: id=[8], host=[heat-engine], content=[I'm fine!]
2018-01-20 04:24:56,058:INFO:### Response: id=[9], host=[heat-engine], content=[I'm fine!]
2018-01-20 04:24:57,070:INFO:### Response: id=[10], host=[heat-engine], content=[I'm fine!]
2018-01-20 04:24:58,083:INFO:### Response: id=[11], host=[heat-engine], content=[I'm fine!]
2018-01-20 04:24:59,096:INFO:### Response: id=[12], host=[heat-engine], content=[I'm fine!]
2018-01-20 04:25:00,108:INFO:### Response: id=[13], host=[heat-engine], content=[I'm fine!]
2018-01-20 04:25:01,123:INFO:### Response: id=[14], host=[heat-engine], content=[I'm fine!]
... (snip)
  • heat-engine側Dockerコンテナ動作を確認する
$ tail -f log/heat-1-engine.log
2018-01-20 04:24:47,916:INFO:### Request: id=[1], host=[heat-api], content=[How are you?]
2018-01-20 04:24:48,955:INFO:### Request: id=[2], host=[heat-api], content=[How are you?]
2018-01-20 04:24:49,969:INFO:### Request: id=[3], host=[heat-api], content=[How are you?]
2018-01-20 04:24:50,983:INFO:### Request: id=[4], host=[heat-api], content=[How are you?]
2018-01-20 04:24:51,996:INFO:### Request: id=[5], host=[heat-api], content=[How are you?]
2018-01-20 04:24:53,009:INFO:### Request: id=[6], host=[heat-api], content=[How are you?]
2018-01-20 04:24:54,025:INFO:### Request: id=[7], host=[heat-api], content=[How are you?]
2018-01-20 04:24:55,040:INFO:### Request: id=[8], host=[heat-api], content=[How are you?]
2018-01-20 04:24:56,051:INFO:### Request: id=[9], host=[heat-api], content=[How are you?]
2018-01-20 04:24:57,065:INFO:### Request: id=[10], host=[heat-api], content=[How are you?]
2018-01-20 04:24:58,077:INFO:### Request: id=[11], host=[heat-api], content=[How are you?]
2018-01-20 04:24:59,091:INFO:### Request: id=[12], host=[heat-api], content=[How are you?]
2018-01-20 04:25:00,104:INFO:### Request: id=[13], host=[heat-api], content=[How are you?]
2018-01-20 04:25:01,116:INFO:### Request: id=[14], host=[heat-api], content=[How are you?]
... (snip)

□ RabbitMQのWebポータル画面を確認してみる

Dockerコンテナを動作させているDockerホスト側IPアドレスの、15672ポートに対して、Webアクセスしてみると、RabbitMQのWebポータル管理画面を閲覧することができます。
(ログイン・パスワードは、"guest/guest"です。)

(1) rpc.callモデルにおける、Exchange情報
RPC Clientサンプルアプリで定義した'heat'というExchangeが一覧中に表示されていることが確認できます。

OpenStackメッセージング機構を探ってみる.001.jpeg

(2) rpc.callモデルにおける、Queue情報
RPC Clientサンプルアプリで定義した'engine'というQueueが一覧中に表示されていることが確認できます。

OpenStackメッセージング機構を探ってみる.002.jpeg

これらの情報から、今回のサンプルアプリ動作によって、構成されたrpc.callモデルは、以下のようになります。

OpenStackメッセージング機構を探ってみる.003.jpeg

なお、RabbitMQのWebポータル画面を使用せずとも、RabbitMQ上のコマンドプロンプトから、"rabbitmqctl"コマンドを実行しても、構成を確認することは可能です。

$ docker exec -it rabbit-1-server rabbitmqctl list_bindings
Listing bindings for vhost /...
    exchange    engine  queue   engine  []
    exchange    engine.heat-engine  queue   engine.heat-engine  []
    exchange    engine_fanout_cb41acf66c2e48beb2cbd5b185003648  queue   engine_fanout_cb41acf66c2e48beb2cbd5b185003648  []
    exchange    reply_fd28765a397044ee9d67f3709692cde4  queue   reply_fd28765a397044ee9d67f3709692cde4  []
engine_fanout   exchange    engine_fanout_cb41acf66c2e48beb2cbd5b185003648  queue   engine  []
heat    exchange    engine  queue   engine  []
heat    exchange    engine.heat-engine  queue   engine.heat-engine  []
reply_fd28765a397044ee9d67f3709692cde4  exchange    reply_fd28765a397044ee9d67f3709692cde4  queue   reply_fd28765a397044ee9d67f3709692cde4  []
$ docker exec -it rabbit-1-server rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
engine_fanout_cb41acf66c2e48beb2cbd5b185003648  0
engine.heat-engine  0
reply_fd28765a397044ee9d67f3709692cde4  0
engine  0

■ 終わりに、

oslo.messagingを活用して、RabbitMQベースのrpc.call通信モデルを試してみました。
RPCは、"Remote Procedure Call"なので、今回のように、一度、RPC通信部分を実装してしまえば、新たにリモート関数を追加することは、とても簡単に対応できるのが、Good!ですよね。

■ 補足:heat-engine内部動作のDeepDive

heat-engine側で、rpc.callを受け付けた際の動作メカニズムを調査してみたので、そのメモです。

すでに、EOLになってしまっていますが、JUNO版のoslo.messagingからの引用です。
細かい説明は、省略しますが、肝になっている箇所はこちらになります。

oslo.messaging/oslo/messaging/_executors/impl_eventlet.py
class EventletExecutor(base.ExecutorBase):

    """A message executor which integrates with eventlet.

    This is an executor which polls for incoming messages from a greenthread
    and dispatches each message in its own greenthread.

    The stop() method kills the message polling greenthread and the wait()
    method waits for all message dispatch greenthreads to complete.
    """

    def __init__(self, conf, listener, dispatcher):
        super(EventletExecutor, self).__init__(conf, listener, dispatcher)
        self.conf.register_opts(_eventlet_opts)
        self._thread = None
        self._greenpool = greenpool.GreenPool(self.conf.rpc_thread_pool_size)

    def start(self):
        if self._thread is not None:
            return

        @excutils.forever_retry_uncaught_exceptions
        def _executor_thread():
            try:
                while True:
                    incoming = self.listener.poll()
                    spawn_with(ctxt=self.dispatcher(incoming),
                               pool=self._greenpool)
            except greenlet.GreenletExit:
                return

        self._thread = eventlet.spawn(_executor_thread)

こちらからも、コード内容が確認できます -> oslo.messaging / oslo / messaging / executors / impl_eventlet.py

(1) 事前準備:EventletExecutorクラスが呼び出されるメカニズム

まずは、この理由を説明するためには、OpenStackのstevedoreが提供する、DriverManager の仕組みを理解することが必要不可欠になります。
こちらのブログサイトでは、サンプルを用いて丁寧に解説されているので、興味のある方は、ご覧ください
How to OpenStack load Drivers Extensions and Plugins using stevedore

それでは、rpc_serverとして、EventletExecutorクラスが呼び出されるメカニズムを解説します。
まず、着目する箇所は、heat-engineプログラムから呼び出されるget_rpc_serverメソッドのreturnバリューとして起動される msg_server.MessageHandlingServerの部分です。

oslo.messaging/blob/juno-eol/oslo/messaging/rpc/server.py
def get_rpc_server(transport, target, endpoints,
                   executor='blocking', serializer=None):

...

    dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
    return msg_server.MessageHandlingServer(transport, dispatcher, executor)

こちらからも、コード内容が確認できます -> oslo.messaging / oslo / messaging / rpc / server.py

それでは、msg_server.MessageHandlingServerクラスのコンストラクタ部分に着目してみましょう。

oslo.messaging/oslo/messaging/server.py
class MessageHandlingServer(object):
    """Server for handling messages.

    Connect a transport to a dispatcher that knows how process the
    message using an executor that knows how the app wants to create
    new tasks.
    """

    def __init__(self, transport, dispatcher, executor='blocking'):
        """Construct a message handling server.

        The dispatcher parameter is a callable which is invoked with context
        and message dictionaries each time a message is received.

        The executor parameter controls how incoming messages will be received
        and dispatched. By default, the most simple executor is used - the
        blocking executor.

        :param transport: the messaging transport
        :type transport: Transport
        :param dispatcher: a callable which is invoked for each method
        :type dispatcher: callable
        :param executor: name of message executor - for example
                         'eventlet', 'blocking'
        :type executor: str
        """
        self.conf = transport.conf

        self.transport = transport
        self.dispatcher = dispatcher
        self.executor = executor

        try:
            mgr = driver.DriverManager('oslo.messaging.executors',
                                       self.executor)
        except RuntimeError as ex:
            raise ExecutorLoadFailure(self.executor, ex)
        else:
            self._executor_cls = mgr.driver
            self._executor = None

        super(MessageHandlingServer, self).__init__()

こちらからも、コード内容が確認できます -> oslo.messaging / oslo / messaging / server.py

ここで、stevedoreのDriverManager動作メカニズムが関係してきます。
なお、ここでのself.executorには、"eventlet"が指定されています。これは、heat-engine.pyプログラムからの呼び出し時に引数として、executor='eventlet'と指定された内容そのものですね。

mgr = driver.DriverManager('oslo.messaging.executors', self.executor)

続いて、stevedoreのDriverManagerが参照する、setup.cfgを確認しておきます。

oslo.messaging/setup.cfg
[entry_points]

...

oslo.messaging.executors =
    blocking = oslo.messaging._executors.impl_blocking:BlockingExecutor
    eventlet = oslo.messaging._executors.impl_eventlet:EventletExecutor

...

ここでのentry_pointsとして、oslo.messaging.executors名前空間には、
"eventlet = oslo.messaging._executors.impl_eventlet:EventletExecutor"と定義されているので、eventletドライバ名に対応するEventletExecutorクラスが呼び出されることになります。

早速、pythonインタプリタを活用して、実際のstevedoreのDriverManager動作を見て行きましょう!

# python
Python 2.7.6 (default, Nov 23 2017, 15:49:48) 
[GCC 4.8.4] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from stevedore import driver
>>> mgr = driver.DriverManager('oslo.messaging.executors', "eventlet")
>>> mgr.driver
<class 'oslo.messaging._executors.impl_eventlet.EventletExecutor'>
>>> 

ご覧の通り、oslo.messaging._executors.impl_eventlet.EventletExecutorクラスが呼び出されている様子が確認できました。

(2) rpc_server動作メカニズム

では、具体的に、EventletExecutorクラスの中身を見て行きます。
heat-engine内部で動作するrpc_serverが起動すると、EventletExecutorクラスのstartメソッドが実行されます。
そして、以降の処理の流れは、こんな感じです。

  • Consume処理として、RabbitMQの"engine"キューに、incomingなメッセージが存在しているかをチェックする
  • もし、incomingなメッセージが存在しているのであれば、新たなグリーンスレッドにおいて、RPC処理として、self.dispatcher(incoming)を起動する
  • 最後に、Publisher処理として、health_checkメソッドの戻り値を、RabbitMQの"reply_xxx"キューに、Publishingする

rpc.png

では、self.dispatcher(incoming)とは、「具体的に何を指し示しているのか?」というと
heat-engine.pyプログラムから、get_rpc_serverメソッドが呼び出されることによって、生成されるRPCDispatcherクラスのインスタンスそのものになります。

oslo.messaging/blob/juno-eol/oslo/messaging/rpc/server.py
def get_rpc_server(transport, target, endpoints,
                   executor='blocking', serializer=None):

...

    dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
    return msg_server.MessageHandlingServer(transport, dispatcher, executor)

こちらからも、コード内容が確認できます -> oslo.messaging / oslo / messaging / rpc / server.py

そして、RPCDispatcherクラスが呼び出されることによって、__dispatch_and_replyメソッドが呼び出されることになります。

oslo.messaging/blob/juno-eol/oslo/messaging/rpc/dispatcher.py
class RPCDispatcher(object):
...

    @contextlib.contextmanager
    def __call__(self, incoming):
        incoming.acknowledge()
        yield lambda: self._dispatch_and_reply(incoming)

    def _dispatch_and_reply(self, incoming):
        try:
            incoming.reply(self._dispatch(incoming.ctxt,
                                          incoming.message))
        except ExpectedException as e:
            LOG.debug(u'Expected exception during message handling (%s)',
                      e.exc_info[1])
            incoming.reply(failure=e.exc_info, log_failure=False)
        except Exception as e:
            # sys.exc_info() is deleted by LOG.exception().
            exc_info = sys.exc_info()
            LOG.error(_('Exception during message handling: %s'), e,
                      exc_info=exc_info)
            incoming.reply(failure=exc_info)
            # NOTE(dhellmann): Remove circular object reference
            # between the current stack frame and the traceback in
            # exc_info.
            del exc_inf

こちらからも、コード内容が確認できます -> oslo.messaging / oslo / messaging / rpc / dispatcher.py

そして、_dispatch_and_replyメソッドの中身では、incomingメッセージの中身を取り出して、

result = getattr(endpoint, method)(ctxt, **new_args)

を実行します。
なお、getattrの第一引数の"endpoint"とは、heat-engine.pyプログラムで定義したEngineServiceクラスそのものです。すなわち、get_rpc_serverを呼びだす際に、endpointに、selfとして代入していることが確認できますね。

heat-engine.py
...

def get_rpc_server(target, endpoint):
    return oslo.messaging.get_rpc_server(TRANSPORT, target, [endpoint],
                                         executor='eventlet')
...

class EngineService(object):

    RPC_API_VERSION = '1.35'

    def __init__(self, host, topic):
        self.host = host
        self.topic = topic

    def start(self):
        target = oslo_messaging.Target(
            version=self.RPC_API_VERSION, server=self.host,
            topic=self.topic)
        server = get_rpc_server(target, self)
        server.start()
        server.wait()

    def health_check(self, ctx, seqid, host, req):
        logging.info("### Request: id=[{0}], host=[{1}], content=[{2}]"
                    .format(seqid, host, req))
        myhost = os.uname()[1]
        response = "I'm fine!"
        return seqid, myhost, response

...

getattrの第二引数の"method"とは、incomingメッセージの中身を取り出して入手した、"health_check"という値そのものになります。

以上より、heat-engineは、RabbitMQの"engine"キューから、incomingなメッセージを取得して、health_checkメソッドを起動することが可能になるわけです。