2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【AWS CloudWatch Logs】boto3 や AWS CLI でログイベントを取得、書き込みする

Last updated at Posted at 2022-06-26

ログイベントを取得する

boto3 の get_log_events メソッドについて

boto3 の get_log_events メソッドは、ログイベントが多いと一度にすべてのログイベントを取得することができません。

デフォルトでは、この操作は、1MB の応答サイズに収まるだけのログイベントを返します(最大 10,000 ログイベント)。

ですので、トークンを指定することで残りのログイベントを順次取得していく必要があります。このトークンは get_log_events メソッドの戻り値から取り出すことができます。

トークンを使用する場合、ログを古い順に取得した方がわかりやすいと思います。

古い順にログを取得したい場合、startFromHeadtrue にして、nextForwardTokenget_log_events メソッドの nextToken 引数に渡します。

この値がtrueの場合、最も古いログイベントが最初に返されます。この値がfalseの場合、最新のログ・イベントが最初に返されます。デフォルト値はfalseです。
この操作で以前の nextForwardToken 値を nextToken として使用する場合、startFromHead に true を指定する必要があります。

  • サンプルコード
def _get_custom_log_events(log_group_name, log_stream_name, next_token=''):
    if next_token == '':
        # 最も古いログを取得するときには next_token が渡されていないので、nextToken には何も指定しない
        response = client.get_log_events(
            logGroupName=log_group_name,
            logStreamName=log_stream_name,
            startFromHead=True
        )
    else:
        # next_token が渡されている場合、nextToken にそれを渡して続きのログイベントを取得する
        response = client.get_log_events(
            logGroupName=log_group_name,
            logStreamName=log_stream_name,
            startFromHead=True,
            nextToken=next_token
        )
    return response['events'], response['nextForwardToken']

def get_custom_log_events(log_group_name, log_stream_name):
    next_token = ''
    # 最初に最も古いログイベントを取得し、
    # その後はログイベントを取得できなくなったところで終了する
    log_events, next_token = _get_custom_log_events(
        log_group_name, log_stream_name, next_token=next_token)
    while len(log_events) > 0:
        # 最新のログイベントを取得するまでループする
        print(f'ログイベントの件数: {len(log_events)}')
        print(f"ログイベントの最初の時刻: {change_milli_to_datetime(log_events[0]['timestamp'], is_jst=True)}")
        log_events, next_token = _get_custom_log_events(
            log_group_name, log_stream_name, next_token=next_token)
  • 出力結果

以下の出力結果を見ると、ログイベントを古い順に取得していること、最大でも一度に取得できるログイベントの件数は 10,000 件であることがわかります。

ログイベントの件数: 10000
ログイベントの最初の時刻: 2022-06-25 09:52:38
ログイベントの件数: 10000
ログイベントの最初の時刻: 2022-06-25 10:40:56
ログイベントの件数: 6351
ログイベントの最初の時刻: 2022-06-25 11:16:58

エポックミリ秒について

boto3 の get_log_events で返される timestamp はエポックミリ秒になっています。これだと直感的にわかりにくいので、JST(日本時間)に変換するようにしています。この変換は change_milli_to_datetime メソッドで行っています。

def change_milli_to_datetime(milli_seconds, is_jst=False):
    if is_jst:
        utc_datetime = datetime.datetime.fromtimestamp(milli_seconds / 1000)
        return_datetime = utc_datetime + datetime.timedelta(hours=9)
    else:
        return_datetime = datetime.datetime.fromtimestamp(milli_seconds / 1000)
    return return_datetime.strftime('%Y-%m-%d %H:%M:%S')

aws logs get-log-events について

AWS CLI を使って指定したロググループ、ログストリームのログイベントを取得します。

$ aws logs get-log-events \
    --log-group-name my-log-group \
    --log-stream-name my-log-stream

出力結果は次のようになります。

{
    "events": [
        {
            "timestamp": 1656222984995,
            "message": "log event 1",
            "ingestionTime": 1656222986785
        },
        {
            "timestamp": 1656222996916,
            "message": "log event 2",
            "ingestionTime": 1656222997203
        }
    ],
    "nextForwardToken": "f/36935007046170486061077520900542879420822171613674733568/s",
    "nextBackwardToken": "b/36935006780323302549393962417700849724412482460423356416/s"
}

ログ形式に変換するには jq コマンドを使用します。

