はじめに
FargateをAPIバックエンド的な使い方をした構成を構築してみたかった。Webサービスは特にホストする予定はなく、APIのバックエンド処理をFargateのサービスタイプを利用した構成で試すことが目的。ちなみにこの構成を考える際に悩んだような部分は以下の点。
悩み
・Private SubnetにFargateを構築して色々なAWSサービスと統合するには、VPCエンドポイントが必要で、エンドポイント一つにつき月額10USDくらいかかってくる。
⇒思ったよりも作成するエンドポイントが多そうで、重なるとだいぶコストになる。。。
・Fargateをサービスタイプで起動すると、こちらもコストがかかってくる。
・でも実際にFargateをPrivate Subnetでサービスとして構築してみたい。
・Lambdaを利用した既存構成はできているから、一時的にLambda部分をFargateに置き換えて検証しよう。
・検証が終わったらLambdaの構成に戻し、Fargate関連のサービスは削除しよう。(悲しいけど)
・削除漏れがあると嫌なので、Terraformで構築しよう。(一気にクリーンアップできるし、今後の資産にもなる)
⇒この点は是非実践してみたかったが、Terraformを整備するのに時間がかかりそうで直ぐにはできなそうだったので、結局コンソールで作成することにした。代わりの他の構成部分をTerraformで整備していく方針。
構成
Subscriber Lambdaの代わりとなるSubscriber Fargateを構築する。ただしある程度検証が終わったらLambdaの利用に戻す。
ちなみにこれまでの構成については以下の記事などで記録している。
↓
実施内容
・Subscriber用のDockerイメージを格納するリポジトリの作成
・Subscriber用のDockerイメージの作成
・Subscriber用のECSクラスターの作成
・ECR用VPCエンドポイントの作成
・S3用VPCエンドポイントの作成
・CloudWatchLogs用のエンドポイント+セキュリティグループの作成
・SQS用VPCエンドポイントの作成
・DynamoDB用VPCエンドポイントの作成
・IAMロールの設定確認
・ECS Task定義(Subscriber用)
・Fargateサービスのデプロイ
・動作確認
ECRリポジトリ作成
sub-repo
としてリポジトリを作成する。このリポジトリへ作成したDockerイメージをプッシュしていく。
Subscriber用のDockerイメージの作成
DockerFile
、msg_subscriber.py
、requirement.txt
をそれぞれ作成する
FROM python:3.8
WORKDIR /app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY msg_subscriber.py ./
ENV PYTHONUNBUFFERED=1
ENTRYPOINT ["python", "./msg_subscriber.py"]
import json
import boto3
from datetime import datetime
import os
queue_name = os.environ['QueueName']
sqs_endpoint_url = os.environ['SQSEndpointUrl']
dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-1')
table = dynamodb.Table('sns_sqs_lambda_table')
sqs = boto3.resource('sqs', region_name='ap-northeast-1', endpoint_url=sqs_endpoint_url)
queue = sqs.get_queue_by_name(QueueName=queue_name)
def logging(errorLv, funcName, errorMsg):
loggingDateStr=(datetime.now()).strftime('%Y%m%d %H:%M:%S')
print(loggingDateStr + " " + funcName + " " + "[" + errorLv + "] " + errorMsg)
return
def main():
logging("info", "subscriber fargate", "service started")
print("subscribe fargate started..")
message = queue.receive_messages(MaxNumberOfMessages=1)
result = ''
if message:
print("fetched message: " + message[0].body)
try:
result = table.put_item(Item={
'message_id': message[0].message_id
'subject': 'fixed subject',
'message': message[0].body,
'timestamp': datetime.now().isoformat(timespec='seconds')
})
except Exception as e:
print(e)
raise e
else:
print("there is no message.")
print(result)
return result
if __name__ == '__main__':
main()
シンプルにSQSからメッセージを1件取得してくる内容。QueueName
,SQSEndpointUrl
はECSTask定義を作成する際に環境変数に設定する。(一度これを忘れてエラーとなった)
boto3.resource('sqs')
の仕様などは以下の公式ガイドなどで確認できる。
boto3==1.9.183
リポジトリへプッシュ
記述DockerイメージをビルドしてECRリポジトリへプッシュする。
ECRにログインし、
イメージをビルドし、
イメージにタグ付けをし、
プッシュ実行
ECS Clusterの作成
次にコンテナを実行するクラスターを作成していく。といってもAWSコンソールで数クリックで完了する。
今回はNetworking onlyを選択し、
クラスター名は任意に設定しCreate
。
VPCエンドポイントの作成
今回はプライベートサブネットにFargateを作成し、各種のAWSサービスと連携させるため複数のVPCエンドポイントが必要になる。今回はFargateを利用するため、以下の公式ドキュメントにもあるようにECR用のエンドポイント(2種類)とS3エンドポイントがまずは必要。
ECR-dkr用エンドポイントの作成
com.amazonaws.region.ecr.dkr
エンドポイントを作成。
ECR-api用エンドポイントの作成
com.amazonaws.region.ecr.api
エンドポイントを作成。
S3用のエンドポイントの作成
com.amazonaws.region.s3
を作成。ゲートウェイ型。
CloudWatchLog用のエンドポイントの作成
FargateからCloudWatchLogsへログを配信したいので、com.amazonaws.region.logs
エンドポイントを作成。
CloudWatchLogs用エンドポイントに適用するセキュリティグループの設定
Fargateに適用したセキュリティグループを送信元として、443/tcpのインバウンド通信を許可する設定となればOK。
SQS用エンドポイントの作成
Private SubnetのFargateからSQSにアクセスするためにSQS用のVPCエンドポイントも作成。
DynamoDB用エンドポイントの作成
SQSから取得したメッセージの内容をDynamoDBにPutする処理が含まれているため、DynamoDBにアクセスさせるためにDynamoDB用のエンドポイントを作成。
ここまでの設定で合計6個のVPCエンドポイントが作成された。
IAMロール確認
ECSに付与しているIAMロールの権限を確認。DynamoDBのテーブルにPutItemするポリシーやSQSへのアクセスポリシーを追加している。
Task定義の作成
SubscriberのFargateを実行する際のTask定義をしていく。Image URIには作成したリポジトリを指定。
pythonスクリプトで参照している環境変数を設定する。
Commandにdate
を記述しているがここは無くてもOK。
Fargateを指定し、Taskのサイズも最小を指定。Task roleとTask execution roleはひとまず先ほど確認した同一のロールを指定。
CloudWatchLogsへログが配信されるようにログ構成も設定する。
ECSサービスのデプロイ
作成したタスク定義からサービスを作成(デプロイ)していく。
Launch typeはFargateを指定。
Application TypeはServiceとする。
この辺はデフォルト値のまま
Private Subnetを選択し、Private Subnetのインスタンス用のSGを指定。
またPublic IPは不要しないように設定。
以下は一旦デフォルト値。
動作確認
後続のLambdaによりSQS Queueにメッセージが追加された。
PythonプログラムによるDynamoDBへの新規Item格納も無事に確認できた。
手元のメールボックスにもメール通知受信OK。
追記
これまでの実装だとメッセージを処理した後にメッセージがキューから消えずにデッドレターキューに残ってしまっていた。
公式ドキュメントにもあるがメッセージは処理した後に削除しないと、設定しているデッドレターキューに移動されてしまう。以下のドキュメントなども参考にしながら、処理したメッセージを削除するコードを追加した。
import json
import boto3
from datetime import datetime
import os
queue_name = os.environ['QueueName']
sqs_endpoint_url = os.environ['SQSEndpointUrl']
dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-1')
table = dynamodb.Table('sns_sqs_lambda_table')
sqs = boto3.resource('sqs', region_name='ap-northeast-1', endpoint_url=sqs_endpoint_url)
queue = sqs.get_queue_by_name(QueueName=queue_name)
def logging(errorLv, funcName, errorMsg):
loggingDateStr=(datetime.now()).strftime('%Y%m%d %H:%M:%S')
print(loggingDateStr + " " + funcName + " " + "[" + errorLv + "] " + errorMsg)
return
def main():
while True:
logging("info", "subscriber fargate", "service started")
print('subscribe fargate started..')
message = queue.receive_messages(MaxNumberOfMessages=1)
result = ''
if message:
print("fetched message: ")
print(message[0])
print("fetched message body: " + message[0].body)
try:
result = table.put_item(Item={
'message_id':message[0].message_id,
'subject': 'fixed subject',
'message': message[0].body,
'timestamp': datetime.now().isoformat(timespec='seconds')
})
except Exception as e:
print(e)
raise e
message[0].delete()
print("message has been deleted from sqs queue.")
else:
print("there is no message.")
print(result)
return result
if __name__ == '__main__':
main()
終わりに
今回はPubSubバックエンドにAPIを処理する機構としてFargateのサービスタイプでの挙動を検証した。
検証期間は途中作業できない日もあったり、土日も挟んだので1週間ほどだったが、AWS料金が$15USDくらいいってしまった。やはりVPCエンドポイントは月額で固定料金がかかってくるので、この構成の維持は不要だし一旦Lambda利用に戻そうと思う。FargateはSpotを利用したりしてなるべくコスト軽減を試みたが、VPCエンドポイントの課金は避けられない。。Fargateのサービス起動という観点だけでなく、PrivateSubnetを利用して各AWSサービスに接続しにいくにはVPCエンドポイントも結構必要であることを再確認できたのはよかった。(NATを構築すればそれでも可能だと思うが、NATでも通信に応じた課金が発生する。)
ハマったこと
SQSをpythonから操作する場合にboto3ではboto3.client('sqs')
でも操作できるし、boto3.resource('sqs')
でも操作ができる。Lambdaでの実装はclient
を使っていたが、Fargateでは参考サイトなども参照にした際にresource
の方を使っていたので、API仕様が異なっておりその部分を整理して取り組む必要があった。
参考サイト