はじめに
前回のpythonでrabbimqを扱うではRabbitMqで単純なメッセージのやり取りを行いましたが、RabbitMqは色々な機能があるため今回はその色々な機能と使用方法をまとめます。
環境
- python:3.6.5
- イメージ:rabbitmq:3-management
キューの確認設定
標準のpikaの設定ではRabbitMq内に送受信したいキューが存在しないときは自動的にキューを作成しますが、自動的にキューを作成せずにエラーしたい場合があります。その場合は、チャンネルのqueue_declare()
関数のpassive引数にTrueを与えるとキューがないときはエラーします。
シチュエーション例
プロデューサー側で全てのキューを管理して、コンシューマー側では純粋に接続のみ許したいときなど。
passiveをtrueにしたプロデューサー
前回の例として作成したプロデューサーのchannel.queue_declare()
にpassive=True
を追加しているだけです。キューが存在すればそのまま接続できますが、キューが存在しなければpika.exceptions.ChannelClosedByBroker
がエクセプションとして上がってきます。
import pika
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
try:
channel.queue_declare(queue='hello', passive=True)
except pika.exceptions.ChannelClosedByBroker as ex:
print(ex)
exit(1)
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
connection.close()
キューチェックの実行
ソースができたため、実行してみます。エクセプションが発生しました。発生したエクセプションを表示するとhelloのキューがないということがわかります。
PS C:\Users\xxxx\program\python\pika> python .\client_main.py
(404, "NOT_FOUND - no queue 'hello' in vhost '/'")
コネクションの排他設定
標準のpikaの設定では無条件にコネクションを受け付けますが、他のコネクションを受け付けないように排他制御をかけることができます。その場合は、チャンネルのqueue_declare()
関数のexclusive引数にTrueを与えると他にコネクションが接続しているときはエラーするため、排他のチェックに利用できます。コネクションをクローズすればまた別のコネクションを接続することができます。
シチュエーション例
コンシューマー側でメッセージを用意してRabbitMqに送るまでは、他のコンシューマからメッセージの受け付けをしたくないときなど。
コネクションの排他設定を有効にしたプロデューサー
channel.queue_declare()
にexclusive=True
を追加しているだけです。今回は、一度接続してからコネクションをクローズせずに新しくコネクションを作って接続しています。後から来たコネクションに対してはpika.exceptions.ChannelClosedByBroker
がエクセプションとして上がってきます。
import pika
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello', exclusive=True)
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
channel.close()
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
try:
channel.queue_declare(queue='hello')
except pika.exceptions.ChannelClosedByBroker as ex:
print('other connection access fail')
exit(1)
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
channel.close()
channel.close()
の下にconnection.close()
を入れると正常に終了します。
コネクションの排他の実行
ソースができたため、実行してみます。2つめのキュー接続時にエクセプションが発生しました。発生したエクセプションを表示するとhelloにアクセスロックがかかっていることがわかります。
PS C:\Users\xxxx\program\python\pika> python .\client_main.py
other connection access fail
(405, "RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'hello' in vhost '/'. It could be originally declared on
another connection or the exclusive property value does not match that of the original declaration.")
チャンネルの上限設定
標準のpikaの設定では無条件にチャンネルを作成できますが、チェンネルの上限を設定することができます。その場合は、pikaの pika.ConnectionParameters()
関数のchannel_max引数に上限値を与えると上限を設定することができます。上限を超えるチャンネルを作成しようとするとエラーを発生させます。
シチュエーション例
RabbitMqのサーバが潤沢ではないため、チャンネル生成数を加減するときなど
チャンネルの上限設定を有効にしたプロデューサー
pika.ConnectionParameters()
にchannel_max=2
を追加しているだけです。今回は、上限値を2にしているので無意味に3つのチャンネルを作成しています。その結果、3つめのチェンネルに対しては作成できずにpika.exceptions.ConnectionClosedByBroker
がエクセプションとして上がってきます。
import pika
pika_param = pika.ConnectionParameters('localhost', channel_max=2)
connection = pika.BlockingConnection(pika_param)
channel = connection.channel(1)
channel = connection.channel(2)
try:
channel = connection.channel(3)
except pika.exceptions.ConnectionClosedByBroker as ex:
print('channel crate error')
print(ex)
チャンネルの上限の実行
ソースができたため、実行してみます。3つめのチャンネル作成時にエクセプションが発生しました。発生したエクセプションを表示するとチャンネル上限が2であることがわかります。
PS C:\Users\xxxx\program\python\pika> python .\client_main.py
channel crate error
(530, 'NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)')
リトライの設定
pikaの接続に失敗したときにリトライする回数を設定することができます。その場合は、pikaの pika.ConnectionParameters()
関数のconnection_attempts引数にリトライ回数を与えると回数を設定することができます。
シチュエーション例
RabbitMqのサーバへのネットワークが不安定の場合など
リトライの設定を有効にしたプロデューサー
pika.ConnectionParameters()
にconnection_attempts=2
を追加しているだけです。今回は、リトライをしているのがわかるようにpikaのログを出すようにして、RabbitMqを落としています。上のlogger.xxx系はpika内のログを出すための設定です。
import pika
import datetime
import logging
logger = logging.getLogger('pika')
logger.setLevel(logging.ERROR)
logger.addHandler(logging.StreamHandler())
pika_param = pika.ConnectionParameters('localhost', connection_attempts=2)
try:
print('start connect {}'.format(datetime.datetime.now()))
connection = pika.BlockingConnection(pika_param)
except pika.exceptions.AMQPConnectionError as ex:
print('connect error {}'.format(datetime.datetime.now()))
print(ex)
リトライの実行
ソースができたため、実行してみます。ログが大量に出て分かり難いですが同じようなエラーが4回(2回エラー×設定値リトライ(2回))出ています。
PS C:\Users\xxxx\program\python\pika> python .\client_main.py
start connect 2020-03-01 21:46:18.549268
Socket failed to connect: <socket.socket fd=936, family=AddressFamily.AF_INET6, type=SocketKind.SOCK_STREAM, proto=6, laddr=('::', 38520, 0, 0)>; error=10061 (Unknown error)
TCP Connection attempt failed: ConnectionRefusedError(10061, 'Unknown error'); dest=(<AddressFamily.AF_INET6: 23>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('::1', 5672, 0, 0))
AMQPConnector - reporting failure: AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error')
Socket failed to connect: <socket.socket fd=936, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('0.0.0.0', 38524)>; error=10061 (Unknown error)
TCP Connection attempt failed: ConnectionRefusedError(10061, 'Unknown error'); dest=(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 5672))
AMQPConnector - reporting failure: AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error')
Socket failed to connect: <socket.socket fd=812, family=AddressFamily.AF_INET6, type=SocketKind.SOCK_STREAM, proto=6, laddr=('::', 38533, 0, 0)>; error=10061 (Unknown error)
TCP Connection attempt failed: ConnectionRefusedError(10061, 'Unknown error'); dest=(<AddressFamily.AF_INET6: 23>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('::1', 5672, 0, 0))
AMQPConnector - reporting failure: AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error')
Socket failed to connect: <socket.socket fd=812, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('0.0.0.0', 38535)>; error=10061 (Unknown error)
TCP Connection attempt failed: ConnectionRefusedError(10061, 'Unknown error'); dest=(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 5672))
AMQPConnector - reporting failure: AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error')
AMQP connection workflow failed: AMQPConnectionWorkflowFailed: 4 exceptions in all; last exception - AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error'); first exception - AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error').
AMQPConnectionWorkflow - reporting failure: AMQPConnectionWorkflowFailed: 4 exceptions in all; last exception - AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error'); first exception - AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error')
Connection workflow failed: AMQPConnectionWorkflowFailed: 4 exceptions in all; last exception - AMQPConnectorSocketConnectError:
ConnectionRefusedError(10061, 'Unknown error'); first exception - AMQPConnectorSocketConnectError: ConnectionRefusedError(10061,
'Unknown error')
Error in _create_connection().
Traceback (most recent call last):
File "C:\Users\minkl\AppData\Local\Programs\Python\Python36-32\lib\site-packages\pika\adapters\blocking_connection.py", line 450, in _create_connection
raise self._reap_last_connection_workflow_error(error)
pika.exceptions.AMQPConnectionError
connect error 2020-03-01 21:46:28.665843
おわりに
良く使いそうなRabbitMqの機能を使用してみました。必要な機能は一通りあると思いますが、どの設定がキューなのかコネクションなのかチャンネルなのかを理解して使用しないと混乱しそうに感じました。