Amazon SQSへのキューイングの際、直接SQSを使っても良いですが、SNSを使うと抽象度が高くなり捗ることもあります。
CloudWatchを使ってログを残すこともできますし、便利です。
http://dev.classmethod.jp/cloud/aws/sns-topic-should-be-placed-behind-sqs-queue/
http://docs.aws.amazon.com/ja_jp/sns/latest/dg/SendMessageToSQS.html
そんなSNSトピックとSQSキュー、マネージドコンソールから作っても良いのですが、SDKを使って自動化することでデプロイを自動化したり、環境を作り直すときに便利です。
今回Ruby SDK V2での作り方を調べたのでそのメモです。
コード
sqs = Aws::SQS::Client.new
sns = Aws::SNS::Client.new
# 通知先SNSトピック作成
resp = sns.create_topic(
name: 'topic_name', # required
)
topic_arn = resp.topic_arn
# 通知先キュー作成
resp = sqs.create_queue(
queue_name: 'quque_name', # required
attributes: {
'ReceiveMessageWaitTimeSeconds': '10',
},
)
queue_url = resp.queue_url
# トピックへのメッセージを作成した通知先キューへ送るように購読の設定
# キューのARNを取得
resp = sqs.get_queue_attributes(
queue_url: queue_url,
attribute_names: %w(QueueArn)
)
queue_arn = resp.attributes['QueueArn']
# 取得したARNを使って購読申込
resp = sns.subscribe(
topic_arn: topic_arn, # required
protocol: 'sqs', # required
endpoint: queue_arn
)
subscription_arn = resp.subscription_arn
# 今回はSNSによるメタ情報は不要なので送信したメッセージをそのままキューに送る
sns.set_subscription_attributes(
subscription_arn: subscription_arn, # required
attribute_name: 'RawMessageDelivery',
attribute_value: 'true'
)
# SNSトピックからSQSキューへのメッセージの追加を許可
# http://docs.aws.amazon.com/ja_jp/sns/latest/dg/SendMessageToSQS.html#SendMessageToSQS.sqs.permissions
policy = {
'Version': '2012-10-17',
'Statement': [
{
'Sid':'NotificationsToSQS',
'Effect':'Allow',
'Principal': '*',
'Action':'sqs:SendMessage',
'Resource': queue_arn,
'Condition':{
'ArnEquals':{
'aws:SourceArn': topic_arn
}
}
}
]
}
sqs.set_queue_attributes(
queue_url: queue_url,
attributes: {
'Policy': policy.to_json
}
)
解説
順を追って見ていきます。
SNSトピックの作成
キューに入れるメッセージの送り先にするSNSトピックを作ります。
これにはAws::SNS::Client#create_topic
を使います。
# 通知先SNSトピック作成
resp = sns.create_topic(
name: 'topic_name', # required
)
topic_arn = resp.topic_arn
レスポンスに作成したトピックのARNがあるので取っておきます。Aws::SNS::Clientの各種操作で使います。
なお、このメソッドはすでにその名前のトピックがあるときは、既存のトピックを返すので、冪等です。
SQSキューの作成
実際にメッセージを蓄積するSQSキューを作ります。
これにはAws::SQS::Client#create_queue
です。
# 通知先キュー作成
resp = sqs.create_queue(
queue_name: 'quque_name', # required
attributes: {
'ReceiveMessageWaitTimeSeconds': '10', # ロングポーリングを使うなら設定
},
)
queue_url = resp.queue_url
レスポンスにはQueueのURLが入っています。Aws::SQS::Clientの各種操作で使います。
SNSトピックへのSQSキューの購読を設定
「さっそく購読していくかい?」
「せっかく作ったんだから 購読しなきゃ意味がねえやな」
というわけで、購読の設定をします。
購読は、Aws::SNS::Client#subscribe
を使います。
これにはSQSキューのARN(URLではない)が必要になので、まずAws::SQS::Client#get_queue_attributes
でキューのURLからARNを取得します。
# トピックへのメッセージを作成した通知先キューへ送るように購読の設定
# キューのARNを取得
resp = sqs.get_queue_attributes(
queue_url: queue_url,
attribute_names: %w(QueueArn)
)
queue_arn = resp.attributes['QueueArn']
続いて、Aws::SNS::Client#subscribe
です。
# 取得したARNを使って購読申込
resp = sns.subscribe(
topic_arn: topic_arn, # required
protocol: 'sqs', # required
endpoint: queue_arn
)
subscription_arn = resp.subscription_arn
SQSは確認(Aws::SNS::Client#confirm_subscription
)は不要なので、これで購読の登録はできました。
SNSトピックのRawMessageDeliveryオプションを設定
今回、SQSへの投入窓口としてSNSを使うにあたり、SNSのメタ情報は不要なので、RawMessageDeliveryオプションを設定しました。
【AWS発表】 Amazon SQSとSNSのペイロードを最大256KBに拡張。生のメッセージも利用可能に
http://aws.typepad.com/aws_japan/2013/06/larger-payloads-256-kb-for-amazon-sqs-and-sns.html
# 今回はSNSによるメタ情報は不要なので送信したメッセージをそのままキューに送る
sns.set_subscription_attributes(
subscription_arn: subscription_arn, # required
attribute_name: 'RawMessageDelivery',
attribute_value: 'true'
)
SNSトピックからのSQSキューへのアクセス許可
SNSトピックのSubcriberにSQSキューを登録するだけでは、SQSキューにメッセージを入れようとした際に、権限不足でエラーになります。
これはSNSにメッセージを送信したクライアントからはわからないので、気付きにくいです。
忘れずメッセージ送信許可を行います。
Amazon SQS キューにメッセージを送信する許可を Amazon SNS トピックに付与する
http://docs.aws.amazon.com/ja_jp/sns/latest/dg/SendMessageToSQS.html#SendMessageToSQS.sqs.permissions
これには、Aws::SQS::Client#set_queue_attributes
で、ポリシードキュメントを設定します。
# SNSトピックからSQSキューへのメッセージの追加を許可
# http://docs.aws.amazon.com/ja_jp/sns/latest/dg/SendMessageToSQS.html#SendMessageToSQS.sqs.permissions
policy = {
'Version': '2012-10-17',
'Statement': [
{
'Sid':'NotificationsToSQS',
'Effect':'Allow',
'Principal': '*',
'Action':'sqs:SendMessage',
'Resource': queue_arn,
'Condition':{
'ArnEquals':{
'aws:SourceArn': topic_arn
}
}
}
]
}
sqs.set_queue_attributes(
queue_url: queue_url,
attributes: {
'Policy': policy.to_json
}
)
動作確認
AWSマネジメントコンソールで各種リソースが作成・設定されていることを確認します。
SNSのトピック詳細画面から「Publish」して、SQSキューにメッセージが送信されたら成功です。デバッグにはDelivery Status設定が便利です。
[Amazon SNS] 配送ステータスが CloudWatch で確認できるようになりました!
http://dev.classmethod.jp/cloud/aws/sns-delivery-status-feature/
メッセージ投入のコード
SNSトピックのARNを使ってAws::SNS::Client#publish
します。トピック名から直接はダメで先にARNを取得しておかないといけないのがちょっと面倒。
sns = Aws::SNS::Client.new
sns.publish(
topic_arn: topic_arn,
message: {hoge: 'Fuga'}.to_json
)
あとはワーカー側でSQSからメッセージを取り出して必要な処理をしましょう。