はじめに
運用中のシステムでは、コンテナごとに大量のログが出力されます。
これらのログ件数を集計することで、システムの稼働状況や件数の急激な増減により障害を早期発見できます。
この記事では、Elasticsearchからログを取得し、コンテナごとのログの件数を集計する方法について説明します。
また、プログラムのソースコードは以下のGitHubリポジトリで公開しています。
https://github.com/c0a221667a/-
環境
- Python:3.12.3
ライブラリ
- elasticsearch:7.17.12
- pandas:2.3.3
- numpy:2.3.4
- pytz:2025.2
- tabulate:0.9.0
目的
目的は、ログの件数を集計し、システムの稼働状況の把握や障害の早期発見をすることです。
件数の増加率を時系列で見ることで、システムの稼働状況を確認でき、件数の急激な増減を確認することで、障害の早期発見が可能となります。
プログラムの内容
全体の構成
- Elasticsearchへの接続
- ログを収集する期間の指定
- コンテナ名とタイムスタンプの抽出
- 1分単位での集計
- コンテナごとに前時間帯との差分を計算
- 集計結果をCSVファイルに出力
1. Elasticsearchへの接続
接続先のElasticsearchのIPアドレスやポート番号、対象のインデックスを設定します。
ES_HOST = '192.168.100.192'
ES_PORT = 30092
ES_INDEX_PATTERN = 'beats-*'
接続先のIPアドレスをES_HOST = に、ポート番号をES_PORT = に指定し、収集したいログのインデックスをES_INDEX_PATTERN =に指定してください。
2. ログを収集する期間の指定
プログラム実行時に収集する期間の終了時刻を入力します。
終了時刻を入力してください(例: 20250904-1400):
プログラムを実行するとターミナルに終了時刻を入力するように指示されます。入力した時刻から24時間前を開始時刻とし、入力した時刻を終了時刻としてログをElasticsearchから取得します。
終了時刻を入力する際は「YYYYMMDD-HHMM」の形式で入力します。
1月や2月、1日や2日のように1桁の月日は0を含んで入力する必要があり、年月日を合わせた8つの数字を入力します。
時刻は、:を含まずに時と分の合わせた数字を入力します。
終了時刻を入力してください(例: 20250904-1400):20251031-1200
3. コンテナ名とタイムスタンプの抽出
Elasticsearchのログからkubernetes.container.nameフィールドを使用しコンテナ名を、@timestampフィールドを使ってログの発生時刻を取得します。
source.get('kubernetes', {}).get('container', {}).get('name')
4. 1分単位での集計
取得したログのタイムスタンプを日本時間に変換し、1分単位に丸めます。
rounded_time_jst = timestamp_jst.replace(second=0, microsecond=0)
key = f"{rounded_time_jst.strftime('%Y-%m-%d %H:%M')} {container_name}"
counter[key] += 1
これにより、「2025-10-31 12:00 コンテナAのログ件数が〇件」で集計できます。
丸めるというのは、タイムスタンプの時間を一定の時間に集約することを表しています。
00:01:10や00:01:59の場合は00:01に集約され、12:00:27や12:00:48の場合は12:00に集約されます。実際の処理のイメージは下記のようになっています。
取得したデータ
| 時間帯 | ログ件数 |
|---|---|
| 00:01:10 | 1 |
| 00:01:59 | 1 |
| 00:02:05 | 1 |
| 00:02:40 | 1 |
集約後のデータ
| 時間帯 | ログ件数 |
|---|---|
| 00:01:00 | 2 |
| 00:02:00 | 2 |
5. コンテナごとに前の時間帯との差分を計算
同じコンテナの前の時間と件数を比較し、ログ件数の変化を算出します。
ログ件数の変化を確認することで、特定の時間帯にどのコンテナでログ件数が急増したかを把握できます。
change = current_count - previous_count
change_str = f"{change:+d}"
前回より10件増えている場合は、「+10」、減っている場合は「-10」とCSVファイルに出力されます。
出力される形式は以下のようになっています。
6. 集計結果をCSVファイルに出力
集計結果はプログラムの以下の部分で、自動でCSVファイルが作成され、集計結果が保存されます。
既に存在する場合、内容は上書きされます。
output_file = 'log_test.csv'
with open(output_file, mode='w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
...
実行手順
実際にプログラムを実行して、ログを集計し、結果を出力するまでの流れを説明します。
手順
1. 仮想環境を作成、有効化し、elasticsearchモジュールとpandas、numpy、pytz、tabulateライブラリをインストールします。
# 仮想環境の作成
python3 -m venv venv
# 仮想環境の有効化
source venv/bin/activate
# elasticsearchモジュールのインストール
pip install elasticsearch
# pandas、numpy、pytz、tabulteライブラリのインストール
pip install pandas numpy pytz tabulate
2. スクリプトを実行します。
まず、下記のコードを入力すると、プログラムが実行されます。
python log_syukei.py
次に、「YYYYMMDD-HHMM」の形式で終了時刻を入力し、ログを集計する際の終了時間を決定します。
そして、終了時刻から24時間前を開始時刻としてElasticsearchからログを取得します。
最後に、集計が完了すると、出力先のCSVファイルのファイル名が表示されます。
↓実際の画面

↓log_test.csvの中身の一部

まとめ
Elasticsearchに保存されたコンテナログをPythonで集計する方法について紹介しました。
この方法により、運用中のシステムで発生するログを効率的に分析できます。
また、ログ件数の変動を可視化でき、システムの稼働状況の把握や障害の早期発見が可能となります。
プログラムのソースコード
この記事で紹介したPythonでのログ件数の集計を行うプログラムのソースコードは以下となります。
import csv
from datetime import datetime, timedelta
from collections import defaultdict
import os
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
import pytz
import sys
# Elasticsearch接続設定
# 🚨 IPアドレスとポートはあなたの環境に合わせてください
ES_HOST = '192.168.100.192'
ES_PORT = 30092
ES_USER = None
ES_PASSWORD = None
# 検索対象のElasticsearchインデックスパターン
# 🚨 あなたの環境に合わせて 'syslog-*' に変更
ES_INDEX_PATTERN = 'beats-*'
# 出力ファイル名
output_file = 'log_test.csv'
# 日本標準時 (JST) タイムゾーンを定義
JST = pytz.timezone('Asia/Tokyo')
def get_logs_from_elasticsearch(start_time, end_time):
"""
指定された期間のログをElasticsearchから取得します。
"""
# 🚨 Elasticsearchクライアントをバージョン8系に合わせた設定
es = Elasticsearch(
hosts=[{'host': ES_HOST, 'port': ES_PORT, 'scheme': 'http'}],
http_auth=(ES_USER, ES_PASSWORD) if ES_USER and ES_PASSWORD else None,
request_timeout=60,
verify_certs=False,
)
# JSTで指定された時間をUTCに変換してElasticsearchに渡します
start_time_utc = start_time.astimezone(pytz.utc)
end_time_utc = end_time.astimezone(pytz.utc)
query = {
"query": {
"range": {
"@timestamp": {
"gte": start_time_utc.isoformat(timespec='milliseconds').replace('+00:00', 'Z'),
"lte": end_time_utc.isoformat(timespec='milliseconds').replace('+00:00', 'Z')
}
}
},
"sort": [
{"@timestamp": {"order": "asc"}}
]
}
print(f"✅ Elasticsearch ({ES_HOST}:{ES_PORT}, Index: {ES_INDEX_PATTERN}) からログを取得中...")
logs = []
try:
# scanヘルパーを使用して大量のログを効率的に取得
for hit in scan(es,
query=query,
index=ES_INDEX_PATTERN,
scroll='2m',
timeout='60s'
):
source = hit['_source']
# 'kubernetes.container.name' フィールドを取得
container_name = source.get('kubernetes', {}).get('container', {}).get('name')
timestamp = source.get('@timestamp')
# コンテナ名とタイムスタンプが存在する場合のみ処理
if container_name and timestamp:
logs.append({
'container_name': container_name,
'@timestamp': timestamp
})
except Exception as e:
print(f"❌ Elasticsearchからのログ取得中にエラーが発生しました: {e}", file=sys.stderr)
return []
print(f"✅ {len(logs)} 件のログを取得しました。")
return logs
if __name__ == "__main__":
counter = defaultdict(int)
# ユーザーに入力を求める
end_input_str = input("終了時刻を入力してください(例: 20250904-1400): ")
try:
# 'YYYYMMDD-HH:MM' 形式で入力された時刻をパースしてナイーブなdatetimeオブジェクトを作成
end_time_naive = datetime.strptime(end_input_str, '%Y%m%d-%H%M')
# ナイーブなdatetimeオブジェクトにJSTタイムゾーンを明示的に付与
end_time_jst = JST.localize(end_time_naive)
# 終了時刻から24時間前の時間を開始時刻として計算 (元のコードのまま)
start_time_jst = end_time_jst - timedelta(hours=24)
print(f"✅ ログ検索期間: {start_time_jst.strftime('%Y-%m-%d %H:%M:%S')} から {end_time_jst.strftime('%Y-%m-%d %H:%M:%S')}")
except ValueError:
print("❌ 入力された時刻の形式が正しくありません。'YYYYMMDD-HHMM'の形式で入力してください。", file=sys.stderr)
sys.exit(1)
log_entries = get_logs_from_elasticsearch(start_time_jst, end_time_jst)
for row in log_entries:
timestamp_str = row.get('@timestamp')
container_name = row.get('container_name')
if not timestamp_str or not container_name:
continue
try:
timestamp_utc = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')).astimezone(pytz.utc)
timestamp_jst = timestamp_utc.astimezone(JST)
# 1分単位に切り捨て (元のコードのロジックを維持)
minute = (timestamp_jst.minute // 1) * 1
rounded_time_jst = timestamp_jst.replace(minute=minute, second=0, microsecond=0)
key = f"{rounded_time_jst.strftime('%Y-%m-%d %H:%M')} {container_name}"
counter[key] += 1
except ValueError as e:
print(f"エラー: タイムスタンプのパースに失敗しました - {e} → {timestamp_str}", file=sys.stderr)
continue
last_interval_counts = {}
with open(output_file, mode='w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['時間帯', 'コンテナ名', 'ログ件数', '前時間帯からの変化'])
for key, current_count in sorted(counter.items()):
time_str, container_name = key.rsplit(' ', 1)
change_str = "-"
if container_name in last_interval_counts:
previous_count = last_interval_counts[container_name]
change = current_count - previous_count
change_str = f"{change:+d}"
last_interval_counts[container_name] = current_count
writer.writerow([time_str, container_name, current_count, change_str])
print(f"✅ 集計が完了しました → {output_file}")




