0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Private SubnetにFargateを構築し、SQS・DynamoDBと連携する。

Last updated at Posted at 2023-07-21

はじめに

FargateをAPIバックエンド的な使い方をした構成を構築してみたかった。Webサービスは特にホストする予定はなく、APIのバックエンド処理をFargateのサービスタイプを利用した構成で試すことが目的。ちなみにこの構成を考える際に悩んだような部分は以下の点。

悩み

・Private SubnetにFargateを構築して色々なAWSサービスと統合するには、VPCエンドポイントが必要で、エンドポイント一つにつき月額10USDくらいかかってくる。
 ⇒思ったよりも作成するエンドポイントが多そうで、重なるとだいぶコストになる。。。
・Fargateをサービスタイプで起動すると、こちらもコストがかかってくる。
・でも実際にFargateをPrivate Subnetでサービスとして構築してみたい。
・Lambdaを利用した既存構成はできているから、一時的にLambda部分をFargateに置き換えて検証しよう。
・検証が終わったらLambdaの構成に戻し、Fargate関連のサービスは削除しよう。(悲しいけど)
・削除漏れがあると嫌なので、Terraformで構築しよう。(一気にクリーンアップできるし、今後の資産にもなる)
 ⇒この点は是非実践してみたかったが、Terraformを整備するのに時間がかかりそうで直ぐにはできなそうだったので、結局コンソールで作成することにした。代わりの他の構成部分をTerraformで整備していく方針。

構成

image.png
Subscriber Lambdaの代わりとなるSubscriber Fargateを構築する。ただしある程度検証が終わったらLambdaの利用に戻す。

ちなみにこれまでの構成については以下の記事などで記録している。

実施内容

・Subscriber用のDockerイメージを格納するリポジトリの作成
・Subscriber用のDockerイメージの作成
・Subscriber用のECSクラスターの作成
・ECR用VPCエンドポイントの作成
・S3用VPCエンドポイントの作成
・CloudWatchLogs用のエンドポイント+セキュリティグループの作成
・SQS用VPCエンドポイントの作成
・DynamoDB用VPCエンドポイントの作成
・IAMロールの設定確認
・ECS Task定義(Subscriber用)
・Fargateサービスのデプロイ
・動作確認

ECRリポジトリ作成

sub-repoとしてリポジトリを作成する。このリポジトリへ作成したDockerイメージをプッシュしていく。
image.png

Subscriber用のDockerイメージの作成

DockerFilemsg_subscriber.pyrequirement.txtをそれぞれ作成する

DockerFile
FROM python:3.8

WORKDIR /app

COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

COPY msg_subscriber.py ./

ENV PYTHONUNBUFFERED=1
ENTRYPOINT ["python", "./msg_subscriber.py"]

msg_subscriber.py
import json
import boto3
from datetime import datetime
import os

queue_name = os.environ['QueueName']
sqs_endpoint_url = os.environ['SQSEndpointUrl']

dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-1')
table = dynamodb.Table('sns_sqs_lambda_table')
sqs = boto3.resource('sqs', region_name='ap-northeast-1', endpoint_url=sqs_endpoint_url)
queue = sqs.get_queue_by_name(QueueName=queue_name)

def logging(errorLv, funcName, errorMsg):
    loggingDateStr=(datetime.now()).strftime('%Y%m%d %H:%M:%S')
    print(loggingDateStr + " " + funcName + " " + "[" + errorLv + "] " + errorMsg)
    return

def main():
  
  logging("info", "subscriber fargate", "service started")
  print("subscribe fargate started..")
  
  message = queue.receive_messages(MaxNumberOfMessages=1)
  result = ''
  
  if message:
    print("fetched message: " + message[0].body)
    try:
        result = table.put_item(Item={
            'message_id': message[0].message_id
            'subject': 'fixed subject',
            'message': message[0].body,
            'timestamp': datetime.now().isoformat(timespec='seconds')
        })
      
    except Exception as e:
        print(e)
        raise e

  else:
    print("there is no message.")
  
  print(result)
  return result

if __name__ == '__main__':
  main()

シンプルにSQSからメッセージを1件取得してくる内容。QueueName,SQSEndpointUrlはECSTask定義を作成する際に環境変数に設定する。(一度これを忘れてエラーとなった)
boto3.resource('sqs')の仕様などは以下の公式ガイドなどで確認できる。

requirement.txt
boto3==1.9.183

リポジトリへプッシュ

記述DockerイメージをビルドしてECRリポジトリへプッシュする。
ECRにログインし、
image.png
イメージをビルドし、
image.png
イメージにタグ付けをし、
image.png
プッシュ実行
image.png

ECS Clusterの作成

次にコンテナを実行するクラスターを作成していく。といってもAWSコンソールで数クリックで完了する。
今回はNetworking onlyを選択し、
image.png
クラスター名は任意に設定しCreate
image.png

VPCエンドポイントの作成

今回はプライベートサブネットにFargateを作成し、各種のAWSサービスと連携させるため複数のVPCエンドポイントが必要になる。今回はFargateを利用するため、以下の公式ドキュメントにもあるようにECR用のエンドポイント(2種類)とS3エンドポイントがまずは必要。

ECR-dkr用エンドポイントの作成

com.amazonaws.region.ecr.dkrエンドポイントを作成。
image.png
image.png
image.png

ECR-api用エンドポイントの作成

com.amazonaws.region.ecr.apiエンドポイントを作成。
image.png
image.png
image.png

S3用のエンドポイントの作成