$ aws logs get-log-events \
    --log-group-name my-log-group \
    --log-stream-name my-log-stream \
    | jq -r '.events[] | [(.timestamp / 1000 | strftime("%Y-%m-%d %H:%M:%S")), (."message")] | @tsv'

出力結果は次のようになります。

2022-06-26 05:56:24     log event 1
2022-06-26 05:56:36     log event 2

--limit オプションの説明に、1 回のコマンド実行で取得できるログイベントの最大件数が記載されています。こちらも boto3 と同様に 10,000 件が最大です。

返されるログイベントの最大数です。値を指定しない場合、応答サイズ1MBに収まる最大ログイベント数で、最大10,000ログイベントまでとなります。

ですので、次の残りのログを取得するにはトークンを使用します。以下ではトークンを取得しています。トークンは 24 時間で有効期限が切れます。以下では 24 時間以内に同一のトークンを指定した場合、同じトークンを返していることを確認しています。

$ aws logs get-log-events --log-group-name my-log-group --log-stream-name test-log-stream-5 --start-from-head | jq '.nextForwardToken'
"f/36932738147926504953624927261640687377691913435615789056/s"
$ aws logs get-log-events --log-group-name my-log-group --log-stream-name test-log-stream-5 --start-from-head | jq '.nextForwardToken'
"f/36932738147926504953624927261640687377691913435615789056/s"
$ aws logs get-log-events --log-group-name my-log-group --log-stream-name test-log-stream-5 --start-from-head | jq '.nextForwardToken'
"f/36932738147926504953624927261640687377691913435615789056/s"

トークンを使用してログイベントをすべて取得する

ログイベントをすべて取得したことを知るには、連続した呼び出しに対するトークンを比較します。

ストリームの終端に達した場合は、渡したのと同じトークンが返されます。

これを while 文の条件式としてサンプルコードを作成しました。

  • 使用例
$ sh get_log_events.sh [ロググループ名] [ログストリーム名]
  • サンプルコード
get_log_events.sh
function get_log_events_with_token () {
  # トークンを使用して次の 10,000 件までのログイベントを取得する
  log_group_name=${1}
  log_stream_name=${2}
  output_log_file=${3}
  next_token=${4}
  aws logs get-log-events \
    --log-group-name ${log_group_name} \
    --log-stream-name ${log_stream_name} \
    --start-from-head \
    --next-token ${next_token} \
    | jq -r '.events[] | [(.timestamp / 1000 | strftime("%Y-%m-%d %H:%M:%S")), (."message")] | @tsv' \
  >> ${output_log_file}
}

function get_log_events () {
  # 10,000 件までの最初のログイベントを取得する
  log_group_name=${1}
  log_stream_name=${2}
  output_log_file=${3}
  aws logs get-log-events \
    --log-group-name ${log_group_name} \
    --log-stream-name ${log_stream_name} \
    --start-from-head \
    | jq -r '.events[] | [(.timestamp / 1000 | strftime("%Y-%m-%d %H:%M:%S")), (."message")] | @tsv' \
  >> ${output_log_file}
}

function get_next_token_with_token () {
  # トークンを使用して次のログイベントを取得し、その中に含まれるトークンを取得する
  log_group_name=${1}
  log_stream_name=${2}
  next_token=${3}
  aws logs get-log-events \
    --log-group-name ${log_group_name} \
    --log-stream-name ${log_stream_name} \
    --start-from-head \
    --next-token ${next_token} \
    | jq '.nextForwardToken'
}

function get_next_token () {
  # 最初のログイベントに含まれるトークンを取得する
  log_group_name=${1}
  log_stream_name=${2}
  aws logs get-log-events \
    --log-group-name ${log_group_name} \
    --log-stream-name ${log_stream_name} \
    --start-from-head \
    | jq '.nextForwardToken'
}

# ロググループ名、ログストリーム名をそれぞれ第一引数、第二引数として渡す。
log_group_name=${1}
log_stream_name=${2}
output_log_file="${log_stream_name}.log"

if [ "${log_group_name}" = '' ] || [ "${log_stream_name}" = '' ]; then
  echo 'arg1: Log group name'
  echo 'arg2: Log stream name'
  exit 1
fi

next_forward_token='not set next token'
prev_token='not set prev token'

