こちら↓の記事とほぼ同等のことを、blastengine API 連携でも試してみました。
AWS に関する設定手順の大部分は前掲の記事と同じですので、必要に応じてそちらを参照してください。
サンプルの内容
「メール送信時に送信履歴用の DynamoDB テーブルに記録した情報」と「blastengine の Webhook で取得した情報」を突合し、Bounce Event に関する情報を取得するサンプルです。
今回のサンプルでは、メール送信部分を(SendGrid のサンプル同様)、
- メール送信用の DynamoDB テーブルに送信するメールの情報をレコード登録
- DynamoDB Streams 経由でメール送信用の Lambda 関数を呼び出し
- 同 Lambda 関数でメールを送信し送信履歴テーブルにレコード登録
という流れで処理していますが、この部分はメール送信を行うアプリケーションで実装したほうが良いかもしれません。
ただし、blastengine API 連携には 1 分間に 500 リクエスト以内 のレート制限があるため、リクエストの流量調整の実装をアプリケーションから独立させる目的でこのような設計にする手もあるかもしれません。
2023/3/24 追記:
今回のサンプルでは SendGrid のサンプルとは違い、メール送信処理後にメール送信用テーブルからレコードを削除する仕様に変更しました。
- blastengine の Webhook によって Bounce Event 取得用 API Gateway を呼び出し
- 同 API Gateway 経由で Bounce Event 取得用 Lambda 関数を実行
- 同 Lambda 関数 で受信したドロップおよびハードエラーのイベント情報と送信履歴用 DynamoDB テーブルのレコードを突合
- 突合した結果を Bounce Event 用 DynamoDB テーブルにレコード登録
という流れで Bounce Event を記録します。
こちらの GitHub リポジトリに実際のコードを置いてあります。
blastengine 側の設定 (1)
1. API キーの設定
「設定」 画面の左上、「API キー」 で確認しておきます。
API キーのほかにログイン ID も必要なのでログイン時に確認しておきます。
AWS 側の設定
1. Dynamo DB テーブルおよび KMS キーの作成
DynamoDB テーブルをいくつか作成していきます。
1-1. KMS キーの作成
最初に DynamoDB テーブル暗号化用 KMS キーを作成します。
Amazon 所有キーを使う場合はキーの作成は不要です。
「KMS」 - 「カスタマー管理型のキー」 画面で 「キーの作成」 をクリックします。
- エイリアス(キーの名前) : 任意(
testMailKey
など) - キーの管理者 : 適切な管理者を選択
キーの使用者(ユーザー)は各 Lambda 関数作成後に追加します。
1-2. メール送信用 DynamoDB テーブルの作成
「DynamoDB」(「ダッシュボード」 または 「テーブル」)画面で 「テーブルの作成」 をクリックします。
- テーブル名 : 任意(
blastMailSender
など) - パーティションキー :
id
(文字列) - 設定をカスタマイズ
- キャパシティーモード : オンデマンド
- 暗号化キーの管理 : 先ほど作成した KMS キーを選択
- Amazon 所有キーを使う場合は「Amazon DynamoDB が所有」を選択
1-3. メール送信履歴用 DynamoDB テーブルの作成
引き続きメール送信履歴用テーブルを作成します。
- テーブル名 : 任意(
blastMailSentLog
など) - パーティションキー :
deliveryId
(数値)
ほかの項目はメール送信用テーブルと同じです。
1-4. Bounce Event 用 DynamoDB テーブルの作成
最後に Bounce Event 用テーブルを作成します。
- テーブル名 : 任意(
blastMailBounce
など) - パーティションキー :
deliveryId
(数値)
ほかの項目はメール送信用テーブルと同じです。
実運用で使う場合、DynamoDB の各テーブルには TTL を設定すると良いでしょう。
2. Lambda 関数 / API Gateway / IAM Role および KMS キーの作成・設定
2-1. blastengine Python SDK を Lambda レイヤーとして登録
Amazon Linux 2 の EC2(Cloud9)を用意し、 blastengine Python SDK を.zip
化します。
mkdir python
pip3 install blastengine -t python
zip blastengine-sdk.zip -r python
「Lambda」 - 「レイヤー」 画面で 「レイヤーの作成」 をクリックします。
- 名前 : 任意(
blastengine
など) - 互換性のあるアーキテクチャ : x86_64・arm64 ともチェック
- ランタイム : Python 3.9
- 先ほど作った
.zip
ファイルをアップロード
2-2. メール送信用 Lambda 関数の作成
「Lambda」 - 「関数」 画面で 「関数の作成」 をクリックします。
- 関数名 : 任意(
blastMailSender
など) - ランタイム : Python 3.9
- アーキテクチャ : どちらでも可(ここでは arm64 を選択)
- 実行ロール : 基本的な Lambda アクセス権限で新しいロールを作成
「コード」 - 「レイヤー」 - 「レイヤーの追加」 :
- レイヤーソース : 「カスタムレイヤー」 を選択
- カスタムレイヤー : 2-1. で作成したものを選択
「コード」 - 「コードソース」 : lambda_function.py
import boto3
import datetime
import json
import os
import random
import time
import traceback
from base64 import b64decode
from blastengine.Client import Blastengine
from blastengine.Transaction import Transaction
from boto3.dynamodb.conditions import Key
from boto3.dynamodb.types import TypeDeserializer
deserializer = TypeDeserializer()
be_api_user = boto3.client('kms').decrypt(
CiphertextBlob=b64decode(os.environ['BLASTENGINE_API_USER']),
EncryptionContext={'LambdaFunctionName': os.environ['AWS_LAMBDA_FUNCTION_NAME']}
)['Plaintext'].decode('utf-8')
be_api_key = boto3.client('kms').decrypt(
CiphertextBlob=b64decode(os.environ['BLASTENGINE_API_KEY']),
EncryptionContext={'LambdaFunctionName': os.environ['AWS_LAMBDA_FUNCTION_NAME']}
)['Plaintext'].decode('utf-8')
table_sender = os.environ['TABLE_SENDER']
table_sent_log = os.environ['TABLE_SENT_LOG']
MAX_REQUEST_COUNT = 3
TIME_SLEEP = 60
TIME_JITTER = 5
def lambda_handler(event, context):
# blastengine初期化
Blastengine(be_api_user, be_api_key)
# テーブルの準備
dynamodb = boto3.resource('dynamodb')
tableLog = dynamodb.Table(table_sent_log)
tableSender = dynamodb.Table(table_sender)
# ストリームから受け取った変更後レコードリストをループ処理
for record in event['Records']:
eventName = record['eventName']
# DynamoDB StreamsのフィルタでもeventNameをフィルタしているが設定ミスでの課金を防ぐためeventNameを確認
if (eventName == 'INSERT' or eventName == 'MODIFY'):
image = record['dynamodb']['NewImage']
request_count = 0
mail_sent = 0
delivery_id = 0
while request_count < MAX_REQUEST_COUNT:
try:
# eventをdictに変換して値を抽出
item = deserialize(image)
message = Message(item)
# blastengineで送信
if (mail_sent == 0):
delivery_id = send(message)
mail_sent = 1
# 履歴用テーブルに転記
tb_response = store(tableLog, message, delivery_id)
print('Result table:', tb_response)
# 送信用テーブルからレコード削除
tableSender.delete_item(Key={'id': message.id})
# 送信 OK
request_count = MAX_REQUEST_COUNT
except Exception as e:
# 例外→ログを残す
print(traceback.format_exc())
request_count += 1
if (request_count >= MAX_REQUEST_COUNT):
# 連続失敗上限到達→スキップ
print(image)
print('Retry count exceeded. -> skip')
else:
# 60~64秒スリープしてリトライ
time.sleep(TIME_SLEEP + random.randrange(TIME_JITTER))
print('Retry:', request_count)
return 'Successfully processed {} records.'.format(len(event['Records']))
class Message():
# 送信メールメッセージクラス
def __init__(self, item):
# コンストラクタ
self.id = item['id']
self.from_address = item['fromAddress']
self.from_name = item['fromName']
self.subject = item['subject']
self.text_part = item['textPart']
self.to_address = item['toAddress']
self.to_name = item['toName']
def deserialize(image):
# dictに変換
d = {}
for key in image:
d[key] = deserializer.deserialize(image[key])
return d
def send(message):
# blastengineで通知
transaction = Transaction()
transaction.subject(message.subject)
transaction.text_part(message.text_part)
transaction.fromAddress(message.from_address, message.from_name)
transaction.to(message.to_address)
return transaction.send()
def store(table, message, delivery_id):
# 送信ログ用テーブルに転記
response = table.put_item(
Item={
'deliveryId' : delivery_id,
'fromAddress': message.from_address,
'fromName' : message.from_name,
'subject' : message.subject,
'toAddress' : message.to_address,
'toName' : message.to_name
}
)
if (response['ResponseMetadata']['HTTPStatusCode'] != 200):
# ステータスコードが200以外→エラー
print('DynamoDB put_item error:', response)
return response
「設定」 - 「一般設定」 - 「編集」 :
- タイムアウト : 10 分 0 秒
- 既存のロール : 「View the 【ロール名】」 のリンクをクリック
- ポリシーの 「編集」 をクリック
- アクション : ListStreams/リソース : *
- アクション : DeleteItem/リソース : メール送信用 DynamoDB テーブルの ARN(2023/3/24 追記)
- アクション : GetItem・Query・PutItem/リソース : メール送信履歴用 DynamoDB テーブルの ARN
- アクション : DescribeStream・GetRecords・GetShardIterator/リソース : メール送信用 DynamoDB テーブルの stream/*(ARN)を指定
- ポリシーの 「編集」 をクリック
- Lambda 関数の 「基本設定を編集」 画面に戻って 「保存」
環境変数は暗号化用の KMS キーを作成してから設定します。
「設定」 - 「トリガー」 - 「トリガーを追加」 :
- トリガーの設定 : 「DynamoDB」 を選択
- DynamoDB テーブル : メール送信用テーブルを選択
- バッチサイズ : 100
- 追加設定
- 再試行 : 0
- フィルタリング条件 :
{ "eventName": ["INSERT", "MODIFY"]}
(2023/3/24 追記)
エラー(例外)終了したときに無限にリトライを続けて、想定外の課金が積みあがるのを避けるために再試行を「0」にします。
2023/3/24 追記:
また、レコード削除時のREMOVE
イベント発生時に Lambda のトリガーを無駄に発火させないように、フィルタリングを行います。
2-3. Lambda 関数(メール送信)用 KMS キーの作成
1-1. と同様にキーを作成します。
- エイリアス : 任意の名前
- キーの使用者(ユーザー) : メール送信用 Lambda 関数の実行ロール
環境変数の暗号化にデフォルトのキーを使う場合は作成不要です。
2-4. メール送信用 Lambda 関数の環境変数を設定
メール送信用 Lambda 関数の画面で 「設定」 - 「環境変数」 - 「編集」 をクリックします。
- キー :
-
BLASTENGINE_API_USER
: blastengine のログイン ID -
BLASTENGINE_API_KEY
: blastengine 側の設定 (1) の 1. で発行した API キー -
TABLE_SENDER
: メール用 DynamoDB テーブルの名前(2023/3/24 追記) -
TABLE_SENT_LOG
: メール送信履歴用 DynamoDB テーブルの名前
-
- 転送時の暗号化 : 「転送時の暗号化に使用するヘルパーの有効化」 にチェック
- 保管時に暗号化する AWS KMS キー : 「カスタマーマスターキーの使用」 を選択
- カスタマーマスターキー : 2-3. で作成したキーを選択
- キー「
BLASTENGINE_API_USER
」の右側にある 「暗号化」 で値の暗号化- 先ほどと同じ AWS KMS キーを選択し、「暗号化」 をクリック
- キー「
BLASTENGINE_API_KEY
」についても同様に暗号化- 前の画面に戻ったら 「保存」 をクリックして完了
2-5. Bounce Event 取得用 Lambda 関数の作成
関数を作成します。
- 関数名 : 任意(
blastBounceReceiver
など) - ランタイム : Python 3.9
- アーキテクチャ : どちらでも可(ここでは arm64 を選択)
- 実行ロール : 基本的な Lambda アクセス権限で新しいロールを作成
- コード :
lambda_function.py
import json
import boto3
import os
import traceback
from botocore.exceptions import ClientError
table_sent_log = os.environ['TABLE_SENT_LOG']
table_bounce = os.environ['TABLE_BOUNCE']
def lambda_handler(event, context):
# 履歴用テーブル・バウンス用テーブルの準備
dynamodb = boto3.resource('dynamodb')
tableSentLog = dynamodb.Table(table_sent_log)
tableBounce = dynamodb.Table(table_bounce)
# リクエストから必要情報を取得
body = json.loads(event['body'])
events = body['events']
# バウンス用テーブルに記録
for bodyItem in events:
try:
bounceItem = BounceItem(tableSentLog, bodyItem['event'])
tb_response = store(tableBounce, bounceItem)
print('Result table:', tb_response)
except Exception as e:
# 例外→ログを残す
print(traceback.format_exc())
print(bounceItem)
# 異常終了
return {
'statusCode': 500,
'body' : json.dumps({'message':'NG'})
}
# 正常終了
return {
'statusCode': 200,
'body' : json.dumps({'message':'OK'})
}
class BounceItem():
# バウンス項目クラス
def __init__(self, table, item):
# コンストラクタ
# Webhookの情報を取得
self.type = item['type']
self.datetime = item['datetime']
detail = item['detail']
self.to_address = detail['mailaddress']
self.subject = detail['subject']
self.error_code = str(detail['error_code'])
self.error_message = detail['error_message']
self.delivery_id = detail['delivery_id']
# 送信履歴の情報を取得
response = table.get_item(Key={'deliveryId': self.delivery_id})
sentItem = response['Item']
self.from_address = sentItem['fromAddress']
self.from_name = sentItem['fromName']
self.to_name = sentItem['toName']
def store(table, item):
# バウンス用テーブルに転記
response = table.put_item(
Item={
'deliveryId' : item.delivery_id,
'fromAddress' : item.from_address,
'fromName' : item.from_name,
'toAddress' : item.to_address,
'toName' : item.to_name,
'subject' : item.subject,
'datetime' : item.datetime,
'type' : item.type,
'errorCode' : item.error_code,
'errorMessage': item.error_message
}
)
if (response['ResponseMetadata']['HTTPStatusCode'] != 200):
# ステータスコードが200以外→エラー
print('DynamoDB put_item error:', response)
return response
- 一般設定 - タイムアウト : 30 秒
- アクセス権限 : 選択されているポリシーに DynamoDB に関する権限を追加
- アクション : GetItem/リソース : メール送信履歴用 DynamoDB テーブルの ARN
- アクション : PutItem/リソース : Bounce Event 用 DymanoDB テーブルの ARN
- 環境変数 :
-
TABLE_BOUNCE
: Bounce Event 用 DynamoDB テーブルの名前 -
TABLE_SENT_LOG
: メール送信履歴用 DynamoDB テーブルの名前
-
トリガーは API Gateway 作成時に設定(追加)します(Lambda 統合の設定を行うことで追加される)。
2-6. Bounce Event 取得用 API Gateway の作成
「API Gateway」 - 「API」 画面で 「API を作成」 をクリックします。
-
「REST API」 の 「構築」 をクリック
- 任意の API 名(例 : 「blastEventHookReceiver」)を入力して 「API の作成」 をクリック
-
「リソース」 - 「アクション」 のメニューで 「リソースの作成」 を選択
- 任意のリソース名(悪用されにくいようランダムで長いもの)を入力して 「リソースの作成」 をクリック
- 同じリソースの 「アクション」 のメニューで 「メソッドの作成」 を選択
-
POST メソッドを選択
- 統合タイプ : Lambda 関数
- Lambda プロキシ統合の使用 : チェック
- Lambda 関数 : Bounce Event 取得用 Lambda 関数の名前
- 「保存」 → 「OK」 をクリック
-
「メソッドレスポンス」 のリンクをクリック
-
「レスポンスの追加」 をクリック
- ステータス 500 のレスポンスを追加(チェックアイコン をクリック)
-
「500 のレスポンス本文」 を登録
- コンテンツタイプ : application/json
- モデル : Empty
- チェックアイコン をクリックして登録(保存)
-
「レスポンスの追加」 をクリック
-
POST メソッドを選択
- 同じリソースの 「アクション」 のメニューで 「API のデプロイ」 を選択
- デプロイされるステージ : [新しいステージ]
- ステージ名 : v1
- 「デプロイ」 をクリックして API をデプロイ
- 同じリソースの 「アクション」 のメニューで 「メソッドの作成」 を選択
- 任意のリソース名(悪用されにくいようランダムで長いもの)を入力して 「リソースの作成」 をクリック
-
「リソース」 - 「アクション」 のメニューで 「リソースの作成」 を選択
- 任意の API 名(例 : 「blastEventHookReceiver」)を入力して 「API の作成」 をクリック
2-7. Dynamo DB テーブル用 KMS キーにユーザー(IAM Role)を追加
Amazon 所有キーを使う場合は不要です。
「KMS」 - 「カスタマー管理型のキー」 画面で DynamoDB テーブル用 KMS キーのエイリアスのリンクをクリックします。
-
「キーユーザー」 の 「追加」 をクリック
- 以下の Lambda 関数実行用の IAM Role を追加
- メール送信用 Lambda 関数
- Bounce Event 取得用 Lambda 関数
- 以下の Lambda 関数実行用の IAM Role を追加
3. Route 53 Public Hosted Zone(送信者メールアドレスのドメイン)に SPF・DKIM レコードを追加
それぞれ、リンク先を参考にレコードを登録します。
DKIM レコードの生成には、PowerDMARC の「DKIM レコードジェネレーター」を使うと便利です。鍵の長さがレコード長の制限を超える場合は 1024 で生成します。
blastengine 側の設定 (2)
1. DKIM 作成者署名の設定
「設定」 画面の左下、 「DKIM 作成者署名の設定」 で設定します。
- セレクタ : Route 53 の DKIM レコードとして登録したものに合わせる(
s1
など) - ドメイン : 送信者メールアドレスのドメイン
- 秘密鍵 : 先ほど生成した DKIM レコードの秘密鍵
2. Webhook の設定
「設定」 画面の右下、 「Webhook」 で設定します。
- 状態 : 有効(チェック)
- エンドポイント URL : Bounce Event 取得用 API Gateway の API エンドポイント URL
- 通知イベント : ドロップ・ハードエラー(チェック)
テスト実行
テストメールを送信してみます。
1. 存在するメールアドレスに送信してみる
「DynamoDB」 - 「項目」 画面でメール送信用 DynamoDB テーブルを選択し、「項目を作成」 をクリックします。
画面の各属性・値を入力して 「項目を作成」 をクリックすると、メールが送信されます。
メール送信履歴用 DynamoDB テーブルで送信内容を確認します。
1 〜 2 分ほど待って、Bounce Event 用 DynamoDB テーブルにレコードが登録されないことを確認します。
2. 存在しないメールアドレスに送信してみる
存在しないメールアドレスのドメインは、必ず自己所有ドメインにします。他人の所有ドメイン宛てには送信しないでください。
1 〜 2 分ほど待つと blastengine の配信ログに結果が表示されます(ステータス : ハードエラー)。
その後、Bounce Event 用 DynamoDB テーブルにレコードが登録されているのを確認します(type : HARDERROR)。