Amazon MQ for ActiveMQ
ActiveMQでは5つのプロトコルのエンドポイントが作成される。
ここではActiveMQのQueueの実装サンプルを記載する。
なお実行環境はLambdaを前提としている。
OpenWire
後で書く
参考
- http://engmng.blog.fc2.com/blog-entry-107.html
- https://simplesassim.wordpress.com/2013/12/30/how-to-send-a-message-to-an-apache-activemq-queue-with-jython/
- https://simplesassim.wordpress.com/2013/12/30/how-to-receive-a-message-from-an-apache-activemq-queue-with-jython/
- https://docs.confluent.io/ja-jp/platform/7.0/clients/kafka-jms-client/installation.html#cjms-installation
STOMP
送信
概要
- トリガーイベントから連携された内容をActiveMQにキューイングする
- DynamoDB Streamをトリガーとする
- イベントのうち、DynamoDBへのインサートだけを処理対象としている(こちらはフィルタリング設定でも可)
import stomp
import ssl
import json
import os
broker = os.environ['BROKER_URL']
port = os.environ['BROKER_PORT']
queue = os.environ['QUEUE_NAME']
username = os.environ['USER_NAME']
password = os.environ['PASSWORD']
def lambda_handler(event, context):
record = event['Records'][0]
print(json.dumps(record))
# DynamodbにレコードがINSERTされた場合
if event['Records'][0]['eventName'] == 'INSERT':
record = event['Records'][0]['dynamodb']['NewImage']
print('INSERT')
# AmazonMQへenqueue
conn = stomp.Connection([(broker, port)])
conn.set_ssl(for_hosts=[(broker, port)],ssl_version=ssl.PROTOCOL_TLSv1_2)
conn.connect(username, password, wait=True)
ret = conn.send(body=json.dumps(record), destination=queue, headers={'message-id': 'test'})
print(ret)
conn.disconnect()
受信
概要
- ActiveMQから5秒間Subscribeし、取得したキューの中身をファイル化してS3へPUTする
- エンドポイントはSSLになっているため
set_ssl
をしている - Lambdaにおいてグローバル変数を扱う場合には注意が必要(handler外は再利用される)
import boto3
import os
import time
import stomp
import ssl
import datetime
from logging import getLogger, INFO, DEBUG
# Logの設定
logger = getLogger(__name__)
logger.setLevel(os.environ['LOG_LEVEL'])
# MQの設定
broker = os.environ['BROKER_URL']
port = os.environ['BROKER_PORT']
queue = os.environ['QUEUE_NAME']
username = os.environ['USER_NAME']
password = os.environ['PASSWORD']
subscription_id = '1'
polling_time = 5
# S3の設定
s3 = boto3.resource('s3')
s3_bucket = s3.Bucket(os.environ['BUCKET_NAME'])
tmp_output = '/tmp/output.csv'
class MyListener(stomp.ConnectionListener):
def __init__(self, conn):
self.conn = conn
def on_error(self, frame):
logger.info('on_error: received an error "%s"' % frame.body)
def on_message(self, frame):
# debug log
logger.debug('on_message: processed message')
logger.debug('message-id: %s', frame.headers['message-id'])
read_messages.append(frame.body)
# ACKを明示的に行う
self.conn.ack(frame.headers['message-id'], subscription_id)
def lambda_handler(event, context):
# グローバル変数定義と初期化
global read_messages
read_messages = []
# ブローカー接続とキュー取得
conn = stomp.Connection([(broker, port)], heartbeats=(8000, 5000))
conn.set_listener('', MyListener(conn))
conn.set_ssl(for_hosts=[(broker, port)], ssl_version=ssl.PROTOCOL_TLSv1_2)
conn.connect(username, password, wait=True)
conn.subscribe(destination=queue, id=subscription_id, ack='client-individual')
logger.info('mq connect.')
time.sleep(polling_time)
# ブローカーとの切断
conn.disconnect()
logger.info('mq disconnect.')
# S3へCSVファイルをアップロード
if len(read_messages) == 0:
logger.info('queue is empty.')
else:
with open(tmp_output, mode='a') as f:
for message in read_messages:
print(message, file=f)
now = datetime.datetime.now()
s3_output = 'output_'+now.strftime('%Y%m%d%H%M%S')+'.csv'
s3_bucket.upload_file(tmp_output, s3_output)
logger.info('s3 upload complete. file_name: %s', s3_output)
なおサンプルでよく出てくる on_disconnected()
などをLambdaで実装すると、セグフォを乱発するので注意(原因不明)
WSS
よくわからないので割愛
AMQP
RabbitMQ用?やれたらやる
MQTT
MQTTではPub/Sub方式しか対応できないため対象外
参考資料
- ActiveMQ公式ドキュメントのサンプル
- STOMP/RabbitMQでのSSLの実装サンプル