LoginSignup
1
0

More than 1 year has passed since last update.

Amazon MQ for ActiveMQ 送受信サンプル(Python)

Last updated at Posted at 2023-02-16

Amazon MQ for ActiveMQ

ActiveMQでは5つのプロトコルのエンドポイントが作成される。
ここではActiveMQのQueueの実装サンプルを記載する。
なお実行環境はLambdaを前提としている。

OpenWire

後で書く

参考

STOMP

送信

概要

  • トリガーイベントから連携された内容をActiveMQにキューイングする
  • DynamoDB Streamをトリガーとする
  • イベントのうち、DynamoDBへのインサートだけを処理対象としている(こちらはフィルタリング設定でも可)
import stomp
import ssl
import json
import os

broker = os.environ['BROKER_URL']
port = os.environ['BROKER_PORT']
queue = os.environ['QUEUE_NAME']
username = os.environ['USER_NAME']
password = os.environ['PASSWORD']


def lambda_handler(event, context):
    
    record = event['Records'][0]
    print(json.dumps(record))

    # DynamodbにレコードがINSERTされた場合
    if event['Records'][0]['eventName'] == 'INSERT':
        record = event['Records'][0]['dynamodb']['NewImage']
        print('INSERT')


        # AmazonMQへenqueue
        conn = stomp.Connection([(broker, port)])
        conn.set_ssl(for_hosts=[(broker, port)],ssl_version=ssl.PROTOCOL_TLSv1_2)
        conn.connect(username, password, wait=True)
        ret = conn.send(body=json.dumps(record), destination=queue, headers={'message-id': 'test'})
        print(ret)
        conn.disconnect()

受信

概要

  • ActiveMQから5秒間Subscribeし、取得したキューの中身をファイル化してS3へPUTする
  • エンドポイントはSSLになっているためset_sslをしている
  • Lambdaにおいてグローバル変数を扱う場合には注意が必要(handler外は再利用される)
import boto3
import os
import time
import stomp
import ssl
import datetime
from logging import getLogger, INFO, DEBUG

# Logの設定
logger = getLogger(__name__)
logger.setLevel(os.environ['LOG_LEVEL'])

# MQの設定
broker = os.environ['BROKER_URL']
port = os.environ['BROKER_PORT']
queue = os.environ['QUEUE_NAME']
username = os.environ['USER_NAME']
password = os.environ['PASSWORD']
subscription_id = '1'
polling_time = 5

# S3の設定
s3 = boto3.resource('s3')
s3_bucket = s3.Bucket(os.environ['BUCKET_NAME'])
tmp_output = '/tmp/output.csv'


class MyListener(stomp.ConnectionListener):
    def __init__(self, conn):
        self.conn = conn

    def on_error(self, frame):
        logger.info('on_error: received an error "%s"' % frame.body)

    def on_message(self, frame):
        # debug log
        logger.debug('on_message: processed message')
        logger.debug('message-id: %s', frame.headers['message-id'])
        read_messages.append(frame.body)
        # ACKを明示的に行う
        self.conn.ack(frame.headers['message-id'], subscription_id)

def lambda_handler(event, context):
    # グローバル変数定義と初期化
    global read_messages
    read_messages = []
    
    # ブローカー接続とキュー取得
    conn = stomp.Connection([(broker, port)], heartbeats=(8000, 5000))
    conn.set_listener('', MyListener(conn))
    conn.set_ssl(for_hosts=[(broker, port)], ssl_version=ssl.PROTOCOL_TLSv1_2)
    conn.connect(username, password, wait=True)
    conn.subscribe(destination=queue, id=subscription_id, ack='client-individual')
    logger.info('mq connect.')
    time.sleep(polling_time)
    
    # ブローカーとの切断
    conn.disconnect()
    logger.info('mq disconnect.')

    # S3へCSVファイルをアップロード
    if len(read_messages) == 0:
        logger.info('queue is empty.')
    else:
        with open(tmp_output, mode='a') as f:
            for message in read_messages:
                print(message, file=f)

        now = datetime.datetime.now()
        s3_output = 'output_'+now.strftime('%Y%m%d%H%M%S')+'.csv'
        s3_bucket.upload_file(tmp_output, s3_output)
        logger.info('s3 upload complete. file_name: %s', s3_output)

なおサンプルでよく出てくる on_disconnected() などをLambdaで実装すると、セグフォを乱発するので注意(原因不明)

WSS

よくわからないので割愛

AMQP

RabbitMQ用?やれたらやる

MQTT

MQTTではPub/Sub方式しか対応できないため対象外

参考資料

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0