Edited at

AWS CLI で定期的に AWS SQS からメッセージを取得する

AWS のマネージドサービス->SNS->SQS->一般的なログファイルでアラートを連携して、既存のログ監視ツールに読ませることになったのでサンプルを作ってみました。

OSは RHEL7.6 の想定なので jq など入っていません。

EPELリポジトリに jq あったので使って良いらしい。


sqs.sh

#!/bin/bash

set -eu

yyyymmdd=$(date +%F)
log=./log_$yyyymmdd
archive=./archive_$yyyymmdd
longPollingInterval=0
url=https://sqs.ap-northeast-1.amazonaws.com/1234567890/hoge
Filter=./filter.awk

# デキュー
# https://docs.aws.amazon.com/cli/latest/reference/sqs/
m=$(aws sqs receive-message --max-number-of-messages "10" --queue-url $url --wait-time-seconds $longPollingInterval)

# デキューの結果があるときは繰り返し
while $(echo $m | grep -E -q '[[:graph:]]'); do

# ラベリングしてログに保存、途中で加工前のデータをアーカイブとして保存。
echo $m \
| jq -r -c '.Messages[].Body' \
| tee -a ${archive} \
| awk -f ${Filter} \
> ${log}

# 削除(削除に失敗した場合はログ出力)
echo $m \
| jq -r -c '.Messages[]' \
| jq -r -c '{ Id: .MessageId, ReceiptHandle: .ReceiptHandle }' \
| jq -r -c -r '.' \
| xargs -0 -I@ aws sqs delete-message-batch -queue-url $url --entries "@" \
| jq -r -c '.Failed | fromjson? | .[]' \
| awk '{print strftime("%Y/%m/%d %H:%M:%S",systime()) ": メッセージ削除に失敗: " $0}' \
> ${log}

# もう一度デキューして残りが無いか確認する
m=$(aws sqs receive-message --max-number-of-messages "10" --queue-url $url --wait-time-seconds $longPollingInterval)
done
echo $(date '+%F %R:%S') cmpl >> $log


メッセージの中間処理には(case文より) awk の方が適していると思ったので処理を分けます。

function label(m){

print strftime("%Y/%m/%d %H:%M:%S",systime()) ":" m ": " $0
next
}
$0 ~ "pattern1" { label("pattern1") }
$0 ~ "pattern2" { label("pattern2") }
{ label("parse error") }

こいつを crond に登録して毎分動かします。大量のメッセージが来る想定ではないので、単体で毎分動かす程度で十分かと。今どきは systemdtimer を使うのかもしれませんが、よく分からないので。

PATH=/usr/bin:/usr/local/bin:$PATH

* * * * * /bin/bash /opt/sqs.sh

AWS SQS が落ちていたりすると /var/spool/mail/user に大量のエラーメールがたまる懸念がありますので注意が必要です。ローテーションに組み込んでおきましょう(MAILTO=""しても良いのですがエラーは追えたほうが良いので)。