概要
-
目的: 特定ラベルを持つ2つのPodを毎分チェックし、稼働状況をMySQLに記録
-
使用技術: Python, kubernetes-client, SQLAlchemy(MySQL), schedule
-
機能
- Kubernetes APIでPod一覧取得
- Pod数が2でない場合はSlack通知(任意)
- Podごとに
Active/InactiveをDBでINSERT or UPDATE
前提
- Python 3.9以上がインストール済み
- Kubernetesクラスタへアクセス可能(in‑cluster or
~/.kube/config) - MySQLサーバが稼働中
- 必要ライブラリをインストール
pip install kubernetes sqlalchemy pymysql schedule slack-sdk
テーブル定義(SQL例)
CREATE TABLE pod_status (
id INT AUTO_INCREMENT PRIMARY KEY,
pod_name VARCHAR(128) NOT NULL UNIQUE,
status VARCHAR(16) NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
コード解説
以下を pod_monitor.py として保存します。
import os
import logging
from datetime import datetime
import time
import schedule
import requests
from kubernetes import client, config
from sqlalchemy import (
create_engine, Column, String, Integer, DateTime
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# --- 設定 & ログ ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Kubernetes
LABEL_SELECTOR = os.getenv("LABEL_SELECTOR", "app=my-app")
try:
config.load_incluster_config()
except:
config.load_kube_config()
v1 = client.CoreV1Api()
# DB
DB_URL = os.getenv("DB_URL", "mysql+pymysql://user:pass@host:3306/dbname")
engine = create_engine(DB_URL, echo=False)
Session = sessionmaker(bind=engine)
Base = declarative_base()
class PodStatus(Base):
__tablename__ = 'pod_status'
id = Column(Integer, primary_key=True)
pod_name = Column(String(128), unique=True, nullable=False)
status = Column(String(16), nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
Base.metadata.create_all(engine)
# Slack通知 (WEBHOOK)
SLACK_WEBHOOK = os.getenv("SLACK_WEBHOOK")
def notify_slack(message: str):
if not SLACK_WEBHOOK:
return
payload = {"text": message}
try:
requests.post(SLACK_WEBHOOK, json=payload)
except Exception:
logger.exception("Slack通知失敗")
# Pod監視処理
def check_pods():
session = Session()
try:
pods = v1.list_pod_for_all_namespaces(
label_selector=LABEL_SELECTOR
).items
names = sorted([p.metadata.name for p in pods])
logger.info(f"Found pods: {names}")
# Pod数チェック
if len(pods) != 2:
msg = f"[WARN] Pod数が{len(pods)}件 (期待値:2)"
logger.warning(msg)
notify_slack(msg)
# ソート順で先頭をActive, 2番目をInactive
for idx, pod_name in enumerate(names):
target_status = 'Active' if idx == 0 else 'Inactive'
rec = (
session.query(PodStatus)
.filter_by(pod_name=pod_name)
.first()
)
if rec:
if rec.status != target_status:
logger.info(f"UPDATE {pod_name}: {rec.status} → {target_status}")
rec.status = target_status
else:
rec = PodStatus(
pod_name=pod_name,
status=target_status
)
session.add(rec)
logger.info(f"INSERT {pod_name} as {target_status}")
session.commit()
except Exception:
logger.exception("Podチェック中にエラー発生")
finally:
session.close()
# スケジュール実行
if __name__ == '__main__':
schedule.every(1).minutes.do(check_pods)
logger.info("=== Pod Monitor Started ===")
while True:
schedule.run_pending()
time.sleep(1)
Docker化例
FROM python:3.9-slim
WORKDIR /app
COPY pod_monitor.py ./
RUN pip install --no-cache-dir \
kubernetes sqlalchemy pymysql schedule slack-sdk
CMD ["python", "pod_monitor.py"]
Kubernetesデプロイ例
apiVersion: apps/v1
kind: Deployment
metadata:
name: pod-monitor
spec:
replicas: 1
template:
spec:
containers:
- name: monitor
image: your-registry/pod-monitor:latest
env:
- name: LABEL_SELECTOR
value: "app=my-app"
- name: DB_URL
valueFrom:
secretKeyRef:
name: db-secret
key: url
- name: SLACK_WEBHOOK
valueFrom:
secretKeyRef:
name: slack-secret
key: webhook
動作確認
- Pod Monitorを起動すると、ログに
Found pods: [...]が出力される - MySQLの
pod_statusテーブルでstatusがActive/Inactiveに更新される - Pod数が2以外ならSlack通知が飛ぶ
まとめ
- ホスト(Pod)ごとに一意レコードを持ち、状態が変化したときだけUPDATE
- Pod数異常時にはSlack等でアラート
- 軽量ライブラリのみで手軽に構築できる