上限の10,000件
CloudWatch のコンソールから10,000件を超えるログはダウンロードできません。なので、コンソール上からできることは、取得できたログから次の10,000件の範囲を絞り込んで、10,000件のログを取得、、、以下繰り返し。といったことくらいです。
これはちょっとめんどくさいのでPythonの boto3
を使ってらくに取得しましょう。
AWS SDK for Python (Boto3) を使用すると、AWS の使用を迅速に開始できます。Boto3 を使用することで、Python のアプリケーション、ライブラリ、スクリプトを AWS の各種サービス(Amazon S3、Amazon EC2、Amazon DynamoDB など)と容易に統合できます。
実際のソース
プログラムは下記のリンクのものを参考につくりました。変更点は主に次の通りです。
- 出力の方法をCSVからデータフレームに変更
- クライアントを生成時にアクセスキー、シークレットキー、リージョンを指定
- プログレスバーを導入
今回はログを分析してすぐに捨てる予定だったので、直接データフレームに変換しました。
import boto3
from tqdm import tqdm
import pandas as pd
import time
from datetime import datetime
def get_log(client, log_group_name, timestamps, query, limit):
interval = 60 # タイムスタンプで60秒
total_intervals = int((timestamps[1] - timestamps[0]) / interval)
period_start = timestamps[0]
dfs = []
with tqdm(total=total_intervals) as pbar:
while period_start < timestamps[1]:
period_end = min(period_start + interval, timestamps[1])
df = get_log_period(client, log_group_name, period_start, period_end, query, limit)
dfs.append(df)
period_start = period_end
pbar.update()
return pd.concat(dfs, ignore_index=True)
def get_log_period(client, log_group_name, timestamp_start, timestamp_end, query, limit):
query_id = client.start_query(
logGroupName=log_group_name,
startTime=timestamp_start,
endTime=timestamp_end,
queryString=query,
limit=limit,
)['queryId']
res = None
while res == None or res['status'] == 'Running':
time.sleep(1)
res = client.get_query_results(
queryId=query_id
)
if len(res['results']) == 0:
return
result_data = [
[item['value'] for item in entry] for entry in res['results']
]
columns_names = [item['field'] for item in res['results'][0]]
df = pd.DataFrame(result_data, columns=columns_names)
df['@timestamp'] = pd.to_datetime(df['@timestamp'])
return df
def main():
client = boto3.client(
'logs',
aws_access_key_id=<your_aws_access_key_id>,
aws_secret_access_key=<your_aws_secret_access_key>,
region_name=<region_name>
)
log_group_name = <your_log_group_name>
timestamps = [
int(time.time())
int(time.time()) + 120 # 実行時から120秒後
]
query = <your_query>
df = get_log(client, log_group_name, timestamps, query)
df.head(5)
if __name__ == '__main__':
main()
ログの取得方法
この後のログの取得に関わっていくつか似た用語が出てくるので先に紹介しておきます。ここでの用語はCloudWatch上の階層関係を表しただけで、正式な用語でない可能性があることをあらかじめお断りしておきます。
ECSを例に出して考えていきます。Aサービスの中にX1, X2, X3という風に同じタスクを3つ作成したとします。この時、ログの階層は下記のようになります。ログイベントが実際にタスクが垂れ流すログを表示しているところです。
ログ
│
└── Aロググループ
├── X1ログストリーム
├── X2ログストリーム
│ │
│ ├── 20240101_X2ログイベント // ログのレコード
│ └── 20240102_X2ログイベント
│
└── X3ログストリーム
では、上記を参考に実際にログを取得している部分を見ていきましょう。ログの取得は client.start_query
で行っています。ここで指定するのはロググループ名とログの範囲、そしてクエリです。
ここで、
「あれ?ロググループしか選択しなくて、どこのログイベントを取ってくるんだい」
となったかたがいると思います。私も最初同じことを考えました。実は、ログストリームの指定は クエリ で行うしかないのです。実際にコンソール上でも、クエリの欄に filter
がかかって、ログストリームを指定しているかと思います。ですので、特定のログイベントを取得したい場合は、そのログイベントが属するログストリームをクエリにて指定しなくてはいけません。
その他注意点
- プログラムは
<your_hoge_hoge>
部分を自分の値に修正してください -
query
は CloudWatch Logs Insights クエリ を指していますが、コンソール上で動作したクエリをそのまま持ってくるとうまく動きません- 私がはまったのはコンソール上で
display
というメソッドが使えたのですが、boto3に読み込ませることはできませんでした
- 私がはまったのはコンソール上で
- データフレームに読み込まれる値はすべてstring型になっている
- プロットしようとしたら数値じゃねーじゃんと怒られるので注意しましょう
おわりに
初めてログのインサイトでクエリを発行したときこんなに便利なものがあるのかと感動したものです。parse
を使えばログの成形も簡単に行えます。いままで sed
とかつかって置換していたので感動しました。(凄く速いしね!!)
ちなみに料金はスキャンしたデータ1 GB あたり 0.005USDとなっています。お手軽?なのかな?