1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

CloudWatch LogsのログをprivateなVPCにあるRedshiftに投入

Posted at

目的

ECSで稼働しているAPIサーバがJSON形式で出力したログがCloudWatchLogsに転送されている。転送されたログのうち必要なものだけを Redshiftに投入したい。

CloudWatchLogs2Redshift.png

Kinesis Firehose から 直接Redshiftに投入できなかった

Amazon Redshift クラスターへの VPC アクセス

Amazon Redshift クラスターが Virtual Private Cloud (VPC) にある場合、パブリック IP アドレスでパブリックにアクセス可能である必要があります。また、Kinesis Data Firehose の IP アドレスをブロック解除して、Amazon Redshift クラスターへのアクセス権を Kinesis Data Firehose に付与する必要があります。現在、Kinesis Data Firehose は利用可能なリージョンごとに 1 つの CIDR ブロックを使用します。

上の要件があるので、投入先のRedshiftはパブリックなアクセスを受け付けないVPCに配置してあるので、Kinesis Firehose の要件をみたせません。そのため、Kinesis FirehoseはS3に出力し、Redshiftが接続を受け付ける VPCサブネットに配置したEC2インスタンスで稼働するシェルスクリプトが Kinesis FirehoseがS3に出力したファイルを読み込んで、Redshiftに投入することにした。

Jenkinsを利用するのは既存の管理の仕組みの範囲内で行いたかったため。S3のファイルをRedshiftに投入するには別の方法もあるのは理解している。また、AWS Guleも 使ったことがないので面倒 不勉強なので運用に乗らないのでjqで加工することにした。

扱うリソース

  • CloudWatchLogs
    • ロググループ
    • IAMロール (Kinesis Firehoseに書き込める)
  • Kinesis Firehose
    • ストリーム (CloudWatchLogsのフィルターに合致したログが書き込まれる)
    • IAMロール (Kinesisのストリームを読み込めて、S3にPUTできる)
  • S3
    • JSONファイル(Kinesis Firehoseが読み込んだストリームの内容が書き込まれている)
    • CSVファイル(JSONを加工して、RedshiftでCOPYコマンドでテーブルに投入する)
    • イベント( S3のPUTイベントをSQSにエンキューする)
  • SQS
    • キュー (S3のPUTイベントがエンキューされる)
    • メッセージ ( S3にPUTされたオブジェクトの情報)
    • アクセス許可( S3のバケットのイベントでエンキューを許可する)
  • EC2
    • Redshiftが接続を受け付けるネットワークにあり、Jenkisが稼働している
  • Redshift
    • インスタンス情報( ホスト名 / DB名 / ユーザ名とパスワード )
    • テーブル

設定手順

CloudWatchLogsへのログ出力とRedshiftおよびJenkinsの設定が完了している状態で、主にKinesis Firehose 関係を設定する。

Kinesis Firehoseのストリーム作成

GUIを使ってポチポチした。

Step1: Name and source
![screencapture-ap-northeast-1-console-aws-amazon-firehose-home-2019-05-04-13_46_18.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/145742/87d129d8-8440-c3ce-406d-eadc5177d174.png)
ここでは、作成するストリーム名を指定する。
  • ストリーム名: example-firehose-stream
Step2: Process record
![screencapture-ap-northeast-1-console-aws-amazon-firehose-home-2019-05-04-13_46_35.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/145742/795b2b7e-da05-6ad1-4dee-c78d4bcda916.png)
必要なら、LambdaかAWS Guleでのデータ加工を行える。今回は使わない。
Step3: Choose destination
![screencapture-ap-northeast-1-console-aws-amazon-firehose-home-2019-05-06-14_16_00.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/145742/79768619-9feb-ca3f-9dac-89617ff6e4bc.png)
Kinesis Firehoseの出力先になるS3の設定。バケット名とプレフィックスを指定する。また、エラーが発生した情報を記載したファイルのプレフィックスを指定する。
  • バケット名: example-bucket
  • S3 prefix: app-logs/from_CloudWatchLogs/
  • S3 error prefix: app-logs/from_CloudWatchLogs-error/
Step4: Configure settings
![screencapture-ap-northeast-1-console-aws-amazon-firehose-home-2019-05-06-14_18_20.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/145742/8a376097-e73b-86b1-f9c8-4efacf5efdaf.png)

