目的
AWS IoT案件でログを確認する機会が増えたので備忘録として
今回はローカルにログを保存します。
IoTコンソール使いにくい。。。。
利用サービス・言語
- AWS-Boto3
- Python3
tl;dr
- ローカルフォルダから最新のログを取得
- Boto3でログを順次取得←ここ大事
- ログを読み込んでファイル出力
処理
ローカルフォルダから最新のログを取得
再帰などの処理は入れていません、また、ファイル名が年月日始まりではない場合もエラーとなります。
ポイントというほどのものではないですが
- globとos.path.basenameでファイル一覧取得
- sortedでソート
は今後も覚えておこうと思います。
def Get_LastEventTime():
date = 0
# フォルダからファイル一覧を取得
# データを文字コード順にソート
filepaths = glob.glob(PATH + '*')
# なにもファイルがない場合は0を返す
if not filepaths:
return date
files = list()
for name in filepaths:
files.append(os.path.basename(name))
files = sorted(files, reverse=True)
# 一番新しいファイルの日付を取得
date = int(files[0][:14])
return date
Boto3でログを順次取得
describe_log_streamsでログストリームを取得します。
ポイントはdescribe_log_streamsを(orderBy='LastEventTime',limit=1,nextToken=token)でコールすることで降順に一つずつ読み込みます。
起動直後だけは最新のもの(nextTokenなし)で呼び出し、以降はtokenを引数にします。
# ログストリームの取得
# ローカルに保存されているファイルより新しいものだけ保存する
client = boto3.client('logs')
def get_log_streams():
response = dict()
response['nextToken'] = ''
LatestTime = Get_LastEventTime()
while(1):
if response['nextToken'] == '':
# トークンが無い場合は最新を取得
response = client.describe_log_streams(
logGroupName=GROUP_NAME, orderBy='LastEventTime',
descending=True, limit=1)
else:
# トークンがある場合はトークンを指定して取得
response = client.describe_log_streams(
logGroupName=GROUP_NAME, orderBy='LastEventTime',
descending=True, limit=1, nextToken=token)
infos = response['logStreams'][0]
name = infos['logStreamName']
# ログの最終イベント発生時刻
last_time = infos['lastEventTimestamp']
# UNIXTIMEになっているため、指定フォーマットの文字列に変換
eventtime = datetime.datetime.fromtimestamp(int(last_time) / 1000)
eventtime = eventtime.strftime("%Y%m%d%H%M%S")
print('ログ日時 : ファイルの更新日時 -> ', eventtime, LatestTime)
# 最新のeventtimeを取得
if LatestTime >= int(eventtime):
# 新しいファイルがない場合は終了する
return
# ソートしやすいようにイベント時間をPrefixにする
filename = eventtime + '_' + name
# Win環境でバグるので'/'を削除
filename = filename.replace('/', '')
# ファイルの書き込み
fp = open(PATH + filename, 'w')
# 次で書きます。
get_log_file(name, fp)
fp.close()
# 次のトークンが無ければ終了
if not ('nextToken' in response):
return
token = response['nextToken']
ログを読み込んでファイル出力
describe_log_streamsで取得してきたログストリーム名からログストリームを取得します。
get_log_eventsでログストリームを取得します、今回はフィルタもつけています。
# 取得するログのタグ
# Loggerクラス使用し、デバッグ用のタグを打っておく
filters = ['[Center]',
'[Terminal1]',
'[Terminal2]',
'[Terminal3]',
'[Terminal4]'
]
# ファイルのデータを読み込む
def get_log_file(stream_name, fp):
response = client.get_log_events(
logGroupName=GROUP_NAME,
logStreamName=stream_name,
startFromHead=True
)
for event in response['events']:
# 特定ワード(Debug用タグ)が含まれない場合は書き込みしない
if not filtering(log_msg):
continue
# 改行を削除
log_msg = event['message'].replace('\n', '')
try:
# 文字列が長くなるので中間に入っている
# Token?を除外して書き込み
log_msg = log_msg[:20] + log_msg[63:]
fp.write(log_msg + '\n')
except Exception as e:
# 例外はすべて適当に投げてます
fp.write("[Except!!]{0}".format(e) + '\n')
さいごに
数が膨大な量になるので日ごとに1ファイルにまとめたり、いい感じにフィルタかけたりしたいです。
以下今回のコードすべて
All.py
# -*- coding:utf-8 -*-
import boto3
import json
import datetime
import sys
import glob
import os
# LogInfo
GROUP_NAME = '/aws/lambda/hoge'
# Save DirPath
PATH = 'C:\\Users\\fuga'
client = boto3.client('logs')
# 取得するログのタグ
# Loggerクラス使用し、デバッグ用のタグを打っておく
filters = ['[Center]',
'[Terminal1]',
'[Terminal2]',
'[Terminal3]',
'[Terminal4]'
]
# ファイルのデータを読み込む
def get_log_file(stream_name, fp):
response = client.get_log_events(
logGroupName=GROUP_NAME,
logStreamName=stream_name,
startFromHead=True
)
for event in response['events']:
# 特定ワード(Debug用タグ)が含まれない場合は書き込みしない
if not filtering(log_msg):
continue
# 改行を削除
log_msg = event['message'].replace('\n', '')
try:
# 文字列が長くなるので中間に入っている
# Token?を除外して書き込み
log_msg = log_msg[:20] + log_msg[63:]
fp.write(log_msg + '\n')
except Exception as e:
# 例外はすべて適当に投げてます
fp.write("[Except!!]{0}".format(e) + '\n')
# ログストリームの取得
# ローカルに保存されているファイルより新しいものだけ保存する
def get_log_streams():
response = dict()
response['nextToken'] = ''
LatestTime = Get_LastEventTime()
while(1):
if response['nextToken'] == '':
# トークンが無い場合は最新を取得
response = client.describe_log_streams(
logGroupName=GROUP_NAME, orderBy='LastEventTime',
descending=True, limit=1)
else:
# トークンがある場合はトークンを指定して取得
response = client.describe_log_streams(
logGroupName=GROUP_NAME, orderBy='LastEventTime',
descending=True, limit=1, nextToken=token)
infos = response['logStreams'][0]
name = infos['logStreamName']
# ログの最終イベント発生時刻
last_time = infos['lastEventTimestamp']
# UNIXTIMEになっているため、指定フォーマットの文字列に変換
eventtime = datetime.datetime.fromtimestamp(int(last_time) / 1000)
eventtime = eventtime.strftime("%Y%m%d%H%M%S")
print('ログ日時 : ファイルの更新日時 -> ', eventtime, LatestTime)
# 最新のeventtimeを取得
if LatestTime >= int(eventtime):
# 新しいファイルがない場合は終了する
return
# ソートしやすいようにイベント時間をPrefixにする
filename = eventtime + '_' + name
# Win環境でバグるので'/'を削除
filename = filename.replace('/', '')
# ファイルの書き込み
fp = open(PATH + filename, 'w')
get_log_file(name, fp)
fp.close()
# 次のトークンが無ければ終了
if not ('nextToken' in response):
return
token = response['nextToken']
# 特定ワードが含まれない場合は0を返す
def filtering(msg):
flg = 0
for tag in filters:
if tag in msg:
flg = 1
return flg
def Get_LastEventTime():
date = 0
# フォルダからファイル一覧を取得
# データを文字コード順にソート
filepaths = glob.glob(PATH + '*')
# なにもファイルがない場合は0を返す
if not filepaths:
return date
files = list()
for name in filepaths:
files.append(os.path.basename(name))
files = sorted(files, reverse=True)
# 一番新しいファイルの日付を取得
date = int(files[0][:14])
return date
if __name__ == '__main__':
get_log_streams()