下は送受信するサーバー両方で実行。24.04と22.02で異なるため注意
Ubuntu 24.04の場合.bash
echo 'deb https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu noble main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
# GPGキーを/etc/apt/keyrings/ディレクトリにダウンロード
curl -1sLf "https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/gpg.E495BB49CC4BBE5B.key" | sudo gpg --dearmor -o /etc/apt/keyrings/rabbitmq-erlang.gpg
# リポジトリの設定を追加
echo "deb [signed-by=/etc/apt/keyrings/rabbitmq-erlang.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu noble main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
#その後、以下のコマンドでパッケージリストを更新できます:
sudo apt update
#これにより、Ubuntu 24.04用のRabbitMQとErlangパッケージをインストールする準備が整います。
Ubuntu 22.04の場合.bash
# GPGキーを/etc/apt/keyrings/ディレクトリにダウンロード
curl -1sLf "https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/gpg.E495BB49CC4BBE5B.key" | sudo gpg --dearmor -o /etc/apt/keyrings/rabbitmq-erlang.gpg
# リポジトリの設定を追加
echo "deb [signed-by=/etc/apt/keyrings/rabbitmq-erlang.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
#その後、以下のコマンドでパッケージリストを更新できます:
sudo apt update
#これにより、Ubuntu 24.04用のRabbitMQとErlangパッケージをインストールする準備が整います。
以下のpassword1は設定したいPWに変えることを推奨
送受信サーバー両方.bash
sudo apt-get install erlang
sudo apt-get install rabbitmq-server -y
sudo systemctl status rabbitmq-server
#sudo rabbitmq-plugins enable rabbitmq_management
#ウェブ管理コンソールを有効にしたい場合は上のコメントを外す
pip install pika
sudo rabbitmqctl add_user user1 password1
sudo rabbitmqctl set_user_tags user1 administrator
sudo rabbitmqctl set_permissions -p / user1 ".*" ".*" ".*"
sudo ufw allow 5672
以下の'xxx.xxx.xxx.xxx'は送信先(受信したい側)サーバーIPに変えてください
送信する側プログラム.py
import pika
import json
import os
import ssl
import time
import datetime
def send_tweet_to_queue(msg):
ssl_options = pika.SSLOptions(ssl_context, dest)
connection_parameters = pika.ConnectionParameters(
host='xxx.xxx.xxx.xxx', # 送信先のIPアドレス
port=5672,
credentials=credentials) # RabbitMQのポート(デフォルトは5672)
connection = pika.BlockingConnection(connection_parameters)
channel = connection.channel()
channel.queue_declare(queue='queue1')
channel.basic_publish(exchange='', routing_key='queue1', body=json.dumps(msg))
print(f"Sent {msg}")
connection.close()
for _ in range(10):
time.sleep(2)
timestamp = time.time()
dt = datetime.fromtimestamp(timestamp)
formatted_time = dt.strftime('%Y-%m-%d %H:%M:%S.%f')
message = [{"send_time": formatted_time, "raw_time":timestamp,"content": "test"}]
send_tweet_to_queue(message)
受信する側プログラム.py
import pika
import json
import asyncio
import time
async def process_msg(msg):
msg=msg[0]
print("received msg",msg)
print("遅延(秒)",time.time()-msg['send_raw_time'])
print("content",msg['content'])
print()
def callback(ch, method, properties, body):
msg = json.loads(body)
asyncio.run(process_msg(msg))
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='queue1')
# メッセージを受信して処理
channel.basic_consume(queue='queue1', on_message_callback=callback, auto_ack=True)
print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()