RabbitMQでキュー内のメッセージを確認する方法のメモ。
既存のメッセージをACKなしでconsumeすることで、キュー内のメッセージに影響を与えずに済みます。
コード
以下のメソッドを実行すると全メッセージをACKなしで取得できる。
import json
import pandas as pd
import pika
def get_all_messages(user='guest', password='guest', host, topic):
"""
Queue内のMessagesをすべて取得する
"""
credentials = pika.PlainCredentials(user, password)
pika_param = pika.ConnectionParameters(host=host, credentials=credentials, channel_max=2)
dfs_ = list()
with pika.BlockingConnection(pika_param) as connection:
channel = connection.channel()
for method, properties, body in channel.consume(queue=topic, inactivity_timeout=10):
if (method, properties, body) == (None, None, None):
break
dfs_.append(json.loads(body))
return pd.DataFrame(dfs_)
実行例
messages = get_all_messages(host="test_host", topic="test_topic")
取得するメッセージ数が多い場合、適宜time.sleep()を入れたりが必要かも?
でもそうするとメッセージの取得に時間がかかりすぎるので、一括でdumpできる方法があればなぁと。
関連リンク