blastengine APIに異常が発生したとき、メール送信を自動で止め、復旧したら再開する。本記事では、Zabbixの外形監視とWebhookを組み合わせて、この「止血・再開」を自動化する最小構成を実装します。
やりたいこと
以下のような構成で、blastengine APIの状態に応じて送信を自動制御します。
アプリ → 送信ゲートウェイ → blastengine
↑
Zabbix(Webhook)
- アプリ: メール送信リクエストを送信ゲートウェイに投げる
- 送信ゲートウェイ: 自社で用意する中継サーバー。モードに応じて送信/停止を切り替える
- blastengine: 実際のメール配信を行う外部サービス
- Zabbix: blastengine APIを外形監視し、異常検知時にWebhookでゲートウェイを制御
blastengineが429(レート制限)や5xx(サーバーエラー)を返し始めたら、ゲートウェイをSTOPモードに切り替えて送信を止める。復旧したらNORMALモードに戻して送信を再開する。これにより、無駄なリトライやエラーの連鎖を防ぎます。
送信ゲートウェイの最小実装
まず、送信モードを切り替えられるゲートウェイを実装します。
from flask import Flask, request, jsonify
import requests
import os
from datetime import datetime
app = Flask(__name__)
# 送信モード: NORMAL(通常送信)/ STOP(送信停止)
send_mode = "NORMAL"
mode_history = []
# blastengine設定(環境変数から取得)
BLASTENGINE_BASE_URL = os.getenv("BLASTENGINE_BASE_URL", "https://app.engn.jp/api/v1")
BLASTENGINE_BEARER_TOKEN = os.getenv("BLASTENGINE_BEARER_TOKEN", "")
FROM_ADDRESS = os.getenv("FROM_ADDRESS", "")
@app.route("/send", methods=["POST"])
def send_mail():
"""メール送信エンドポイント"""
global send_mode
# STOPモード時は即エラーを返す
if send_mode == "STOP":
return jsonify({
"error": "SERVICE_STOPPED",
"message": "Mail sending is temporarily stopped"
}), 503
data = request.get_json() or {}
to = data.get("to")
subject = data.get("subject")
body = data.get("body")
if not all([to, subject, body]):
return jsonify({"error": "Missing required fields"}), 400
# blastengine送信
try:
response = requests.post(
f"{BLASTENGINE_BASE_URL}/deliveries/transaction",
json={
"from": {"email": FROM_ADDRESS, "name": "MyApp"},
"to": to,
"subject": subject,
"text_part": body
},
headers={
"Authorization": f"Bearer {BLASTENGINE_BEARER_TOKEN}",
"Content-Type": "application/json"
},
timeout=10
)
response.raise_for_status()
return jsonify({"status": "sent", "delivery_id": response.json().get("delivery_id")})
except requests.RequestException as e:
status_code = e.response.status_code if hasattr(e, "response") and e.response else 500
return jsonify({"error": "SEND_FAILED", "message": str(e)}), status_code
@app.route("/control", methods=["POST"])
def control_mode():
"""モード制御エンドポイント(Zabbix Webhookから呼ばれる)"""
global send_mode, mode_history
data = request.get_json() or {}
new_mode = data.get("mode")
if new_mode not in ["NORMAL", "STOP"]:
return jsonify({"error": "Invalid mode. Use NORMAL or STOP"}), 400
previous_mode = send_mode
send_mode = new_mode
# 履歴に記録
mode_history.append({
"timestamp": datetime.now().isoformat(),
"from": previous_mode,
"to": new_mode,
"trigger": data.get("trigger", "")
})
mode_history = mode_history[-10:] # 直近10件のみ保持
print(f"[{datetime.now()}] Mode changed: {previous_mode} -> {new_mode}")
return jsonify({"status": "ok", "mode": send_mode, "previous": previous_mode})
@app.route("/status", methods=["GET"])
def get_status():
"""現在のモードと履歴を確認"""
return jsonify({"mode": send_mode, "history": mode_history[-5:]})
@app.route("/health", methods=["GET"])
def health_check():
"""ヘルスチェック(Zabbixの監視対象)"""
return jsonify({"status": "ok", "mode": send_mode})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=3000)
このゲートウェイは以下のエンドポイントを持ちます。
| エンドポイント | 役割 |
|---|---|
| POST /send | メール送信。STOPモード時は503を返す |
| POST /control | モード切替。Zabbix Webhookから呼ばれる |
| GET /status | 現在のモードと切替履歴を確認 |
| GET /health | ヘルスチェック |
環境変数の設定例(.env)
BLASTENGINE_BASE_URL=https://app.engn.jp/api/v1
BLASTENGINE_BEARER_TOKEN=your_bearer_token_here
FROM_ADDRESS=noreply@example.com
Zabbixでblastengine APIを外形監視する
blastengine APIの状態を監視するため、ZabbixでHTTPエージェントアイテムを設定します。
監視アイテムの設定
| 項目 | 設定値 |
|---|---|
| 名前 | blastengine API status |
| タイプ | HTTPエージェント |
| キー | blastengine.api.status |
| URL | https://app.engn.jp/api/v1/deliveries/-/count |
| 要求メソッド | GET |
| タイムアウト | 15s |
| データ型 | テキスト |
| 監視間隔 | 30s |
| 必要な応答ステータスコード | (空欄:全てのステータスを受け入れる) |
HTTPエージェントでAPIを呼び出し、レスポンスボディをテキストとして取得します。正常時は {"status": "ok", ...} のようなJSONが返り、異常時は {"error": "..."} が返ります。
Triggerで「停止すべき状態」を定義する
レスポンスに "error" が含まれていたら異常と判断するシンプルなトリガーを定義します。
停止トリガー(PROBLEM)
| 項目 | 設定値 |
|---|---|
| 名前 | blastengine API critical |
| 深刻度 | 重度の障害 |
| 条件式 | find(/blastengine-monitor/blastengine.api.status,,"regexp","error")=1 |
| 復旧モード | 復旧条件式 |
| 復旧条件式 | find(/blastengine-monitor/blastengine.api.status,,"regexp","ok")=1 |
| 手動クローズを許可 | いいえ |
この設定により以下のような動きになります。
- レスポンスに "error" が含まれていたらPROBLEM(異常)
- レスポンスに "ok" が含まれていたらOK(正常)
Webhookで送信モードを切り替える
TriggerがPROBLEMになったらSTOP、OKになったらNORMALに切り替えるWebhookを設定します。
メディアタイプの作成
「Alarts」→「Media Types」から新規作成します。
| 項目 | 設定値 |
|---|---|
| 名前 | Gateway Control Webhook |
| タイプ | Webhook |
スクリプト
var params = JSON.parse(value);
var req = new HttpRequest();
req.addHeader('Content-Type: application/json');
// トリガーステータスに応じてモードを決定
// PROBLEM → STOP, OK → NORMAL
var mode = (params.trigger_status === 'PROBLEM') ? 'STOP' : 'NORMAL';
var payload = JSON.stringify({
mode: mode,
trigger: params.trigger_name,
host: params.host,
timestamp: params.timestamp
});
var response = req.post(params.gateway_url + '/control', payload);
Zabbix.log(4, 'Gateway control: mode=' + mode + ', response=' + response);
return response;
パラメータ設定
| パラメータ | 値 |
|---|---|
| gateway_url | http://your-gateway.example.com:3000 |
| trigger_status | {TRIGGER.STATUS} |
| trigger_name | {TRIGGER.NAME} |
| host | {HOST.NAME} |
| timestamp | {EVENT.DATE} {EVENT.TIME} |
メッセージテンプレート
「Message Templates」タブで以下を追加します(Webhookが正常に動作するために必要)
| メッセージタイプ | 件名 | メッセージ |
|---|---|---|
| 障害 | Problem: {TRIGGER.NAME} | {"event": "problem"} |
| 復旧 | Resolved: {TRIGGER.NAME} | {"event": "recovery"} |
アクションの設定
「Alarts」→「Actions」→「Trigger Actions」で設定します。
アクション: Control Gateway on blastengine issue
| 項目 | 設定値 |
|---|---|
| 名前 | Control Gateway on blastengine issue |
| 条件 | トリガー名 含む blastengine API critical |
実行内容(障害発生時)
- 実行内容タイプ: メッセージの送信
- 送信先ユーザーグループ: Zabbix administrators
- 送信方法: Gateway Control Webhook のみ
復旧時の実行内容
- 復旧メッセージを送信: 有効
- 送信先ユーザーグループ: Zabbix administrators
- 送信方法: Gateway Control Webhook のみ
これにより、PROBLEM発生時とOK復旧時の両方でWebhookが発火します。
動作確認
実際に異常状態を発生させて、自動制御が動作することを確認します。
テスト手順
1. 正常状態の確認
# ゲートウェイの状態確認
curl http://localhost:3000/status
# {"mode":"NORMAL","history":[]}
# メール送信テスト
curl -X POST http://localhost:3000/send \
-H "Content-Type: application/json" \
-d '{"to":"test@example.com","subject":"Test","body":"Hello"}'
# {"status":"sent","delivery_id":1234}
2. 異常発生のシミュレーション
Zabbixが監視しているAPIがエラーを返すと、トリガーがPROBLEMになります。
3. 自動停止の確認
Webhookが発火し、ゲートウェイがSTOPモードになります。
# ゲートウェイの状態確認
curl http://localhost:3000/status
# {"mode":"STOP","history":[{"from":"NORMAL","to":"STOP","trigger":"blastengine API critical",...}]}
# 送信テスト(503が返る)
curl -X POST http://localhost:3000/send \
-H "Content-Type: application/json" \
-d '{"to":"test@example.com","subject":"Test","body":"Hello"}'
# {"error":"SERVICE_STOPPED","message":"Mail sending is temporarily stopped"}
4. 復旧の確認
APIが正常に戻ると、トリガーがOKになり、自動でNORMALモードに復旧します。
curl http://localhost:3000/status
# {"mode":"NORMAL","history":[{"from":"STOP","to":"NORMAL","trigger":"blastengine API critical",...}]}
動作確認結果の例
実際にテストした結果は以下の通りです。
04:55:00 NORMAL → NORMAL (復旧Webhook発火)
04:55:30 NORMAL → STOP (異常検知、送信停止)
04:55:59 STOP → NORMAL (復旧、送信再開)
最小構成からの拡張(任意)
本記事の実装は最小構成です。本番運用に向けて、以下の拡張を検討してください。
- キュー化: STOPモード中のリクエストを破棄せずキューに溜め、復旧後に順次送信
- バックオフ: 復旧直後は送信レートを下げ、段階的に通常レートに戻す
- 優先度別制御: 重要な通知は別チャネル(SMS等)で送信し、STOPの影響を受けないようにする
- 通知: モード切替時にSlackやメールで管理者に通知
- ログ・メトリクス: モード切替履歴や停止時間をPrometheusなどに記録
まとめ
本記事では、blastengine APIの異常を検知して送信を自動制御する仕組みを構築しました。
| コンポーネント | 役割 |
|---|---|
| Zabbix | blastengine APIを外形監視し、異常を検知 |
| Webhook | Triggerの状態変化をゲートウェイに伝達 |
| 送信ゲートウェイ | 送信モード(NORMAL/STOP)に応じて送信可否を判断 |
この分業により、監視・判断・制御がそれぞれ独立し、柔軟な運用が可能になります。blastengineに限らず、外部APIに依存するシステム全般で応用できる構成です。