LoginSignup
1
3

More than 5 years have passed since last update.

CloudWatchを最近使うのでログダウンロード用処理かいた

Posted at

目的

AWS IoT案件でログを確認する機会が増えたので備忘録として
今回はローカルにログを保存します。

IoTコンソール使いにくい。。。。

利用サービス・言語

  • AWS-Boto3
  • Python3

tl;dr

  • ローカルフォルダから最新のログを取得
  • Boto3でログを順次取得←ここ大事
  • ログを読み込んでファイル出力

処理

ローカルフォルダから最新のログを取得

再帰などの処理は入れていません、また、ファイル名が年月日始まりではない場合もエラーとなります。
ポイントというほどのものではないですが

  • globとos.path.basenameでファイル一覧取得
  • sortedでソート

は今後も覚えておこうと思います。

def Get_LastEventTime():
    date = 0
    # フォルダからファイル一覧を取得
    # データを文字コード順にソート
    filepaths = glob.glob(PATH + '*')

    # なにもファイルがない場合は0を返す
    if not filepaths:
        return date
    files = list()
    for name in filepaths:
        files.append(os.path.basename(name))
    files = sorted(files, reverse=True)
    # 一番新しいファイルの日付を取得
    date = int(files[0][:14])

    return date

Boto3でログを順次取得

describe_log_streamsでログストリームを取得します。
ポイントはdescribe_log_streamsを(orderBy='LastEventTime',limit=1,nextToken=token)でコールすることで降順に一つずつ読み込みます。
起動直後だけは最新のもの(nextTokenなし)で呼び出し、以降はtokenを引数にします。

# ログストリームの取得
# ローカルに保存されているファイルより新しいものだけ保存する
client = boto3.client('logs')
def get_log_streams():
    response = dict()
    response['nextToken'] = ''

    LatestTime = Get_LastEventTime()
    while(1):
        if response['nextToken'] == '':
            # トークンが無い場合は最新を取得
            response = client.describe_log_streams(
                logGroupName=GROUP_NAME, orderBy='LastEventTime',
                descending=True, limit=1)
        else:
            # トークンがある場合はトークンを指定して取得
            response = client.describe_log_streams(
                logGroupName=GROUP_NAME, orderBy='LastEventTime',
                descending=True, limit=1, nextToken=token)

        infos = response['logStreams'][0]
        name = infos['logStreamName']
        # ログの最終イベント発生時刻
        last_time = infos['lastEventTimestamp']

        # UNIXTIMEになっているため、指定フォーマットの文字列に変換
        eventtime = datetime.datetime.fromtimestamp(int(last_time) / 1000)
        eventtime = eventtime.strftime("%Y%m%d%H%M%S")

        print('ログ日時 : ファイルの更新日時  ->  ', eventtime, LatestTime)
        # 最新のeventtimeを取得
        if LatestTime >= int(eventtime):
            # 新しいファイルがない場合は終了する
            return

        # ソートしやすいようにイベント時間をPrefixにする
        filename = eventtime + '_' + name
        # Win環境でバグるので'/'を削除
        filename = filename.replace('/', '')

        # ファイルの書き込み
        fp = open(PATH + filename, 'w')
        # 次で書きます。
        get_log_file(name, fp)
        fp.close()

        # 次のトークンが無ければ終了
        if not ('nextToken' in response):
            return
        token = response['nextToken']

ログを読み込んでファイル出力

describe_log_streamsで取得してきたログストリーム名からログストリームを取得します。
get_log_eventsでログストリームを取得します、今回はフィルタもつけています。

# 取得するログのタグ
# Loggerクラス使用し、デバッグ用のタグを打っておく
filters = ['[Center]',
           '[Terminal1]',
           '[Terminal2]',
           '[Terminal3]',
           '[Terminal4]'
           ]

# ファイルのデータを読み込む
def get_log_file(stream_name, fp):
    response = client.get_log_events(
        logGroupName=GROUP_NAME,
        logStreamName=stream_name,
        startFromHead=True
    )

    for event in response['events']:
        # 特定ワード(Debug用タグ)が含まれない場合は書き込みしない
        if not filtering(log_msg):
            continue
        # 改行を削除
        log_msg = event['message'].replace('\n', '')

        try:
            # 文字列が長くなるので中間に入っている
            # Token?を除外して書き込み
            log_msg = log_msg[:20] + log_msg[63:]
            fp.write(log_msg + '\n')
        except Exception as e:
            # 例外はすべて適当に投げてます
            fp.write("[Except!!]{0}".format(e) + '\n')

