cloudpack あら便利カレンダー 2017 の21日目です!
Python使ってCloudwatch LogsからLambdaのログを取得していたのですが、
最近取得しようと思ったら、エラーになってしまい、ちょっと直してみた話です。
半年ぐらい前の話
以前、業務でCloudwatch LogsにあるLambdaのログをサーバー(EC2)に転送して、色々と調査しようと思って、
CloudWatchLogsからログを取得(期間指定してみた)
を参考に作って、開発中のシステムの調査に使っていました。
こちら非常に参考になりました。ありがとうございました!
この場を借りて御礼申し上げます。
久々に使ってみたら問題発生
しばらく使っていなかったのですが、
ユーザーから
ちょっとこの日のこの時間のログ取得してよ
と言われて、さすがにコンソールから一気に取るのは面倒だなぁー。
前に作ったの使うかー。と、久々に使ってみたのですが。。。
いつまでたっても終わらない。
1分指定にしても終わらない。
何でやねん?!と、ログを差し込んでみて、調べてみたら。。。
全期間のストリームから取ろうとしてる。。。
稼働して3ヶ月ぐらいのシステムで、それなりにログがあった状態で、
その全期間のログを総なめしてたもので、そりゃ時間かかるわ。っていう状態でした。
いざ改修
このままでは取得できないぞ・・・ということで、改修を決意し、いろいろ調べて、最終的にSDKのドキュメントを眺めていたら、
ログストリームを取得するメソッドの
describe_log_streams
の引数として、
log Stream NameをPrefix指定できることに気づきました。
[Boto 3 Documentation - describe_log_streams](https://boto3.readthedocs.io/en/latest/reference/services/logs.html#CloudWatchLogs.Client.describe_log_streams"Boto 3 Documentation - describe_log_streams")
Lambdaのログなら、ここに日付指定すれば、絞り込みができる!
ということで、実行時の引数に日付を追加(開発スピード重視)して、
describe_log_streamsの引数に追加する改修を実施し、
実行!・・・あっという間に終了(というほどあっという間ではないですが)しました。
改修後のソース
# ! /usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import os
import sys
import codecs
import boto3
import time
from datetime import datetime
from datetime import timedelta
sys.stdout = codecs.getwriter('utf-8')(sys.stdout)
log_group = sys.argv[1]
date = datetime.now().strftime("%Y%m%d-%H%M%S")
# 出力先ディレクトリ
log_directory_base ='./cwlogs'
slice = log_group[0:1]
if (slice == '/'):
log_directory = log_directory_base + "%s/%s" % (log_group, date)
else:
log_directory = log_directory_base + "/%s/%s" % (log_group, date)
def validate():
if(len(sys.argv) != 5):
print '[ ERROR ] Few arguments.'
print 'Please specify the argument.'
print 'First: LogGroupName, Second: StartTime, Third: EndTime, Fourth: profileName., Fifth: logPrefixName'
sys.exit()
try:
epoc = datetime(1970, 1, 1)
global starttime
starttime = int((datetime.strptime(sys.argv[2], '%Y-%m-%dT%H:%M:%S') - epoc).total_seconds()) * 1000
global endtime
endtime = int((datetime.strptime(sys.argv[3], '%Y-%m-%dT%H:%M:%S') - epoc).total_seconds()) * 1000
durationtime = int(endtime - starttime)
print 'durationtime "%s"' % durationtime
if 86400000 < durationtime:
print '[ ERROR ] Too long duration.'
print 'Max duration is 1 day.'
sys.exit()
global log_stream_name_prefix
log_stream_name_prefix = sys.argv[4]
except ValueError:
print '[ ERROR ] Failed StartTime or EndTime format.'
print 'Date Format is "%Y-%m-%dT%H:%M:%S".'
sys.exit()
def find_streams(token, log_stream_name_prefix):
def is_valid_stream(stream):
return starttime < stream.get('lastIngestionTime')
try:
if token is None:
data = cwlogs.describe_log_streams(logGroupName = log_group, logStreamNamePrefix = log_stream_name_prefix)
else:
data = cwlogs.describe_log_streams(logGroupName = log_group, logStreamNamePrefix = log_stream_name_prefix, nextToken = token)
except ValueError:
print '[ ERROR ] Failed LogGroupName is invalid.'
print '"%s" is not found.' % log_group
sys.exit()
streams = filter(is_valid_stream, data['logStreams'])
if 'nextToken' not in data:
return streams
time.sleep(0.5)
streams.extend(find_streams(data['nextToken'], log_stream_name_prefix))
return streams
def find_events(token, last_token, stream):
print 'logGroupName = "%s", logStreamName = "%s", startTime = "%s", endTime = "%s"' % (log_group, stream, starttime, endtime)
if token is None:
data = cwlogs.get_log_events(logGroupName = log_group, logStreamName = stream, startTime = starttime, endTime = endtime, startFromHead = True)
else:
data = cwlogs.get_log_events(logGroupName = log_group, logStreamName = stream, startTime = starttime, endTime = endtime, startFromHead = True, nextToken = token)
events = data['events']
if events:
write_logs(events, stream)
del events[:]
if data['nextForwardToken'] != last_token:
time.sleep(0.5)
find_events(data['nextForwardToken'], token, stream)
def write_logs(events, stream):
streams = stream.split(']');
if (len(streams) > 1):
# Lambdaのログ
with codecs.open("%s/%s.log" % (log_directory, streams[1]), "a", "utf-8") as f:
for e in events:
f.write("%s\n" % e['message'])
else:
# それ以外のログ
with codecs.open("%s/%s.log" % (log_directory, streams[0]), "a", "utf-8") as f:
for e in events:
f.write("%s\n" % e['message'])
def main():
validate()
global cwlogs
cwlogs = boto3.client('logs')
streams = [stream['logStreamName'] for stream in find_streams(None, log_stream_name_prefix)]
os.makedirs("%s/" %)
print(log_directory)
for stream in streams:
find_events(None, None, stream)
if __name__ == '__main__':
main()
# Lambdaの場合
python getCWLogs.py ロググループ名 開始日時(UTC) 終了日時(UTC) ログストリーム名先頭の日付
# Lambda以外(API Gatewayとか)の場合
python getCWLogs.py ロググループ名 開始日時(UTC) 終了日時(UTC) ログストリーム名
今の使い所
一定期間のLambdaのログ取得して、grepで検索したり、Excel(になっちゃうんですよね)で集計したいときに使ってます。
ちょっと久々に書いてみましたが、
他の方の役に立つのかな?的なネタでした。