0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

UbuntuでPythonからRabbitMQを使って異なる物理サーバーに非同期的にメッセージを送信する

Posted at

下は送受信するサーバー両方で実行。24.04と22.02で異なるため注意

Ubuntu 24.04の場合.bash
echo 'deb https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu noble main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
# GPGキーを/etc/apt/keyrings/ディレクトリにダウンロード
curl -1sLf "https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/gpg.E495BB49CC4BBE5B.key" | sudo gpg --dearmor -o /etc/apt/keyrings/rabbitmq-erlang.gpg

# リポジトリの設定を追加
echo "deb [signed-by=/etc/apt/keyrings/rabbitmq-erlang.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu noble main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list

#その後、以下のコマンドでパッケージリストを更新できます:
sudo apt update
#これにより、Ubuntu 24.04用のRabbitMQとErlangパッケージをインストールする準備が整います。
Ubuntu 22.04の場合.bash
# GPGキーを/etc/apt/keyrings/ディレクトリにダウンロード
curl -1sLf "https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/gpg.E495BB49CC4BBE5B.key" | sudo gpg --dearmor -o /etc/apt/keyrings/rabbitmq-erlang.gpg

# リポジトリの設定を追加
echo "deb [signed-by=/etc/apt/keyrings/rabbitmq-erlang.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list

#その後、以下のコマンドでパッケージリストを更新できます:
sudo apt update
#これにより、Ubuntu 24.04用のRabbitMQとErlangパッケージをインストールする準備が整います。

以下のpassword1は設定したいPWに変えることを推奨

送受信サーバー両方.bash
sudo apt-get install erlang
sudo apt-get install rabbitmq-server -y
sudo systemctl status rabbitmq-server
#sudo rabbitmq-plugins enable rabbitmq_management
#ウェブ管理コンソールを有効にしたい場合は上のコメントを外す
pip install pika
sudo rabbitmqctl add_user user1 password1
sudo rabbitmqctl set_user_tags user1 administrator
sudo rabbitmqctl set_permissions -p / user1 ".*" ".*" ".*"
sudo ufw allow 5672

以下の'xxx.xxx.xxx.xxx'は送信先(受信したい側)サーバーIPに変えてください

送信する側プログラム.py
import pika
import json
import os
import ssl
import time
import datetime
def send_tweet_to_queue(msg):
    ssl_options = pika.SSLOptions(ssl_context, dest) 
    connection_parameters = pika.ConnectionParameters(
    host='xxx.xxx.xxx.xxx',  # 送信先のIPアドレス
    port=5672,
    credentials=credentials)   # RabbitMQのポート(デフォルトは5672)
    connection = pika.BlockingConnection(connection_parameters)
    channel = connection.channel()
    channel.queue_declare(queue='queue1')
    
    channel.basic_publish(exchange='', routing_key='queue1', body=json.dumps(msg))
    print(f"Sent {msg}")
    connection.close()


for _ in range(10):
    time.sleep(2)
    timestamp = time.time()
    dt = datetime.fromtimestamp(timestamp)
    formatted_time = dt.strftime('%Y-%m-%d %H:%M:%S.%f')
    message = [{"send_time": formatted_time, "raw_time":timestamp,"content": "test"}]
    send_tweet_to_queue(message)
受信する側プログラム.py
import pika
import json
import asyncio
import time

async def process_msg(msg):
    msg=msg[0]
    print("received msg",msg)
    print("遅延(秒)",time.time()-msg['send_raw_time'])
    print("content",msg['content'])
    print()

def callback(ch, method, properties, body):
    msg = json.loads(body)
    asyncio.run(process_msg(msg))

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='queue1')

# メッセージを受信して処理
channel.basic_consume(queue='queue1', on_message_callback=callback, auto_ack=True)

print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?