LoginSignup
5
6

More than 3 years have passed since last update.

CloudWatch LogsのログをRedshiftやAthenaで分析する

Last updated at Posted at 2019-05-09

目的

CloudWatch Logsのログを分析したい。

Kinesis Firehoseを使えばS3に転送できるが、そのままだと{json}{json}のように1行に複数のJSONオブジェクトが保存されてしまう。
どうやら自力で
Base64デコード=>gunzip=>logEventsを取り出す=>中のmessageに改行文字を追加して結合=>Base64エンコードして返り値として渡す
という処理が必要らしい

というわけで順番にやっていきます

CloudWatch Logsの作成

Log Groupの作成

CloudWatchのLogsから
Actions -> Create log group
スクリーンショット 2019-05-09 18.13.41.png

Log Streamの作成

上で作ったLog GroupをクリックしてCreate Log Stram
スクリーンショット 2019-05-09 18.15.15.png

試しに流してみる

put-log-events.sh
#!/bin/bash

export AWS_DEFAULT_REGION=ap-northeast-1

LogGroupName="redshift-experiment"
LogStreamName="command-stream"

Profile=$1
Mess=$2

Mess=`echo $Mess | sed -e "s/'//g"`

UploadSequenceToken=$(aws --profile $Profile logs describe-log-streams --log-group-name "$LogGroupName" --query 'logStreams[?logStreamName==`'$LogStreamName'`].[uploadSequenceToken]' --output text)

TimeStamp=`gdate "+%s%N" --utc`
TimeStamp=`expr $TimeStamp / 1000000`

if [ "$UploadSequenceToken" != "None" ]
then
  aws --profile $Profile logs put-log-events --log-group-name "$LogGroupName" --log-stream-name "$LogStreamName" --log-events timestamp=$TimeStamp,message="$Mess" --sequence-token $UploadSequenceToken
else
  aws --profile $Profile logs put-log-events --log-group-name "$LogGroupName" --log-stream-name "$LogStreamName" --log-events timestamp=$TimeStamp,message="$Mess"
fi

事前にaws cliをインストールして設定しておいてください。
あと$ brew install coreutilsでgdateを使えるようにしておきましょう。
awsアカウントって大抵複数持ってるだろうから、、こちらのスクリプトをコピーしてprofile渡せるようにしたり、macで動くようにdateをgdateに変えた感じです。

$ ./put-log-events.sh default hello

でログが増えていきます。

Kinesis Streamの作成

cloudwatch-logs-streamというstremを作ります。
お金は大切なので出来るだけ小さく作りましょう。
スクリーンショット 2019-05-09 18.24.45.png

roleの作成

こちらがとても参考になります。

cwl-role.json
{
  "Statement": {
    "Effect": "Allow",
    "Principal": { "Service": "logs.ap-northeast-1.amazonaws.com" },
    "Action": "sts:AssumeRole"
  }
}
cwl-kinesis-write.json
{
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "kinesis:PutRecord",
      "Resource": "arn:aws:kinesis:ap-northeast-1:xxxx:stream/cloudwatch-logs-stream"
    },
    {
      "Effect": "Allow",
      "Action": "iam:PassRole",
      "Resource": "arn:aws:iam::xxxx:role/cwl-to-kinesis"
    }
  ]
}

$ aws iam create-role --role-name cwl-to-kinesis --assume-role-policy-document file://./cwl-role.json --profile default
$ aws iam put-role-policy --role-name cwl-to-kinesis --policy-name cwl-kinesis-write --policy-document file://./cwl-kinesis-write.json --profile default

サブスクリプションフィルタを作成

aws logs \
    put-subscription-filter \
    --region ap-northeast-1 \
    --log-group-name "redshift-experiment" \
    --filter-name "All" \
    --filter-pattern "" \
    --role-arn "arn:aws:iam::xxxx:role/cwl-to-kinesis" \
    --destination-arn "arn:aws:kinesis:ap-northeast-1:xxxx:stream/cloudwatch-logs-stream" \
    --profile default

S3へ保存

S3に保存するためにKinesis Firehoseの設定
cloudwatch-logs-delivery-streamを作ります

Amazon KinesisのData FirehoseでCreate Delivery Streamをクリック
Step1でSourceにさっき作ったKinesis Streamを選択

スクリーンショット 2019-05-09 18.45.46.png

Step2 デフィオルトでOK

Step3 DestinationでS3を選択保存先のbucketやprefixは適当につけておきましょう。

Step4は基本デフォルトでOK。ただしStep4のBuffer intervalだけ60Secondを指定しておなかないとログを流してもなかなか反映されません。
IAM roleは検証環境とかであれば新しく作ってしまいましょう。

これで作ってログを流すとs3にファイルが置かれますが、、、
gzipされている上に拡張子がない、しかも改行もない。
参考までにこんな感じです

{"messageType":"DATA_MESSAGE","owner":"xxx","logGroup":"redshift-experiment","logStream":"command-stream","subscriptionFilters":["All"],"logEvents":[{"id":"xxx","timestamp":1557395415730,"message":"hello1"}]}{"messageType":"DATA_MESSAGE","owner":"xxx","logGroup":"redshift-experiment","logStream":"command-stream","subscriptionFilters":["All"],"logEvents":[{"id":"xxx","timestamp":1557395420146,"message":"hello2"}]}

というわけでlambdaで変換します

import base64
import gzip
import io
import json

def lambda_handler(event, context):
    records = [process_record(r) for r in event['records']]
    return {'records': records }

def process_record(record):
    record_id = record['recordId']
    data = base64.b64decode(record['data'])
    iodata = io.BytesIO(data)

    with gzip.GzipFile(fileobj=iodata, mode='r') as f:
        data = json.loads(f.read())

    processed_data = process_data(data) + '\n'
    return {
        'data': base64.b64encode(processed_data.encode('utf-8')).decode('utf-8'),
        'result': 'Ok',
        'recordId': record_id
    }

def process_data(data):
    return '\n'.join([format_log_event(json) for json in data['logEvents']])

def format_log_event(j):
    return json.dumps({'timestamp': j['timestamp'], 'message': j['message']})

こんな感じのscriptをAWS Lambda create functionで適当な名前をつけて保存してあげて、、
あとはfirehoseのeditのTransform source records with AWS Lambdaで保存したlambdaを指定すれば変換されます。

lambda設定するとIAM roleでおこられるのでCreate new or updateクリック

これでいい感じのファイルがs3に置かれるのでAthena使えます。

{"timestamp": 1557401834708, "message": "hello"}
{"timestamp": 1557401837275, "message": "hello"}

Redshift使いたい場合は こちらのSourceを最初につくったKinesis StreamにすればOK

参考サイト
https://dev.classmethod.jp/cloud/aws/put-cloudwatchlogs/
https://qiita.com/tilfin/items/150cd899077d3c6b7edd

5
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
6