はじめに
運用中のシステムでは、コンテナごとに大量のログが出力されます。
これらのログ件数を集計することで、システムの稼働状況や件数の急激な増減により障害を早期発見できます。
この記事では、Elasticsearchからログを取得し、コンテナごとのログの件数を集計する方法について説明します。
また、プログラムのソースコードは以下のGitHubリポジトリで公開しています。
https://github.com/c0a221667a/-
環境
- Python:3.12.3
 
ライブラリ
- elasticsearch:7.17.12
 - pandas:2.3.3
 - numpy:2.3.4
 - pytz:2025.2
 - tabulate:0.9.0
 
目的
目的は、ログの件数を集計し、システムの稼働状況の把握や障害の早期発見をすることです。
件数の増加率を時系列で見ることで、システムの稼働状況を確認でき、件数の急激な増減を確認することで、障害の早期発見が可能となります。
プログラムの内容
全体の構成
- Elasticsearchへの接続
 - ログを収集する期間の指定
 - コンテナ名とタイムスタンプの抽出
 - 1分単位での集計
 - コンテナごとに前時間帯との差分を計算
 - 集計結果をCSVファイルに出力
 
1. Elasticsearchへの接続
接続先のElasticsearchのIPアドレスやポート番号、対象のインデックスを設定します。
ES_HOST = '192.168.100.192'
ES_PORT = 30092
ES_INDEX_PATTERN = 'beats-*'
接続先のIPアドレスをES_HOST = に、ポート番号をES_PORT = に指定し、収集したいログのインデックスをES_INDEX_PATTERN =に指定してください。
2. ログを収集する期間の指定
プログラム実行時に収集する期間の終了時刻を入力します。
終了時刻を入力してください(例: 20250904-1400):
プログラムを実行するとターミナルに終了時刻を入力するように指示されます。入力した時刻から24時間前を開始時刻とし、入力した時刻を終了時刻としてログをElasticsearchから取得します。
終了時刻を入力する際は「YYYYMMDD-HHMM」の形式で入力します。
1月や2月、1日や2日のように1桁の月日は0を含んで入力する必要があり、年月日を合わせた8つの数字を入力します。
時刻は、:を含まずに時と分の合わせた数字を入力します。
終了時刻を入力してください(例: 20250904-1400):20251031-1200
3. コンテナ名とタイムスタンプの抽出
Elasticsearchのログからkubernetes.container.nameフィールドを使用しコンテナ名を、@timestampフィールドを使ってログの発生時刻を取得します。
source.get('kubernetes', {}).get('container', {}).get('name')
4. 1分単位での集計
取得したログのタイムスタンプを日本時間に変換し、1分単位に丸めます。
rounded_time_jst = timestamp_jst.replace(second=0, microsecond=0)
key = f"{rounded_time_jst.strftime('%Y-%m-%d %H:%M')} {container_name}"
counter[key] += 1
これにより、「2025-10-31 12:00 コンテナAのログ件数が〇件」で集計できます。
丸めるというのは、タイムスタンプの時間を一定の時間に集約することを表しています。
00:01:10や00:01:59の場合は00:01に集約され、12:00:27や12:00:48の場合は12:00に集約されます。実際の処理のイメージは下記のようになっています。
取得したデータ
| 時間帯 | ログ件数 | 
|---|---|
| 00:01:10 | 1 | 
| 00:01:59 | 1 | 
| 00:02:05 | 1 | 
| 00:02:40 | 1 | 
集約後のデータ
| 時間帯 | ログ件数 | 
|---|---|
| 00:01:00 | 2 | 
| 00:02:00 | 2 | 
5. コンテナごとに前の時間帯との差分を計算
同じコンテナの前の時間と件数を比較し、ログ件数の変化を算出します。
ログ件数の変化を確認することで、特定の時間帯にどのコンテナでログ件数が急増したかを把握できます。
change = current_count - previous_count
change_str = f"{change:+d}"
前回より10件増えている場合は、「+10」、減っている場合は「-10」とCSVファイルに出力されます。
出力される形式は以下のようになっています。
6. 集計結果をCSVファイルに出力
集計結果はプログラムの以下の部分で、自動でCSVファイルが作成され、集計結果が保存されます。
既に存在する場合、内容は上書きされます。
output_file = 'log_test.csv'
with open(output_file, mode='w', newline='', encoding='utf-8') as f:
    writer = csv.writer(f)
    ...
実行手順
実際にプログラムを実行して、ログを集計し、結果を出力するまでの流れを説明します。
手順
1. 仮想環境を作成、有効化し、elasticsearchモジュールとpandas、numpy、pytz、tabulateライブラリをインストールします。
# 仮想環境の作成
python3 -m venv venv
# 仮想環境の有効化
source venv/bin/activate
# elasticsearchモジュールのインストール
pip install elasticsearch
# pandas、numpy、pytz、tabulteライブラリのインストール
pip install pandas numpy pytz tabulate
2. スクリプトを実行します。
まず、下記のコードを入力すると、プログラムが実行されます。
python log_syukei.py
次に、「YYYYMMDD-HHMM」の形式で終了時刻を入力し、ログを集計する際の終了時間を決定します。
そして、終了時刻から24時間前を開始時刻としてElasticsearchからログを取得します。
最後に、集計が完了すると、出力先のCSVファイルのファイル名が表示されます。
↓実際の画面

↓log_test.csvの中身の一部

まとめ
Elasticsearchに保存されたコンテナログをPythonで集計する方法について紹介しました。
この方法により、運用中のシステムで発生するログを効率的に分析できます。
また、ログ件数の変動を可視化でき、システムの稼働状況の把握や障害の早期発見が可能となります。