# 連続した get-log-events に対するトークンを比較。一致している場合終了する。
while [ "${prev_token}" != "${next_forward_token}" ];
do
  echo "next_forward_token: ${next_forward_token}  prev_token: ${prev_token}"
  prev_token=${next_forward_token}
  if [ "${next_forward_token}" != 'not set next token' ]; then
    get_log_events_with_token ${log_group_name} ${log_stream_name} ${output_log_file} ${next_forward_token}
    next_forward_token=`get_next_token_with_token ${log_group_name} ${log_stream_name} ${next_forward_token} | sed 's/"//g'`
  else
    get_log_events ${log_group_name} ${log_stream_name} ${output_log_file}
    next_forward_token=`get_next_token ${log_group_name} ${log_stream_name} | sed 's/"//g'`
  fi
done
  • 出力結果
next_forward_token: not set next token    prev_token: not set prev token
next_forward_token: f/36932738147926504953624927261640687377691913435615789056/s    prev_token: not set next token
next_forward_token: f/36932786362137624176832161876448689378099746692664131584/s    prev_token: f/36932738147926504953624927261640687377691913435615789056/s
next_forward_token: f/36933034034213799057932785197430955409514490627738238976/s    prev_token: f/36932786362137624176832161876448689378099746692664131584/s
next_forward_token: f/36933034034247250173728437345182295516585964633153568767/s    prev_token: f/36933034034213799057932785197430955409514490627738238976/s

ログイベントを書き込む

以下はログイベントを書き込むサンプルコードです。ログイベントを書き込むには boto3 の put_log_events メソッドを使用します。

import json
import time
import datetime
import boto3

client = boto3.client('logs')

def get_log_stream_token(log_group_name, log_stream_name):
    # ログイベントがログストリームに存在する場合、書き込むを行うにはログストリームのトークンが必要なのでそれを取得する
    response = client.describe_log_streams(
        logGroupName=log_group_name,
        logStreamNamePrefix=log_stream_name,
    )
    log_streams = response['logStreams']
    log_stream = log_streams[0]
    log_stream_token = log_stream['uploadSequenceToken']
    return log_stream_token

def put_custom_log_event(log_group_name, log_stream_name, log_stream_token):
    # ログストリームのトークンを元にログイベントを書き込む
    response = client.put_log_events(
        logGroupName=log_group_name,
        logStreamName=log_stream_name,
        logEvents=[
            {
                'timestamp': int(time.time()) * 1000,
                'message': 'test'
            },
        ],
        sequenceToken=log_stream_token
      )

def lambda_handler(event, context):
    log_group_name = 'my-log-group'
    log_stream_name = 'my-log-stream'
    log_stream_token= get_log_stream_token(log_group_name, log_stream_name)
    put_custom_log_event(log_group_name, log_stream_name, log_stream_token)

boto3 の put_log_events メソッドについて

このメソッドは、指定したログストリームに既存のログイベントがあると次のエラーで失敗します。トークンが不正であるというエラーなので、put_log_events の引数である sequenceToken にトークンを渡す必要があります。

An error occurred (InvalidSequenceTokenException) when calling the PutLogEvents operation: The given sequenceToken is invalid.

ただし指定したログストリームにログイベントが何も含まれていなければ成功します。

前のPutLogEvents呼び出しの応答から取得したシーケンス・トークン。新しく作成されたログ ストリームでのアップロードには、シーケンス トークンは必要ありません。また、DescribeLogStreams を使用して、シーケンス・トークンを取得することもできます。

上記の説明では、「PutLogEvents呼び出しの応答から取得したシーケンス・トークン」とありますが、「DescribeLogStreams を使用して、シーケンス・トークンを取得する」ことが可能です。

ですので、サンプルコードではまず get_log_stream_token メソッドでログストリームのトークンを取得します。このメソッドの内部では、boto3 の describe_log_streams メソッドを使用しています。

このトークンですが、describe_log_streams メソッドの戻り値の中の uploadSequenceToken を使用します。これを put_log_events メソッドの sequenceToken に渡します。

  • describe_log_streams メソッドの戻り値
{
    'logStreams': [
        {
            'logStreamName': 'string',
            'creationTime': 123,
            'firstEventTimestamp': 123,
            'lastEventTimestamp': 123,
            'lastIngestionTime': 123,
            'uploadSequenceToken': 'string',
            'arn': 'string',
            'storedBytes': 123
        },
    ],
    'nextToken': 'string'
}

put_log_events メソッドの sequenceToken に同じ値を用いる場合

