0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

blastengineの宛先一括登録ジョブを"事故らせない"小技3つ(部分成功・エラーCSV活用)

Posted at

blastengineで一斉配信を行う際、宛先アドレスの一括登録ジョブは避けて通れない機能です。しかし「CSVを投げたら一部だけ失敗した」「どの行がエラーだったか追えない」といったトラブルは現場で頻繁に起こります。

本記事では、運用全般ではなく実装者がハマりやすいポイントに絞って、3つの小技を紹介します。

前提:一括登録ジョブで宛先を流し込む

blastengineの一斉配信(BULK)では、以下の流れで宛先を登録します。

  1. CSVファイルを POST /deliveries/{delivery_id}/emails/import で投げる
  2. job_id が返る
  3. GET /deliveries/-/emails/import/{job_id} で結果を確認する
  4. 失敗があれば error_file_url からエラーCSV(zip)を取得する

詳細なフローは公式ドキュメントに譲りますが、この「投げる→待つ→結果を見る→エラーを取る」という基本サイクルを押さえておけばOKです。


小技1:まずは ignore_errors で"部分成功"に倒す

一括登録ジョブでは、1行でもバリデーションエラーがあるとジョブ全体が失敗するのがデフォルトの挙動です。これは安全側に倒した設計ですが、実運用では「正しい9,999件を止めてまで、1件のエラーを優先する」必要がないケースも多いでしょう。

そこで活用したいのが ignore_errors パラメータです。

{
  "ignore_errors": true
}

このオプションを true にすると、バリデーションエラーが存在してもエラー行を除外して登録を続行できます。正常な行は先に登録され、失敗行だけが error_file_url に出力されます。

部分成功を前提にした設計にしておくと、復旧フローが圧倒的にシンプルになります。

どんなときに false にするか

  • 「全件登録できなければ配信自体を中止したい」ケース
  • テスト環境で、エラーを見逃さず全件チェックしたいケース

通常の本番運用では ignore_errors=true をデフォルトにしておき、上記のような特殊ケースだけ false に切り替えるのがおすすめです。

実装メモ

import requests
import json

BASE_URL = "https://app.engn.jp/api/v1"
BEARER_TOKEN = "your_bearer_token"

def start_import_job(delivery_id: int, csv_path: str, ignore_errors: bool = True) -> int:
    """
    宛先一括登録ジョブを開始し、job_id を返す
    """
    url = f"{BASE_URL}/deliveries/{delivery_id}/emails/import"

    # オプション用のJSONファイルを作成
    options = {"ignore_errors": ignore_errors}

    with open(csv_path, "rb") as csv_file:
        files = {
            "file": ("addresses.csv", csv_file, "text/csv"),
            "data": ("options.json", json.dumps(options), "application/json"),
        }
        response = requests.post(
            url,
            headers={"Authorization": f"Bearer {BEARER_TOKEN}"},
            files=files,
        )

    response.raise_for_status()
    job_id = response.json()["job_id"]

    # ログに残す(後述の小技3で活用)
    logger.info(f"Import job started: delivery_id={delivery_id}, job_id={job_id}, ignore_errors={ignore_errors}")

    return job_id

小技2:エラーCSVを"修正用入力"として再利用する

ジョブ完了後、失敗行があると error_file_url にエラーCSVのダウンロードURLが発行されます。多くの現場ではこのCSVを「人が目視で確認して手修正」という運用になりがちですが、エラーCSVをそのまま再投入の素材として扱う発想に切り替えると自動化が進みます。

エラーCSVには元の行データに加えてエラー理由が付与されています。これを機械的に処理して「修正済みCSV」を生成し、再度ジョブに投入するフローを組みましょう。

よくあるエラーパターンと自動修正の例

エラー内容 自動修正の例
メールアドレス形式エラー 前後の空白除去、全角→半角変換
必須項目が空 デフォルト値で補完、または除外リストへ
重複アドレス 2件目以降を除外
文字コードエラー UTF-8に正規化

実装メモ

import requests
import zipfile
import csv
import io
import re

def download_and_parse_error_csv(error_file_url: str) -> list:
    """
    エラーCSV(zip)をダウンロードしてパースする
    """
    response = requests.get(
        error_file_url,
        headers={"Authorization": f"Bearer {BEARER_TOKEN}"},
    )
    response.raise_for_status()

    # zipを展開
    with zipfile.ZipFile(io.BytesIO(response.content)) as zf:
        csv_filename = zf.namelist()[0]
        with zf.open(csv_filename) as f:
            # BOM付きUTF-8で読み込み
            reader = csv.DictReader(io.TextIOWrapper(f, encoding="utf-8-sig"))
            return list(reader)