Kinesis Firehose 自体の設定を行う。

  • S3 Buffer conditions
    • S3に書き込むバッファの大きさ
    • Mbyte単位の容量と秒単位の書き込み間隔を指定する
    • この値はAWS利用料金と更新頻のトレードオフになる
    • 今回はデフォルト値のまま
  • S3 compression and encryption
  • Error logging
    • Kinesis Firenoseでエラーが発生したらログの書き込みを行うかを指定する
    • 書き込み先は Step3 の S3 error prefix が使われる
    • 今回は Enabled にした
  • Tags(optional)
    • お好みで
  • IAM role
    • 上記の IAMロール (Kinesisのストリームを読み込めて、S3にPUTできる) を指定する。
    • 「Create new or choose」をクリックして、新規にIAMロールを作成するか、既存の条件を満たすIAMをロールを選択する。
    • 今回は作成した。
Step5: Preview
![screencapture-ap-northeast-1-console-aws-amazon-firehose-home-2019-05-06-14_19_14.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/145742/fdf89a8d-1c2b-6c2f-e8fd-ab400fc99066.png)

これまでに指定した値のプレビューが表示されるので、確認する。修正が必要であれば、 Edit ボタンで修正画面に遷移する

Kinesis Firehoseの完成を待つ
![screencapture-ap-northeast-1-console-aws-amazon-firehose-home-2019-05-06-14_53_50.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/145742/920e7de2-fde6-1c1a-b9cc-ab76b10644b2.png)

作成完了するのを待つ。

SQS

キューを作成
![screencapture-console-aws-amazon-sqs-home-2019-05-06-14_26_04.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/145742/7f16c60d-bea8-037f-7aa2-d34ff6c07630.png)

AWSコンソールから標準キューを作成する。

  • キュー名: put-s3-event-queue
S3のPutイベントで書き込める権限

作成したキューの アクセス許可 タブの ポリシードキュメントの編集 で権限設定を行う

{
  "Version": "2012-10-17",
  "Id": "arn:aws:sqs:ap-northeast-1:123456789012:put-s3-event-queue/SQSDefaultPolicy",
  "Statement": [
    {
      "Sid": "allow enqeue from s3 bucket of example-bucket",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": "SQS:SendMessage",
      "Resource": "arn:aws:sqs:ap-northeast-1:123456789012:put-s3-event-queue",
      "Condition": {
        "ArnLike": {
          "aws:SourceArn": "arn:aws:s3:*:*:example-bucket"
        }
      }
    }
  ]
}

S3バケットにPUTイベントを設定

Eventsに追加
![screencapture-console-aws-amazon-s3-buckets-dev-kfc-jobs-batch-files-2019-05-06-14_29_21.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/145742/aa227c38-070e-05f2-6450-17230ba52bfc.png)

AWS Consoleで example-bucket バケットを選択して、プロパティの Events を追加。下の値を設定した。

  • 名前: 任意の名前
  • イベント: PUT だけをチェックする
  • プレフィックス: app-logs/from_CloudWatchLogs/ (Kinesis Firehoseの 「S3 prefix」と同じにした)
  • サフィックス: 空白
  • 通知先: SQSキュー
  • SQS: put-s3-event-queue (上で作成した標準キュー)

CloudWatch Logsのサブスクリプションフィルタ

AWS Consoleでは指定できないので、CLIで行う。

上記の 「IAMロール (Kinesis Firehoseに書き込める) 」を作成

まずは、 TrustPolicyForCWL.json に 信頼ポリシーを記載する

{
  "Statement": {
    "Effect": "Allow",
    "Principal": { "Service": "logs.ap-northeast-1.amazonaws.com" },
    "Action": "sts:AssumeRole"
  }
}

それを利用して、IAMロールを作成する。

$ aws iam create-role \
      --role-name CWLtoKinesisFirehoseRole \
      --assume-role-policy-document file://./TrustPolicyForCWL.json

次に、PermissionsForCWL.json に権限ポリシーを記載する

{
    "Statement":[
      {
        "Effect":"Allow",
        "Action":["firehose:*"],
        "Resource":["arn:aws:firehose:ap-northeast-1:123456789012:deliverystream/example-firehose-stream"]
      },
      {
        "Effect":"Allow",
        "Action":["iam:PassRole"],
        "Resource":["arn:aws:iam::123456789012:role/CWLtoKinesisFirehoseRole"]
      }
    ]
}

そして、作成したIAMロール CWLtoKinesisFirehoseRole にポリシーをアタッチする

$ aws iam put-role-policy --role-name CWLtoKinesisFirehoseRole \
    --policy-name Permissions-Policy-For-CWL \
    --policy-document file://./PermissionsForCWL.json

CloudWatch Logsのロググループにサブスクリプションフィルターを設定