com.amazonaws.region.s3を作成。ゲートウェイ型。
image.png
image.png
image.png

CloudWatchLog用のエンドポイントの作成

FargateからCloudWatchLogsへログを配信したいので、com.amazonaws.region.logsエンドポイントを作成。
image.png
image.png
image.png
image.png

CloudWatchLogs用エンドポイントに適用するセキュリティグループの設定

Fargateに適用したセキュリティグループを送信元として、443/tcpのインバウンド通信を許可する設定となればOK。
image.png
image.png

SQS用エンドポイントの作成

Private SubnetのFargateからSQSにアクセスするためにSQS用のVPCエンドポイントも作成。
image.png
image.png

DynamoDB用エンドポイントの作成

SQSから取得したメッセージの内容をDynamoDBにPutする処理が含まれているため、DynamoDBにアクセスさせるためにDynamoDB用のエンドポイントを作成。
image.png
image.png
image.png

ここまでの設定で合計6個のVPCエンドポイントが作成された。
image.png

IAMロール確認

ECSに付与しているIAMロールの権限を確認。DynamoDBのテーブルにPutItemするポリシーやSQSへのアクセスポリシーを追加している。
image.png

Task定義の作成

SubscriberのFargateを実行する際のTask定義をしていく。Image URIには作成したリポジトリを指定。
image.png
pythonスクリプトで参照している環境変数を設定する。
image.png
Commandにdateを記述しているがここは無くてもOK。
image.png
Fargateを指定し、Taskのサイズも最小を指定。Task roleとTask execution roleはひとまず先ほど確認した同一のロールを指定。
image.png
CloudWatchLogsへログが配信されるようにログ構成も設定する。
image.png

ECSサービスのデプロイ

作成したタスク定義からサービスを作成(デプロイ)していく。
image.png
Launch typeはFargateを指定。
image.png
Application TypeはServiceとする。
image.png
この辺はデフォルト値のまま
image.png
Private Subnetを選択し、Private Subnetのインスタンス用のSGを指定。
またPublic IPは不要しないように設定。
image.png
以下は一旦デフォルト値。
image.png

動作確認

PostmanからREST APIでリクエストを実施。
image.png

後続のLambdaによりSQS Queueにメッセージが追加された。
image.png

デプロイしたServiceによりTaskが起動。
image.png

PythonプログラムによるDynamoDBへの新規Item格納も無事に確認できた。
image.png

CloudWatchLogsでもログ出力を確認。
image.png

手元のメールボックスにもメール通知受信OK。

追記

これまでの実装だとメッセージを処理した後にメッセージがキューから消えずにデッドレターキューに残ってしまっていた。
image.png

公式ドキュメントにもあるがメッセージは処理した後に削除しないと、設定しているデッドレターキューに移動されてしまう。以下のドキュメントなども参考にしながら、処理したメッセージを削除するコードを追加した。

msg_subscriber.py
import json
import boto3
from datetime import datetime
import os

queue_name = os.environ['QueueName']
sqs_endpoint_url = os.environ['SQSEndpointUrl']

dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-1')
table = dynamodb.Table('sns_sqs_lambda_table')
sqs = boto3.resource('sqs', region_name='ap-northeast-1', endpoint_url=sqs_endpoint_url)

queue = sqs.get_queue_by_name(QueueName=queue_name)

def logging(errorLv, funcName, errorMsg):
    loggingDateStr=(datetime.now()).strftime('%Y%m%d %H:%M:%S')
    print(loggingDateStr + " " + funcName + " " + "[" + errorLv + "] " + errorMsg)
    return

def main():
  while True:
  
    logging("info", "subscriber fargate", "service started")
    print('subscribe fargate started..')
    
    message = queue.receive_messages(MaxNumberOfMessages=1)
    result = ''
    
    if message:
      print("fetched message: ") 
      print(message[0])
      print("fetched message body: " + message[0].body)
      try:
          result = table.put_item(Item={
              'message_id':message[0].message_id,
              'subject': 'fixed subject',
              'message': message[0].body,
              'timestamp': datetime.now().isoformat(timespec='seconds')
          })
        
      except Exception as e:
          print(e)
          raise e
          
      message[0].delete()
      print("message has been deleted from sqs queue.")
  
    else:
      print("there is no message.")
    
    print(result)
    return result

if __name__ == '__main__':
  main()

終わりに

今回はPubSubバックエンドにAPIを処理する機構としてFargateのサービスタイプでの挙動を検証した。
検証期間は途中作業できない日もあったり、土日も挟んだので1週間ほどだったが、AWS料金が$15USDくらいいってしまった。やはりVPCエンドポイントは月額で固定料金がかかってくるので、この構成の維持は不要だし一旦Lambda利用に戻そうと思う。FargateはSpotを利用したりしてなるべくコスト軽減を試みたが、VPCエンドポイントの課金は避けられない。。Fargateのサービス起動という観点だけでなく、PrivateSubnetを利用して各AWSサービスに接続しにいくにはVPCエンドポイントも結構必要であることを再確認できたのはよかった。(NATを構築すればそれでも可能だと思うが、NATでも通信に応じた課金が発生する。)
image.png

ハマったこと

SQSをpythonから操作する場合にboto3ではboto3.client('sqs')でも操作できるし、boto3.resource('sqs')でも操作ができる。Lambdaでの実装はclientを使っていたが、Fargateでは参考サイトなども参照にした際にresourceの方を使っていたので、API仕様が異なっておりその部分を整理して取り組む必要があった。

参考サイト

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?