例えば、以下のように for 文で put_log_events をループすると失敗します。

def put_custom_log_event(log_group_name, log_stream_name, log_stream_token):
    for i in range(10):
        response = client.put_log_events(
            logGroupName=log_group_name,
            logStreamName=log_stream_name,
            logEvents=[
                {
                    'timestamp': int(time.time()) * 1000,
                    'message': 'test'
                },
            ],
            sequenceToken=log_stream_token
        )

上記の実行結果、以下のエラーが発生しました。

An error occurred (DataAlreadyAcceptedException) when calling the PutLogEvents operation: The given batch of log events has already been accepted

ドキュメントには以下のように記載があります。

sequenceTokenに同じ値を使用して狭い時間内に2回PutLogEventsを呼び出すと、両方の呼び出しが成功する場合もあれば、1つの呼び出しが拒否される場合もあります。

これを回避するために、put_log_events メソッドの戻り値の nextSequenceTokensequenceToken に渡します。

def put_custom_log_event(log_group_name, log_stream_name, log_stream_token):
    next_token = ''
    for i in range(10):
        if next_token == '':
            # next_token にトークンがセットされていない場合、log_stream_token を使用する
            response = client.put_log_events(
                logGroupName=log_group_name,
                logStreamName=log_stream_name,
                logEvents=[
                    {
                        'timestamp': int(time.time()) * 1000,
                        'message': f'test {i}'
                    },
                ],
                sequenceToken=log_stream_token
            )
        else:
            # next_token にトークンがセットされている場合、next_token を使用する
            response = client.put_log_events(
                logGroupName=log_group_name,
                logStreamName=log_stream_name,
                logEvents=[
                    {
                        'timestamp': int(time.time()) * 1000,
                        'message': f'test {i}'
                    },
                ],
                sequenceToken=next_token
            )
        # next_token をセット
        next_token = response['nextSequenceToken']

ログイベントを時間指定して取得する

get_log_eventsstartTime または endTime にエポックミリ秒を渡すと範囲を指定してログイベントを取得することができます。

エポックミリ秒だとわかりにくいので、'%Y-%m-%d %H:%M:%S' の形式で渡せると良いと思います。

def change_datetime_to_milli(string_time):
    dtime = datetime.datetime.strptime(string_time, '%Y-%m-%d %H:%M:%S') - datetime.timedelta(hours=9)
    milli_seconds = int(time.mktime(dtime.timetuple()) * 1000)
    return milli_seconds

Step Functions で Lambda を呼び出してログを書き込む

boto3 の get_log_events メソッドが、10,000件を超えてログを取得できないことを確認するために、大量のログイベントを生成しました。しかし、一度に 10,000 件のログを書き込もうとするとスロットリングエラーや Lambda の実行時間制限によりうまくいかなかったので、Step Functions を使用しました。

  1. Lambda Invoke では、ログイベントを書き込む Lambda 関数を実行する。
  2. Choice Continue では、指定した回数よりも多く Lambda 関数を呼び出したか確認する。
  3. 指定した回数よりも多く Lambda 関数を呼び出した場合、Before End Process に進む(処理終了)。そうでない場合、Lambda Invoke に進む。
  4. 処理終了

image.png

  • ステートマシンの定義

※以下のステートマシンの定義では、1 回の処理で 50 件ログイベントを書き込む Lambda 関数を 200 回呼び出していますが、スロットリングエラーが発生しました。3 回ステートマシンを実行し、83回目の Lambda 関数実行時、83回目の Lambda 関数実行時、192回目の Lambda 関数実行時にスロットリングエラーが発生しました。この 3 回のステートマシン実行後、ログイベントの件数が 10,000 件を超えたので良しとしました。

{
  "Comment": "A description of my state machine",
  "StartAt": "Lambda Invoke",
  "States": {
    "Lambda Invoke": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:ap-northeast-1:ACCOUNT_ID:function:create-log-event:$LATEST"
      },
      "Next": "Choice Continue"
    },
    "Choice Continue": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.Count",
          "NumericGreaterThan": 200,
          "Next": "Before End Process"
        }
      ],
      "Default": "Lambda Invoke"
    },
    "Before End Process": {
      "Type": "Pass",
      "Result": "This flow will end.",
      "ResultPath": "$.Result",
      "End": true
    }
  }
}

サンプルコード

今回作成したサンプルコードは以下に置きました。

参考資料

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?