def normalize_email(email: str) -> str:
    """メールアドレスを正規化"""
    # 前後の空白除去
    email = email.strip()
    # 全角→半角変換(@など)
    email = email.translate(str.maketrans("@.", "@."))
    return email.lower()


def create_retry_csv(error_rows: list, output_path: str) -> int:
    """
    エラー行を修正し、再投入用CSVを生成する
    戻り値は出力した行数
    """
    seen_emails = set()
    valid_rows = []

    for row in error_rows:
        email = normalize_email(row.get("email", ""))

        # メールアドレス形式チェック
        if not re.match(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", email):
            logger.warning(f"Invalid email skipped: {email}")
            continue

        # 重複チェック
        if email in seen_emails:
            logger.warning(f"Duplicate email skipped: {email}")
            continue
        seen_emails.add(email)

        # 修正した行を追加
        row["email"] = email
        valid_rows.append(row)

    # CSVに書き出し
    if valid_rows:
        fieldnames = valid_rows[0].keys()
        with open(output_path, "w", encoding="utf-8", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(valid_rows)

    return len(valid_rows)

小技3:job_id と投入元IDのひも付けを先に決める

障害発生時に「このjob_idはどの配信の、どのバッチ処理で投入されたものか?」がすぐに分からないと、原因究明に時間がかかります。ログ設計を事前に固めておきましょう。

最低限ログに残すべき項目

項目 説明
delivery_id blastengine側の配信ID
job_id 一括登録ジョブのID
batch_id / request_id 自社システム側の一意なID(バッチID、リクエストIDなど)
total_count 投入した総件数
success_count 成功件数
failed_count 失敗件数
source 投入元の識別子(「会員DB同期」「キャンペーン登録」など)

実装メモ

import json
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

logger = logging.getLogger(__name__)

@dataclass
class ImportJobLog:
    """一括登録ジョブのログ構造"""
    timestamp: str
    delivery_id: int
    job_id: int
    batch_id: str           # 自社システム側のID
    source: str             # 投入元の識別子
    total_count: int
    success_count: Optional[int] = None
    failed_count: Optional[int] = None
    status: str = "STARTED"
    error_file_url: Optional[str] = None

    def to_dict(self) -> dict:
        return {k: v for k, v in self.__dict__.items() if v is not None}


def log_job_start(delivery_id: int, job_id: int, batch_id: str, source: str, total_count: int):
    """ジョブ開始時のログ"""
    log = ImportJobLog(
        timestamp=datetime.now().isoformat(),
        delivery_id=delivery_id,
        job_id=job_id,
        batch_id=batch_id,
        source=source,
        total_count=total_count,
        status="STARTED",
    )
    logger.info(f"import_job: {json.dumps(log.to_dict(), ensure_ascii=False)}")


def log_job_complete(delivery_id: int, job_id: int, batch_id: str, source: str,
                     total_count: int, success_count: int, failed_count: int,
                     error_file_url: Optional[str] = None):
    """ジョブ完了時のログ"""
    log = ImportJobLog(
        timestamp=datetime.now().isoformat(),
        delivery_id=delivery_id,
        job_id=job_id,
        batch_id=batch_id,
        source=source,
        total_count=total_count,
        success_count=success_count,
        failed_count=failed_count,
        status="FINISHED" if failed_count == 0 else "PARTIAL_SUCCESS",
        error_file_url=error_file_url,
    )
    logger.info(f"import_job: {json.dumps(log.to_dict(), ensure_ascii=False)}")

ログ出力例(JSON形式)

{
  "timestamp": "2025-01-30T10:00:00+09:00",
  "delivery_id": 12345,
  "job_id": 67890,
  "batch_id": "campaign_2025_spring_001",
  "source": "キャンペーン登録",
  "total_count": 10000,
  "success_count": 9850,
  "failed_count": 150,
  "status": "PARTIAL_SUCCESS",
  "error_file_url": "https://app.engn.jp/api/v1/deliveries/-/emails/import/67890/errorinfo/download"
}

このログがあれば、「batch_id=campaign_2025_spring_001 で150件失敗した」→「エラーCSVを取得して原因調査」→「修正して再投入」という流れがスムーズに回ります。


最小コード例:一括登録の全体フロー

ここまでの小技を統合した、一括登録ジョブの骨格コードです。

import time
import requests
import json
import csv
import io
import zipfile
import re
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

# ログ設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 設定値
BASE_URL = "https://app.engn.jp/api/v1"
BEARER_TOKEN = "your_bearer_token"
POLL_INTERVAL = 5  # 秒
MAX_POLL_COUNT = 60  # 最大5分待機


# ===========================================
# ジョブ操作
# ===========================================

def start_import_job(delivery_id: int, csv_path: str, ignore_errors: bool = True) -> int:
    """宛先一括登録ジョブを開始し、job_id を返す"""
    url = f"{BASE_URL}/deliveries/{delivery_id}/emails/import"
    options = {"ignore_errors": ignore_errors}

    with open(csv_path, "rb") as csv_file:
        files = {
            "file": ("addresses.csv", csv_file, "text/csv"),
            "data": ("options.json", json.dumps(options), "application/json"),
        }
        response = requests.post(
            url,
            headers={"Authorization": f"Bearer {BEARER_TOKEN}"},
            files=files,
        )

    response.raise_for_status()
    job_id = response.json()["job_id"]
    logger.info(f"Import job started: delivery_id={delivery_id}, job_id={job_id}, ignore_errors={ignore_errors}")
    return job_id


def wait_for_job_completion(job_id: int) -> dict:
    """ジョブの完了をポーリングで待機"""
    url = f"{BASE_URL}/deliveries/-/emails/import/{job_id}"

    for _ in range(MAX_POLL_COUNT):
        response = requests.get(
            url,
            headers={"Authorization": f"Bearer {BEARER_TOKEN}"},
        )
        response.raise_for_status()
        result = response.json()

        logger.info(f"Job {job_id} status: {result['status']} ({result.get('percentage', 0)}%)")

        if result["status"] == "FINISHED":
            return result
        elif result["status"] in ("FAILED", "SYSTEM_ERROR", "TIMEOUT"):
            raise Exception(f"Job failed: {result['status']}")

        time.sleep(POLL_INTERVAL)

    raise TimeoutError(f"Job {job_id} did not complete in time")


def get_job_status(job_id: int) -> dict:
    """ジョブのステータスを取得"""
    url = f"{BASE_URL}/deliveries/-/emails/import/{job_id}"
    response = requests.get(
        url,
        headers={"Authorization": f"Bearer {BEARER_TOKEN}"},
    )
    response.raise_for_status()
    return response.json()


# ===========================================
# エラーCSV処理
# ===========================================

def download_and_parse_error_csv(error_file_url: str) -> list:
    """エラーCSV(zip)をダウンロードしてパースする"""
    response = requests.get(
        error_file_url,
        headers={"Authorization": f"Bearer {BEARER_TOKEN}"},
    )
    response.raise_for_status()

    with zipfile.ZipFile(io.BytesIO(response.content)) as zf:
        csv_filename = zf.namelist()[0]
        with zf.open(csv_filename) as f:
            reader = csv.DictReader(io.TextIOWrapper(f, encoding="utf-8-sig"))
            return list(reader)


def normalize_email(email: str) -> str:
    """メールアドレスを正規化"""
    email = email.strip()
    email = email.translate(str.maketrans("@.", "@."))
    return email.lower()


def create_retry_csv(error_rows: list, output_path: str) -> int:
    """エラー行を修正し、再投入用CSVを生成する"""
    seen_emails = set()
    valid_rows = []

    for row in error_rows:
        email = normalize_email(row.get("email", ""))

        if not re.match(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", email):
            logger.warning(f"Invalid email skipped: {email}")
            continue

        if email in seen_emails:
            logger.warning(f"Duplicate email skipped: {email}")
            continue
        seen_emails.add(email)

        row["email"] = email
        valid_rows.append(row)

    if valid_rows:
        fieldnames = valid_rows[0].keys()
        with open(output_path, "w", encoding="utf-8", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(valid_rows)

    return len(valid_rows)


# ===========================================
# ログ
# ===========================================

@dataclass
class ImportJobLog:
    """一括登録ジョブのログ構造"""
    timestamp: str
    delivery_id: int
    job_id: int
    batch_id: str
    source: str
    total_count: int
    success_count: Optional[int] = None
    failed_count: Optional[int] = None
    status: str = "STARTED"
    error_file_url: Optional[str] = None

    def to_dict(self) -> dict:
        return {k: v for k, v in self.__dict__.items() if v is not None}


def log_job_start(delivery_id: int, job_id: int, batch_id: str, source: str, total_count: int):
    """ジョブ開始時のログ"""
    log = ImportJobLog(
        timestamp=datetime.now().isoformat(),
        delivery_id=delivery_id,
        job_id=job_id,
        batch_id=batch_id,
        source=source,
        total_count=total_count,
        status="STARTED",
    )
    logger.info(f"import_job: {json.dumps(log.to_dict(), ensure_ascii=False)}")


def log_job_complete(delivery_id: int, job_id: int, batch_id: str, source: str,
                     total_count: int, success_count: int, failed_count: int,
                     error_file_url: Optional[str] = None):
    """ジョブ完了時のログ"""
    log = ImportJobLog(
        timestamp=datetime.now().isoformat(),
        delivery_id=delivery_id,
        job_id=job_id,
        batch_id=batch_id,
        source=source,
        total_count=total_count,
        success_count=success_count,
        failed_count=failed_count,
        status="FINISHED" if failed_count == 0 else "PARTIAL_SUCCESS",
        error_file_url=error_file_url,
    )
    logger.info(f"import_job: {json.dumps(log.to_dict(), ensure_ascii=False)}")


# ===========================================
# メインフロー
# ===========================================

def run_bulk_import(
    delivery_id: int,
    csv_path: str,
    batch_id: str,
    source: str,
) -> dict:
    """
    宛先一括登録の全体フロー
    1. CSV生成(呼び出し元で実施)
    2. ジョブ起動
    3. 完了待ち
    4. 失敗があればエラーCSVを取得して再投入
    """

    # CSVの行数をカウント
    with open(csv_path, "r", encoding="utf-8") as f:
        total_count = sum(1 for _ in f) - 1  # ヘッダー除く

    # 1. ジョブ起動(ignore_errors=Trueで部分成功を許容)
    job_id = start_import_job(delivery_id, csv_path, ignore_errors=True)
    log_job_start(delivery_id, job_id, batch_id, source, total_count)

    # 2. 完了待ち(ポーリング)
    job_result = wait_for_job_completion(job_id)

    # 3. 結果をログ出力
    log_job_complete(
        delivery_id=delivery_id,
        job_id=job_id,
        batch_id=batch_id,
        source=source,
        total_count=job_result["total_count"],
        success_count=job_result["success_count"],
        failed_count=job_result["failed_count"],
        error_file_url=job_result.get("error_file_url"),
    )

    # 4. 失敗行があればエラーCSVを取得してリトライ
    if job_result["failed_count"] > 0 and job_result.get("error_file_url"):
        logger.info("=== エラーCSVをダウンロード ===")
        error_rows = download_and_parse_error_csv(job_result["error_file_url"])
        logger.info(f"エラー行数: {len(error_rows)}")

        # リトライCSVを生成
        retry_csv_path = f"/tmp/retry_{job_id}.csv"
        retry_count = create_retry_csv(error_rows, retry_csv_path)
        logger.info(f"リトライ可能な行数: {retry_count}")

        if retry_count > 0:
            # 再投入
            new_job_id = start_import_job(delivery_id, retry_csv_path, ignore_errors=True)
            retry_result = wait_for_job_completion(new_job_id)
            logger.info(f"Retry completed: job_id={new_job_id}, success={retry_result['success_count']}, failed={retry_result['failed_count']}")

    return job_result

使用例

# メイン処理
result = run_bulk_import(
    delivery_id=12345,
    csv_path="/path/to/addresses.csv",
    batch_id="campaign_2025_spring_001",
    source="キャンペーン登録",
)

print(f"成功: {result['success_count']}件 / 失敗: {result['failed_count']}")

まとめ

blastengineの宛先一括登録ジョブを安全に運用するための3つの小技を紹介しました。

  1. ignore_errors=true で止めない - 部分成功を許容し、正常行は先に登録
  2. エラーCSVを再投入素材にする - 人力確認ではなく、自動修正→再投入のフローを構築
  3. job_id と投入元IDをログで結ぶ - 障害時の追跡を可能にする最低限のログ設計

これだけで、一括登録まわりの「あの配信、どうなったっけ?」という手戻りが大幅に減らせます。

blastengineのAPIはシンプルで扱いやすい設計になっていますが、エラーハンドリングとログ設計は利用者側で作り込む必要があります。ぜひ本記事のパターンを参考に、堅牢な実装を目指してください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?