さいごに

数が膨大な量になるので日ごとに1ファイルにまとめたり、いい感じにフィルタかけたりしたいです。

以下今回のコードすべて

All.py
# -*- coding:utf-8 -*-
import boto3
import json
import datetime
import sys
import glob
import os

# LogInfo
GROUP_NAME = '/aws/lambda/hoge'
# Save DirPath
PATH = 'C:\\Users\\fuga'


client = boto3.client('logs')

# 取得するログのタグ
# Loggerクラス使用し、デバッグ用のタグを打っておく
filters = ['[Center]',
           '[Terminal1]',
           '[Terminal2]',
           '[Terminal3]',
           '[Terminal4]'
           ]


# ファイルのデータを読み込む
def get_log_file(stream_name, fp):
    response = client.get_log_events(
        logGroupName=GROUP_NAME,
        logStreamName=stream_name,
        startFromHead=True
    )

    for event in response['events']:
        # 特定ワード(Debug用タグ)が含まれない場合は書き込みしない
        if not filtering(log_msg):
            continue
        # 改行を削除
        log_msg = event['message'].replace('\n', '')

        try:
            # 文字列が長くなるので中間に入っている
            # Token?を除外して書き込み
            log_msg = log_msg[:20] + log_msg[63:]
            fp.write(log_msg + '\n')
        except Exception as e:
            # 例外はすべて適当に投げてます
            fp.write("[Except!!]{0}".format(e) + '\n')


# ログストリームの取得
# ローカルに保存されているファイルより新しいものだけ保存する
def get_log_streams():
    response = dict()
    response['nextToken'] = ''

    LatestTime = Get_LastEventTime()
    while(1):
        if response['nextToken'] == '':
            # トークンが無い場合は最新を取得
            response = client.describe_log_streams(
                logGroupName=GROUP_NAME, orderBy='LastEventTime',
                descending=True, limit=1)
        else:
            # トークンがある場合はトークンを指定して取得
            response = client.describe_log_streams(
                logGroupName=GROUP_NAME, orderBy='LastEventTime',
                descending=True, limit=1, nextToken=token)

        infos = response['logStreams'][0]
        name = infos['logStreamName']
        # ログの最終イベント発生時刻
        last_time = infos['lastEventTimestamp']

        # UNIXTIMEになっているため、指定フォーマットの文字列に変換
        eventtime = datetime.datetime.fromtimestamp(int(last_time) / 1000)
        eventtime = eventtime.strftime("%Y%m%d%H%M%S")

        print('ログ日時 : ファイルの更新日時  ->  ', eventtime, LatestTime)
        # 最新のeventtimeを取得
        if LatestTime >= int(eventtime):
            # 新しいファイルがない場合は終了する
            return

        # ソートしやすいようにイベント時間をPrefixにする
        filename = eventtime + '_' + name
        # Win環境でバグるので'/'を削除
        filename = filename.replace('/', '')

        # ファイルの書き込み
        fp = open(PATH + filename, 'w')
        get_log_file(name, fp)
        fp.close()

        # 次のトークンが無ければ終了
        if not ('nextToken' in response):
            return
        token = response['nextToken']


# 特定ワードが含まれない場合は0を返す
def filtering(msg):
    flg = 0
    for tag in filters:
        if tag in msg:
            flg = 1
    return flg


def Get_LastEventTime():
    date = 0
    # フォルダからファイル一覧を取得
    # データを文字コード順にソート
    filepaths = glob.glob(PATH + '*')

    # なにもファイルがない場合は0を返す
    if not filepaths:
        return date
    files = list()
    for name in filepaths:
        files.append(os.path.basename(name))
    files = sorted(files, reverse=True)
    # 一番新しいファイルの日付を取得
    date = int(files[0][:14])

    return date


if __name__ == '__main__':
    get_log_streams()
1
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3