目的
ECSで稼働しているAPIサーバがJSON形式で出力したログがCloudWatchLogsに転送されている。転送されたログのうち必要なものだけを Redshiftに投入したい。
Kinesis Firehose から 直接Redshiftに投入できなかった
Amazon Redshift クラスターへの VPC アクセス
Amazon Redshift クラスターが Virtual Private Cloud (VPC) にある場合、パブリック IP アドレスでパブリックにアクセス可能である必要があります。また、Kinesis Data Firehose の IP アドレスをブロック解除して、Amazon Redshift クラスターへのアクセス権を Kinesis Data Firehose に付与する必要があります。現在、Kinesis Data Firehose は利用可能なリージョンごとに 1 つの CIDR ブロックを使用します。
上の要件があるので、投入先のRedshiftはパブリックなアクセスを受け付けないVPCに配置してあるので、Kinesis Firehose の要件をみたせません。そのため、Kinesis FirehoseはS3に出力し、Redshiftが接続を受け付ける VPCサブネットに配置したEC2インスタンスで稼働するシェルスクリプトが Kinesis FirehoseがS3に出力したファイルを読み込んで、Redshiftに投入することにした。
Jenkinsを利用するのは既存の管理の仕組みの範囲内で行いたかったため。S3のファイルをRedshiftに投入するには別の方法もあるのは理解している。また、AWS Guleも 使ったことがないので面倒 不勉強なので運用に乗らないのでjqで加工することにした。
扱うリソース
- CloudWatchLogs
- ロググループ
- IAMロール (Kinesis Firehoseに書き込める)
- Kinesis Firehose
- ストリーム (CloudWatchLogsのフィルターに合致したログが書き込まれる)
- IAMロール (Kinesisのストリームを読み込めて、S3にPUTできる)
- S3
- JSONファイル(Kinesis Firehoseが読み込んだストリームの内容が書き込まれている)
- CSVファイル(JSONを加工して、RedshiftでCOPYコマンドでテーブルに投入する)
- イベント( S3のPUTイベントをSQSにエンキューする)
- SQS
- キュー (S3のPUTイベントがエンキューされる)
- メッセージ ( S3にPUTされたオブジェクトの情報)
- アクセス許可( S3のバケットのイベントでエンキューを許可する)
- EC2
- Redshiftが接続を受け付けるネットワークにあり、Jenkisが稼働している
- Redshift
- インスタンス情報( ホスト名 / DB名 / ユーザ名とパスワード )
- テーブル
設定手順
CloudWatchLogsへのログ出力とRedshiftおよびJenkinsの設定が完了している状態で、主にKinesis Firehose 関係を設定する。
Kinesis Firehoseのストリーム作成
GUIを使ってポチポチした。
Step1: Name and source
- ストリーム名: example-firehose-stream
Step2: Process record
Step3: Choose destination
- バケット名: example-bucket
- S3 prefix: app-logs/from_CloudWatchLogs/
- S3 error prefix: app-logs/from_CloudWatchLogs-error/
Step4: Configure settings
Kinesis Firehose 自体の設定を行う。
- S3 Buffer conditions
- S3に書き込むバッファの大きさ
- Mbyte単位の容量と秒単位の書き込み間隔を指定する
- この値はAWS利用料金と更新頻のトレードオフになる
- 今回はデフォルト値のまま
- S3 compression and encryption
- 圧縮とKMSを利用した暗号化を指定する
- 圧縮については、gzip レベル6で圧縮されているので、Kinesis Firehoseで圧縮する必要がない。 (https://docs.aws.amazon.com/ja_jp/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#FirehoseExample)
- 今回はどちらも利用しないので、 Disabled にした
- Error logging
- Kinesis Firenoseでエラーが発生したらログの書き込みを行うかを指定する
- 書き込み先は Step3 の S3 error prefix が使われる
- 今回は Enabled にした
- Tags(optional)
- お好みで
- IAM role
- 上記の IAMロール (Kinesisのストリームを読み込めて、S3にPUTできる) を指定する。
- 「Create new or choose」をクリックして、新規にIAMロールを作成するか、既存の条件を満たすIAMをロールを選択する。
- 今回は作成した。
Step5: Preview
これまでに指定した値のプレビューが表示されるので、確認する。修正が必要であれば、 Edit ボタンで修正画面に遷移する
Kinesis Firehoseの完成を待つ
作成完了するのを待つ。
SQS
キューを作成
AWSコンソールから標準キューを作成する。
- キュー名: put-s3-event-queue
S3のPutイベントで書き込める権限
作成したキューの アクセス許可 タブの ポリシードキュメントの編集 で権限設定を行う
{
"Version": "2012-10-17",
"Id": "arn:aws:sqs:ap-northeast-1:123456789012:put-s3-event-queue/SQSDefaultPolicy",
"Statement": [
{
"Sid": "allow enqeue from s3 bucket of example-bucket",
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": "SQS:SendMessage",
"Resource": "arn:aws:sqs:ap-northeast-1:123456789012:put-s3-event-queue",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:s3:*:*:example-bucket"
}
}
}
]
}
S3バケットにPUTイベントを設定
Eventsに追加
AWS Consoleで example-bucket バケットを選択して、プロパティの Events を追加。下の値を設定した。
- 名前: 任意の名前
- イベント: PUT だけをチェックする
- プレフィックス: app-logs/from_CloudWatchLogs/ (Kinesis Firehoseの 「S3 prefix」と同じにした)
- サフィックス: 空白
- 通知先: SQSキュー
- SQS: put-s3-event-queue (上で作成した標準キュー)
CloudWatch Logsのサブスクリプションフィルタ
AWS Consoleでは指定できないので、CLIで行う。
上記の 「IAMロール (Kinesis Firehoseに書き込める) 」を作成
まずは、 TrustPolicyForCWL.json
に 信頼ポリシーを記載する
{
"Statement": {
"Effect": "Allow",
"Principal": { "Service": "logs.ap-northeast-1.amazonaws.com" },
"Action": "sts:AssumeRole"
}
}
それを利用して、IAMロールを作成する。
$ aws iam create-role \
--role-name CWLtoKinesisFirehoseRole \
--assume-role-policy-document file://./TrustPolicyForCWL.json
次に、PermissionsForCWL.json
に権限ポリシーを記載する
{
"Statement":[
{
"Effect":"Allow",
"Action":["firehose:*"],
"Resource":["arn:aws:firehose:ap-northeast-1:123456789012:deliverystream/example-firehose-stream"]
},
{
"Effect":"Allow",
"Action":["iam:PassRole"],
"Resource":["arn:aws:iam::123456789012:role/CWLtoKinesisFirehoseRole"]
}
]
}
そして、作成したIAMロール CWLtoKinesisFirehoseRole
にポリシーをアタッチする
$ aws iam put-role-policy --role-name CWLtoKinesisFirehoseRole \
--policy-name Permissions-Policy-For-CWL \
--policy-document file://./PermissionsForCWL.json
CloudWatch Logsのロググループにサブスクリプションフィルターを設定
$ aws logs put-subscription-filter \
--log-group-name "my-app-group" \
--filter-name "to-redshift-only-filter" \
--filter-pattern '{$.context.log_name = "specific-log"}' \
--destination-arn "arn:aws:firehose:ap-northeast-1:123456789012:deliverystream/example-firehose-stream" \
--role-arn "arn:aws:iam::123456789012:role/CWLtoKinesisFirehoseRole"
パラメータの意味
- log-group-name
- 目的のロググループ名
- filter-name
- フィルター名
- filter-pattern
- ログストリームに出力されるログをフィルターする条件
- AWS Consoleであらかじめ確認しておく
- destination-arn
- Kinesis Firehoseのストリームのarn
- role-arn
- 上で作成したIAMロール
Jenkinsで実行するシェルスクリプト
Redshiftにアクセスできるsubnetに構築した Jenkins にジョブを登録する。スケジュールで定期的に実行する。
シェルで行なっていることは下の通り。
- SQSのキューをデキューしてメッセージを取得する
- メッセージがあれば、メッセージからKinesis FirehoseがS3に書き込んだファイルのパスを取得する
- S3のファイルを取得して、Redshiftの
COPY
コマンドで扱えるCSVに変換して、S3に上げ直す。この際に、Kinesis Firehoseの prefix とは異なるprefixにする。そうしないと、PUTイベントが無限ループする。 - Redshiftに接続してCOPYコマンドを実行して目的のテーブルにロードする
- SQSのキューからメッセージを削除する
ジョブのシェルスクリプト
#!/bin/bash
SQS_QUEUE_URL="https://sqs.ap-northeast-1.amazonaws.com/123456789012/put-s3-event-queue"
REDSHIFT_HOST="my-redshift-host"
REDSHIFT_DB_NAME="my_db"
REDSHIFT_USER="db_user"
# パスワードは . pgpass に書いてある
set -e -u
msg_file="/tmp/s3-to-redshift.sh.$$"
trap "rm -f ${msg_file}" EXIT
cmd="aws sqs receive-message --queue-url ${SQS_QUEUE_URL}"
echo $cmd
msg=$(eval $cmd > ${msg_file})
if [ ! -s "${msg_file}" ]
then
echo "no found message on SQS queue"
exit 1
fi
s3url=$(cat $msg_file | jq -r '.Messages[].Body' | jq -r '["s3:/", .Records[].s3.bucket.name, .Records[].s3.object.key] | join("/")' )
handle=$(cat $msg_file | jq -r '.Messages[].ReceiptHandle')
echo "ReceiptHandle=${handle}"
csvpath=$(echo ${s3url} | sed -e 's/from_CloudWatchLogs/to_redshift/')
echo "convert logfile. ${s3url} to ${tsvpath}"
aws s3 cp ${s3url} - \
|zcat \
| jq -r '.logEvents[].message' \
| jq -r '[ 〜いい感じに加工する〜 ] |@csv' \
| aws s3 cp - ${csvpath}
sql="copy integrated_user_log
from '${csvpath}'
iam_role 'arn:aws:iam::123456789012:role/redshift-aim-role'
FORMAT CSV
TIMEFORMAT 'auto'
\;
"
echo "run SQL ..."
echo $sql
echo $sql | psql -h ${REDSHIFT_HOST} -U ${REDSHIFT_USER} -d ${REDSHIFT_DB_NAME} -p 5439
echo "remove Queue message"
cmd="aws sqs delete-message --queue-url ${SQS_QUEUE_URL} --receipt-handle ${handle} "
echo $cmd
eval "$cmd"
パラメータ調整 - 料金と更新間隔のトレードオフ
Kinesis Firehoseの料金は 5Kbyte単位に切り上げされます。
料金は Amazon Kinesis Data Firehose に取り込まれたデータの量に基づきます。データの量は、このサービスに送信したデータレコードの数に、直近の 5 KB の倍数に切り上げた各レコードのサイズを乗算した値として計算されます。例えば、各データレコードが 42 KB の場合、Kinesis Data Firehose では、45 KB のデータ取り込みとしてそれぞれカウントされます。
そのため、S3に出力される1回あたりのファイルサイズが小さいと、過大な請求が発生する可能性があるので、 「Configure settings」で指定するバッファサイズを大きくして、インターバルを長くする方が料金を抑えることができます。
一方で、S3に出力される間隔が長くなるので、CloudWatchLogsに書き込まれてからRedshiftに反映されるまでのタイムラグが長くなります。
利用用途とデータ量ならびに更新頻度でバランスを取る必要があります。
参考記事
- CloudWatch LogsのログをS3へ【Kinesis Firehose編】
- CloudWatch Logs ユーザーガイド Amazon Kinesis Data Firehose のサブスクリプションフィルタ