$ aws logs put-subscription-filter \
 --log-group-name "my-app-group" \
 --filter-name "to-redshift-only-filter" \
 --filter-pattern '{$.context.log_name = "specific-log"}' \
 --destination-arn "arn:aws:firehose:ap-northeast-1:123456789012:deliverystream/example-firehose-stream" \
 --role-arn "arn:aws:iam::123456789012:role/CWLtoKinesisFirehoseRole"

パラメータの意味

  • log-group-name
    • 目的のロググループ名
  • filter-name
    • フィルター名
  • filter-pattern
    • ログストリームに出力されるログをフィルターする条件
    • AWS Consoleであらかじめ確認しておく
  • destination-arn
    • Kinesis Firehoseのストリームのarn
  • role-arn
    • 上で作成したIAMロール

Jenkinsで実行するシェルスクリプト

Redshiftにアクセスできるsubnetに構築した Jenkins にジョブを登録する。スケジュールで定期的に実行する。

シェルで行なっていることは下の通り。

  1. SQSのキューをデキューしてメッセージを取得する
  2. メッセージがあれば、メッセージからKinesis FirehoseがS3に書き込んだファイルのパスを取得する
  3. S3のファイルを取得して、Redshiftの COPY コマンドで扱えるCSVに変換して、S3に上げ直す。この際に、Kinesis Firehoseの prefix とは異なるprefixにする。そうしないと、PUTイベントが無限ループする。
  4. Redshiftに接続してCOPYコマンドを実行して目的のテーブルにロードする
  5. SQSのキューからメッセージを削除する
ジョブのシェルスクリプト
#!/bin/bash

SQS_QUEUE_URL="https://sqs.ap-northeast-1.amazonaws.com/123456789012/put-s3-event-queue"
REDSHIFT_HOST="my-redshift-host"
REDSHIFT_DB_NAME="my_db"
REDSHIFT_USER="db_user"
# パスワードは . pgpass に書いてある

set -e -u

msg_file="/tmp/s3-to-redshift.sh.$$"
trap "rm -f ${msg_file}" EXIT


cmd="aws sqs receive-message --queue-url ${SQS_QUEUE_URL}"
echo $cmd
msg=$(eval $cmd > ${msg_file})

if [ ! -s "${msg_file}" ]
then
    echo "no found message on SQS queue"
    exit 1
fi
s3url=$(cat $msg_file  | jq -r '.Messages[].Body' | jq -r  '["s3:/", .Records[].s3.bucket.name, .Records[].s3.object.key] | join("/")' )
handle=$(cat $msg_file | jq -r '.Messages[].ReceiptHandle')

echo "ReceiptHandle=${handle}"

csvpath=$(echo ${s3url} | sed -e 's/from_CloudWatchLogs/to_redshift/')
echo "convert logfile. ${s3url} to ${tsvpath}"
aws s3 cp ${s3url} - \
       	|zcat \
       	| jq  -r '.logEvents[].message' \
      	| jq  -r '[ 〜いい感じに加工する〜 ] |@csv' \
      	| aws s3 cp - ${csvpath}
	

sql="copy integrated_user_log
from '${csvpath}'
iam_role 'arn:aws:iam::123456789012:role/redshift-aim-role' 
FORMAT CSV 
TIMEFORMAT 'auto'
\;
"

echo "run SQL ..."
echo $sql
echo $sql | psql -h ${REDSHIFT_HOST} -U ${REDSHIFT_USER} -d ${REDSHIFT_DB_NAME} -p 5439

echo "remove Queue message"
cmd="aws sqs delete-message --queue-url ${SQS_QUEUE_URL} --receipt-handle ${handle} "
echo $cmd
eval "$cmd"

パラメータ調整 - 料金と更新間隔のトレードオフ

Kinesis Firehoseの料金は 5Kbyte単位に切り上げされます。

料金は Amazon Kinesis Data Firehose に取り込まれたデータの量に基づきます。データの量は、このサービスに送信したデータレコードの数に、直近の 5 KB の倍数に切り上げた各レコードのサイズを乗算した値として計算されます。例えば、各データレコードが 42 KB の場合、Kinesis Data Firehose では、45 KB のデータ取り込みとしてそれぞれカウントされます。

そのため、S3に出力される1回あたりのファイルサイズが小さいと、過大な請求が発生する可能性があるので、 「Configure settings」で指定するバッファサイズを大きくして、インターバルを長くする方が料金を抑えることができます。

一方で、S3に出力される間隔が長くなるので、CloudWatchLogsに書き込まれてからRedshiftに反映されるまでのタイムラグが長くなります。

利用用途とデータ量ならびに更新頻度でバランスを取る必要があります。

参考記事

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?